Spark Streaming 快速入门系列(6) | DStream的几种保存方式

网友投稿 749 2022-05-30

大家好,我是不温卜火,是一名计算机学院大数据专业大二的学生,昵称来源于成语—不温不火,本意是希望自己性情温和。作为一名互联网行业的小白,博主写博客一方面是为了记录自己的学习过程,另一方面是总结自己所犯的错误希望能够帮助到很多和自己一样处于起步阶段的萌新。但由于水平有限,博客中难免会有一些错误出现,有纰漏之处恳请各位大佬不吝赐教!暂时只有csdn这一个平台,博客主页:https://buwenbuhuo.blog.csdn.net/

本片博文为大家带来的是DStream的几种保存方式。

目录

1. 保存到文本文件

2. 保存到Mysql (第一种写法)

3. 保存到Mysql (第二种写法)

关于这部分我们还可以通过查看官方文档实现:

http://spark.apache.org/docs/2.1.1/streaming-programming-guide.html#transformations-on-dstreams

输出操作指定了对流数据经转化操作得到的数据所要执行的操作(例如把结果推入外部数据库或输出到屏幕上)。

与RDD中的惰性求值类似,如果一个DStream及其派生出的DStream都没有被执行输出操作,那么这些DStream就都不会被求值。如果StreamingContext中没有设定输出操作,整个context就都不会启动。

下列为输出操作的方法与解释

注意:

连接不能写在driver层面(序列化);

如果写在foreach则每个RDD中的每一条数据都创建,得不偿失;

增加foreachPartition,在分区创建(获取)。

1. 保存到文本文件

1. 源码

package com.buwenbuhuo.spark.streaming.day02.output import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} /** * * @author 不温卜火 * @create 2020-08-12 20:45 * MyCSDN : https://buwenbuhuo.blog.csdn.net/ * */ object OutDemo1 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("OutDemo1").setMaster("local[2]") val ssc = new StreamingContext(conf, Seconds(3)) ssc.checkpoint("ck1") ssc .socketTextStream("hadoop002",9999) .flatMap(_.split("\W+")) .map((_,1)) .reduceByKey(_+_) .saveAsTextFiles("world","log") ssc.start() ssc.awaitTermination() } }

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

2. 打开端口进行测试

nc -lk 9999

1

3. 运行结果

2. 保存到Mysql (第一种写法)

1. 源码

package com.buwenbuhuo.spark.streaming.day02.output import java.util.Properties import org.apache.spark.SparkConf import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.streaming.{Seconds, StreamingContext} /** * * @author 不温卜火 * @create 2020-08-12 21:45 * MyCSDN : https://buwenbuhuo.blog.csdn.net/ * */ object OutDemo2 { val props: Properties = new Properties() props.setProperty("user","root") props.setProperty("password","199712") def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("OutDemo2").setMaster("local[2]") val ssc = new StreamingContext(conf, Seconds(3)) ssc.checkpoint("ck3") ssc .socketTextStream("hadoop002",9999) .flatMap(_.split("\W+")) .map((_,1)) .reduceByKey(_+_) .foreachRDD(rdd =>{ // 把rdd转成df // 1. 先创建sparkSession val spark: SparkSession = SparkSession.builder() .config(rdd.sparkContext.getConf) .getOrCreate() import spark.implicits._ // 2. 转换 val df: DataFrame = rdd.toDF("word","count") // 3. 写 df.write.mode("append").jdbc("jdbc:mysql://hadoop002:3306/rdd","window0812",props) }) ssc.start() ssc.awaitTermination() } }

1

2

3

4

5

6

7

8

9

10

11

12

Spark Streaming 快速入门系列(6) | DStream的几种保存方式

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

2. 运行与写入数据

3. 查看结果

3. 保存到Mysql (第二种写法)

1. 源码

package com.buwenbuhuo.spark.streaming.day02.output import java.util.Properties import org.apache.spark.SparkConf import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.streaming.{Seconds, StreamingContext} /** * * @author 不温卜火 * @create 2020-08-12 22:45 * MyCSDN : https://buwenbuhuo.blog.csdn.net/ * */ object OutDemo3 { val props: Properties = new Properties() props.setProperty("user","root") props.setProperty("password","199712") def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("OutDemo3").setMaster("local[2]") val ssc = new StreamingContext(conf, Seconds(3)) ssc.checkpoint("ck3") ssc .socketTextStream("hadoop002",9999) .flatMap(_.split("\\W+")) .map((_,1)) .updateStateByKey((seq:Seq[Int],opt:Option[Int]) => Some(seq.sum + opt.getOrElse(0))) .foreachRDD(rdd =>{ // 把rdd转成df // 1. 先创建sparkSession val spark: SparkSession = SparkSession.builder() .config(rdd.sparkContext.getConf) .getOrCreate() import spark.implicits._ // 2. 转换 val df: DataFrame = rdd.toDF("word","count") // 3. 写 df.write.mode("overwrite").jdbc("jdbc:mysql://hadoop002:3306/rdd","window0813",props) }) ssc.start() ssc.awaitTermination() } }

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

2. 运行

3. 运行结果

本次的分享就到这里了,

好书不厌读百回,熟读课思子自知。而我想要成为全场最靓的仔,就必须坚持通过学习来获取更多知识,用知识改变命运,用博客见证成长,用行动证明我在努力。

如果我的博客对你有帮助、如果你喜欢我的博客内容,请“” “评论”“”一键三连哦!听说的人运气不会太差,每一天都会元气满满呦!如果实在要白嫖的话,那祝你开心每一天,欢迎常来我博客看看。

码字不易,大家的支持就是我坚持下去的动力。后不要忘了关注我哦!

MySQL spark

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Oracle在线重定义之COPY_TABLE_DEPENDENTS
下一篇:基于MindSpore框架wide&deep模型的实战CTR体验
相关文章