Spark Streaming 进阶实战五个例子

网友投稿 901 2022-05-30

一、带状态的算子:UpdateStateByKey

实现 计算 过去一段时间到当前时间 单词 出现的 频次

object StatefulWordCount {

def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setMaster("local[2]").setAppName("StatefulWordCount")

val ssc = new streamingContext(sparkConf, Seconds(5))

//如果使用了 stateful 的算子,必须要设置 checkpoint,

//因为老的值 必须要 存在 某个 目录下面,新的值 才能去更新老的值

//在生产环境中,建议把checkpoint 设置到 hdfs 的某个文件夹中

ssc.checkpoint(".")

val lines = ssc.socketTextStream("localhost", 6789)

val result = lines.flatMap(_.split(" ").map((_, 1)))

val state = result.updateStateByKey[Int](updateFunction _)

state.print()

ssc.start()

ssc.awaitTermination()

}

/**

* 把当前的数据去更新已有的或者老的数据

* @param currentValues 当前的

* @param preValues 老的

* @return

*/

def updateFunction(currentValues: Seq[Int], preValues: Option[Int]): Option[Int] = {

val current = currentValues.sum

val pre = preValues.getOrElse(0)

Some(current + pre)

}

}

二、实战:计算到目前为止累积出现的单词的个数写入到mysql中

/**

* 使用 spark streaming 完成 词频统计,并输出到 mysql 数据库

* 创建 数据库

*

* 创建数据表

* create table wordcount (

* word varchar(50) default null,

* wordcount int(10) default null

* )

*/

object ForeachRDDApp {

def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setMaster("local[2]").setAppName("ForeachRDDApp")

val ssc = new StreamingContext(sparkConf, Seconds(5))

val lines = ssc.socketTextStream("localhost", 6789)

val result = lines.flatMap(_.split(" ").map((_, 1))).reduceByKey(_ + _)

//result.print()

//将结果写入到mysql

//1、错误的方式

// result.foreachRDD(rdd =>{

// val connection = createConnection()

// rdd.foreach {

// record =>

// val sql = "insert into wordcount (word,wordcount)" +

// "values ('"+record._1+"','"+record._2+"')"

// connection.createStatement().execute(sql)

// }

// })

//2、正确的方式

result.foreachRDD(rdd => {

rdd.foreachPartition(partitionOfRecords => {

if (partitionOfRecords.size > 0) {

val connection = createConnection()

partitionOfRecords.foreach(pair => {

val sql = "insert into wordcount (word,wordcount)" +

"values ('" + pair._1 + "','" + pair._2 + "')"

connection.createStatement().execute(sql)

})

connection.close()

}

})

})

//3、更好的方式,查阅官方文档,使用 连接池的方式

//存在的问题,这样每次都会插入新的数据,同样的单词频次字段不会去累加更新

//解决方案 :每次 insert 之前,判断一下,该单词是否已经存在数据库中,如果已经存在则update

//或者 存放在 hbase /redis 中,调用相应的api ,直接 插入和更新。

ssc.start()

ssc.awaitTermination()

}

def createConnection() = {

Class.forName("com.mysql.jdbc.Driver")

DriverManager.getConnection("jdbc://mysql://localhost:3306/dzx_spark", "root", "1234")

}

}

三、基于window 的统计

window :定时的进行一个时间段内的数据处理

window length : 窗口的长度

sliding interval : 窗口的间隔

这2个参数和我们的batch size  成倍数关系。如果不是倍数关系运行直接报错

每隔多久计算某个范围内的数据:每隔10秒计算前10分钟的wc ==>每隔 sliding interval  统计 window length 的值

pair.reduceByKeyAndWindow((a:Int,b:Int)=>(a+b),Seconds(30),Seconds(10))

四、黑名单过滤

/**

* 黑名单过滤

*

* 访问日志 ==> DStream

*

* 20180808,zs

* 20180808,ls

* 20180808,ww

*

* ==> (zs:20180808,zs) (ls:20180808,ls)(ww:20180808,ww)

*

* 黑名单列表 ==》 RDD

* zs ls

* ==>(zs:true) (ls:true)

*

*

* leftjoin

* (zs:[<20180808,zs>,]) pass ...

* (ls:[<20180808,ls>,]) pass ...

* (ww:[<20180808,ww>,]) ==> tuple1 ok...

*

*

*/

object BlackNameListApp {

def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setMaster("local[2]").setAppName("ForeachRDDApp")

val ssc = new StreamingContext(sparkConf, Seconds(5))

/**

* 构建黑名单

*/

val blacks = List("zs", "ls")

val blacksRDD = ssc.sparkContext.parallelize(blacks)

.map(x => (x, true))

val lines = ssc.socketTextStream("localhost", 6789)

val clicklog = lines.map(x => (x.split(",")(1), x)).transform(rdd => {

rdd.leftOuterJoin(blacksRDD).filter(x => x._2._2.getOrElse(false) != true).map(x => x._2._1)

})

clicklog.print()

ssc.start()

ssc.awaitTermination()

}

}

五、 spark  streaming 整合 spark  sql  实战

object SqlNetworkWordCount {

def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setMaster("local[2]").setAppName("SqlNetworkWordCount")

val ssc = new StreamingContext(sparkConf, Seconds(5))

val lines = ssc.socketTextStream("192.168.42.85", 6789)

val words = lines.flatMap(_.split(" "))

// Convert RDDs of the words DStream to DataFrame and run SQL query

words.foreachRDD { (rdd: RDD[String], time: Time) =>

// Get the singleton instance of SparkSession

val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)

import spark.implicits._

// Convert RDD[String] to RDD[case class] to DataFrame

val wordsDataFrame = rdd.map(w => Record(w)).toDF()

// Creates a temporary view using the DataFrame

wordsDataFrame.createOrReplaceTempView("words")

// Do word count on table using SQL and print it

val wordCountsDataFrame =

Spark Streaming 进阶实战五个例子

spark.sql("select word, count(*) as total from words group by word")

println(s"========= $time =========")

wordCountsDataFrame.show()

}

ssc.start()

ssc.awaitTermination()

}

}

/** Case class for converting RDD to DataFrame */

case class Record(word: String)

/** Lazily instantiated singleton instance of SparkSession */

object SparkSessionSingleton {

@transient private var instance: SparkSession = _

def getInstance(sparkConf: SparkConf): SparkSession = {

if (instance == null) {

instance = SparkSession

.builder

.config(sparkConf)

.getOrCreate()

}

instance

}

}

MySQL spark

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:python初始
下一篇:【七日阅书】1.注重实效《程序员修炼之道—从小工到专家》
相关文章