当前位置: 首页 >数据库 > 大数据学习之 Spark基本编程案例 48

大数据学习之 Spark基本编程案例 48

案例一:计算网页访问量前三名

源数据大致预览:

大数据学习之 Spark基本编程案例  48 _ JavaClub全栈架构师技术笔记

 

编写Scala代码:

 

package day02import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}/**  * @author dawn  * @version 1.0, 2019年6月21日11:40:16  *  *  需求:计算网页访问量前三名  *  用户:喜欢视频 直播  *  帮助企业做经营和决策  *  *  看数据  */object UrlCount {  def main(args: Array[String]): Unit = {//1.加载数据val conf: SparkConf = new SparkConf().setAppName("UrlCount").setMaster("local[2]")//Spark程序入口val sc: SparkContext = new SparkContext(conf)//加载数据val rdd1: RDD[String] = sc.textFile("itstar.log")//2.对数据进行计算val rdd2:RDD[(String,Int)]=rdd1.map(line => {  val s = line.split("\t")  //标注出现1次  (s(1), 1)})//3.将相同的网址进行累加求和  网页,201val rdd3:RDD[(String,Int)] = rdd2.reduceByKey(_+_)//4.排序 取出前三val rdd4:Array[(String,Int)] = rdd3.sortBy(_._2,false).take(3)//5.遍历打印rdd3.foreach(x => {  println("网址为:"+x._1+"访问量为:"+x._2)})//6.转换 toString toBufferprintln(rdd4.toBuffer)sc.stop()  }}

 

 

运行结果:

大数据学习之 Spark基本编程案例  48 _ JavaClub全栈架构师技术笔记

 

案例二:求出每个学院 访问第一位的网址,分组

编写Scala代码:

 

package day02import java.net.URLimport org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}/**  * @author dawn  * @version 1.0, 2019年6月21日12:25:44  *  需求:求出每个学院 访问第一位的网址,分组  *  bigdata:video(直播)  *  java:video  *  python:teacher  */object UrlGoupCount {  def main(args: Array[String]): Unit = {//1.创建SparkConTextval conf:SparkConf=new SparkConf().setAppName("UrlGoupCount").setMaster("local[2]")val sc:SparkContext=new SparkContext(conf)//2.加载数据val rdd1:RDD[String]=sc.textFile("itstar.log")//3.切分val rdd2:RDD[(String,Int)] = rdd1.map(line =>{  val s=line.split("\t")  (s(1),1)})//4.求出总的访问量 网址,总的访问量val rdd3:RDD[(String,Int)] = rdd2.reduceByKey((x,y)  => x+y )//5.取出学院val rdd4:RDD[(String,String,Int)] = rdd3.map(x =>{  //拿到url  val url:String=x._1  //用java的方式拿到主机名  val host:String =new URL(url).getHost.split("[.]")(0)  //元组输出  (host,url,x._2)})//6.按照学院进行分组 groupBy返回的结果是:RDD[(k,iterator[(String,Stig,Int)])]val rdd5:RDD[(String,List[(String,String,Int)])]=rdd4.groupBy(_._1).mapValues(it => {  //倒序,记得要将iterator[(String,Stig,Int)]转成List再排序  it.toList.sortBy(_._3).reverse.take(1)})//7.遍历打印rdd5.foreach(x =>{  println("学院为:"+x._1+"||"+"访问量第一的是:"+x._2)})sc.stop()  }}

  

运行结果:

大数据学习之 Spark基本编程案例  48 _ JavaClub全栈架构师技术笔记

 

 

案例三:加入自定义分区 按照学院分区,相同的学院分为一个结果文件

编写Scala代码:

 

package day02import java.net.URLimport org.apache.spark.rdd.RDDimport org.apache.spark.{Partitioner, SparkConf, SparkContext}import scala.collection.mutable/**  * @author Dawn  * @version 1.0, 2019年6月21日12:25:49  *  需求:加入自定义分区  *  按照学院分区,相同的学院分为一个结果文件  */object UrlParCount {  def main(args: Array[String]): Unit = {//1.创建SparkContext对象val conf:SparkConf=new SparkConf().setAppName("").setMaster("local[2]")val sc:SparkContext=new SparkContext(conf)//2.加载数据val rdd1:RDD[(String,Int)] = sc.textFile("itstar.log").map(line => {  val s=line.split("\t")  (s(1),1)})//3.聚合val rdd2:RDD[(String,Int)] = rdd1.reduceByKey(_+_)//4.自定义格式val rdd3:RDD[(String,(String,Int))]=rdd2.map(t =>{  val url=t._1  val host=new URL(url).getHost  val XHost=host.split("[.]")(0)  //元组输出  (XHost,(url,t._2))})//5.加入自定义分区val xueyuan:Array[String] = rdd3.map(_._1).distinct().collect()//去重只剩下net bigdata javaval xueYuanPartitioner:XueYuanParititioner=new XueYuanParititioner(xueyuan)//6.加入分区规则val rdd4:RDD[(String,(String,Int))]=rdd3.partitionBy(xueYuanPartitioner).mapPartitions(it =>{  //将rdd3的结果进行自定义分区,再遍历分区中的元素,并将元素进行toList,再按照访问量排序,  //在倒叙,再取出第一个元素,返回类型是(String,(String,Int)),不是(String,List[(String,String,Int)])  it.toList.sortBy(_._2._2).reverse.take(1).iterator})//7.把结果存储rdd4.saveAsTextFile("f:/temp/sparkPV案例/partition")sc.stop()  }}class XueYuanParititioner(xy:Array[String]) extends Partitioner{  //自定义规则 学院 分区号  val rules: mutable.HashMap[String,Int]=new mutable.HashMap[String,Int]()  var number=0  //遍历学院  for(i <- xy){//学院与分区号对应,rules是一个HashMap,加一个元素rules += (i -> number)//分区号递增number += 1  }  //总的分区个数=学院中的长度为分区个数  override def numPartitions = xy.length  //拿到分区  override def getPartition(key: Any):Int = {  rules.getOrElse(key.toString,0)  }}

 

  

运行结果:

大数据学习之 Spark基本编程案例  48 _ JavaClub全栈架构师技术笔记

 

 

案例四:Spark访问数据库

分组排名第一的学院结果存储在mysql

 

编写代码如下:

package day03import java.net.URLimport java.sql.{Connection, DriverManager}import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}/**  * @author Hunter  * @version 1.0, 2019年6月21日21:10:30  *  把最终的结果存储在mysql中  */object UrlGroupCount1 {  def main(args: Array[String]): Unit = {val conf:SparkConf=new SparkConf().setAppName("UrlGroupCount1").setMaster("local[2]")val sc:SparkContext=new SparkContext(conf)//加载数据val rdd1:RDD[String] =sc.textFile("itstar.log")val rdd2:RDD[(String,Int)]=rdd1.map(line =>{  val s:Array[String]=line.split("\t")  (s(1),1)})//累加求和val rdd3:RDD[(String,Int)]=rdd2.reduceByKey(_+_)//取出分组的学院val rdd4:RDD[(String,Int)]=rdd3.map(x =>{  val url=x._1  val host=new URL(url).getHost.split("[.]")(0)  //元组输出  (host,x._2)})//6.根据学院分组val rdd5:RDD[(String,List[(String,Int)])]=rdd4.groupBy(_._1).mapValues(it => {  //根据访问量排序 倒序  it.toList.sortBy(_._2).take(1)})//7.把计算结果保存到mysql中rdd5.foreach(x => {  //把数据写到mysql  val conn: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/url_count?charatorEncoding=utf-8","root","199902")  //把spark结果插入到mysql中  val sql="INSERT INTO url_data (xueyuan,number_one) VALUES (?,?)"  //执行Sql  val statement=conn.prepareStatement(sql)  statement.setString(1,x._1)  statement.setString(2,x._2.toString())  statement.executeUpdate()  statement.close()  conn.close()})//8.关闭资源 应用停掉sc.stop()  }}

  

运行结果:

大数据学习之 Spark基本编程案例  48 _ JavaClub全栈架构师技术笔记

 

案例五:Spark提供jdbcRDD,操作MySQL

编写代码如下:

package day03import java.sql.DriverManagerimport org.apache.spark.rdd.JdbcRDDimport org.apache.spark.{SparkConf, SparkContext}/**  * @author Dawn  * @version 1.0, 2019年6月22日11:25:30  *  spark提供的连接mysql的方式  *  jdbcRDD  */object JdbcRDDDemo {  def main(args: Array[String]): Unit = {val conf:SparkConf=new SparkConf().setAppName("JdbcRDDDemo").setMaster("local[2]")val sc:SparkContext=new SparkContext(conf)//匿名函数val connection=() =>{  Class.forName("com.mysql.jdbc.Driver").newInstance()  DriverManager.getConnection("jdbc:mysql://localhost:3306/url_count?charatorEncoding=utf-8","root","199902")}//查询数据val jdbcRdd:JdbcRDD[(Int,String,String)]=new JdbcRDD(  //指定sparkcontext  sc,  connection,  "SELECT * FROM url_data where uid >= ? AND uid <= ?",  //2个任务并行  1,4,2,  r =>{  val uid = r.getInt(1)  val xueyuan = r.getString(2)  val number_one = r.getString(3)  (uid, xueyuan, number_one)})val jrdd = jdbcRdd.collect()println(jrdd.toBuffer)sc.stop()  }}

  

运行结果:

大数据学习之 Spark基本编程案例  48 _ JavaClub全栈架构师技术笔记

 

作者:大魔王阿黎
来源链接:https://www.cnblogs.com/hidamowang/p/11144160.html

版权声明:
1、JavaClub(https://www.javaclub.cn)以学习交流为目的,由作者投稿、网友推荐和小编整理收藏优秀的IT技术及相关内容,包括但不限于文字、图片、音频、视频、软件、程序等,其均来自互联网,本站不享有版权,版权归原作者所有。

2、本站提供的内容仅用于个人学习、研究或欣赏,以及其他非商业性或非盈利性用途,但同时应遵守著作权法及其他相关法律的规定,不得侵犯相关权利人及本网站的合法权利。
3、本网站内容原作者如不愿意在本网站刊登内容,请及时通知本站(javaclubcn@163.com),我们将第一时间核实后及时予以删除。





本文链接:https://www.javaclub.cn/database/118411.html

标签:group by
分享给朋友:

“大数据学习之 Spark基本编程案例 48” 的相关文章

一文带你了解MySQL基础 2022年05月15日 09:35:43
MYSQL的存储过程 2022年05月16日 21:54:19
Mybatis中的${}和#{}区别 2022年05月17日 21:41:44
mysql 查询1小时内 2022年06月06日 12:59:30
mysql递归查询 2022年06月06日 18:26:30
mysql 查询表中前10条数据 2022年06月08日 04:35:17
Mysql查询某字段值重复的数据 2022年06月11日 19:39:22
mysql 查询列拼接字段 2022年06月12日 09:17:20