【Spark】(task2)PySpark数据统计和分组聚合

网友投稿 1089 2022-05-30

学习总结

文章目录

学习总结

一、数据统计

1.1 读取文件

1.2 保存读取的信息

1.3 分析每列的类型,取值个数

1.4 分析每列是否包含缺失值

二、分组聚合

2.1 学习groupby分组聚合的使用

2.2 学习agg分组聚合的使用

2.3 transform的使用

Reference

一、数据统计

1.1 读取文件

步骤1:读取文件https://cdn.coggle.club/Pokemon.csv

import pandas as pd from pyspark.sql import SparkSession # 创建spark应用 spark = SparkSession.builder.appName('mypyspark').getOrCreate() # 用python链接spark环境 from pyspark import SparkFiles spark.sparkContext.addFile('https://cdn.coggle.club/Pokemon.csv') df = spark.read.csv(SparkFiles.get('Pokemon.csv'), header = True, inferSchema = True) df.show(5)

1

2

3

4

5

6

7

8

9

10

11

12

13

并且我们将表头属性有.的部分进行更名:

#重命名 df = df.withColumnRenamed('Sp. Atk', 'Sp Atk') df = df.withColumnRenamed('Sp. Def', 'Sp Def') df.show(5)

1

2

3

4

5

1.2 保存读取的信息

步骤2:将读取的进行保存,表头也需要保存,这里可保存为csv或者json格式文件。

#df.write df.write.csv(path='Pokemon.csv',mode='overwrite', header=True ) df.write.json(path='Pokemon.json',mode='overwrite' )

1

2

3

4

1.3 分析每列的类型,取值个数

步骤3:分析每列的类型,取值个数,这里就可以使用df.describe分析每列的最大值、最小值、平均值、方差等特性了(下面我只贴出HP的结果)。

for column in df.columns: df.describe(column).show()

1

2

1.4 分析每列是否包含缺失值

步骤4:分析每列是否包含缺失值,打印出有缺失值的样本数,占总数的比例。

df.filter(df['Type 2'].isNull()).count() # 386 # 转换成pandas,打印出每一列的缺失值个数 df.toPandas().isnull().sum() # 结果: Name 0 Type 1 0 Type 2 386 Total 0 HP 0 Attack 0 Defense 0 Sp Atk 0 Sp Def 0 Speed 0 Generation 0 Legendary 0 dtype: int64

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

打印出有缺失值的样本数,占总数的比例:

for col in df.columns: print(col, df.filter(df[col].isNull()).count()/df.count()) Name 0.0 Type 1 0.0 Type 2 0.4825 Total 0.0 HP 0.0 Attack 0.0 Defense 0.0 Sp Atk 0.0 Sp Def 0.0 Speed 0.0 Generation 0.0 Legendary 0.0

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

二、分组聚合

读取文件https://cdn.coggle.club/Pokemon.csv

和刚才的步骤一是一样的。

import pandas as pd from pyspark.sql import SparkSession # 创建spark应用 spark = SparkSession.builder.appName('mypyspark').getOrCreate() # 用python链接spark环境 from pyspark import SparkFiles spark.sparkContext.addFile('https://cdn.coggle.club/Pokemon.csv') df = spark.read.csv(SparkFiles.get('Pokemon.csv'), header = True, inferSchema = True) df.show(5)

1

2

3

4

5

6

7

8

9

10

11

12

13

2.1 学习groupby分组聚合的使用

和sql的group by一样意思,分组聚合,下面找到Type 1对应数据进行分组和计数。

# Type 1 各种数量 df.groupby('Type 1').count().show(5)

1

2

还可以分组后(按照Type 1分组)计算每组内的HP平均值:

df.groupBy(['Type 1']).avg('HP').show(5) +------+-----------------+ |Type 1| avg(HP)| +------+-----------------+ | Water| 72.0625| |Poison| 67.25| | Steel|65.22222222222223| | Rock|65.36363636363636| | Ice| 72.0| +------+-----------------+ only showing top 5 rows

1

2

3

4

5

6

7

8

9

10

11

12

而如果是计算每组的个数:

# Type 1 各种数量 df.groupby('Type 1').count().show(5) +------+-----+ |Type 1|count| +------+-----+ | Water| 112| |Poison| 28| | Steel| 27| | Rock| 44| | Ice| 24| +------+-----+ only showing top 5 rows

1

2

3

4

5

6

7

8

9

10

11

12

13

2.2 学习agg分组聚合的使用

和2.1一样实现分组后(按照Type 1分组)计算每组内的HP平均值,我们也可以用agg分组聚合,{'HP': 'mean'}为参数。

df.groupBy('Type 1').agg({'HP': 'mean'}).show(5) +------+-----------------+ |Type 1| avg(HP)| +------+-----------------+ | Water| 72.0625| |Poison| 67.25| | Steel|65.22222222222223| | Rock|65.36363636363636| | Ice| 72.0| +------+-----------------+ only showing top 5 rows

1

2

3

4

5

6

7

8

9

10

11

12

数据量不是很多时,可以使用toPandas转成pandas的dataframe,然后使用pandas的各种函数:

df.toPandas().groupby('Type 1', as_index=False)['HP'].agg({'hp_mean': 'mean'})

1

2.3 transform的使用

transform的参数如下,可以看到需要传入一个函数(用于转换数据)。

DataFrame.transform(func: Callable[[…], Series], axis: Union[int, str] = 0, *args: Any, **kwargs: Any) → DataFrame

1

如果我们要按照Type 1分组后,求组内关于HP的平均值,可以将np.mean函数传入transform,具体介绍可以参考pyspark官方文档。

df.toPandas().groupby('Type 1')['HP'].transform(np.mean) 0 67.271429 1 67.271429 2 67.271429 3 67.271429 4 69.903846 ... 795 65.363636 796 65.363636 797 70.631579 798 70.631579 799 69.903846 Name: HP, Length: 800, dtype: float64

1

2

3

4

【Spark】(task2)PySpark数据统计和分组聚合

5

6

7

8

9

10

11

12

13

14

Reference

pyspark官方文档

https://spark.apache.org/docs/latest/quick-start.html

https://spark.apache.org/docs/latest/sql-programming-guide.html

https://github.com/apache/spark/tree/4f25b3f712/examples/src/main/python

https://sparkbyexamples.com/pyspark-tutorial/

https://www.tutorialspoint.com/pyspark/index.htm

pyspark—agg的用法

spark

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:markdown基本语法整理
下一篇:【docker系列】镜像分层原理及容器层写时复制
相关文章