PySpark的DataFrame处理方法
转:https://blog.csdn.net/weimingyu945/article/details/77981884
感谢!
-------------------------------------------------------------------------------------------------------
基本操作:
运行时获取spark版本号(以spark 2.0.0为例):
sparksn = SparkSession.builder.appName("PythonSQL").getOrCreate() print sparksn.version |
创建和转换格式:
Pandas和Spark的DataFrame两者互相转换:
pandas_df = spark_df.toPandas() | spark_df = sqlContext.createDataFrame(pandas_df) |
与Spark RDD的相互转换:
rdd_df = df.rdd | df = rdd_df.toDF() |
注:rdd转df前提是每个rdd的类型都是Row类型
增:
新增列:
df.withColumn(“xx”, 0).show() 会报错,因为原来没有xx列 from pyspark.sql import functions df = df.withColumn(“xx”, functions.lit(0)).show() |
fillna函数:
df.na.fill() |
以原有列为基础添加列:
df = df.withColumn('count20', df["count"] - 20) # 新列为原有列的数据减去20 |
删:
删除一列:
df.drop('age').collect() df.drop(df.age).collect() |
dropna函数:
df = df.na.drop() # 扔掉任何列包含na的行 df = df.dropna(subset=['col_name1', 'col_name2']) # 扔掉col1或col2中任一一列包含na的行 |
改:
修改原有df[“xx”]列的所有值:
df = df.withColumn(“xx”, 1) |
修改列的类型(类型投射):
df = df.withColumn("year2", df["year1"].cast("Int")) |
合并2个表的join方法:
df_join = df_left.join(df_right, df_left.key == df_right.key, "inner") |
其中,方法可以为:`inner`, `outer`, `left_outer`, `right_outer`, `leftsemi`.
groupBy方法整合:
GroupedData = df.groupBy(“age”) 应用单个函数(按照A列同名的进行分组,组内对B列进行均值计算来合并): df.groupBy(“A”).avg(“B”).show() 应用多个函数: from pyspark.sql import functions df.groupBy(“A”).agg(functions.avg(“B”), functions.min(“B”), functions.max(“B”)).show() |
整合后GroupedData类型可用的方法(均返回DataFrame类型):
avg(*cols) —— 计算每组中一列或多列的平均值
count() —— 计算每组中一共有多少行,返回DataFrame有2列,一列为分组的组名,另一列为行总数
max(*cols) —— 计算每组中一列或多列的最大值
mean(*cols) —— 计算每组中一列或多列的平均值
min(*cols) —— 计算每组中一列或多列的最小值
sum(*cols) —— 计算每组中一列或多列的总和
【函数应用】将df的每一列应用函数f:
df.foreach(f) 或者 df.rdd.foreach(f) |
【函数应用】将df的每一块应用函数f:
df.foreachPartition(f) 或者 df.rdd.foreachPartition(f) |
【Map和Reduce应用】返回类型seqRDDs
df.map(func) df.reduce(func) |
解决toDF()跑出First 100 rows类型无法确定的异常,可以采用将Row内每个元素都统一转格式,或者判断格式处理的方法,解决包含None类型时转换成DataFrame出错的问题:
@staticmethod def map_convert_none_to_str(row): dict_row = row.asDict() for key in dict_row: if key != 'some_column_name': value = dict_row[key] if value is None: value_in = str("") else: value_in = str(value) dict_row[key] = value_in columns = dict_row.keys() v = dict_row.values() row = Row(*columns) retu row(*v) |
查:
行元素查询操作:
像SQL那样打印列表前20元素(show函数内可用int类型指定要打印的行数):
df.show() df.show(30) |
以树的形式打印概要
df.printSchema() |
获取头几行到本地:
list = df.head(3) # Example: [Row(a=1, b=1), Row(a=2, b=2), ... ...] list = df.take(5) # Example: [Row(a=1, b=1), Row(a=2, b=2), ... ...] |
输出list类型,list中每个元素是Row类:
list = df.collect() |
注:此方法将所有数据全部导入到本地
查询总行数:
int_num = df.count() |
查询某列为null的行:
from pyspark.sql.functions import isnull df = df.filter(isnull("col_a")) |
列元素操作:
获取Row元素的所有列名:
r = Row(age=11, name='Alice') print r.__fields__ # ['age', 'name'] |
选择一列或多列:
df.select(“name”) df.select(df[‘name’], df[‘age’]+1) df.select(df.a, df.b, df.c) # 选择a、b、c三列 df.select(df["a"], df["b"], df["c"]) # 选择a、b、c三列 |
排序:
df = df.sort("age", ascending=False) |
过滤数据(filter和where方法相同):
df = df.filter(df['age']>21) df = df.where(df['age']>21) # 对null或nan数据进行过滤: from pyspark.sql.functions import isnan, isnull df = df.filter(isnull("a")) # 把a列里面数据为null的筛选出来(代表python的None类型) df = df.filter(isnan("a")) # 把a列里面数据为nan的筛选出来(Not a Number,非数字数据) |
SQL操作:
DataFrame注册成SQL的表:
df.createOrReplaceTempView("TBL1") |
进行SQL查询(返回DataFrame):
conf = SparkConf() ss = SparkSession.builder.appName("APP_NAME").config(conf=conf).getOrCreate() df = ss.sql(“SELECT name, age FROM TBL1 WHERE age >= 13 AND age <= 19″) |
时间序列操作:
先按某几列分组,再按时间段分组:
from pyspark.sql.functions import window win_monday = window("col1", "1 week", startTime="4 day") GroupedData = df.groupBy([df.col2, df.col3, df.col4, win_monday]) |
参考资料:
传统MySQL查询(执行时间 19 min 16.58 sec):
mysql> SELECT MIN(yearD), MAX(yearD) AS max_year, Carrier, COUNT(*) AS cnt, SUM(IF(ArrDelayMinutes > 30, 1, 0)) AS flights_delayed, ROUND(SUM(IF(ArrDelayMinutes > 30, 1, 0)) / COUNT(*),2) AS rate FROM ontime_part WHERE DayOfWeek NOT IN (6 , 7) AND OriginState NOT IN ('AK' , 'HI', 'PR', 'VI') AND DestState NOT IN ('AK' , 'HI', 'PR', 'VI') GROUP BY carrier HAVING cnt > 1000 AND max_year > '1990' ORDER BY rate DESC , cnt DESC LIMIT 10; |
使用Scala语言摘写的Spark查询(执行时间 2 min 19.628 sec):
scala> val jdbcDF = spark.read.format("jdbc").options(Map("url" -> "jdbc:mysql://localhost:3306/ontime?user=root&password=mysql", "dbtable" -> "ontime.ontime_sm", "fetchSize" -> "10000", "partitionColumn" -> "yeard", "lowerBound" -> "1988", "upperBound" -> "2015", "numPartitions" -> "48")).load() jdbcDF.createOrReplaceTempView("ontime") val sqlDF = sql("SELECT MIN(yearD), MAX(yearD) AS max_year, Carrier, COUNT(*) AS cnt, SUM(IF(ArrDelayMinutes > 30, 1, 0)) AS flights_delayed, ROUND(SUM(IF(ArrDelayMinutes > 30, 1, 0)) / COUNT(*),2) AS rate FROM ontime_part WHERE DayOfWeek NOT IN (6 , 7) AND OriginState NOT IN ('AK' , 'HI', 'PR', 'VI') AND DestState NOT IN ('AK' , 'HI', 'PR', 'VI') GROUP BY carrier HAVING cnt > 1000 AND max_year > '1990' ORDER BY rate DESC , cnt DESC LIMIT 10; ") sqlDF.show() |
Spark RDD中的map、reduce等操作的概念详解:
map将RDD中的每个元素都经过map内函数处理后返回给原来的RDD,即对每个RDD单独处理且不影响其它和总量。属于一对一的关系(这里一指的是对1个RDD而言)。
flatMap将RDD中的每个元素进行处理,返回一个list,list里面可以是1个或多个RDD,最终RDD总数会不变或变多。属于一变多的关系(这里一指的是对1个RDD而言)。
reduce将RDD中元素前两个传给输入函数,产生一个新的retu值,新产生的retu值与RDD中下一个元素(第三个元素)组成两个元素,再被传给输入函数,直到最后只有一个值为止。属于多变一的关系。
val c = sc.parallelize(1 to 10)
c.reduce((x, y) => x + y)//结果55
reduceByKey(binary_function)
reduceByKey就是对元素为KV对的RDD中Key相同的元素的Value进行binary_function的reduce操作,因此,Key相同的多个元素的值被reduce为一个值,然后与原RDD中的Key组成一个新的KV对。属于多变少的关系。
val a = sc.parallelize(List((1,2),(1,3),(3,4),(3,6)))
a.reduceByKey((x,y) => x + y).collect
作者:静悟生慧
来源链接:https://www.cnblogs.com/Allen-rg/p/9626399.html
版权声明:
1、JavaClub(https://www.javaclub.cn)以学习交流为目的,由作者投稿、网友推荐和小编整理收藏优秀的IT技术及相关内容,包括但不限于文字、图片、音频、视频、软件、程序等,其均来自互联网,本站不享有版权,版权归原作者所有。
2、本站提供的内容仅用于个人学习、研究或欣赏,以及其他非商业性或非盈利性用途,但同时应遵守著作权法及其他相关法律的规定,不得侵犯相关权利人及本网站的合法权利。
3、本网站内容原作者如不愿意在本网站刊登内容,请及时通知本站(javaclubcn@163.com),我们将第一时间核实后及时予以删除。