Excel常用快捷键命令大全
836
2022-05-30
大家好,我是老兵。
Flink基于流编程模型,内置了很多强大功能的算子,可以帮助我们快速开发应用程序。
作为Flink开发老手,大多算子的写法和场景想来已是了然于胸,但是使用过程常常会有一些小小的问题:
部分算子长时间未用,忘了用法。。
某些场景选择什么算子?如何选择?含糊不清。。
工欲善其事,必先利其器!快速高效的使用合适的算子开发程序,往往可以达到事半功倍的效果。
想着好记性不如烂笔头这个道理,特此整理一份常见的Flink算子开发手册!!也作为自己的工作笔记。欢迎大家~
1 DataStream API
Flink DataStream API让用户灵活且高效编写Flink流式程序。主要分为DataSource模块、Transformation模块以及DataSink模块。
Flink编程模型流程示意
Source模块定义数据接入功能,包括内置数据源和外部数据源。
Transformation模块定义DataStream数据流各种转换操作。
Sink模块定义数据输出功能,存储结果到外部存储介质中。
大家好,我是老兵。
Flink基于流编程模型,内置了很多强大功能的算子,可以帮助我们快速开发应用程序。
作为Flink开发老手,大多算子的写法和场景想来已是了然于胸,但是使用过程常常会有一些小小的问题:
部分算子长时间未用,忘了用法。。
某些场景选择什么算子?如何选择?含糊不清。。
工欲善其事,必先利其器!快速高效的使用合适的算子开发程序,往往可以达到事半功倍的效果。
想着好记性不如烂笔头这个道理,特此整理一份常见的Flink算子开发手册!!也作为自己的工作笔记。欢迎大家~
1 DataStream API
Flink DataStream API让用户灵活且高效编写Flink流式程序。主要分为DataSource模块、Transformation模块以及DataSink模块。
Source模块定义数据接入功能,包括内置数据源和外部数据源。
Transformation模块定义DataStream数据流各种转换操作。
Sink模块定义数据输出功能,存储结果到外部存储介质中。
执行环境: StreamExecutionEnvironment
系统模块 :
DataSouce、Transformation和DataSink
2 DataSource 输入
DataSource输入模块定义了DataStream API中的数据输入操作,Flink输入数据源分为内置数据源和第三方数据源两种类型。
内置数据源包括文件、Socket网络端口以及集合类型数据,不需要引入其他依赖库,在Flink系统内部已经实现。
第三方数据源定义了Flink和外部系统数据交互逻辑,例如Apache Kafka Connector、Elastic Search Connector等。
同时用户可以自定义数据源。
2.1 readTextFile、readFile算子
支持读取文本文件到Flink系统,转换成DataStream数据集。
readTextFile算子直接读取系统文本文件(.log|.txt ...)
readFile算子可以指定InputFormat读取特定数据类型的文件(包括CSV、JSON或者自定义InputFormat)
// 读取文本文件 val textInputStream = env.readTextFile( "/data/example.log") // 指定InputFormat,读取CSV文件 val csvInputStream = env.readFile( // 可以自定义类型(InputFormat) new CsvInputFormat[String] ( new Path("/data/example.csv") ) { override def fillRecord(out: String, onbjects: Array[AnyRef]: String) = { return null } }, "/data/example.csv" )
2.2 Socket算子
支持从Socket端口读取数据,转换成DataStream算子。
算子参数:Ip地址、端口、delimiter字符串切割符、最大重试次数maxRetry
maxRetry主要提供任务失败重连机制。当设定为0时,Flink任务直接停止。
Unix环境下,执行nc -lk [:port] 启动网络服务
// Flink程序读取Socket端口(9999)数据 val socketDataStream = env.socketTextStream("localhost", 9999)
2.3 集合算子
支持操作Flink内置集合类(Collection),转换成DataStream。
支持Java、Scala算子常见集合类
本质是将本地集合数据分发到远程执行;适用于本地测试,注意数据结构类型的一致性
// fromElements元素集合转换 val elementDataStream = env.fromElements( Tuple2('aa', 1L),Tuple2('bb', 2L) ) // fromCollection数组转换(Java) String[] collections = new String[] { "aa", "bb" }; DataStream
2.4 外部数据源算子
支持从第三方数据源系统读取数据,转换成DataStreams算子。
常见外部数据源算子: Hadoop FileSystem、ElasticSearch、 Apache Kafka、RabbitMQ等
使用时需要在Maven环境中添加jar包依赖(pom)
// Maven配置
2.5 自定义数据源算子
支持实现内置的Function相关接口,自定义数据源。
具体的内置方法包含但不限于:
SourceFunction接口
ParallelSourceFunction接口
RichParallelSourceFunction类
后续再通过env的addSource()方法添加,具体实现不展开。
3 DataStream转换
Flink对若干个DataStream操作生成新的DataStream,该过程被称为Transformation。
Flink程序中大多数逻辑均在Transformation过程中完成,包含转换、过滤、排序、连接、关联、选择和聚合等操作。
注意和Spark中transformation的区别。
Flink中DataStream转换可以分为几种类型:
Single DataStream: 单个DataStream数据集元素处理逻辑
Multi DataStream: 多个DataStream数据集元素处理逻辑
物理分区:数据集并行度和数据分区处理
3.1 Map算子(#Single)
对数据集中每个元素进行转换操作,生成新DataStream。
底层为MapFunction算子。通过调用map函数,对每个元素执行操作。
常用于数据清洗、计算和转换等。
val inputStream = env.fromElements( ("aa", 1), ("bb", 2), ("cc", 3) ) // 第一种写法: map操作,完成每个元素 + 1 val mapStream1 = inputStream.map( t => (t._1, t.2 + 1) ) // 第二种写法: 指定MapFunction val mapStream2 = inputStream.map( new MapFunction[(String, Int), (String, Int)] { override def map(t: (String, Int)): (String, Int) = { (t._1, t._2 + 1)} } )
3.2 FlatMap算子(#Single)
支持对数据集中所有元素转换成多个元素,生成新DataStream。
val flatDataStream = env.fromCollections() val resultStream = flatDataStream.flatMap{ line => line.split(",") }
3.3 filter算子(#Single)
支持对数据集进行过滤筛选,生成新的DataStream
// 通配符写法 val filterDataStream = dataStream.fliter { _ % 2 == 0 } // 指定运算符表达式 val filterDS = dataStream.filter( x => x % 2 == 0 )
3.4 keyBy算子(#Single)
根据指定Key对DataStream数据集分区,生成新的KeyedStream
相同Key值的数据归并到同一分区
类似于Spark中的groupByKey
val inputStream = env.fromElements( ("aa", 11), ("aa", 22), ("bb", 33) ) // 根据第一个字段作为key分区 // 转换为KeyedStream[(String, String), Tuple] val keyedStream: inputStream.keyBy(0)
3.5 reduce算子(#Single)
支持对输入KeyedSteam根据reduce()聚合,生成新的DataStream
根据key分区聚合形成KeyedStream
支持运算符和自定义reduceFunc函数
val inputStream = env.fromElements( ("aa", 11), ("bb", 33), ("cc", 22), ("aa", 21) ) // 指定第一个字段分区key val keyedStream = inputStream.keyBy(0) // 对第二个字段进行累加求和 val reduceDataStream = keyedStream.reduce { (t1, t2) => (t1._1, t1._2 + t2._2) }
自定义Reduce函数,需要实现匿名类。
val reduceDataStream = keyedStream.reeduce( new ReduceFunction[(String, Int)] { override def reduce(t1: (String,Int), t2: (String, Int)): (String, Int) = { (t1._1, t1._2 + t2._2) } } )
3.6 aggregations算子(#Single)
DataStream基础聚合算子,通过输入KeyedStream进行聚合生成新的DataStream
根据指定字段聚合,可自定义聚合逻辑
底层封装了sum、min、max等函数
val inputStream = env.fromElements( (1, 7), (2, 8), (3, 11), (2, 3) ) // 指定第一个字段分区key val keyedStream: [(Int, Int), Tuple] = inputStream.keyBy(0) // 第二个字段sum统计 val sumStream = keyedStream.sum(1) // 最后输出结果 sumStream.print()
3.7 Connect合并算子(#Multi)
合并多种类型数据集,并保留原数据集的数据类型,生成ConnectedStream
共享状态数据,可互相获取数据集状态
某些场景下可替代join算子,变相实现flink双流join功能
// 创建不同数据类型数据集 val stream1 = env.fromElements( ("aa", 3), ("bb", 4), ("cc", 11), ("dd", 22) ) val stream2 = env.fromElements( (1, 2, 11, 8) ) // 连接数据集 // 返回[(String, Int), Int] // 类似: [("aa", 3),1] val connectedStream = stream1.connect(stream2)
3.8 Connect算子—CoMap(#Multi)
ConnectedStream数据流的Map功能算子,操作合并数据集所有元素
定义CoMapFunction对象,参数为输入数据类型、输出数据类型和mapFunc
子map函数多线程交替执行,生成最终的合并目标数据集
// 上文Connected操作后形成的数据流 // 参数: 第1个为stream1类型;第2个为stream2类型;第3个为stream3类型 val resultStream = connnectedStream.map( new CoMapFunction[(String, Int), Int, (Int, String)] { // 定义第一个数据集处理逻辑,输入值为stream1 override def map1(in1: (String, Int)): (Int, String) = { (in1._2, in1._1) } // 定义第二个数据集处理逻辑,输入值为stream2 override def map2(in2: Int): (Int,String)={ (in2, "default") } )
3.9 Connect算子—CoFlatMap(#Multi)
ConnectedStream数据流的flatmap功能算子
在flatmap()方法中指定CoFlatMapFunction,并分别实现flatmap1()和flatmap2()函数。
val resultStream2 = connectedStream.flatMap( new CoFlatMapFunction[(String, Int), Int, (String, Int, Int)] { // 举例: 函数中共享变量,完成两个数据集合并 var value = 0 // 定义第1个数据集处理函数 override def flatMap1(in1: (String, Int), collect: Collector[(String, Int, Int)]): Unit = { collect.collect((in1._1, in1._2, value)) } } // 定义第2个数据集处理函数 override def flatMap2(in2: Int, collect: Collector[(String, Int, Int)]): Unit = { value = in2 } )
3.10 Union算子(#Multi)
将两个或者多个数据集合并,生成与输入数据集类型一致的DataStream
输入数据集的数据类型要求一致
输出数据集的数据类型和输入数据一致
注意和connect算子的区别
val stream1 = env.fromElements( ("aa", 3), ("bb", 22), ("cc", 45) ) val stream2 = env.fromElements( ("dd", 23), ("ff", 21), ("gg", 89) ) val stream3 = .... // 合并数据集 val unionStream = stream1.union(stream2) val unionStream2 = stream1.union( stream2, stream3 )
3.11 Split算子(#Multi)
将DataStream数据集按照条件拆分,转换成两个数据集的DataStream算子
将接入的数据路由到多个输出数据集,在split函数中定义拆分逻辑
可以被看作是union的逆向实现
val stream1 = env.fromElements( ("aa", 3), ("bb", 33), ("cc", 56),("aa", 23), ("cc", 67) ) // 根据第二个字段的奇偶性标记数据(切分) val splitStream = stream1.split( v => if (v._2 % 2 == 1 Seq("even") else Seq("odd")) )
3.12 Select算子(#Multi)
Select筛选算子,通过条件选择数据集中元素,生成新的DataStream
// 筛选偶数数据 val evenStream = splitedStream.select("even") //筛选所有数据 val allStream = splitedStream.select("even", "odd")
3.13 window窗口算子(时间机制)
Flink的窗口算子是实时计算的核心算子,常用于某固定时间内指标统计
1)窗口API
Flink提供了高级窗口API算子,封装底层窗口操作,包括窗口类型、触发器、侧输出等。同时根据上游输入Stream流分为Non-Keyed和Keyed两种类型。
Non-keyed(上游为Non-KeyedStream) 直接调用windowAll(),获取全局统计
val inputStream: DataStream = ... // 当传入为KeyedStream时,调用window()函数 inputStream.keyBy(0).window(new WindowFunc(...)) // 当传入为不做处理的Non-Keyed输入Stream流 // 直接使用windowAll()全局统计 inputStream.windowAll(new WindowFunc(...))
keyed(上游为KeyedStream类型)
调用DataStream的内置window()
stream.keyBy(..//keyed输入流.) .window(..//窗口类型.) .trigger(.//触发器<可选>..) .evictor(.//剔除器<可选>.) .allowdedLateness(.//延迟处理机制.) .sideOutputLateDate(.//侧输出.) .reduce/fold.aggregate/apply(.//计算函数.)
2)窗口类型
根据窗口的分配方式分为: 滚动、滑动、会话和全局等,分别支持不同窗口流动方式和范围。
同时支持事件时间和处理时间数据流。
Tumbling Window Join (滚动窗口)
Tumbling Window Join (滚动窗口)
Sliding Window Join (滑动窗口)
Sliding Window Join (滑动窗口)
Session Widnow Join(会话窗口)
Session Widnow Join(会话窗口)
以十分钟时间滑动窗口统计案例说明:
val tumblingStream = inputStream .keyBy(0) .window( TumblingEventTimeWindows.of( Time.seconds(10)) ).process(...)
4 DataSink输出
Flink读取数据源,经过系列Transform操作后,结果一般转存至外部存储介质或者下游,即Flink的DataSink过程。
Flink将外部存储的连接逻辑封装在Connector连接器中,常见的有:
Apache Kafka
ElasticSearch
Hadoop FileSystem
Redis
文件系统、端口
4.1 文件|端口
支持文件、客户端、Socket网络输出,为Flink内置算子,不需要依赖三方库
常见有writeAsCSV(本地文件)、writeToSocket(Socket网络)
// 本地csv inputStream.writeAsCsv( "file://path/xx.csv", WriteMode.OVERWRITE ) // Socket网络 inputStream.writeToSocket( host, post, new SimpleStringSchema() )
4.2 外部第三方
基于SinkFunction定义,需要引入外部三方依赖库,设置三方系统参数
val dataStream = ... // 定义FlinkKafkaProducer val kafkaProducer = new FlinkKafkaProducer011[Sting] ( "localhost:9092", //kafka broker list连接 "xxx-topic", // kafka topic new SimpleStringSchema() //序列化 ) // 添加SinkFunc dataStream.addSink(kafkaProducer())
5 总结
Flink内置的算子库种类全、功能强大,熟练掌握算子的使用方式和场景应用,是实时计算的必备技能。
后面还会继续更新此系列,欢迎添加我的个人- youlong525,一起学习交流~
未完待续。。
》》更多好文,欢迎关注公众号: 大数据兵工厂
Flink Scala 大数据 实时流计算服务 CS
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。