当前位置: 首页 >数据库 > Spark SQL大数据处理并写入Elasticsearch

Spark SQL大数据处理并写入Elasticsearch

SparkSQL(Spark用于处理结构化数据的模块)

通过SparkSQL导入的数据可以来自MySQL数据库、Json数据、Csv数据等,通过load这些数据可以对其做一系列计算

下面通过程序代码来详细查看SparkSQL导入数据并写入到ES中:

数据集:北京市PM2.5数据

Spark版本:2.3.2

Python版本:3.5.2

mysql-connector-java-8.0.11 下载

ElasticSearch:6.4.1

Kibana:6.4.1

elasticsearch-spark-20_2.11-6.4.1.jar 下载

具体代码:

Spark SQL大数据处理并写入Elasticsearch _ JavaClub全栈架构师技术笔记
 1 # coding: utf-8 2 import sys 3 import os 4  5 pre_current_dir = os.path.diame(os.getcwd()) 6 sys.path.append(pre_current_dir) 7 from pyspark.sql import SparkSession 8 from pyspark.sql.types import * 9 from pyspark.sql.functions import udf10 from settings import ES_CONF11 12 current_dir = os.path.diame(os.path.realpath(__file__))13 14 spark = SparkSession.builder.appName("weather_result").getOrCreate()15 16 17 def get_health_level(value):18 """19 PM2.5对应健康级别20 :param value:21 :retu:22 """23 if 0 <= value <= 50:24 retu "Very Good"25 elif 50 < value <= 100:26 retu "Good"27 elif 100 < value <= 150:28 retu "Unhealthy for Sensi"29 elif value <= 200:30 retu "Unhealthy"31 elif 200 < value <= 300:32 retu "Very Unhealthy"33 elif 300 < value <= 500:34 retu "Hazardous"35 elif value > 500:36 retu "Extreme danger"37 else:38 retu None39 40 41 def get_weather_result():42 """43 获取Spark SQL分析后的数据44 :retu:45 """46 # load所需字段的数据到DF47 df_2017 = spark.read.format("csv") \48 .option("header", "true") \49 .option("inferSchema", "true") \50 .load("file://{}/data/Beijing2017_PM25.csv".format(current_dir)) \51 .select("Year", "Month", "Day", "Hour", "Value", "QC Name")52 53 # 查看Schema54 df_2017.printSchema()55 56 # 通过udf将字符型health_level转换为column57 level_function_udf = udf(get_health_level, StringType())58 59 # 新建列healthy_level 并healthy_level分组60 group_2017 = df_2017.withColumn(61 "healthy_level", level_function_udf(df_2017['Value'])62 ).groupBy("healthy_level").count()63 64 # 新建列days和percentage 并计算它们对应的值65 result_2017 = group_2017.select("healthy_level", "count") \66 .withColumn("days", group_2017['count'] / 24) \67 .withColumn("percentage", group_2017['count'] / df_2017.count())68 result_2017.show()69 70 retu result_201771 72 73 def write_result_es():74 """75 将SparkSQL计算结果写入到ES76 :retu:77 """78 result_2017 = get_weather_result()79 # ES_CONF配置 ES的node和index80 result_2017.write.format("org.elasticsearch.spark.sql") \81 .option("es.nodes", "{}".format(ES_CONF['ELASTIC_HOST'])) \82 .mode("overwrite") \83 .save("{}/pm_value".format(ES_CONF['WEATHER_INDEX_NAME']))84 85 86 write_result_es()87 spark.stop()
View Code

将mysql-connector-java-8.0.11和elasticsearch-spark-20_2.11-6.4.1.jar放到Spark的jars目录下,提交spark任务即可。

 

注意:

(1) 如果提示:ClassNotFoundException Failed to find data source: org.elasticsearch.spark.sql.,则表示spark没有发现jar包,此时需重新编译pyspark:

cd /opt/spark-2.3.2-bin-hadoop2.7/python python3 setup.py sdist pip install dist/*.tar.gz

 (2) 如果提示:Multiple ES-Hadoop versions detected in the classpath; please use only one ,

  则表示ES-Hadoop jar包有多余的,可能既有elasticsearch-hadoop,又有elasticsearch-spark,此时删除多余的jar包,重新编译pyspark 即可

 

执行效果:

Spark SQL大数据处理并写入Elasticsearch _ JavaClub全栈架构师技术笔记

 

更多源码请关注我的githubhttps://github.com/a342058040/Spark-for-Python ,Spark相关技术全程用python实现,持续更新

作者:HarvardFly
来源链接:https://www.cnblogs.com/FG123/p/9748836.html

版权声明:
1、JavaClub(https://www.javaclub.cn)以学习交流为目的,由作者投稿、网友推荐和小编整理收藏优秀的IT技术及相关内容,包括但不限于文字、图片、音频、视频、软件、程序等,其均来自互联网,本站不享有版权,版权归原作者所有。

2、本站提供的内容仅用于个人学习、研究或欣赏,以及其他非商业性或非盈利性用途,但同时应遵守著作权法及其他相关法律的规定,不得侵犯相关权利人及本网站的合法权利。
3、本网站内容原作者如不愿意在本网站刊登内容,请及时通知本站(javaclubcn@163.com),我们将第一时间核实后及时予以删除。





本文链接:https://www.javaclub.cn/database/117917.html

标签:group by
分享给朋友:

“Spark SQL大数据处理并写入Elasticsearch” 的相关文章

触发器的定义及优点 2022年05月21日 11:37:15
MySQL 查询指定时间范围内的数据 2022年06月06日 16:59:25
mysql 查询或 2022年06月07日 13:56:22
MySQL查询指定行的记录 2022年06月14日 06:02:58
MySQL 查询结果替换 2022年06月15日 10:48:11