为什么无法显示(sd卡内的照片为什么无法显示)
880
2022-05-30
服务器运行环境:Spark 2.4.4 + scall 2.11.12 + Kafka 2.2.2
由于业务相对简单,kafka只有固定topics,所以一直使用下面脚本执行实时流计算
spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.4 --py-files /data/service/xxx.zip /data/service/xxx.py
代码中使用pyspark.streaming.kafka的KafkaUtils来创建spark streaming与kafka的连接,运行了好长时间都没有出现过问题
随着新业务接入,在新功能中kafka需要使用动态topics方式,要用到正则表达式,查了KafkaUtils源码和相关资料,发现它不支持动态topics方式,需要使用spark-streaming-kafka-0-10才能支持
查看文档http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html 与 http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html 后,使用结构化流structured-streaming来实现
实现代码:
import sysfrom pyspark.sql import SparkSessiondef process_row(row): # Write row to storage passif __name__ == "__main__": if len(sys.argv) != 4: print(""" Usage: structured_kafka_wordcount.py
执行提交任务命令
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4 /data/service/demo.py master:9092 subscribePattern event.log.*
提交后一直报下面错误
org.apache.kafka.common.config.ConfigException: Missing required configuration "partition.assignment.strategy" which has no default value
查了好多资料,都说需要添加参数,配置Kafka分区分配策略,并将readStream修改为:
ds = spark\ .readStream\ .format("kafka")\ .option("kafka.bootstrap.servers", bootstrapServers)\ .option("kafka.partition.assignment.strategy", "range")\ .option(subscribeType, topics)\ .load()
再次运行异常信息改为无法连接kafka了,弄了整整一天人都快崩溃了还没搞定
还好最终查找https://xbuba.com/questions/44959483,大牛提示说有可能是kafka0.8版本的jar与kafka0.10的jar冲突原因造成的
使用命令查找
find / -name 'spark-streaming-kafka*' find / -name 'spark-sql-kafka*'
发现在/root/.ivy2/cache/org.apache.spark/ 目录下面存在spark-streaming-kafka-0-8_2.11 与 spark-sql-kafka-0-10_2.11 文件夹和相关的jar文件
将spark-streaming-kafka-0-8_2.11删除后执行代码就正常运行了
由于老脚本用的还是kafka0.8,为了兼容两个版本能同时运行,需要将/root/.ivy2/cache/org.apache.spark/ 目录下面kafka0.8与kafka0.10两个版本的jar全部清除
然后登录https://repo1.maven.org/maven2/org/apache/spark/ 下载spark-streaming-kafka-0-8与spark-sql-kafka-0-10对应的jar下来,并将提交命令spark-submit的参数改为:
spark-submit --jars /data/service/spark-streaming-kafka-0-8-assembly_2.11-2.4.4.jar --py-files /data/service/xxx.zip /data/service/xxx.py spark-submit --jars /data/service/spark-sql-kafka-0-10_2.11-2.4.4.jar /data/service/demo.py master:9092 subscribePattern event.log.*
修改后两个脚本运行都没有问题(PS:老脚本原想直接用org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4包来启动,执行后直接暴错,提示说要改为org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.4才行)
Spark
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。