一网打尽Flink算子大全(面试问的都在这里)

网友投稿 795 2022-05-30

大家好,我是老兵。

Flink基于流编程模型,内置了很多强大功能的算子,可以帮助我们快速开发应用程序。

作为Flink开发老手,大多算子的写法和场景想来已是了然于胸,但是使用过程常常会有一些小小的问题:

部分算子长时间未用,忘了用法。。

某些场景选择什么算子?如何选择?含糊不清。。

工欲善其事,必先利其器!快速高效的使用合适的算子开发程序,往往可以达到事半功倍的效果。

想着好记性不如烂笔头这个道理,特此整理一份常见的Flink算子开发手册!!也作为自己的工作笔记。欢迎大家~

1 DataStream API

Flink DataStream API让用户灵活且高效编写Flink流式程序。主要分为DataSource模块、Transformation模块以及DataSink模块。

Flink编程模型流程示意

Source模块定义数据接入功能,包括内置数据源和外部数据源。

Transformation模块定义DataStream数据流各种转换操作。

Sink模块定义数据输出功能,存储结果到外部存储介质中。

大家好,我是老兵。

Flink基于流编程模型,内置了很多强大功能的算子,可以帮助我们快速开发应用程序。

作为Flink开发老手,大多算子的写法和场景想来已是了然于胸,但是使用过程常常会有一些小小的问题:

部分算子长时间未用,忘了用法。。

某些场景选择什么算子?如何选择?含糊不清。。

工欲善其事,必先利其器!快速高效的使用合适的算子开发程序,往往可以达到事半功倍的效果。

想着好记性不如烂笔头这个道理,特此整理一份常见的Flink算子开发手册!!也作为自己的工作笔记。欢迎大家~

1 DataStream API

Flink DataStream API让用户灵活且高效编写Flink流式程序。主要分为DataSource模块、Transformation模块以及DataSink模块。

Source模块定义数据接入功能,包括内置数据源和外部数据源。

Transformation模块定义DataStream数据流各种转换操作。

Sink模块定义数据输出功能,存储结果到外部存储介质中。

执行环境: StreamExecutionEnvironment

系统模块 :

DataSouce、Transformation和DataSink

2 DataSource 输入

DataSource输入模块定义了DataStream API中的数据输入操作,Flink输入数据源分为内置数据源和第三方数据源两种类型。

内置数据源包括文件、Socket网络端口以及集合类型数据,不需要引入其他依赖库,在Flink系统内部已经实现。

第三方数据源定义了Flink和外部系统数据交互逻辑,例如Apache Kafka Connector、Elastic Search Connector等。

同时用户可以自定义数据源。

2.1 readTextFile、readFile算子

支持读取文本文件到Flink系统,转换成DataStream数据集。

readTextFile算子直接读取系统文本文件(.log|.txt ...)

readFile算子可以指定InputFormat读取特定数据类型的文件(包括CSV、JSON或者自定义InputFormat)

// 读取文本文件 val textInputStream = env.readTextFile( "/data/example.log") // 指定InputFormat,读取CSV文件 val csvInputStream = env.readFile( // 可以自定义类型(InputFormat) new CsvInputFormat[String] ( new Path("/data/example.csv") ) { override def fillRecord(out: String, onbjects: Array[AnyRef]: String) = { return null } }, "/data/example.csv" )

2.2 Socket算子

支持从Socket端口读取数据,转换成DataStream算子。

算子参数:Ip地址、端口、delimiter字符串切割符、最大重试次数maxRetry

maxRetry主要提供任务失败重连机制。当设定为0时,Flink任务直接停止。

Unix环境下,执行nc -lk [:port] 启动网络服务

// Flink程序读取Socket端口(9999)数据 val socketDataStream = env.socketTextStream("localhost", 9999)

2.3 集合算子

支持操作Flink内置集合类(Collection),转换成DataStream。

支持Java、Scala算子常见集合类

本质是将本地集合数据分发到远程执行;适用于本地测试,注意数据结构类型的一致性

