当前位置: 首页 >数据库 > Spark学生答题情况分析

Spark学生答题情况分析


1 流程分析

Spark学生答题情况分析 _ JavaClub全栈架构师技术笔记

注意: 重点做的 2 3 4 部分

2 业务模块划分

Spark学生答题情况分析 _ JavaClub全栈架构师技术笔记

准备工作

3 创建模块包结构

Spark学生答题情况分析 _ JavaClub全栈架构师技术笔记

4 准备Kafka主题

#查看topic信息/export/server/kafka/bin/kafka-topics.sh --list --zookeeper node1:2181#删除topic/export/server/kafka/bin/kafka-topics.sh --delete --zookeeper node1:2181 --topic edu#创建topic/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 1 --partitions 3 --topic edu#模拟消费者/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic edu --from-beginning

4.1 测试发送数据到Kafka

启动

Spark学生答题情况分析 _ JavaClub全栈架构师技术笔记

Spark学生答题情况分析 _ JavaClub全栈架构师技术笔记

{"student_id":"学生ID_31","textbook_id":"教材ID_1","grade_id":"年级ID_6","subject_id":"科目ID_2_语文","chapter_id":"章节ID_chapter_3","question_id":"题目ID_1003","score":7,"answer_time":"2021-01-09 14:53:28","ts":"Jan 9, 2021 2:53:28 PM"}

学生答题情况实时分析

5 需求

Spark学生答题情况分析 _ JavaClub全栈架构师技术笔记

5.1 代码实现

