Apache Flink On Yarn模式高可用(HA)集群部署

网友投稿 1532 2022-05-30

本文介绍如何部署Apache Flink On YARN(也就是如何在YARN上运行Flink作业),采用HDP 2.6.3以及Apache Flink 1.7.1。

Yarn在Hadoop的生态系统中担任了资源管理和任务调度的角色,可以更好对集群资源进行调度和控制。

此处不对HDP安装做讲述,需要安装HDP的可以通过HDP官网安装指南进行安装。

官方文档QuickStart中包含两种Flink启动方式:

启动一个YARN session(Start a long-running Flink cluster on YARN)

本文介绍如何部署Apache Flink On YARN(也就是如何在YARN上运行Flink作业),采用HDP 2.6.3以及Apache Flink 1.7.1。

Yarn在Hadoop的生态系统中担任了资源管理和任务调度的角色,可以更好对集群资源进行调度和控制。

此处不对HDP安装做讲述,需要安装HDP的可以通过HDP官网安装指南进行安装。

官方文档QuickStart中包含两种Flink启动方式:

启动一个YARN session(Start a long-running Flink cluster on YARN)

直接在YARN上提交运行Flink作业(Run a Flink job on YARN)。

在讲解运行方式之前,我们先来讲解Flink基于HDP之上的On Yarn安装。

安装