// fromElements元素集合转换 val elementDataStream = env.fromElements( Tuple2('aa', 1L),Tuple2('bb', 2L) ) // fromCollection数组转换(Java) String[] collections = new String[] { "aa", "bb" }; DataStream collectionDatastream = env.fromCollection( Arrays.asList(collections) ); // List列表转换(Java) List arrays = new ArrayList<>(); arrays.add("aa") arrays.add("bb") DataStream arrayDataStream = env.fromCollection(arrays)

2.4 外部数据源算子

支持从第三方数据源系统读取数据,转换成DataStreams算子。

常见外部数据源算子: Hadoop FileSystem、ElasticSearch、 Apache Kafka、RabbitMQ等

使用时需要在Maven环境中添加jar包依赖(pom)

// Maven配置 org.apache.flink flink-connector-kafka-1.2_2.12 1.9.1 // 读取Kafka数据源(Java) Properties prop = new Properties(); prop.setProperty("bootstrap.servers", "localhost:9092"); ... DataStream kafkaStream = env.addSource( new FlinkKafkaStream010<> ( "topic-1", new SimpleStringSchema(), properties ) )

2.5 自定义数据源算子

支持实现内置的Function相关接口,自定义数据源。

具体的内置方法包含但不限于:

SourceFunction接口

ParallelSourceFunction接口

RichParallelSourceFunction类

后续再通过env的addSource()方法添加,具体实现不展开。

3 DataStream转换

Flink对若干个DataStream操作生成新的DataStream,该过程被称为Transformation。

Flink程序中大多数逻辑均在Transformation过程中完成,包含转换、过滤、排序、连接、关联、选择和聚合等操作。

注意和Spark中transformation的区别。

Flink中DataStream转换可以分为几种类型:

Single DataStream: 单个DataStream数据集元素处理逻辑

Multi DataStream: 多个DataStream数据集元素处理逻辑

一网打尽Flink算子大全(面试问的都在这里)

物理分区:数据集并行度和数据分区处理

