当前位置: 首页 >数据库 > Spark SQL实现日志离线批处理

Spark SQL实现日志离线批处理

一、 基本的离线数据处理架构:
 
  1. 数据采集   Flume:Web日志写入到HDFS
  2. 数据清洗   脏数据 Spark、Hive、MR等计算框架来完成。 清洗完之后再放回HDFS
  3. 数据处理   按照需要,进行业务的统计和分析。 也通过计算框架完成
  4. 处理结果入库   存放到RDBMS、NoSQL中
  5. 数据可视化    通过图形化展示出来。  ECharts、HUE、Zeppelin
 
处理框图:
 
1 2 3 4 5 6 7为离线处理,其中5不一定是Hive(还有Spark SQL等) 6不一定是RDBMS(NoSQL)
执行时,可用调度框架Oozie、Azkaban,指定任务执行的时间
 
另外一条线是实时处理
 
 
拟定项目需求:
  1. 统计某时间段最受欢迎的某项的TopN和对应的访问次数
  2. 按地市统计最受欢迎  从IP提取城市信息
  3. 按访问流量统计
 
 
互联网日志一般包括有:
访问时间  访问URL  耗费流量   访问IP地址
从日志里提取以上我们需要的数据
 
假设我们现在仅有一台电脑供学习作为集群使用,为了防止内存溢出,有必要进行剪切日志:
用head -10000命令截取前10000条
数据量太大的话,在IDE中可能会报错
 
 
 
 二、日志处理过程
 
数据清洗:
 
第一步: 从原始日志提取有用信息,本例中就是拿到时间、URL、流量、IP
  1. 读取日志文件,得到RDD,通过map方法,split成一个数组,然后选择数组中有用的几项(用断点的方法分析哪几项有用,并匹配相应的变量)
  2. 获取到的信息有可能因为某些问题,如线程问题而导致生成了带有错误的信息,第一步中一开始用了SimpleDateFormat(线程不安全)来转变时间格式,会导致某些时间转换错误。一般要改成FastDateFormat来做
 
实现代码:
//提取有用信息,转换格式object SparkStatFormatJob {  def main(args: Array[String]) = {val spark = SparkSession.builder().appName("SparkStatFormatJob").master("local[2]").getOrCreate()val access = spark.sparkContext.textFile("/Users/kingheyleung/Downloads/data/10000_access.log")//access.take(10).foreach(println)access.map(line => {  val splits = line.split(" ")  val ip = splits(0)  //用断点的方法,观察splits数组,找出时间、url、流量对应哪一个字段  //创建时间类DateUtils,转换成常用的时间表达方式  //把url多余的""引号清除掉  val time = splits(3) + " " + splits(4)  val url = splits(11).replaceAll("\"", "")  val traffic = splits(9)  //(ip, DateUtils.parse(time), url, traffic)  用来测试输出是否正常  //把裁剪好的数据重新组合,用Tab分割  DateUtils.parse(time) + "\t" + url + "\t" + traffic + "\t" + ip}).saveAsTextFile("file:///usr/local/mycode/immooclog/")spark.stop()  }}

 

//日期解析object DateUtils {  //输入格式  val ORIGINAL_TIME_FORMAT = FastDateFormat.getInstance("dd/MMM/yyyy:HH:mm:sss Z", Locale.ENGLISH)  //输出格式  val TARGET_TIME_FORMAT = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")def parse(time:String) = {TARGET_TIME_FORMAT.format(new Date(getTime(time)))  }  def getTime(time:String) = {try {  ORIGINAL_TIME_FORMAT.parse(time.substring(time.indexOf("[") + 1, time.lastIndexOf("]"))).getTime} catch {case e : Exception => {0l  }}  }

 

一般日志处理需要进行分区

本例中按照日志中的访问时间进行分区
 
 
第二步:解析上一步得到的有用信息,我把它称为解析日志
 
其实就是把较为整洁的数据日志,解析出每个字段的含义,并把RDD转成DF
在此案例中,完成的是:
输入:访问时间  访问URL  耗费流量   访问IP地址  =>转变为输出:url、类型(本例中url的后缀有article还是video)、对应ID号、流量、ip、城市、时间、天(用于分组)
并且创建DataFrame(也就是定义Row和StructType,其中Row要和原日志的每个字段对应,而StructType是根据所需要的输出来定义就行)
 