从Apache Flink官网下载地址(http://flink.apache.org/downloads.html)下载对应版本的安装包并解压

curl -O 

tar xvzf flink-1.8-SNAPSHOT-bin-hadoop2.tgz

Flink On Yarn模式需要用户配置与Hadoop集群,设置HADOOP_CONF_DIR以及HADOOP_CLASSPATH。

将如下代码添加到~/.bash_profile配置文件中

$ vi ~/.bash_profile

export HADOOP_CONF_DIR="/etc/hadoop/conf"

export HADOOP_CLASSPATH="/usr/hdp/current/hadoop-client/*:/usr/hdp/current/hadoop-client/lib/*:/usr/hdp/current/hadoop-hdfs-client/*:/usr/hdp/current/hadoop-hdfs-client/lib/*:/usr/hdp/current/hadoop-yarn-client/*:/usr/hdp/current/h配置yarn启动前环境变量

oop-yarn-client/lib/*"

source .bash_profile文件引入环境变量并检查变量是否设置正确

source ~/.bash_profile

echo $HADOOP_CONFIG_DIR

echo $HADOOP_CLASSPATH

配置

由于HDP是运行Hadoop任务以及访问HDFS都是使用hdfs用户,我们需要在yarn启动前指定HADOOP_USER_NAME变量,flink才不会因为权限问题而无法启动。

$ vi /usr/local/flink-1.3.3/bin/yarn-session.sh

#!/usr/bin/env bash

...

bin=`dirname "

bin=`dirname "$0"`

"`

bin=`cd "$bin"; pwd`

# get Flink config

. "$bin"/config.sh

if [ "$FLINK_IDENT_STRING" = "" ]; then

FLINK_IDENT_STRING="$USER"

fi

export HADOOP_USER_NAME=hdfs

JVM_ARGS="$JVM_ARGS -Xmx512m"

CC_CLASSPATH=`manglePathList $(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS`

log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-yarn-session-$HOSTNAME.log

log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-yarn-session.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback-yarn.xml"

export FLINK_CONF_DIR

$JAVA_RUN $JVM_ARGS -classpath "$CC_CLASSPATH" $log_setting org.apache.flink.yarn.cli.FlinkYarnSessionCli -j "$FLINK_LIB_DIR"/flink-dist*.jar "$@"

注意:HADOOP_USER_NAME参数必须在JAVA_RUN之前配置,否则程序运行之后无法读取到该环境变量

要启动HA群集,需要在conf/flink-conf.yaml添加以下配置:

高可用性模式(必需):必须在conf/flink-conf.yaml中将高可用模式设置为zookeeper才能启用高可用模式。 或者,此选项可以设置为Flink应该用于创建HighAvailabilityServices实例的工厂类的FQN。

high-availability: zookeeper

ZooKeeper quorum(必需):ZooKeeper quorum是ZooKeeper服务器的复制组,它提供分布式协调服务。

high-availability.zookeeper.quorum: address1:2181[,...],addressX:2181

每个addressX:port指的是一个ZooKeeper服务器,Flink可以在给定的地址和端口访问它。

Zookeeper root(推荐):

ZooKeeper root节点,在该节点下放置所有集群节点。

high-availability.zookeeper.path.root: /flink

Zookeeper Cluster-id(推荐):

cluster-id ZooKeeper节点,在该节点下放置集群的所有必需的协调数据。

high-availability.cluster-id: /default_ns # important: customize per cluster

存储目录(必需):JobManager元数据保存在文件系统storageDir中,只有一个指向该状态的指针存储在ZooKeeper中。

high-availability.storageDir: hdfs:///flink/recovery

storageDir存储JobManager故障恢复所需的所有元数据。

配置主服务器和ZooKeeper quorum后,您可以像往常一样使用提供的集群启动脚本。他们将启动HA群集。请记住,调用脚本时必须运行ZooKeeper quorum,并确保为要启动的每个HA群集配置单独的ZooKeeper根路径。

除HA配置外,还需要配置最大尝试次数conf/flink-conf.yaml:

yarn.application-attempts: 10

这意味着在Yarn应用程序失败之前,应用程序可以重新启动9次(9次重试+ 1次初始尝试)。

由于我们是基于HDP创建的Hadoop集群,已有现成的zookeeper集群,所以这里我们使用现有的zookeeper进行HA配置,配置如下:

high-availability: zookeeper

high-availability.zookeeper.quorum: flink-dc-01:2181,flink-dc-02:2181,flink-dc-03:2181

high-availability.zookeeper.path.root: /flink

high-availability.storageDir: hdfs://ns1/flink/recovery

yarn.application-attempts: 10

配置Application最大的尝试次数

yarn.resourcemanager.am.max-attempts

4

The maximum number of application master execution attempts.

当前YARN版本的默认值为2(表示允许单个JobManager失败)。

hdp平台需要去掉uber shaded hadoop的包,同时添加mapreduce的包到yarn应用classpath,否则会出现如下问题:

Exception in thread "main" java.lang.IllegalAccessError: tried to access method org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider.getProxyInternal()Ljava/lang/Object; from class org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider

rm -f /root/flink-1.7.1/lib/flink-shaded-hadoop2-uber-1.7.1.jar

进入ambari界面,service->yarn->config->advanced->Advanced yarn-site->yarn.application.classpath添加

/usr/hdp/current/hadoop-mapreduce-client/*,/usr/hdp/current/hadoop-mapreduce-client/lib/*

修改后,需要重启yarn相关组件,ambari界面会有指示如何重启,一键搞定.

Flink默认包含两种配置方式:log4j以及logback

不配置的情况下运行flink集群或者运行flink job会提示建议移除其中一种。

org.apache.flink.yarn.AbstractYarnClusterDescriptor           - The configuration directory ('/root/flink-1.7.1/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them.

直接移除或者重命名都可行。

例如:

$ mv logback.xml logback.xml_bak

示例配置:

vi /usr/local/flink-1.3.3/conf/log4j.properties

log4j.appender.file.append=true

log4j.appender.file.MaxFileSize=100M  #最大文件大小

log4j.appender.file.MaxBackupIndex=10  # 最大备份索引大小

启动Flink

本节主要介绍Flink的两种启动方式。

启动一个长期运行的flink集群通过yarn-session.sh执行部署。

$ ./bin/yarn-session.sh

Usage:

Required

-n,--container    Number of YARN container to allocate (=Number of Task Managers)

Optional

-D              use value for given property

-d,--detached                   If present, runs the job in detached mode

-h,--help                       Help for the Yarn session CLI.

-id,--applicationId        Attach to running YARN session

-j,--jar                   Path to Flink jar file

-jm,--jobManagerMemory     Memory for JobManager Container with optional unit (default: MB)

-m,--jobmanager            Address of the JobManager (master) to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration.

-n,--container             Number of YARN container to allocate (=Number of Task Managers)

-nl,--nodeLabel            Specify YARN node label for the YARN application

-nm,--name                 Set a custom name for the application on YARN

-q,--query                      Display available YARN resources (memory, cores)

-qu,--queue                Specify YARN queue.

-s,--slots                 Number of slots per TaskManager

-sae,--shutdownOnAttachedExit   If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated abruptly, e.g., in response to a user interrupt, such

as typing Ctrl + C.

-st,--streaming                 Start Flink in streaming mode

-t,--ship                  Ship files in the specified directory (t for transfer)

-tm,--taskManagerMemory    Memory per TaskManager Container with optional unit (default: MB)

-yd,--yarndetached              If present, runs the job in detached mode (deprecated; use non-YARN specific option instead)

-z,--zookeeperNamespace    Namespace to create the Zookeeper sub-paths for high availability mode

主要参数讲解:

1、-n 指定TaskManager数量

2、-jm 指定JobManager使用内存

3、-m 指定JobManager地址

4、-tm 指定TaskManager使用内存

5、-D 指定动态参数

6、-d 客户端分离,指定后YarnSession部署到yarn之后,客户端会自行关闭。

7、-j 指定执行jar包

bin/yarn-session.sh -n 8 -s 5 -jm 2048 -tm 4096 -nm pinpoint-flink-job

实例说明:

8个TaskManager

每个TaskManager5个slot

每个TaskManager内存4g,

指定application名称为pinpoint-flink-job

注意:部署长期运行的flink on yarn实例后,在flink web上看到的TaskManager以及Slots都为0。只有在提交任务的时候,才会依据分配资源给对应的任务执行。

执行任务提交命令:

$ bin/flink run ./examples/batch/WordCount.jar --input hdfs://xdata2/tmp/LICENSE-2.0.txt --output hdfs://xdata2/tmp/wordcount_result.txt

指定输入文件:hdfs://xdata2/tmp/LICENSE-2.0.txt

指定输出文件:hdfs://xdata2/tmp/wordcount_result.txt

命令运行日志如下:

SLF4J: Class path contains multiple SLF4J bindings.

SLF4J: Found binding in [jar:file:/root/flink-1.7.1/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: Found binding in [jar:file:/root/flink-1.7.1/lib/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: Found binding in [jar:file:/usr/hdp/2.6.2.0-205/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.

SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

2019-01-24 16:05:26,059 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - Found Yarn properties file under /tmp/.yarn-properties-root.

2019-01-24 16:05:26,059 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - Found Yarn properties file under /tmp/.yarn-properties-root.

2019-01-24 16:05:26,358 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - YARN properties set default parallelism to 40

2019-01-24 16:05:26,358 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - YARN properties set default parallelism to 40

YARN properties set default parallelism to 40

2019-01-24 16:05:26,618 INFO  org.apache.hadoop.yarn.client.AHSProxy                        - Connecting to Application History server at vigor-dc-38/192.168.2.38:10200

2019-01-24 16:05:26,628 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar

2019-01-24 16:05:26,628 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar

Apache Flink On Yarn模式高可用(HA)集群部署

2019-01-24 16:05:26,638 INFO  org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider  - Looking for the active RM in [rm1, rm2]...

2019-01-24 16:05:26,773 INFO  org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider  - Found active RM [rm1]

2019-01-24 16:05:26,779 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Found application JobManager host name 'vigor-dc-41' and port '39925' from supplied application id 'application_1548213441093_0011'

2019-01-24 16:05:27,186 WARN  org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory       - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.

Starting execution of program

Program execution finished

Job with JobID 7ab3cf90748c8d05c7aa2e7cbce85730 has finished.

Job Runtime: 8979 ms

提交后可以在Flink web页面上看到提交的任务信息及执行情况。

使用hadoop命令查询执行结果信息

[root@vigor-dc-38 flink-1.7.1]# hadoop fs -cat /tmp/wordcount_result.txt

...

above 1

acceptance 1

accepting 3

act 1

acting 1

acts 1

add 2

addendum 1

additional 5

additions 1

advised 1

against 2

agree 1

agreed 3

agreement 1

all 3

...

若你想在Yarn上启动Flink用于单独任务执行,可以直接通过bin/flink run的方式来实现。

示例:

./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar

Yarn会话的命令行选项也可以用于./bin/flink。使用y或yarn(对于长参数选项)作为前缀。

命令执行后,yarn会为任务单独启动一个flink on yarn实例,用于运行flink任务,在flink web界面上可以看到该任务。

查看后段执行结果:

Printing result to stdout. Use --output to specify output path.

(a,5)

(action,1)

(after,1)

(against,1)

(all,2)

(and,12)

(arms,1)

(arrows,1)

...

总结

Flink on Yarn两种部署方式可以根据自身的需求自行选择。可选择单独一种,也可以两种结合使用。

重要任务建议单独运行一个实例,其他的任务可以使用长时间运行方式,将多个任务部署到上面,不用到时候资源也会得到释放。

Standalone模式在后续的文章补上。

参考链接

https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/hadoop.html

https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/deployment/yarn_setup.html

https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/jobmanager_high_availability.html

关注公众号

Apache Yarn

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:spring secrity ldap
下一篇:【知乎大V体验鲲鹏】如何快速实现鲲鹏弹性云服务器的Node.js部署和高可用性?
相关文章