3.1 Map算子(#Single)

对数据集中每个元素进行转换操作,生成新DataStream。

底层为MapFunction算子。通过调用map函数,对每个元素执行操作。

常用于数据清洗、计算和转换等。

val inputStream = env.fromElements( ("aa", 1), ("bb", 2), ("cc", 3) ) // 第一种写法: map操作,完成每个元素 + 1 val mapStream1 = inputStream.map( t => (t._1, t.2 + 1) ) // 第二种写法: 指定MapFunction val mapStream2 = inputStream.map( new MapFunction[(String, Int), (String, Int)] { override def map(t: (String, Int)): (String, Int) = { (t._1, t._2 + 1)} } )

3.2 FlatMap算子(#Single)

支持对数据集中所有元素转换成多个元素,生成新DataStream。

val flatDataStream = env.fromCollections() val resultStream = flatDataStream.flatMap{ line => line.split(",") }

3.3 filter算子(#Single)

支持对数据集进行过滤筛选,生成新的DataStream

// 通配符写法 val filterDataStream = dataStream.fliter { _ % 2 == 0 } // 指定运算符表达式 val filterDS = dataStream.filter( x => x % 2 == 0 )

3.4 keyBy算子(#Single)

根据指定Key对DataStream数据集分区,生成新的KeyedStream

相同Key值的数据归并到同一分区

类似于Spark中的groupByKey

val inputStream = env.fromElements( ("aa", 11), ("aa", 22), ("bb", 33) ) // 根据第一个字段作为key分区 // 转换为KeyedStream[(String, String), Tuple] val keyedStream: inputStream.keyBy(0)

3.5 reduce算子(#Single)

支持对输入KeyedSteam根据reduce()聚合,生成新的DataStream

根据key分区聚合形成KeyedStream

支持运算符和自定义reduceFunc函数

val inputStream = env.fromElements( ("aa", 11), ("bb", 33), ("cc", 22), ("aa", 21) ) // 指定第一个字段分区key val keyedStream = inputStream.keyBy(0) // 对第二个字段进行累加求和 val reduceDataStream = keyedStream.reduce { (t1, t2) => (t1._1, t1._2 + t2._2) }

自定义Reduce函数,需要实现匿名类。

val reduceDataStream = keyedStream.reeduce( new ReduceFunction[(String, Int)] { override def reduce(t1: (String,Int), t2: (String, Int)): (String, Int) = { (t1._1, t1._2 + t2._2) } } )

3.6 aggregations算子(#Single)

DataStream基础聚合算子,通过输入KeyedStream进行聚合生成新的DataStream

根据指定字段聚合,可自定义聚合逻辑

底层封装了sum、min、max等函数

val inputStream = env.fromElements( (1, 7), (2, 8), (3, 11), (2, 3) ) // 指定第一个字段分区key val keyedStream: [(Int, Int), Tuple] = inputStream.keyBy(0) // 第二个字段sum统计 val sumStream = keyedStream.sum(1) // 最后输出结果 sumStream.print()

3.7 Connect合并算子(#Multi)

合并多种类型数据集,并保留原数据集的数据类型,生成ConnectedStream

共享状态数据,可互相获取数据集状态

某些场景下可替代join算子,变相实现flink双流join功能

// 创建不同数据类型数据集 val stream1 = env.fromElements( ("aa", 3), ("bb", 4), ("cc", 11), ("dd", 22) ) val stream2 = env.fromElements( (1, 2, 11, 8) ) // 连接数据集 // 返回[(String, Int), Int] // 类似: [("aa", 3),1] val connectedStream = stream1.connect(stream2)

3.8 Connect算子—CoMap(#Multi)

ConnectedStream数据流的Map功能算子,操作合并数据集所有元素

定义CoMapFunction对象,参数为输入数据类型、输出数据类型和mapFunc

子map函数多线程交替执行,生成最终的合并目标数据集

// 上文Connected操作后形成的数据流 // 参数: 第1个为stream1类型;第2个为stream2类型;第3个为stream3类型 val resultStream = connnectedStream.map( new CoMapFunction[(String, Int), Int, (Int, String)] { // 定义第一个数据集处理逻辑,输入值为stream1 override def map1(in1: (String, Int)): (Int, String) = { (in1._2, in1._1) } // 定义第二个数据集处理逻辑,输入值为stream2 override def map2(in2: Int): (Int,String)={ (in2, "default") } )

3.9 Connect算子—CoFlatMap(#Multi)

ConnectedStream数据流的flatmap功能算子

在flatmap()方法中指定CoFlatMapFunction,并分别实现flatmap1()和flatmap2()函数。

val resultStream2 = connectedStream.flatMap( new CoFlatMapFunction[(String, Int), Int, (String, Int, Int)] { // 举例: 函数中共享变量,完成两个数据集合并 var value = 0 // 定义第1个数据集处理函数 override def flatMap1(in1: (String, Int), collect: Collector[(String, Int, Int)]): Unit = { collect.collect((in1._1, in1._2, value)) } } // 定义第2个数据集处理函数 override def flatMap2(in2: Int, collect: Collector[(String, Int, Int)]): Unit = { value = in2 } )

3.10 Union算子(#Multi)

将两个或者多个数据集合并,生成与输入数据集类型一致的DataStream

输入数据集的数据类型要求一致

输出数据集的数据类型和输入数据一致

注意和connect算子的区别

val stream1 = env.fromElements( ("aa", 3), ("bb", 22), ("cc", 45) ) val stream2 = env.fromElements( ("dd", 23), ("ff", 21), ("gg", 89) ) val stream3 = .... // 合并数据集 val unionStream = stream1.union(stream2) val unionStream2 = stream1.union( stream2, stream3 )

3.11 Split算子(#Multi)

将DataStream数据集按照条件拆分,转换成两个数据集的DataStream算子

将接入的数据路由到多个输出数据集,在split函数中定义拆分逻辑

可以被看作是union的逆向实现

val stream1 = env.fromElements( ("aa", 3), ("bb", 33), ("cc", 56),("aa", 23), ("cc", 67) ) // 根据第二个字段的奇偶性标记数据(切分) val splitStream = stream1.split( v => if (v._2 % 2 == 1 Seq("even") else Seq("odd")) )

3.12 Select算子(#Multi)

Select筛选算子,通过条件选择数据集中元素,生成新的DataStream

// 筛选偶数数据 val evenStream = splitedStream.select("even") //筛选所有数据 val allStream = splitedStream.select("even", "odd")

3.13 window窗口算子(时间机制)

Flink的窗口算子是实时计算的核心算子,常用于某固定时间内指标统计

1)窗口API

Flink提供了高级窗口API算子,封装底层窗口操作,包括窗口类型、触发器、侧输出等。同时根据上游输入Stream流分为Non-Keyed和Keyed两种类型。

Non-keyed(上游为Non-KeyedStream) 直接调用windowAll(),获取全局统计

val inputStream: DataStream = ... // 当传入为KeyedStream时,调用window()函数 inputStream.keyBy(0).window(new WindowFunc(...)) // 当传入为不做处理的Non-Keyed输入Stream流 // 直接使用windowAll()全局统计 inputStream.windowAll(new WindowFunc(...))

keyed(上游为KeyedStream类型)

调用DataStream的内置window()

stream.keyBy(..//keyed输入流.) .window(..//窗口类型.) .trigger(.//触发器<可选>..) .evictor(.//剔除器<可选>.) .allowdedLateness(.//延迟处理机制.) .sideOutputLateDate(.//侧输出.) .reduce/fold.aggregate/apply(.//计算函数.)

2)窗口类型

根据窗口的分配方式分为: 滚动、滑动、会话和全局等,分别支持不同窗口流动方式和范围。

同时支持事件时间和处理时间数据流。

Tumbling Window Join (滚动窗口)

Tumbling Window Join (滚动窗口)

Sliding Window Join (滑动窗口)

Sliding Window Join (滑动窗口)

Session Widnow Join(会话窗口)

Session Widnow Join(会话窗口)

以十分钟时间滑动窗口统计案例说明:

val tumblingStream = inputStream .keyBy(0) .window( TumblingEventTimeWindows.of( Time.seconds(10)) ).process(...)

4 DataSink输出

Flink读取数据源,经过系列Transform操作后,结果一般转存至外部存储介质或者下游,即Flink的DataSink过程。

Flink将外部存储的连接逻辑封装在Connector连接器中,常见的有:

Apache Kafka

ElasticSearch

Hadoop FileSystem

Redis

文件系统、端口

4.1 文件|端口

支持文件、客户端、Socket网络输出,为Flink内置算子,不需要依赖三方库

常见有writeAsCSV(本地文件)、writeToSocket(Socket网络)

// 本地csv inputStream.writeAsCsv( "file://path/xx.csv", WriteMode.OVERWRITE ) // Socket网络 inputStream.writeToSocket( host, post, new SimpleStringSchema() )

4.2 外部第三方

基于SinkFunction定义,需要引入外部三方依赖库,设置三方系统参数

val dataStream = ... // 定义FlinkKafkaProducer val kafkaProducer = new FlinkKafkaProducer011[Sting] ( "localhost:9092", //kafka broker list连接 "xxx-topic", // kafka topic new SimpleStringSchema() //序列化 ) // 添加SinkFunc dataStream.addSink(kafkaProducer())

5 总结

Flink内置的算子库种类全、功能强大,熟练掌握算子的使用方式和场景应用,是实时计算的必备技能。

后面还会继续更新此系列,欢迎添加我的个人微信: youlong525,一起学习交流~

未完待续。。

》》更多好文,欢迎关注公众号: 大数据兵工厂

Flink Scala 大数据 实时流计算服务 CS

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Java岗大厂面试百日冲刺【Day53】— 基础篇4 (日积月累,每日三题)
下一篇:【云驻共创】星光 SaaS 伙伴汉得信息:企业级 SaaS 能力构建,云原生赋能数字化创新
相关文章