package cn.itcast.edu.analysis.streamingimport cn.itcast.edu.bean.Answerimport com.google.gson.Gsonimport org.apache.spark.SparkContextimport org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}/** * Author itcast * Desc 实时的从Kafka的edu主题消费数据,并做实时的统计分析,结果可以直接输出到控制台或mysql */object StreamingAnalysis {  def main(args: Array[String]): Unit = {//TODO 0.准备环境val spark: SparkSession = SparkSession.builder().appName("StreamingAnalysis").master("local[*]")  .config("spark.sql.shuffle.partitions", "4")//本次测试时将分区数设置小一点,实际开发中可以根据集群规模调整大小,默认200  .getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")import spark.implicits._import org.apache.spark.sql.functions._//TODO 1.加载数据val kafkaDF: DataFrame = spark.readStream  .format("kafka")  .option("kafka.bootstrap.servers", "node1:9092")  .option("subscribe", "edu")  .load()val valueDS: Dataset[String] = kafkaDF.selectExpr("CAST(value AS STRING)").as[String]//{"student_id":"学生ID_31","textbook_id":"教材ID_1","grade_id":"年级ID_6","subject_id":"科目ID_2_语文","chapter_id":"章节ID_chapter_3","question_id":"题目ID_1003","score":7,"answer_time":"2021-01-09 14:53:28","ts":"Jan 9, 2021 2:53:28 PM"}//TODO 2.处理数据//---数据预处理//解析json-方式1:/*valueDS.select(  get_json_object($"value", "$.student_id").as("student_id"),  //.....)*///解析json-方式2:将每一条json字符串解析为一个样例类对象val answerDS: Dataset[Answer] = valueDS.map(josnStr => {  val gson = new Gson()  //json--->对象  gson.fromJson(josnStr, classOf[Answer])})//---实时分析//TODO ==实时分析需求1:统计top10热点题//SQL/*val result1 = spark.sql("""SELECT  |  question_id, COUNT(1) AS frequency  |FROM  |  t_answer  |GROUP BY  |  question_id  |ORDER BY  |  frequency  |DESC  |LIMIT 10""".stripMargin) *///DSLval result1: Dataset[Row] = answerDS.groupBy('question_id)  //.agg(count('question_id) as "count")  .count()  .orderBy('count.desc)  .limit(10)//TODO ==实时分析需求2:统计top10答题活跃年级/*val result2 = spark.sql(  """SELECT|  grade_id, COUNT(1) AS frequency|FROM|  t_answer|GROUP BY|  grade_id|ORDER BY|  frequency|DESC|LIMIT 10  """.stripMargin) */val result2: Dataset[Row] = answerDS.groupBy('grade_id)  .count()  .orderBy('count.desc)  .limit(10)//TODO ==实时分析需求3:统计top10热点题并带上所属科目/*注意:select...group语句下,select 后面的字段要么是分组字段,要么是聚合字段val result1 = spark.sql("""SELECT  |  question_id,first(subject_id), COUNT(1) AS frequency  |FROM  |  t_answer  |GROUP BY  |  question_id  |ORDER BY  |  frequency  |DESC  |LIMIT 10""".stripMargin) */val result3: Dataset[Row] = answerDS.groupBy('question_id)  .agg(first('subject_id) as "subject_id",count('question_id) as "count"  )  .orderBy('count.desc)  .limit(10)//TODO ==实时分析需求4:统计每个学生的得分最低的题目top10并带上是所属哪道题/*val result4 = spark.sql(  """SELECT|  student_id, FIRST(question_id), MIN(score)|FROM|  t_answer|GROUP BY|  student_id|order by|  score|limit 10  """.stripMargin) */val result4: Dataset[Row] = answerDS.groupBy('student_id)  .agg(min('score) as "minscore",first('question_id)  )  .orderBy('minscore)  .limit(10)//TODO 3.输出结果result1.writeStream  .format("console")  .outputMode("complete")  .start()result2.writeStream  .format("console")  .outputMode("complete")  .start()result3.writeStream  .format("console")  .outputMode("complete")  .start()result4.writeStream  .format("console")  .outputMode("complete")  //TODO 4.启动并等待结束  .start()  .awaitTermination()//TODO 5.关闭资源spark.stop()  }}

实时推荐易错题

6 需求

Spark学生答题情况分析 _ JavaClub全栈架构师技术笔记

6.1 准备模型-直接训练并使用

Spark学生答题情况分析 _ JavaClub全栈架构师技术笔记

Spark学生答题情况分析 _ JavaClub全栈架构师技术笔记

6.2 代码实现

package cn.itcast.edu.analysis.streamingimport cn.itcast.edu.bean.Answerimport cn.itcast.edu.utils.RedisUtilimport com.google.gson.Gsonimport org.apache.kafka.clients.consumer.ConsumerRecordimport org.apache.kafka.common.serialization.StringDeserializerimport org.apache.spark.ml.recommendation.ALSModelimport org.apache.spark.{SparkContext, streaming}import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}import org.apache.spark.streaming.dstream.{DStream, InputDStream}import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}import org.apache.spark.streaming.{Seconds, StreamingContext}import redis.clients.jedis.Jedis/** * Author itcast * Desc * 从Kafka消费消息(消息中有用户id), * 然后从Redis中获取推荐模型的路径,并从路径中加载推荐模型ALSModel * 然后使用该模型给用户推荐易错题 */object StreamingRecommend {  def main(args: Array[String]): Unit = {//TODO 0.准备环境val spark: SparkSession = SparkSession.builder().appName("StreamingAnalysis").master("local[*]")  .config("spark.sql.shuffle.partitions", "4") //本次测试时将分区数设置小一点,实际开发中可以根据集群规模调整大小,默认200  .getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")val ssc: StreamingContext = new StreamingContext(sc, streaming.Seconds(5))import spark.implicits._import org.apache.spark.sql.functions._//TODO 1.加载数据val kafkaParams = Map[String, Object](  "bootstrap.servers" -> "node1:9092", //kafka集群地址  "key.deserializer" -> classOf[StringDeserializer], //key的反序列化规则  "value.deserializer" -> classOf[StringDeserializer], //value的反序列化规则  "group.id" -> "StreamingRecommend", //消费者组名称  "auto.offset.reset" -> "latest",  "auto.commit.interval.ms" -> "1000", //自动提交的时间间隔  "enable.auto.commit" -> (true: java.lang.Boolean) //是否自动提交)val topics = Array("edu") //要订阅的主题val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,  LocationStrategies.PreferConsistent,  ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))//TODO 2.处理数据val valueDStream: DStream[String] = kafkaDStream.map(record => {  record.value()})//{"student_id":"学生ID_47","textbook_id":"教材ID_1","grade_id":"年级ID_3","subject_id":"科目ID_3_英语","chapter_id":"章节ID_chapter_3","question_id":"题目ID_534","score":7,"answer_time":"2021-01-09 15:29:50","ts":"Jan 9, 2021 3:29:50 PM"}valueDStream.foreachRDD(rdd => {  if (!rdd.isEmpty()) {//该rdd表示每个微批的数据//==1.获取path并加载模型//获取redis连接val jedis: Jedis = RedisUtil.pool.getResource//加载模型路径// jedis.hset("als_model", "recommended_question_id", path)val path: String = jedis.hget("als_model", "recommended_question_id")//根据路径加载模型val model: ALSModel = ALSModel.load(path)//==2.取出用户idval answerDF: DataFrame = rdd.coalesce(1).map(josnStr => {  val gson = new Gson()  gson.fromJson(josnStr, classOf[Answer])}).toDF//将用户id转为数字,因为后续模型推荐的时候需要数字格式的idval id2int = udf((student_id: String) => {  student_id.split("_")(1).toInt})val studentIdDF: DataFrame = answerDF.select(id2int('student_id) as "student_id")//==3.使用模型给用户推荐题目val recommendDF: DataFrame = model.recommendForUserSubset(studentIdDF, 10)recommendDF.printSchema()/*root|-- student_id: integer (nullable = false) --用户id|-- recommendations: array (nullable = true)--推荐列表||-- element: struct (containsNull = true)|||-- question_id: integer (nullable = true)--题目id|||-- rating: float (nullable = true)--评分/推荐指数 */recommendDF.show(false)/* +----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+|student_id|recommendations |+----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+|12|[[1707, 2.900552], [641, 2.8934805], [815, 2.8934805], [1583, 2.8934805], [1585, 2.8774242], [1369, 2.8033295], [906, 2.772558], [2129, 2.668791], [1661, 2.585957], [1978, 2.5290453]] ||14|[[1627, 2.8925943], [446, 2.8925943], [1951, 2.8925943], [1412, 2.8925943], [1812, 2.8925943], [1061, 2.8816805], [1661, 2.874632], [1453, 2.8682063], [1111, 2.8643343], [1797, 2.7966104]]| *///处理推荐结果:取出用户id和题目id拼成字符串:"id1,id2,id3..."val recommendResultDF: DataFrame = recommendDF.as[(Int, Array[(Int, Float)])].map(t => {  //val studentId: Int = t._1  //val studentIdStr: String = "学生ID_"+ studentId  //val questionIdsAndRating: Array[(Int, Float)] = t._2  //val questionIds: Array[Int] = questionIdsAndRating.map(_._1)  //val questionIdsStr: String = questionIds.mkString(",")  val studentIdStr: String = "学生ID_" + t._1  val questionIdsStr: String = t._2.map("题目ID_" + _._1).mkString(",")  (studentIdStr, questionIdsStr)}).toDF("student_id", "recommendations")//将answerDF和recommendResultDF进行joinval allInfoDF: DataFrame = answerDF.join(recommendResultDF, "student_id")//==4.输出结果到MySQL/HBaseif (allInfoDF.count() > 0) {  val properties = new java.util.Properties()  properties.setProperty("user", "root")  properties.setProperty("password", "root")  allInfoDF.write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/edu?useUnicode=true&characterEncoding=utf8", "t_recommended", properties)}//关闭redis连接jedis.close()  }})//TODO 3.输出结果//TODO 4.启动并等待停止ssc.start()ssc.awaitTermination()//TODO 5.关闭资源ssc.stop(stopSparkContext = true, stopGracefully = true) //优雅关闭  }}

7 学生答题情况离线分析

Spark学生答题情况分析 _ JavaClub全栈架构师技术笔记

Spark学生答题情况分析 _ JavaClub全栈架构师技术笔记

7.1 代码实现

Spark学生答题情况分析 _ JavaClub全栈架构师技术笔记

package cn.itcast.edu.analysis.batchimport breeze.linalg.*import cn.itcast.edu.bean.AnswerWithRecommendationsimport org.apache.spark.SparkContextimport org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}/** * Author itcast * Desc 离线分析学生学习情况 */object BatchAnalysis {  def main(args: Array[String]): Unit = {//TODO 0.准备环境-SparkSessionval spark: SparkSession = SparkSession.builder().appName("BatchAnalysis").master("local[*]")  .config("spark.sql.shuffle.partitions", "4")//本次测试时将分区数设置小一点,实际开发中可以根据集群规模调整大小,默认200  .getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")import spark.implicits._import org.apache.spark.sql.functions._//TODO 1.加载数据-MySQLval properties = new java.util.Properties()properties.setProperty("user", "root")properties.setProperty("password", "root")val allInfoDS: Dataset[AnswerWithRecommendations] = spark.read.jdbc(  "jdbc:mysql://localhost:3306/edu?useUnicode=true&characterEncoding=utf8",  "t_recommended",  properties).as[AnswerWithRecommendations]//TODO 2.处理数据/分析数据//TODO ===SQL//TODO: 需求:1.各科目热点题分析// 找到Top50热点题对应的科目,然后统计这些科目中,分别包含这几道热点题的条目数/*题号 热度1102938 *//*题号 热度 科目110语文29数学38数学 *//*科目 热点题数量语文  1数学  2 *///1.统计Top50热点题--子查询t1//2.将t1和原始表t_answer关联,并按学科分组聚合统计各个学科包含的热点题的数量//==================写法1:SQL风格================/*spark.sql(  """SELECT|  subject_id, count(t_answer.question_id) AS hot_question_count| FROM|  (SELECT|question_id, count(*) AS frequency|  FROM|t_answer|  GROUP BY|question_id|  ORDER BY|frequency|  DESC LIMIT|50) t1|JOIN|  t_answer|ON|  t1.question_id = t_answer.question_id|GROUP BY|  subject_id|ORDER BY|  hot_question_count| DESC  """.stripMargin)  .show()*///TODO: 需求:2.各科目推荐题分析// 找到Top20热点题对应的推荐题目,然后找到推荐题目对应的科目,并统计每个科目分别包含推荐题目的条数/*科目,包含的推荐的题目的数量英语,105语文,95数学,89 *///1.统计热点题Top20--子查询t1//2.将t1和原始表t_answer关联,得到热点题Top20的推荐题列表t2//3.用SPLIT将recommendations中的ids用","切为数组,然后用EXPLODE将列转行,并记为t3//4.对推荐的题目进行去重,将t3和t_answer原始表进行join,得到每个推荐的题目所属的科目,记为t4//5.统计各个科目包含的推荐的题目数量并倒序排序(已去重)//==================写法1:SQL风格================/*spark.sql(  """SELECT|t4.subject_id,|COUNT(*) AS frequency|FROM|(SELECT|DISTINCT(t3.question_id),|t_answer.subject_id| FROM|(SELECT|EXPLODE(SPLIT(t2.recommendations, ',')) AS question_id|FROM|(SELECT|recommendations| FROM| (SELECT|  question_id,|  COUNT(*) AS frequency|  FROM|  t_answer|  GROUP BY|  question_id|  ORDER BY|  frequency|  DESC LIMIT|  20) t1| JOIN| t_answer| ON| t1.question_id = t_answer.question_id) t2) t3|  JOIN| t_answer|  ON| t3.question_id = t_answer.question_id) t4|GROUP BY|t4.subject_id|ORDER BY|frequency|DESC  """.stripMargin)  .show*///TODO ===DSL//TODO: 需求:1.各科目热点题分析// 找到Top50热点题对应的科目,然后统计这些科目中,分别包含这几道热点题的条目数/*题号 热度1102938 *//*题号 热度 科目110语文29数学38数学 *//*科目 热点题数量语文  1数学  2 *///1.统计Top50热点题--子查询t1val hotTop50: Dataset[Row] = allInfoDS.groupBy('question_id)  .agg(count("*") as "hot")  .orderBy('hot.desc)  .limit(50)//2.将t1和原始表t_answer关联,得到热点题对应的科目val joinDF: DataFrame = hotTop50.join(allInfoDS.dropDuplicates("question_id"),"question_id")//3.按学科分组聚合统计各个学科包含的热点题的数量val result1: Dataset[Row] = joinDF.groupBy('subject_id)  .agg(count("*") as "hotCount")  .orderBy('hotCount.desc)//TODO: 需求:2.各科目推荐题分析// 找到Top20热点题对应的推荐题目,然后找到推荐题目对应的科目,并统计每个科目分别包含推荐题目的条数/*题号  热度1 102 9题号  热度  推荐题1 102,3,42 9 3,4,5推荐题 科目2  数学3  数学4  物理5  化学科目  推荐题数量数学  2物理  1化学  1 *///1.统计热点题Top20--子查询t1val hotTop20: Dataset[Row] = allInfoDS.groupBy('question_id)  .agg(count("*") as "hot")  .orderBy('hot.desc)  .limit(20)//2.将t1和原始表t_answer关联,得到热点题Top20的推荐题列表t2val ridsDF: DataFrame = hotTop20.join(allInfoDS, "question_id")  .select("recommendations")//3.用SPLIT将recommendations中的ids用","切为数组,然后用EXPLODE将列转行,并记为t3val ridsDS: Dataset[Row] = ridsDF.select(explode(split('recommendations, ",")) as "question_id")  .dropDuplicates("question_id")//4.对推荐的题目进行去重,将t3和t_answer原始表进行join,得到每个推荐的题目所属的科目,记为t4//df1.join(df2, $"df1Key" === $"df2Key")//df1.join(df2).where($"df1Key" === $"df2Key")val ridAndSid: DataFrame = ridsDS.join(allInfoDS.dropDuplicates("question_id"),"question_id")//5.统计各个科目包含的推荐的题目数量并倒序排序(已去重)val result2: Dataset[Row] = ridAndSid.groupBy('subject_id)  .agg(count("*") as "rcount")  .orderBy('rcount.desc)//TODO 3.输出结果//result1.show()result2.show()//TODO 4.关闭资源spark.stop()  }}

作者:赵广陆
来源链接:https://www.cnblogs.com/zhaoguanglu/p/15652912.html

版权声明:
1、JavaClub(https://www.javaclub.cn)以学习交流为目的,由作者投稿、网友推荐和小编整理收藏优秀的IT技术及相关内容,包括但不限于文字、图片、音频、视频、软件、程序等,其均来自互联网,本站不享有版权,版权归原作者所有。

2、本站提供的内容仅用于个人学习、研究或欣赏,以及其他非商业性或非盈利性用途,但同时应遵守著作权法及其他相关法律的规定,不得侵犯相关权利人及本网站的合法权利。
3、本网站内容原作者如不愿意在本网站刊登内容,请及时通知本站(javaclubcn@163.com),我们将第一时间核实后及时予以删除。





本文链接:https://www.javaclub.cn/database/118068.html

标签:group by
分享给朋友: