当前位置: 首页 >数据库 > 基于Spark的网站日志分析

基于Spark的网站日志分析

本文只展示核心代码,完整代码见文末链接。

Web Log Analysis

  1. 提取需要的log信息,包括time, traffic, ip, web address
  2. 进一步解析第一步获得的log信息,如把ip转换为对应的省份,从网址中提取出访问内容和内容ID,最后将信息转换为parquet格式。

(1)按日期和内容(video)的ID进行分组,并根据访问次数进行倒序排序。
(2)按日期,内容(video)的ID和省份进行分组,并根据访问次数排名取前3。
最后将(1)和(2)数据写入MySQL。

注意:(1)写入数据库时分partition写入,而非逐条写入。
(2)先filter出公用的df并进行cache
(3)下面代码应该能进一步优化,例如将videoAccessTopNStat的try/catch中生成partition list和StatDAO.inserDayVideoAccessTopN(list)中生成batch应该可以合并,避免两次遍历。

设计和编写思路:
1.设计输入参数args(如inputPath和outputPath)
2.设计转换的工具类,包括StructType(需要提取什么信息,分别是什么格式),parseLog(split并提取各index的信息,用try/catch包裹,设置默认输出)。其中对时间的提取可另外定义一个工具类,包括inputFormat,outputFormat,getTime和parse。而对地域的提取,可另外定义一个IpUtils,引入开源代码ipdatabase。这些工具类写完后都要在自身main方法中测试。最后生成DF。
3.filter出commonDF。
4.实现特定的数据统计
5.输出数据,如果写入MySQL,就另外创建一个StatDAO类,包括获取链接,分批写入数据和release链接。

