从python编译到运行pyspark样例

网友投稿 1080 2022-05-30

MRS集群默认会带上Python2.7.5和Python3.8.0两个版本的Python。默认使用的是Python2.7.5。但是有时候我们希望使用的是我们指定的Python版本来运行pyspark任务,因此需要自行上传对应的Python版本包。由于Python较依赖环境,不同环境编译出来的Python版本可能并不通用。目前遇到过因为libffi和libffi-devel版本不一致导致pyspark运行的时候报错。因此我们在上传Python版本压缩包之前最好在集群系统相同的Linux机器上编译对应的Python。

本文主要介绍从购买华为云ECS到运行pyspark的wordcount样例的过程。

1      准备环境

测试环境是一套MRS_3.1.0普通集群;

希望使用的Python版本是Python3.6.6;

1.1      购买华为云ECS

MRS使用的ECS系统版本一般都是EulerOS2.2或者EulerOS2.5,因此我们购买ECS的时候可以选择2.2或者2.5的EulerOS系统。

如果这个ECS只用于编译Python,需要的资源并不多,可以选择最低规格的ECS。本次测试使用的规格为:通用计算型-2U4G EulerOS2.2

另外最好给ECS配置上EIP,方便访问公网下载Python源码。

购买ECS具体操作参考:https://support.huaweicloud.com/qs-ecs/zh-cn_topic_0030831985.html

1.2      编译Python

使用以下命令安装三方依赖软件:

yum install zlib zlib-devel zip -y

下载对应Python版本源码:

wget https://www.python.org/ftp/python/3.6.6/Python-3.6.6.tgz

解压Python源码:

tar -zxvf Python-3.6.6.tgz

创建安装目录:

mkdir /opt/python36

编译Python:

cd Python-3.6.6 ./configure --prefix=/opt/python36

出现以下内容表明上述命令执行成功

make -j8

出现以下内容表明上述命令执行成功

make install

出现以下内容表明上述命令执行成功

至此,Python已经安装完成。

1.3      安装任务依赖模块

系统默认已经带上2.7版本Python,需要修改环境变量:

export PYTHON_HOME=/opt/python36 export PATH=$PYTHON_HOME/bin:$PATH

安装三方模块:

pip3 install helloword

本地测试是否安装成功:

使用python3进入python交互界面,执行以下代码:

import helloworld helloworld.say_hello("test")

如果出现以下内容则说明安装成功:

1.4      打包Python.zip

cd /opt/python36/ zip -r python36.zip ./*

将压缩包发送到需要使用的MRS客户端节点上,我们以客户端节点的/opt目录为存放位置。

2      测试运行

解压文件,配置环境变量:

cd /opt unzip python36.zip -d python36 export PYSPARK_PYTHON=/opt/python36/bin/python3

上传压缩包到HDFS上

hdfs dfs -mkdir /user/python hdfs dfs -put python36.zip /user/python

2.1      本地运行pyspark

使用pyspark启动local模式的交互界面,执行以下代码测试三方包是否生效:

>>> import helloworld Hello, Sara! >>> helloworld.say_hello("test") 'Hello, test!'

测试执行sql是否正常:

spark.sql("show tables").show() spark.sql("select count(*) from test1").show()

2.2      pyspark on yarn client模式

cd /opt pyspark --master yarn --deploy-mode client \ --conf spark.pyspark.python=./python36.zip/bin/python3 \ --conf spark.pyspark.driver.python=/opt/python36/bin/python3 \ --conf spark.yarn.dist.archives=hdfs://hacluster/user/python/python36.zip

因为client模式下driver是在客户端侧运行,因此需要对driver的python环境单独指定:

spark.pyspark.driver.python=/opt/python36/bin/python3

同样使用上一步的代码测试功能是否正常

增加测试executor是否拿到三方模块检查(/tmp/log1.txt是一个存放在hdfs上面的文本文件,内容不限定):

from pyspark import SparkContext sc = SparkContext.getOrCreate() inputPath = "/tmp/log1.txt" lines = sc.textFile(name = inputPath) words = lines.flatMap(lambda line:line.split(" "),True) pairWords = words.map(lambda word:(helloworld.say_hello(word),1),True) result = pairWords.reduceByKey(lambda v1,v2:v1+v2) result.foreach(lambda t :print(t))

可以看到executor日志里面打印了相关信息:

将最终结果打印到控制台:

output=result.collect() print(output) for (word, count) in output: print(word,count)

2.3      spark-submit on yarn client模式

将上面的测试命令写到test.py脚本里面:

从python编译到运行pyspark样例

# -*- coding: utf-8 -* import helloworld from pyspark import SparkConf, SparkContext if __name__ == "__main__": helloworld.say_hello("test") #创建SparkConf conf = SparkConf().setAppName("wordcount") #创建SparkContext 注意参数要传递conf=conf sc = SparkContext(conf=conf) inputPath = "/tmp/log1.txt" lines = sc.textFile(name = inputPath) #每一行数据按照空格拆分 得到一个个单词 words = lines.flatMap(lambda line:line.split(" "),True) #将每个单词 组装成一个tuple 计数1 pairWords = words.map(lambda word:(helloworld.say_hello(word),1),True) #reduceByKey进行汇总 result = pairWords.reduceByKey(lambda v1,v2:v1+v2) #executor上打印结果 result.foreach(lambda t :print(t)) #搜集所有结果 output = result.collect() #打印汇总结果 ptint(output) #分开打印结果 for (word, count) in output: print(word,count) #退出任务 sc.stop()

启动命令:

spark-submit --master yarn --deploy-mode client \ --conf spark.pyspark.python=./Python/bin/python3 \ --conf spark.pyspark.driver.python=/opt/python36/bin/python3 \ --conf spark.yarn.dist.archives=hdfs://hacluster/user/python/python36.zip#Python \ test.py

查看executor日志和控制台打印内容,确认结果有被打印。

2.4      spark-submit on yarn cluster模式

依旧使用上面的test.py脚本运行任务。

启动命令:

spark-submit --master yarn --deploy-mode cluster \ --conf spark.pyspark.python=./Python/bin/python3 \ --conf spark.yarn.dist.archives=hdfs://hacluster/user/python/python36.zip#Python \ test.py

查看executor日志和driver日志打印内容,确认结果有被打印。

3      结论

至此所有操作步骤都执行完成。关键操作就是编译Python和spark on yarn的client与cluster模式下driver的Python环境配置。

参考文档:

https://bbs.huaweicloud.com/blogs/168935

MapReduce服务 spark

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Hadoop搭建伪分布式
下一篇:圆教育梦想 华为软件开发云驱动大连盈科实现教育云服务转型
相关文章