准备
Flink ON YARN模式就是使用客户端的方式,直接向Hadoop集群提交任务即可。不需要单独启动Flink进程。
注意:
1。Flink ON YARN 模式依赖Hadoop 2.4.1及以上版本
2.Flink ON YARN支持两种使用方式
yarn-session会话模式:
使用Flink中的yarn-session(yarn客户端),会启动两个必要服务 JobManager 和 TaskManagers
客户端通过yarn-session提交作业
yarn-session会一直启动,不停地接收客户端提交的作用
有大量的小作业,适合使用这种方式
yarn-per-job模式
直接提交任务给YARN,大作业,适合使用这种方式
安装包下载:http://archive.apache.org/dist/flink/flink-1.10.2/
Flink on Yarn集群搭建
yarn-session模式搭建
解压
[root@master ~]# tar -zxvf /data/flink-1.10.2 -C /usr/local/src
[root@master ~]# mv /usr/local/src/flink-1.10.2 /usr/local/src/flink
配置环境变量
[root@master ~]# vi /etc/profile
export FLINK_HOME=/usr/local/src/flink
export PATH=$PATH:$FLINK_HOME/bin
[root@master ~]# source /etc/profile
flink on yarn搭建,资源的调度由yarn完成,所以不需要修改配置文件
HADOOP_CLASSPATH
执行yarn-session.sh脚本需要先设置HADOOP_CLASSPATH这个环境变量,否则,执行yarn-session.sh报错,提示找不到hadoop的一些依赖
方式一(推荐):在/etc/profile中配置HADOOP_CLASSPATH
[root@master ~]# vi /etc/profile
export HADOOP_CLASSPATH=`${HADOOP_HOME}/bin/hadoop classpath`
方式二:命令行导入HADOOP_CLASSPATH
[root@master ~]# export HADOOP_CLASSPATH=`hadoop classpath`
缺点:只在当前命令行本次运行有效
方式三:修改yarn-session.sh,添加HADOOP_CLASSPATH
启动集群
接下来,使用yarn-session.sh在YARN中创建一个长时间运行的Flink集群
[root@master flink]# bin/yarn-session.sh -jm 1024m -tm 1024m -d
这个表示创建一个Flink集群,-jm是指定主节点的内存,-tm是指定从节点的内存,-d是表示把这个进程放到后台去执行。
启动之后,会看到类似这样的日志信息,这里面会显示flink web界面的地址,以及这个flink集群在yarn中对应的applicationid
此时在YARN的web界面中可以看到这个flink集群。
可以使用屏幕中显示的flink的web地址或者yarn中这个链接都是可以进入这个flink的web界面
提交作业(job)
接下来向这个Flink集群中提交任务,此时使用Flink中的内置案例。
[root@master flink]# bin/flink run ./examples/batch/WordCount.jar
注意:这个时候我们使用flink run的时候,它会默认找这个文件,然后根据这个文件找到刚才我们创建的那个永久的Flink集群,这个文件里面保存的就是刚才启动的那个Flink集群在YARN中对应的applicationid。
[root@master flink]# more /tmp/.yarn-properties-root
#Generated YARN properties file
#Tue Jun 07 01:39:29 CST 2022
dynamicPropertiesString=
applicationID=application_1654423542500_0004
任务提交上去执行完成之后,再来看flink的web界面,发现这里面有一个已经执行结束的任务了。
注意:这个任务在执行的时候,会动态申请一些资源执行任务,任务执行完毕之后,对应的资源会自动释放掉。
最后把这个Flink集群停掉,使用yarn的kill命令。
[root@master flink]# yarn application -kill application_1654423542500_0004
Killing application application_1654423542500_0004
22/06/07 01:46:43 INFO impl.YarnClientImpl: Killed application application_1654423542500_0004
针对yarn-session命令,它后面还支持一些其它参数,可以在后面传一个-help参数
[root@master flink]# bin/yarn-session.sh -help
Usage:
Optional
-at,--applicationType <arg> Set a custom application type for the application on YARN
-D <property=value> use value for given property
-d,--detached If present, runs the job in detached mode
-h,--help Help for the Yarn session CLI.
-id,--applicationId <arg> Attach to running YARN session
-j,--jar <arg> Path to Flink jar file
-jm,--jobManagerMemory <arg> Memory for JobManager Container with optional unit (default: MB)
-m,--jobmanager <arg> Address of the JobManager to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration.
-nl,--nodeLabel <arg> Specify YARN node label for the YARN application
-nm,--name <arg> Set a custom name for the application on YARN
-q,--query Display available YARN resources (memory, cores)
-qu,--queue <arg> Specify YARN queue.
-s,--slots <arg> Number of slots per TaskManager
-t,--ship <arg> Ship files in the specified directory (t for transfer)
-tm,--taskManagerMemory <arg> Memory per TaskManager Container with optional unit (default: MB)
-yd,--yarndetached If present, runs the job in detached mode (deprecated; use non-YARN specific option instead)
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode
常见命令的中文注释
yarn-per-job模式搭建
flink run -m yarn-cluster (创建Flink集群+提交任务)
使用flink run直接创建一个临时的Flink集群,并且提交任务
此时这里面的参数前面加上了一个y参数
[root@master flink]# bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 ./examples/batch/WordCount.jar
针对Flink命令的一些用法汇总