实现代码:
//解析日志object SparkStatCleanJob {  def main(args: Array[String]) = {val spark = SparkSession.builder().appName("SparkStatCleanJob").master("local[2]").getOrCreate()val accessRDD = spark.sparkContext.textFile("file:///Users/kingheyleung/Downloads/data/access_10000.log")//RDD convert to DF, define Row and StructTypeval accessDF = spark.createDataFrame(accessRDD.map(line => LogConvertUtils.convertToRow(line)), LogConvertUtils.struct)//accessDF.printSchema()//accessDF.show(false)spark.stop()  }}

 

//RDD转换成DF的工具类object LogConvertUtils {  //构建Struct  val struct = StructType(Array(  StructField("url", StringType),  StructField("cmsType", StringType),  StructField("cmsId", LongType),  StructField("traffic", LongType),  StructField("ip", StringType),  StructField("city", StringType),  StructField("time", StringType),  StructField("day", StringType))  )  //提取信息,构建Row  def convertToRow(line:String) = { try {  val splits = line.split("\t")  val url = splits(1)  val traffic = splits(2).toLong  val ip = splits(3)  val domain = "http://www.imooc.com/"  val cms = url.substring(url.indexOf(domain) + domain.length())  val cmsSplits = cms.split("/")var cmsType = ""  var cmsId = 0l  //判断是否存在  if (cmsSplits.length > 1) {cmsType = cmsSplits(0)cmsId = cmsSplits(1).toLong  }  val city = IpUtils.getCity(ip) //通过Ip解析工具传进,具体看下面  val time = splits(0)  val day = time.substring(0, 10).replaceAll("-", "")//定义Row,与Struct一样  Row(url, cmsType, cmsId, traffic, ip, city, time, day)} catch {  case e: Exception => Row(0)}  }}

注意:转换时一定要记得类型转换!!!!

 
进一步解析:对IP地址解析来获得城市信息
 
在这里,为了让IP地址转换成直观的城市信息,我使用了GitHub上的开源项目来实现:
用Maven编译下载的项目
mvn clean package -DskipTests
 
安装jar包到自己的Maven仓库中: 
mvn install:install-file -Dfile=路径.jar -DgroupId=com.ggstar -DartifactId=ipdatabase -Dversion=1.0 -Dpackaging=jar
 
在IDE里面的pom.xml添加dependency,参照GitHub主页上的pom.xml中的dependency
但是出现报错了:
java.io.FileNotFoundException:
file:/Users/rocky/maven_repos/com/ggstar/ipdatabase/1.0/ipdatabase-1.0.jar!/ipRegion.xlsx (No such file or directory)
根据提示,我们需要在项目源码中找到相应的文件拷进去IDE中的main/resources中!
 
 存储清洗后的数据:
按day分区来进行存储  partitionBy
存储模式:mode(SaveMode.Overwrite)  覆盖存储
coalesce:据说生产中经常用,是项目的调优点,控制文件的输出大小,个数
 
 
三、统计功能实现
 
功能实现一:统计TopN视频  
 
第一步:读取数据,read.format().load
第二步:
  1. 使用DataFrame API统计分析
  2. SQL API
最后把统计结果保存在MySQL数据库中
 
调优点:
读取parquet文件时,系统会默认解析各字段相应的数据类型,但有时候我们就只需要它是String类型,需要在SparkSession定义时添加:
config("spark.sql.sources.partitionColumnTypeInference.enabled, "false"")
变成只会按照原类型读入
 
两种方法:
若使用DataFrame API来做:
用$号时候需要导入 隐式转换(这里是列名转换成列)!spark.implicits._
用到dataframe的count()函数要导入包:org.apache.spark.sql.functions._
 
若使用SQL API来做:
创建临时表createTempView
小心写SQL语句换行时不注意而忽略空格
 
实现代码:
 
//完成统计操作object TopNStatJob {  def main(args: Array[String]) {val spark = SparkSession.builder().appName("TopNStatJob")  .config("spark.sql.sources.partitionColumnTypeInference.enabled", "false")  .master("local[2]").getOrCreate()val accessDF = spark.read.format("parquet").load("/Users/kingheyleung/Downloads/data/clean/")dfCountTopNVideo(spark, accessDF)sqlCountTopNVideo(spark, accessDF)//accessDF.printSchema()  spark.stop()  }  def dfCountTopNVideo(spark: SparkSession, accessDF: DataFrame): Unit = {/** DF API* */ //导入隐式转换, 留意$号的使用, 并且导入functions包,使agg聚合函数count能够使用,此处若不用$的话,就无法让times进行desc排序了import spark.implicits._val topNDF = accessDF.filter($"day" === "20170511" && $"cmsType" === "video")  .groupBy("day", "cmsId").agg(count("cmsId").as("times")).orderBy($"times".desc)topNDF.show(false)  }  def sqlCountTopNVideo(spark: SparkSession, accessDF: DataFrame): Unit = {/** SQL API* */ //创建临时表access_view,注意换行时,很容易忽略掉空格accessDF.createTempView("access_view")val topNDF = spark.sql("select day, cmsId, count(1) as times from access_view " +  "where day == '20170511' and cmsType == 'video' " +  "group by day, cmsId " +  "order by times desc")topNDF.show(false)  }}
 
 
在保存数据之前,需要写连接MySQL数据库的工具类,用到java.sql包
  1. 使用DriverManager,连接到mysql 3306
  2. 释放资源,connection和preparedstatement都要,注意处理异常
 
