Flink命令行提交参数
1 参数必选 :
-n,--container <arg> 分配多少个yarn容器 (=taskmanager的数量)
2 参数可选 :
-D <arg> 动态属性
-d,--detached 独立运行
-jm,--jobManagerMemory <arg> JobManager的内存 [in MB]
-nm,--name 在YARN上为一个自定义的应用设置一个名字
-q,--query 显示yarn中可用的资源 (内存, cpu核数)
-qu,--queue <arg> 指定YARN队列.
-s,--slots <arg> 每个TaskManager使用的slots数量
-tm,--taskManagerMemory <arg> 每个TaskManager的内存 [in MB]
-z,--zookeeperNamespace <arg> 针对HA模式在zookeeper上创建NameSpace
-id,--applicationId <yarnAppId> YARN集群上的任务id,附着到一个后台运行的yarn session中
3 run [OPTIONS] <jar-file> <arguments>
run操作参数:
-c,--class <classname> 如果没有在jar包中指定入口类,则需要在这里通过这个参数指定
-m,--jobmanager <host:port> 指定需要连接的jobmanager(主节点)地址,使用这个参数可以指定一个不同于配置文件中的jobmanager
-p,--parallelism <parallelism> 指定程序的并行度。可以覆盖配置文件中的默认值。
4 启动一个新的yarn-session,它们都有一个y或者yarn的前缀
例如:./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar
连接指定host和port的jobmanager:
./bin/flink run -m SparkMaster:1234 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1
启动一个新的yarn-session:
./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1
5 注意:命令行的选项也可以使用./bin/flink 工具获得。
6 Action "run" compiles and runs a program.
Syntax: run [OPTIONS] <jar-file> <arguments>
"run" action options:
-c,--class <classname> Class with the program entry point
("main" method or "getPlan()" method.
Only needed if the JAR file does not
specify the class in its manifest.
-C,--classpath <url> Adds a URL to each user code
classloader on all nodes in the
cluster. The paths must specify a
protocol (e.g. file://) and be
accessible on all nodes (e.g. by means
of a NFS share). You can use this
option multiple times for specifying
more than one URL. The protocol must
be supported by the {@link
java.net.URLClassLoader}.
-d,--detached If present, runs the job in detached
mode
-n,--allowNonRestoredState Allow to skip savepoint state that
cannot be restored. You need to allow
this if you removed an operator from
your program that was part of the
program when the savepoint was
triggered.
-p,--parallelism <parallelism> The parallelism with which to run the
program. Optional flag to override the
default value specified in the
configuration.
-q,--sysoutLogging If present, suppress logging output to
standard out.
-s,--fromSavepoint <savepointPath> Path to a savepoint to restore the job
from (for example
hdfs:///flink/savepoint-1537).
7 Options for yarn-cluster mode:
-d,--detached If present, runs the job in detached
mode
-m,--jobmanager <arg> Address of the JobManager (master) to
which to connect. Use this flag to
connect to a different JobManager than
the one specified in the
configuration.
-yD <property=value> use value for given property
-yd,--yarndetached If present, runs the job in detached
mode (deprecated; use non-YARN
specific option instead)
-yh,--yarnhelp Help for the Yarn session CLI.
-yid,--yarnapplicationId <arg> Attach to running YARN session
-yj,--yarnjar <arg> Path to Flink jar file
-yjm,--yarnjobManagerMemory <arg> Memory for JobManager Container with
optional unit (default: MB)
-yn,--yarncontainer <arg> Number of YARN container to allocate
(=Number of Task Managers)
-ynl,--yarnnodeLabel <arg> Specify YARN node label for the YARN
application
-ynm,--yarnname <arg> Set a custom name for the application
on YARN
-yq,--yarnquery Display available YARN resources
(memory, cores)
-yqu,--yarnqueue <arg> Specify YARN queue.
-ys,--yarnslots <arg> Number of slots per TaskManager
-yst,--yarnstreaming Start Flink in streaming mode
-yt,--yarnship <arg> Ship files in the specified directory
(t for transfer)
-ytm,--yarntaskManagerMemory <arg> Memory per TaskManager Container with
optional unit (default: MB)
-yz,--yarnzookeeperNamespace <arg> Namespace to create the Zookeeper
sub-paths for high availability mode
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper
sub-paths for high availability mode
flink run参数
flink run命令执行模板:flink run [option]
-c,–class : 需要指定的main方法的类
-C,–classpath : 向每个用户代码添加url,他是通过UrlClassLoader加载。url需要指定文件的schema如(file://)
-d,–detached : 在后台运行
-p,–parallelism : job需要指定env的并行度,这个一般都需要设置。
-q,–sysoutLogging : 禁止logging输出作为标准输出。
-s,–fromSavepoint : 基于savepoint保存下来的路径,进行恢复。
-sae,–shutdownOnAttachedExit : 如果是前台的方式提交,当客户端中断,集群执行的job任务也会shutdown。
flink run -m yarn-cluster参数
-m,–jobmanager : yarn-cluster集群
-yd,–yarndetached : 后台
-yjm,–yarnjobManager : jobmanager的内存
-ytm,–yarntaskManager : taskmanager的内存
-yn,–yarncontainer : TaskManager的个数
-yid,–yarnapplicationId : job依附的applicationId
-ynm,–yarnname : application的名称
-ys,–yarnslots : 分配的slots个数
例:flink run -m yarn-cluster -yd -yjm 1024m -ytm 1024m -ynm -ys 1
flink-list
flink list:列出flink的job列表。
flink list -r/–runing :列出正在运行的job
flink list -s/–scheduled :列出已调度完成的job
flink cancel
flink cancel [options] <job_id> : 取消正在运行的job id
flink cancel -s/–withSavepoint <job_id> : 取消正在运行的job,并保存到相应的保存点
通过 -m 来指定要停止的 JobManager 的主机地址和端口
例: bin/flink cancel -m 127.0.0.1:8081 5e20cb6b0f357591171dfcca2eea09de
flink stop :仅仅针对Streaming job
flink stop [options] <job_id>
flink stop <job_id>:停止对应的job
通过 -m 来指定要停止的 JobManager 的主机地址和端口
例: bin/flink stop -m 127.0.0.1:8081 d67420e52bd051fae2fddbaa79e046bb
取消和停止(流作业)的区别如下:
cancel() 调用,立即调用作业算子的 cancel() 方法,以尽快取消它们。如果算子在接到 cancel() 调用后没有停止,Flink 将开始定期中断算子线程的执行,直到所有算子停止为止。
stop() 调用,是更优雅的停止正在运行流作业的方式。stop() 仅适用于 Source 实现了 StoppableFunction 接口的作业。当用户请求停止作业时,作业的所有 Source 都将接收 stop() 方法调用。直到所有 Source 正常关闭时,作业才会正常结束。这种方式,使作业正常处理完所有作业。
flink modify 修改任务并行度
flink modify <job_id> [options]
flink modify <job_id> -p /–parallelism p : 修改job的并行度
例: flink modify -p 并行数 <job_pid>
flink savepoint
flink savepoint [options] <job_id>
eg: # 触发保存点
flink savepoint <job_id> hdfs://xxxx/xx/x : 将flink的快照保存到hdfs目录
使用yarn触发保存点
flink savepoint <job_id> <target_directory> -yid <application_id>
使用savepoint取消作业
flink cancel -s <tar_directory> <job_id>
从保存点恢复
flink run -s <target_directoey> [:runArgs]
如果复原的程序,对逻辑做了修改,比如删除了算子可以指定allowNonRestoredState参数复原。
flink run -s <target_directory> -n/–allowNonRestoredState [:runArgs]
savepoint 与 checkpoint 的区别
checkpoint是增量做的,每次的时间短,数据量小,只要在程序里面启用后会自动触发,用户无需感知;savepoint是全量做的,时间长,数据量大,需要用户主动触发。
checkpoint 是作业failover 的时候自动使用,不需要用户指定,savepoint 一般用于程序版本更新、bug修复、A/B Test 等场景,需要用户指定。