当前位置: 首页 >数据库 > 用户活跃度分析

用户活跃度分析

package cn.ibeifeng.sparkimport org.apache.spark.sql.SparkSession/** * 用户活跃度分析 *  * 我们这次项目课程的升级,也跟spark从入门到精通的升级采取同步,采用scala+eclipse的方式来开发 *  * 我个人而言,还是觉得应该用java去开发spark作业,因为hadoop是最重要的大数据引擎,hadoop mapreduce、hbase,全都是java * 整个公司的编程语言技术栈越简单越好,降低人员的招聘和培养的成本 *  * 但是由于市面上,现在大部分的公司,做spark都是采取一种,spark用scala开发,所以开发spark作业也用scala * 课程为了跟市场保持同步,后面就随便采取scala来开发了 *  */object UserActiveDegreeAnalyze {case class UserActionLog(logId: Long, userId: Long, actionTime: String, actionType: Long, purchaseMoney: Double)  case class UserActionLogVO(logId: Long, userId: Long, actionValue: Long)  case class UserActionLogWithPurchaseMoneyVO(logId: Long, userId: Long, purchaseMoney: Double)def main(args: Array[String]) {// 如果是按照课程之前的模块,或者整套交互式分析系统的架构,应该先从mysql中提取用户指定的参数(java web系统提供界面供用户选择,然后java web系统将参数写入mysql中)// 但是这里已经讲了,之前的环境已经没有了,所以本次升级从简// 我们就直接定义一个日期范围,来模拟获取了参数val startDate = "2016-09-01";val endDate = "2016-11-01";// 开始写代码// spark 2.0具体开发的细节和讲解,全部在从入门到精通中,这里不多说了,直接写代码// 要不然如果没有看过从入门到精通的话,就自己去上网查spark 2.0的入门资料val spark = SparkSession.builder().appName("UserActiveDegreeAnalyze").master("local") .config("spark.sql.warehouse.dir", "D:\\test\\spark\\mall\\spark-warehouse").getOrCreate()// 导入spark的隐式转换import spark.implicits._// 导入spark sql的functionsimport org.apache.spark.sql.functions._// 获取两份数据集val userBaseInfo = spark.read.json("D:\\test\\spark\\mall\\user_base_info.json")val userActionLog = spark.read.json("D:\\test\\spark\\mall\\user_action_log.json")  // 第一个功能:统计指定时间范围内的访问次数最多的10个用户// 说明:课程,所以数据不会搞的太多,但是一般来说,pm产品经理,都会抽取100个~1000个用户,供他们仔细分析//{"logId": 00,"userId": 0, "actionTime": "2016-10-04 15:42:45", "actionType": 0, "purchaseMoney": 0.0}//{"userId": 0, "useame": "user0", "registTime": "2016-10-11 18:06:25"}//userActionLog//// 第一步:过滤数据,找到指定时间范围内的数据//.filter("actionTime >= '" + startDate + "' and actionTime <= '" + endDate + "' and actionType = 0")//// 第二步:关联对应的用户基本信息数据//.join(userBaseInfo, userActionLog("userId") === userBaseInfo("userId"))//// 第三部:进行分组,按照userid和useame//.groupBy(userBaseInfo("userId"), userBaseInfo("useame"))//// 第四步:进行聚合//.agg(count(userActionLog("logId")).alias("actionCount"))//// 第五步:进行排序//.sort($"actionCount".desc)//// 第六步:抽取指定的条数//.limit(10)//// 第七步:展示结果,因为监简化了,所以说不会再写入mysql//.show()// 第一个功能:统计指定时间范围内的访问次数最多的10个用户/* userActionLog.filter("actionTime >= '"+startDate+"' and actionTime <= '"+endDate+"' and actionType = 0")  .join(userBaseInfo,userActionLog("userId")===userBaseInfo("userId"))  .groupBy(userBaseInfo("userId"),userBaseInfo("useame"))  .agg(count(userActionLog("logId")).alias("actionCount"))  .sort($"actionCount".desc).limit(10)  .show()*/// 第二个功能:获取指定时间范围内购买金额最多的10个用户// 对金额进行处理的函数讲解// feature,技术点的讲解:嵌套函数的使用//userActionLog//.filter("actionTime >= '" + startDate + "' and actionTime <= '" + endDate + "' and actionType = 1")//.join(userBaseInfo, userActionLog("userId") === userBaseInfo("userId"))//.groupBy(userBaseInfo("userId"), userBaseInfo("useame"))//.agg(round(sum(userActionLog("purchaseMoney")),2).alias("totalPurchaseMoney"))//.sort($"totalPurchaseMoney".desc)//.limit(10)//.show()// 第二个功能:获取指定时间范围内购买金额最多的10个用户//{"logId": 00,"userId": 0, "actionTime": "2016-10-04 15:42:45", "actionType": 0, "purchaseMoney": 0.0}//{"userId": 0, "useame": "user0", "registTime": "2016-10-11 18:06:25"}  /*  userActionLog.filter(s"actionTime >= '$startDate' and actionTime <= '$endDate' and actionType=1")  .join(userBaseInfo,userActionLog("userId")===userBaseInfo("userId"))  .groupBy(userBaseInfo("userId"),userBaseInfo("useame"))  .agg(round(sum(userActionLog("purchaseMoney")),2).alias("sumPurchase"))  .sort($"sumPurchase".desc)  .limit(10)  .show()*/// 第三个功能:统计最近一个周期相对上一个周期访问次数增长最多的10个用户// 比如说我们设定一个周期是1个月// 我们有1个用户,叫张三,那么张三在9月份这个周期内总共访问了100次,张三在10月份这个周期内总共访问了200次// 张三这个用户在最近一个周期相比上一个周期,访问次数增长了100次// 每个用户都可以计算出这么一个值// 获取在最近两个周期内,访问次数增长最多的10个用户// 周期,是可以由用户在web界面上填写的,java web系统会写入mysql,我们可以去获取本次执行的周期// 假定1个月,2016-10-01~2016-10-31,上一个周期就是2016-09-01~2016-09-30//val userActionLogInFirstPeriod = userActionLog.as[UserActionLog]//.filter("actionTime >= '2016-10-01' and actionTime <= '2016-10-31' and actionType = 0")//.map{ userActionLogEntry => UserActionLogVO(userActionLogEntry.logId, userActionLogEntry.userId, 1) }////val userActionLogInSecondPeriod = userActionLog.as[UserActionLog]//.filter("actionTime >= '2016-01-01' and actionTime <= '2016-09-30' and actionType = 0")//.map{ userActionLogEntry => UserActionLogVO(userActionLogEntry.logId, userActionLogEntry.userId, -1) }////val userActionLogDS = userActionLogInFirstPeriod.union(userActionLogInSecondPeriod)////userActionLogDS//.join(userBaseInfo, userActionLogDS("userId") === userBaseInfo("userId"))//.groupBy(userBaseInfo("userId"), userBaseInfo("useame"))//.agg(sum(userActionLogDS("actionValue")).alias("actionIncr"))//.sort($"actionIncr".desc)//.limit(10)//.show()////////{"logId": 00,"userId": 0, "actionTime": "2016-10-04 15:42:45", "actionType": 0, "purchaseMoney": 0.0}////{"userId": 0, "useame": "user0", "registTime": "2016-10-11 18:06:25"}//// 第三个功能:统计最近一个周期相对上一个周期访问次数增长最多的10个用户//// 比如说我们设定一个周期是1个月//// 我们有1个用户,叫张三,那么张三在9月份这个周期内总共访问了100次,张三在10月份这个周期内总共访问了200次//// 张三这个用户在最近一个周期相比上一个周期,访问次数增长了100次//// 每个用户都可以计算出这么一个值//// 获取在最近两个周期内,访问次数增长最多的10个用户//val startDate1="2016-01-01"//val startDate2="2016-10-01"//val endDate1="2016-09-30"//val endDate2="2016-10-31"////一个时间段内用户的访问时间//val first = userActionLog.as[UserActionLog]//  .filter(s"actionTime >= '$startDate1' and actionTime <= '$endDate1' and actionType=0")//  .map{ userActionLogEntry => UserActionLogVO(userActionLogEntry.logId, userActionLogEntry.userId, -1) }////  //.join(userBaseInfo,userActionLog("userId")===userBaseInfo("userId"))//////  .groupBy(userBaseInfo("userId"),userBaseInfo("useame"))////  .agg(count(userActionLog("logId")))////val second = userActionLog.as[UserActionLog]//  .filter(s"actionTime >= '$startDate2' and actionTime <= '$endDate2' and actionType=0")//  .map{ userActionLogEntry => UserActionLogVO(userActionLogEntry.logId, userActionLogEntry.userId, 1) }//////val userActionLogDS2 =  second//  .union(first);////  //.as[userActionLogDS]//userActionLogDS2.join(userBaseInfo,userBaseInfo("userId")===userActionLogDS2("userId"))//  .groupBy(userBaseInfo("userId"),userBaseInfo("useame"))//  .agg(sum(userActionLogDS2("actionValue")).alias("sumActionValue"))//.sort($"sumActionValue".desc)//  .limit(10)//  .show()////first.union(second).//// 真实的项目中,大量的情况就是这样的,很多作业和代码都是类似的,就是有些地方不太一样而已//// 向大家展示真实的项目逻辑,业务//// 让大家加强印象,多练习几遍,没什么坏处////val userActionLogWithPurchaseMoneyInFirstPeriod = userActionLog.as[UserActionLog]//.filter("actionTime >= '2016-10-01' and actionTime <= '2016-10-31' and actionType = 1")//.map{ userActionLogEntry =>//  UserActionLogWithPurchaseMoneyVO(userActionLogEntry.logId, userActionLogEntry.userId,//userActionLogEntry.purchaseMoney) }////val userActionLogWithPurchaseMoneyInSecondPeriod = userActionLog.as[UserActionLog]//.filter("actionTime >= '2016-09-01' and actionTime <= '2016-09-30' and actionType = 1")//.map{ userActionLogEntry =>//  UserActionLogWithPurchaseMoneyVO(userActionLogEntry.logId, userActionLogEntry.userId,//-userActionLogEntry.purchaseMoney) }////val userActionLogWithPurchaseMoneyDS = userActionLogWithPurchaseMoneyInFirstPeriod.union(userActionLogWithPurchaseMoneyInSecondPeriod)////userActionLogWithPurchaseMoneyDS//.join(userBaseInfo, userActionLogWithPurchaseMoneyDS("userId") === userBaseInfo("userId"))//.groupBy(userBaseInfo("userId"), userBaseInfo("useame"))//.agg(round(sum(userActionLogWithPurchaseMoneyDS("purchaseMoney")), 2).alias("purchaseMoneyIncr"))//.sort($"purchaseMoneyIncr".desc)//.limit(10)//.show()//// 统计指定注册时间范围内头7天访问次数最高的10个用户// 举例,用户通过web界面指定的注册范围是2016-10-01~2016-10-31userActionLog.join(userBaseInfo, userActionLog("userId") === userBaseInfo("userId")).filter(userBaseInfo("registTime") >= "2016-10-01"&& userBaseInfo("registTime") <= "2016-10-31"&& userActionLog("actionTime") >= userBaseInfo("registTime")&& userActionLog("actionTime") <= date_add(userBaseInfo("registTime"), 7)&& userActionLog("actionType") === 0).groupBy(userBaseInfo("userId"), userBaseInfo("useame")).agg(count(userActionLog("logId")).alias("actionCount")).sort($"actionCount".desc).limit(10).show()userActionLog.join(userBaseInfo, userActionLog("userId") === userBaseInfo("userId")).filter(userBaseInfo("registTime") >= "2016-10-01"&& userBaseInfo("registTime") <= "2016-10-31"&& userActionLog("actionTime") >= userBaseInfo("registTime")&& userActionLog("actionTime") <= date_add(userBaseInfo("registTime"), 7)&& userActionLog("actionType") === 1).groupBy(userBaseInfo("userId"), userBaseInfo("useame")).agg(round(sum(userActionLog("purchaseMoney")),2).alias("purchaseMoneyTotal")).sort($"purchaseMoneyTotal".desc).limit(10).show()// 统计指定注册时间范围内头7天访问次数最高的10个用户// 举例,用户通过web界面指定的注册范围是2016-10-01~2016-10-31//{"logId": 00,"userId": 0, "actionTime": "2016-10-04 15:42:45", "actionType": 0, "purchaseMoney": 0.0}//{"userId": 0, "useame": "user0", "registTime": "2016-10-11 18:06:25"}val startDateReg = "2016-10-01"val endDateReg = "2016-10-31"userBaseInfo.filter(s"registTime >= '$startDateReg' and registTime <= '$endDateReg'")  .join(userActionLog,userActionLog("userId")===userBaseInfo("userId"))  .filter(userActionLog("actionTime") >= userBaseInfo("registTime")  &&  userActionLog("actionTime") <= date_add(userBaseInfo("registTime"),7)&& userActionLog("actionType") === 1)  .groupBy(userBaseInfo("userId"),userBaseInfo("useame"))  .agg(count(userActionLog("logId")).alias("countLogId"))  .sort($"countLogId".desc)  .limit(10)  .show()  }  }

 数据链接:http://pan.baidu.com/s/1cKvqZc 密码:4mcy

作者:牵牛花
来源链接:https://www.cnblogs.com/rocky-AGE-24/p/7527497.html

版权声明:
1、JavaClub(https://www.javaclub.cn)以学习交流为目的,由作者投稿、网友推荐和小编整理收藏优秀的IT技术及相关内容,包括但不限于文字、图片、音频、视频、软件、程序等,其均来自互联网,本站不享有版权,版权归原作者所有。

2、本站提供的内容仅用于个人学习、研究或欣赏,以及其他非商业性或非盈利性用途,但同时应遵守著作权法及其他相关法律的规定,不得侵犯相关权利人及本网站的合法权利。
3、本网站内容原作者如不愿意在本网站刊登内容,请及时通知本站(javaclubcn@163.com),我们将第一时间核实后及时予以删除。





本文链接:https://www.javaclub.cn/database/117806.html

标签:group by
分享给朋友:

“用户活跃度分析” 的相关文章

Linux安装MySQL(超详细) 2022年05月16日 21:54:54
Mybatis中的${}和#{}区别 2022年05月17日 21:41:44
连接数据库版本不一致 2022年05月20日 01:07:15
mysql查询最新的一条记录 2022年06月06日 16:04:12
MySQL 查询指定时间范围内的数据 2022年06月06日 16:59:25
MYSQL查询某字段为空的数据 2022年06月08日 21:35:13
mysql数据查询——复杂查询 2022年06月09日 23:08:26