当前位置: 首页 >数据库 > 2020.02.12

2020.02.12

1.Spark SQL 基本操作

将下列 JSON 格式数据复制到 Linux 系统中,并保存命名为 employee.json。

为 employee.json 创建 DataFrame,

?
1
2
3
4
scala> import org.apache.spark.sql.SparkSession
scala> val spark = SparkSession.builder().getOrCreate()
scala> import spark.implicits. _
scala> val df = spark.read.json( "file:///usr/local/spark/employee.json" )

并写出 Scala 语句完成下列操作:

(1)  查询所有数据;

?
1
scala> df.show()

(2) 查询所有数据,并去除重复的数据;

?
1
scala> df.distinct().show()

(3) 查询所有数据,打印时去除 id 字段;

?
1
scala> df.drop( "id" ).show()

(4) 筛选出 age>30 的记录;

?
1
scala> df.filter(df( "age" ) > 30 ).show()

(5) 将数据按 age 分组;

?
1
scala> df.groupBy( "name" ).count().show()

(6) 将数据按 name 升序排列;

?
1
scala> df.sort(df( "name" ).asc).show()

(7) 取出前 3 行数据;

?
1
scala> df.take( 3 ) 或 scala> df.head( 3 )

(8) 查询所有记录的 name 列,并为其取别名为 useame;

?
1
scala> df.select(df( "name" ).as( "useame" )).show()

(9) 查询年龄 age 的平均值;

?
1
scala> df.agg( "age" -> "avg" )

(10) 查询年龄 age 的最小值。

?
1
scala> df.agg( "age" -> "min" )

2.编程实现将 RDD 转换为 DataFrame

源文件内容如下(包含 id,name,age):

请将数据复制保存到 Linux 系统中,命名为 employee.txt,实现从 RDD 转换得到 DataFrame,并按“id:1,name:Ella,age:36”的格式打印出 DataFrame 的所有数据。请写出程序代 码。

?
1
2
3
4
5
6
7
8
9
10
11
12
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.Encoder
import spark.implicits. _
object RDDtoDF {    
     def main(args : Array[String]) {
          case class Employee(id : Long,name : String, age : Long)
          val employeeDF = spark.sparkContext.textFile( "file:///usr/local/spark/employee.txt" ).map( _ .split( "," )).map(at tributes = > Employee(attributes( 0 ).trim.toInt,attributes( 1 ), attributes( 2 ).trim.toInt)).toDF()
          employeeDF.createOrReplaceTempView( "employee" )
          val employeeRDD = spark.sql( "select id,name,age from employee" )
          employeeRDD.map(t = > "id:" +t( 0 )+ "," + "name:" +t( 1 )+ "," + "age:" +t( 2 )).show()    
     }
}

3. 编程实现利用 DataFrame 读写 MySQL 的数据

(1)在 MySQL 数据库中新建数据库 sparktest,再创建表 employee,包含如表 6-2 所示的 两行数据。

?
1
2
3
4
5
mysql> create database sparktest;
mysql> use sparktest;
mysql> create table employee (id int (4), name char (20), gender char (4), age int (4));
mysql> insert into employee values (1, 'Alice' , 'F' ,22);
mysql> insert into employee values (2, 'John' , 'M' ,25);

(2)配置 Spark 通过 JDBC 连接数据库 MySQL,编程实现利用 DataFrame 插入如表 6-3 所 示的两行数据到 MySQL 中,最后打印出 age 的最大值和 age 的总和。

 testmysql.scala

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import java.util.Properties
import org.apache.spark.sql.types. _
import org.apache.spark.sql.Row
object TestMySQL {    
     def main(args : Array[String]) {
         val employeeRDD = spark.sparkContext.parallelize(Array( "3 Mary F 26" , "4 Tom M 23" )).map( _ .split( " " ))
         val schema = StructType(List(StructField( "id" , IntegerType, true ),StructField( "name" , StringType, true ),StructField( "gender" , StringType, true ),StructField( "age" , IntegerType, true )))
         val rowRDD = employeeRDD.map(p = > Row(p( 0 ).toInt,p( 1 ).trim, p( 2 ).trim,p( 3 ).toInt))
         val employeeDF = spark.createDataFrame(rowRDD, schema)
         val prop = new Properties()
         prop.put( "user" , "root"
         prop.put( "password" , "hadoop"
         prop.put( "driver" , "com.mysql.jdbc.Driver" )
         employeeDF.write.mode( "append" ).jdbc( "jdbc:mysql://localhost:3306/sparktest" , sparktest.employee ", prop)
         val jdbcDF = spark.read.format(" jdbc ").option(" url ", " jdbc : mysql : //localhost:3306/sparktest").option("driver","com.mysql.jdbc.Driver").optio n("dbtable","employee").option("user","root").option("password", "hadoop").load() jdbcDF.agg("age" -> "max", "age" -> "sum")    
     }
}

作者:自由人zyr
来源链接:https://www.cnblogs.com/zql98/p/12301122.html

版权声明:
1、JavaClub(https://www.javaclub.cn)以学习交流为目的,由作者投稿、网友推荐和小编整理收藏优秀的IT技术及相关内容,包括但不限于文字、图片、音频、视频、软件、程序等,其均来自互联网,本站不享有版权,版权归原作者所有。

2、本站提供的内容仅用于个人学习、研究或欣赏,以及其他非商业性或非盈利性用途,但同时应遵守著作权法及其他相关法律的规定,不得侵犯相关权利人及本网站的合法权利。
3、本网站内容原作者如不愿意在本网站刊登内容,请及时通知本站(javaclubcn@163.com),我们将第一时间核实后及时予以删除。





本文链接:https://www.javaclub.cn/database/117586.html

标签:group by
分享给朋友:

“2020.02.12” 的相关文章

MySQL事务和锁 2022年05月16日 21:54:37
sql递归查询 2022年05月17日 21:40:33
MySQL学习(4)︱数据库的查询 2022年06月07日 01:52:58
会mysql不一定会sql 2022年06月07日 04:41:06
mysql数据查询——复杂查询 2022年06月09日 23:08:26
mysql的查询句 2022年06月09日 23:40:52
shell简单处理mysql查询结果 2022年06月10日 23:22:02