//Step One:/**  * 将原始日志数据进行解析,返回信息包括visit time, url, traffic, ip  * @param .log, example: 183.162.52.7 - - [10/Nov/2016:00:01:02 +0800]* "POST /api3/getadv HTTP/1.1" ...  * @retu partitioned files, example: 1970-01-01 08:00:00\t-  * \t813\t183.162.52.7  */if (args.length != 2) {  println("Usage: logCleanYa <inputPath> <outputPath>")  System.exit(1)}val Array(inputPath, outputPath) = argsval spark = SparkSession.builder().getOrCreate()val access = spark.sparkContext.textFile(inputPath)//access.take(10).foreach(println)val splited = access.map(line => {val splits = line.split(" ")val ip = splits(0)val time = splits(3) + " " + splits(4)val url = splits(11).replaceAll("\"", "") //remove quotation markval traffic = splits(9)// (ip, DataUtils.parse(time), url, traffic)DataUtils.parse(time) + "\t" + url + "\t" + traffic + "\t" + ip})splited.saveAsTextFile(outputPath)spark.stop()/**  * 用于解析日志时间  */object DataUtils {  //input_format: [10/Nov/2016:00:01:02 +0800]  val YYYYMMDDHHMM_TIME_FORMAT = FastDateFormat.getInstance("dd/MMM/yyyy:HH:mm:SS Z", Locale.ENGLISH)  //output_format: yyyy-MM-dd HH:mm:ss  val TARGET_FORMAT = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")  def getTime(time: String) = {try {  YYYYMMDDHHMM_TIME_FORMAT.parse(time.substring(time.indexOf("[") + 1, time.lastIndexOf("]"))).getTime} catch {  case _ => 0l}  }  /**  * example: [10/Nov/2016:00:01:02 +0800] ==> 2016-11-10 00:01:00  */  def parse(time: String) = {TARGET_FORMAT.format(new Date(getTime(time)))  }//  def main(args: Array[String]): Unit = {//println(parse("[10/Nov/2016:00:01:02 +0800]"))//  }}
//Step Two:/**  * 将第一步解析出来的数据转化为DataFrame,并保存为一份parquet文件。  */if (args.length != 2) {  println("Usage: logCleanYa <inputPath> <outputPath>")  System.exit(1)}val Array(inputPath, outputPath) = argsval spark = SparkSession.builder().getOrCreate()val access = spark.sparkContext.textFile(inputPath)// access.take(10).foreach(println)val accessDF = spark.createDataFrame(access.map(line => AccessConvertUtil.parseLog(line)), AccessConvertUtil.struct)// accessDF.printSchema()// accessDF.show(false)accessDF.coalesce(1).write.format("parquet").partitionBy("day")  .save(outputPath)spark.stop()/**  * 工具类,定义了schema和进一步解析log的方法  */object AccessConvertUtil {  val struct = StructType(Seq(StructField("url", StringType),StructField("cmsType", StringType),StructField("cmsId", IntegerType),StructField("traffic", IntegerType),StructField("ip", StringType),StructField("city", StringType),StructField("time", StringType),StructField("day", StringType)  ))  /*** 进一步解析log,如转化数据类型,解析网址,ip映射具体省份,最后以Row输出*/  def parseLog(log: String) = {try{  val splited = log.split("\t")  val url = splited(1)  val traffic = splited(2).toInt  val ip = splited(3)  // 网址:"http://www.xxx.com/article/101"中article为网页内容,101为article的ID  val domain = "http://www.xxx.com/"  val cms = url.substring(url.indexOf(domain) + domain.length)  val cmsTypeId = cms.split("/")  var cmsType = ""  var cmsId = 0  if (cmsTypeId.length > 1) {cmsType = cmsTypeId(0)cmsId = cmsTypeId(1).toInt  }  val city = IpUtils.getCity(ip)  val time = splited(0)  val day = time.substring(0, 10).replaceAll("-", "")  Row(url, cmsType, cmsId, traffic, ip, city, time, day)} catch {  case _ => {Row(null, null, null, null, null, null, null, null)  }}  }}/**  * Ip工具类,将IP映射为省份,利用开源代码ipdatabase  * https://github.com/wzhe06/ipdatabase  */object IpUtils {  def getCity(ip: String) = {IpHelper.findRegionByIp(ip)  }  def main(args: Array[String]): Unit = {println(getCity("58.30.15.255"))  }}
//Step Three:/**  * 在第二步的结果数据中,按日期和video的ID进行分组,并根据访问次数进行倒序排序。  * 最后将数据写入MySQL。  */if (args.length != 2) {  println("Usage: logCleanYa <inputPath> <day>")  System.exit(1)}val Array(inputPath, day) = argsval spark = SparkSession.builder()  .config("spark.sql.sources.partitionColumnTypeInference.enabled", "false")  .getOrCreate()val accessDF = spark.read.format("parquet").load(inputPath)//accessDF.printSchema()//accessDF.show(false)//预先筛选和cache后面两个函数要复用的dfimport spark.implicits._val commonDF = accessDF.filter($"day" === day && $"cmsType" === "video")commonDF.cache()//删除已有的内容,避免重复StatDAO.deleteData(day)//groupBy videovideoAccessTopNStat(spark, commonDF)//groupBy citycityAccessTopNStat(spark, commonDF)commonDF.unpersist(true)//videoAccessTopDF.show(false)spark.stop()/**  * 两个样例类,用于储存不同数据类型,应用于下面两个方法。  */case class DayVideoAccessStat(day: String, cmsId: Long, times: Long)case class DayCityVideoAccessStat(day: String, cmsId: Long, city: String, times: Long, timesRank: Int)/**  * 按内容ID分组后排序,并把结果写到Mysql  */def videoAccessTopNStat(spark: SparkSession, comDF: DataFrame): Unit = {  import spark.implicits._  val videoAccessTopNStat = comDF.groupBy($"day", $"cmsId").agg(count("cmsId").as("times")).orderBy(desc("times"))  try {videoAccessTopNStat.foreachPartition(partitionOfRecords =>{  val list = new ListBuffer[DayVideoAccessStat]  partitionOfRecords.foreach(info => {val day = info.getAs[String]("day")val cmsId = info.getAs[Long]("cmsId")val times = info.getAs[Long]("times")list.append(DayVideoAccessStat(day, cmsId, times))  })  StatDAO.inserDayVideoAccessTopN(list)})  } catch {case e:Exception => e.printStackTrace()  }}/**  * 按内容ID和省份分组后排名,并把结果写到Mysql  */def cityAccessTopNStat(spark: SparkSession, comDF: DataFrame): Unit = {  import spark.implicits._  val videoAccessTopNStat = comDF.groupBy($"day", $"city", $"cmsId").agg(count("cmsId").as("times"))  val windowSpec = Window.partitionBy($"city").orderBy(desc("times"))  val videoAccessTopNStatDF = videoAccessTopNStat.select(expr("*"), rank().over(windowSpec).as("times_rank")).filter($"times_rank" <= 3)  try {videoAccessTopNStatDF.foreachPartition(partitionOfRecords => {  val list = new ListBuffer[DayCityVideoAccessStat]  partitionOfRecords.foreach(info => {val day = info.getAs[String]("day")val cmsId = info.getAs[Long]("cmsId")val city = info.getAs[String]("city")val times = info.getAs[Long]("times")val timesRank = info.getAs[Int]("times_rank")list.append(DayCityVideoAccessStat(day, cmsId, city, times, timesRank))  })  StatDAO.inserDayCityVideoAccessTopN(list)})  } catch {case e: Exception => e.printStackTrace()  }}/**  * 分组后排序方法  */def videoAccessSortedStat(spark: SparkSession, accessDF: DataFrame) : Unit = {  import spark.implicits._  val sortedStat= accessDF.filter($"day" === "20170511" && $"cmsType" === "video").groupBy($"day", $"cmsId").agg(count("cmsId").as("times")).orderBy(desc("times"))  // 分块创建存储每条信息的list,并调用函数将数据写到到MySQL  try {  sortedStat.foreachPartition(partitionOfRecords =>{val list = new ListBuffer[DayVideoAccessStat]partitionOfRecords.foreach(info => {  val day = info.getAs[String]("day")  val cmsId = info.getAs[Long]("cmsId")  val times = info.getAs[Long]("times")  list.append(DayVideoAccessStat(day, cmsId, times))})StatDAO.inserDayVideoAccessSortedStat(list)  })} catch {  case e:Exception => e.printStackTrace() }}
//Step Three:/**  * 工具类,提供两类方法:  * 1.连接数据库,将数据写入MySQL,并释放连接的方法。  * 2.删除MySQL中已存在的(相同entry的数据)  */object StatDAO {  def inserDayVideoAccessTopN(list: ListBuffer[DayVideoAccessStat]): Unit = {var connection: Connection = nullvar pstmt: PreparedStatement = nulltry{  connection = MySQLUtils.getConnect()  val sql = "insert into day_video_access_topn_stat(day, cms_id, times) values (?, ?, ?)"  val pstmt = connection.prepareStatement(sql)  connection.setAutoCommit(false)  for (ele <- list) {pstmt.setString(1, ele.day)pstmt.setLong(2, ele.cmsId)pstmt.setLong(3, ele.times)pstmt.addBatch()  }  pstmt.executeBatch()  connection.commit()} catch {  case e:Exception => e.printStackTrace()} finally {  MySQLUtils.release(connection, pstmt)}  }  def inserDayCityVideoAccessTopN(list: ListBuffer[DayCityVideoAccessStat]): Unit = {var connection: Connection = nullvar pstmt: PreparedStatement = nulltry{  connection = MySQLUtils.getConnect()  val sql = "insert into day_video_city_access_topn_stat(day, cms_id, city, times, times_rank) values (?, ?, ?, ?, ?)"  val pstmt = connection.prepareStatement(sql)  connection.setAutoCommit(false)  for (ele <- list) {pstmt.setString(1, ele.day)pstmt.setLong(2, ele.cmsId)pstmt.setString(3, ele.city)pstmt.setLong(4, ele.times)pstmt.setInt(5, ele.timesRank)pstmt.addBatch()  }  pstmt.executeBatch()  connection.commit()} catch {  case e:Exception => e.printStackTrace()} finally {  MySQLUtils.release(connection, pstmt)}  }  def deleteData(day: String): Unit = {val tables = Array("day_video_access_topn_stat", "day_video_city_access_topn_stat")var connection: Connection = nullvar pstmt: PreparedStatement = nulltry {  connection = MySQLUtils.getConnect()  for (table <- tables) {val sql = s"delete from $table where day = ?"val pstmt = connection.prepareStatement(sql)pstmt.setString(1, day)pstmt.executeUpdate()  }} catch {  case e: Exception => e.printStackTrace()} finally {  MySQLUtils.release(connection, pstmt)}  }}/**  * 工具类,包含连接数据库和释放连接的方法。  */object MySQLUtils {  def getConnect() = {  DriverManager.getConnection("jdbc:mysql://localhost:3306/log_project","root", "password")  }  def release(connection: Connection, pstmt: PreparedStatement): Unit ={try{  if (pstmt != null) {pstmt.close()  }} catch {  case e: Exception => e.printStackTrace()} finally {  if (connection != null) {connection.close()  }}  }  def main(args: Array[String]): Unit = {println(getConnect())  }}