注意:若测试时拿不到连接,出现以下报错,那就是没有在dependency中添加或者选对mysql-connetor包
java.sql.SQLException: No suitable driver found for jdbc:mysql://localhost:3306/imooc_project?user=root&password=666
Error:scalac: error while loading <root>, Error accessing /Users/kingheyleung/.m2/repository/mysql/mysql-connector-java/5.0.8/mysql-connector-java-5.0.8.jar
 
我最终选的是5.1.40版本才对了
 
 
实现代码:
/** 连接MySQL数据库* 操作工具类* */object MySQLUtils {  //获得连接  def getConnection(): Unit = {DriverManager.getConnection("jdbc:mysql://localhost:3306/imooc_project?user=root&password=666")  }  //释放资源  def release(connection: Connection, pstmt: PreparedStatement): Unit = {try {  if (pstmt != null) {pstmt.close()  }} catch {  case e: Exception => e.printStackTrace()} finally {  connection.close()}  }}

 

 
把统计数据保存到MySQL
  1. 在mysql中创建一张表,包含day,cms_Id,times三个字段(注意各自的数据类型,以及定义不允许为NULL,并把day和cms_Id作为PRI KEY)
  2. 创建模型类case class,三个输入参数,day、cms_Id,times
  3. 创建操作数据库DAO类,输入的参数是一个list,list装的是上面的模型类,目的是插入insert记录到数据库中,DAO中分以下几步:
  4. 首先,做jdbc连接的准备,创建connection和prepareStatement,把关闭连接也写好,用try catch finally抛出异常;
  5. 然后写sql语句,preparestatement需要赋值的地方用占位符放着;
  6. 进行对list遍历,把每个对象都放进pstmt中
  7. 调优点!!!遍历前把自动提交关掉,遍历中把pstmt加入批处理中,遍历完后执行批处理操作!最后手工提交连接
 
实现代码:
//课程访问次数实体类case class VideoAccessStat(day: String, cmsId:Long, times: Long)  /** 各个维度统计的DAO操作* */object StatDAO {  /*  * 批量保存VideoAccessStat到数据库  * */  def insertDayAccessTopN(list: ListBuffer[VideoAccessStat]): Unit = { var connection: Connection = null  //jdbc的准备工作, 定义连接var pstmt: PreparedStatement = null try {  connection = MySQLUtils.getConnection() //真正获取连接connection.setAutoCommit(false)//为了实现批处理,要关掉默认的自动提交val sql = "insert into day_topn_video(day, cms_id, times) values (?, ?, ?)"  //占位符  pstmt = connection.prepareStatement(sql)  //把SQL语句生成pstmt对象,后面才可以填充占位符中的数据for (ele <- list) {pstmt.setString(1, ele.day)pstmt.setLong(2, ele.cmsId)pstmt.setLong(3, ele.times) pstmt.addBatch()//加入批处理  }pstmt.execute()//执行批量处理  connection.commit()//手工提交 } catch {  case e: Exception => e.printStackTrace()} finally {  MySQLUtils.release(connection, pstmt)}  }}

 

 
为了对应以上的第3步,要把统计记录的DF生成一个个对象,放进list中:
  1. 创建模型类对应的list
  2. 对记录进行遍历,把记录的每个字段当做参数,创建模型类对象
  3. 把每个对象添加到list中
  4. 把list传进DAO类中 
 
以下代码添加到上面的TopNJob类里面中就可以把之前生成到的topDF的结果记录保存到MySQL当中了:
try {  topNDF.foreachPartition(partitionOfRecords => { //val list = new ListBuffer[VideoAccessStat]  //创建list来装统计记录 //遍历每一条记录,取出来上面对应的三个字段day,cmsId,timespartitionOfRecords.foreach(info => {  val day = info.getAs[String]("day")//后面的就是取出来的记录的每个字段  val cmsId = info.getAs[Long]("cmsId")  val times = info.getAs[Long]("times")//每一次循环创建一个VideoAccessStat对象,添加一次进入list中  list.append(VideoAccessStat(day, cmsId, times))})//把list传进DAO类StatDAO.insertDayAccessTopN(list)  })} catch {  case e: Exception => e.printStackTrace()}

 

 到此为止已经把项目需求一完成。
 
 
 功能实现二:按照城市来找出topN视频
 
在功能一的基础上,运用row_number函数来实现
 
具体的实现代码:
 
  //先计算访问次数,并按照day,cmsId,city分组  val cityAccessTopNDF = accessDF.filter(accessDF.col("day") === "20170511" && accessDF.col("cmsType") === "video").groupBy("day", "cmsId", "city").agg(count("cmsId").as("times"))//进行分地市排序,使用到row_number函数,生成一个排名,定义为time_rank, 并且取排名前3  cityAccessTopNDF.select(cityAccessTopNDF.col("day"),cityAccessTopNDF.col("cmsId"),cityAccessTopNDF.col("times"),cityAccessTopNDF.col("city"),row_number().over(Window.partitionBy(cityAccessTopNDF.col("city"))  .orderBy(cityAccessTopNDF.col("times").desc)).as("times_rank")  ).filter("times_rank <= 3").show(false)}

 

 其他步骤和功能一一样,但是 插入Mysql的时候报错,原因是MySQL不支持插入中文!!!!
首先可以在mysql命令行中用SET character来改:
SET character_set_client = utf8
 
可通过
show variables like 'character_set_%’;
查看当前的character编码设置
 
然后在jdbc连接时,加上:
useUnicode=true&characterEncoding=utf8
 
改了之后,虽然能够导入MySQL了,而且不出现乱码,但只有一部分数据,并且在控制台报错:
 
com.mysql.jdbc.PreparedStatement.fillSendPacket
com.mysql.jdbc.PreparedStatement.execute
 
后来把批处理删掉竟然就可以把所有数据导入了:
 
pstmt.executeUpdate  //不使用批处理的pstmt插入
 
 
功能三:按流量来排序topN视频
和功能一几乎完全一样,只不过计算流量总和时用的不是count函数而是要用sum函数
 
为了代码的复用性,防止生成重复的数据,在StatDAO定义删除的函数:
 
def deleteDayData(day: String) = {var connection: Connection = null  var pstmt: PreparedStatement = null  var tables = Array("day_topn_video","day_city_topn_video","traffic_topn_video"  )try {connection = MySQLUtils.getConnection() for (table <- tables) {  val deleteSql = s"delete from $table where day = ?”  //Scala特殊处理  pstmt = connection.prepareStatement(deleteSql)  pstmt.setString(1, table)  pstmt.setString(2, day)  pstmt.executeUpdate()}  } catch {case e: Exception => e.printStackTrace()  } finally {MySQLUtils.release(connection, pstmt)  }}

 

 
需要注意的是,table在pstmt中的特殊用法!!
 
 
后续会对以上内容进行可视化处理、跑在YARN上的修改、性能调优

作者:KINGHEY
来源链接:https://www.cnblogs.com/kinghey-java-ljx/p/8543552.html

版权声明:
1、JavaClub(https://www.javaclub.cn)以学习交流为目的,由作者投稿、网友推荐和小编整理收藏优秀的IT技术及相关内容,包括但不限于文字、图片、音频、视频、软件、程序等,其均来自互联网,本站不享有版权,版权归原作者所有。

2、本站提供的内容仅用于个人学习、研究或欣赏,以及其他非商业性或非盈利性用途,但同时应遵守著作权法及其他相关法律的规定,不得侵犯相关权利人及本网站的合法权利。
3、本网站内容原作者如不愿意在本网站刊登内容,请及时通知本站(javaclubcn@163.com),我们将第一时间核实后及时予以删除。





本文链接:https://www.javaclub.cn/database/117687.html

标签:group by
分享给朋友:

“Spark SQL实现日志离线批处理” 的相关文章

JDBC工具类(DButil) 2022年05月13日 09:13:25
MySQL表的增删改查(进阶) 2022年05月16日 21:54:11
Linux安装MySQL(超详细) 2022年05月16日 21:54:54
mysql 查询1小时内 2022年06月06日 12:59:30
MYSQL查询一个月前的数据 2022年06月12日 09:48:55
mysql 查询所有下级 2022年06月12日 13:42:12
Mysql查询用户最后一次登陆时间 2022年06月12日 13:54:22
MySql查询某一天的数据 2022年06月14日 10:43:20