参考:
大数据 Spark SQL慕课网日志分析
GitHub源码

作者:justcodeit
来源链接:https://www.cnblogs.com/code2one/p/9872597.html

版权声明:
1、JavaClub(https://www.javaclub.cn)以学习交流为目的,由作者投稿、网友推荐和小编整理收藏优秀的IT技术及相关内容,包括但不限于文字、图片、音频、视频、软件、程序等,其均来自互联网,本站不享有版权,版权归原作者所有。

2、本站提供的内容仅用于个人学习、研究或欣赏,以及其他非商业性或非盈利性用途,但同时应遵守著作权法及其他相关法律的规定,不得侵犯相关权利人及本网站的合法权利。
3、本网站内容原作者如不愿意在本网站刊登内容,请及时通知本站(javaclubcn@163.com),我们将第一时间核实后及时予以删除。





本文链接:https://www.javaclub.cn/database/117864.html

标签:group by
分享给朋友:

“基于Spark的网站日志分析” 的相关文章

JDBC工具类(DButil) 2022年05月13日 09:13:25
全面解析Redis 2022年05月13日 10:22:02
mysql 查询1小时内 2022年06月06日 12:59:30
mysql 查询或 2022年06月07日 13:56:22
mysql 查询表中前10条数据 2022年06月08日 04:35:17
mysql数据查询——复杂查询 2022年06月09日 23:08:26
mysql查询各分区数据量大小 2022年06月10日 23:41:45
MYSQL查询一个月前的数据 2022年06月12日 09:48:55