Flink提交参数详解

Flink提交参数详解

清泓
2022-06-07 / 0 评论 / 2,436 阅读 / 9358字 / 正在检测是否收录...

Flink命令行提交参数

1 参数必选 : 
     -n,--container <arg>   分配多少个yarn容器 (=taskmanager的数量)  
2 参数可选 : 
     -D <arg>                        动态属性  
     -d,--detached                   独立运行  
     -jm,--jobManagerMemory <arg>    JobManager的内存 [in MB]  
     -nm,--name                      在YARN上为一个自定义的应用设置一个名字  
     -q,--query                      显示yarn中可用的资源 (内存, cpu核数)  
     -qu,--queue <arg>               指定YARN队列.  
     -s,--slots <arg>                每个TaskManager使用的slots数量  
     -tm,--taskManagerMemory <arg>   每个TaskManager的内存 [in MB]  
     -z,--zookeeperNamespace <arg>   针对HA模式在zookeeper上创建NameSpace 
     -id,--applicationId <yarnAppId> YARN集群上的任务id,附着到一个后台运行的yarn session中
 
3 run [OPTIONS] <jar-file> <arguments>  
 
    run操作参数:  
    -c,--class <classname>  如果没有在jar包中指定入口类,则需要在这里通过这个参数指定  
    -m,--jobmanager <host:port>  指定需要连接的jobmanager(主节点)地址,使用这个参数可以指定一个不同于配置文件中的jobmanager  
    -p,--parallelism <parallelism>   指定程序的并行度。可以覆盖配置文件中的默认值。
 
4 启动一个新的yarn-session,它们都有一个y或者yarn的前缀
 
    例如:./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar 
    
    连接指定host和port的jobmanager:
    ./bin/flink run -m SparkMaster:1234 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1
 
    启动一个新的yarn-session:
    ./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1
 
5 注意:命令行的选项也可以使用./bin/flink 工具获得。
 
6 Action "run" compiles and runs a program.
    
      Syntax: run [OPTIONS] <jar-file> <arguments>
      "run" action options:
         -c,--class <classname>               Class with the program entry point
                                              ("main" method or "getPlan()" method.
                                              Only needed if the JAR file does not
                                              specify the class in its manifest.
         -C,--classpath <url>                 Adds a URL to each user code
                                              classloader  on all nodes in the
                                              cluster. The paths must specify a
                                              protocol (e.g. file://) and be
                                              accessible on all nodes (e.g. by means
                                              of a NFS share). You can use this
                                              option multiple times for specifying
                                              more than one URL. The protocol must
                                              be supported by the {@link
                                              java.net.URLClassLoader}.
         -d,--detached                        If present, runs the job in detached
                                              mode
         -n,--allowNonRestoredState           Allow to skip savepoint state that
                                              cannot be restored. You need to allow
                                              this if you removed an operator from
                                              your program that was part of the
                                              program when the savepoint was
                                              triggered.
         -p,--parallelism <parallelism>       The parallelism with which to run the
                                              program. Optional flag to override the
                                              default value specified in the
                                              configuration.
         -q,--sysoutLogging                   If present, suppress logging output to
                                              standard out.
         -s,--fromSavepoint <savepointPath>   Path to a savepoint to restore the job
                                              from (for example
                                              hdfs:///flink/savepoint-1537).
 
7  Options for yarn-cluster mode:
         -d,--detached                        If present, runs the job in detached
                                              mode
         -m,--jobmanager <arg>                Address of the JobManager (master) to
                                              which to connect. Use this flag to
                                              connect to a different JobManager than
                                              the one specified in the
                                              configuration.
         -yD <property=value>                 use value for given property
         -yd,--yarndetached                   If present, runs the job in detached
                                              mode (deprecated; use non-YARN
                                              specific option instead)
         -yh,--yarnhelp                       Help for the Yarn session CLI.
         -yid,--yarnapplicationId <arg>       Attach to running YARN session
         -yj,--yarnjar <arg>                  Path to Flink jar file
         -yjm,--yarnjobManagerMemory <arg>    Memory for JobManager Container with
                                              optional unit (default: MB)
         -yn,--yarncontainer <arg>            Number of YARN container to allocate
                                              (=Number of Task Managers)
         -ynl,--yarnnodeLabel <arg>           Specify YARN node label for the YARN
                                              application
         -ynm,--yarnname <arg>                Set a custom name for the application
                                              on YARN
         -yq,--yarnquery                      Display available YARN resources
                                              (memory, cores)
         -yqu,--yarnqueue <arg>               Specify YARN queue.
         -ys,--yarnslots <arg>                Number of slots per TaskManager
         -yst,--yarnstreaming                 Start Flink in streaming mode
         -yt,--yarnship <arg>                 Ship files in the specified directory
                                              (t for transfer)
         -ytm,--yarntaskManagerMemory <arg>   Memory per TaskManager Container with
                                              optional unit (default: MB)
         -yz,--yarnzookeeperNamespace <arg>   Namespace to create the Zookeeper
                                              sub-paths for high availability mode
         -z,--zookeeperNamespace <arg>        Namespace to create the Zookeeper
                                              sub-paths for high availability mode

flink run参数

flink run命令执行模板:flink run [option]

-c,–class : 需要指定的main方法的类

-C,–classpath : 向每个用户代码添加url,他是通过UrlClassLoader加载。url需要指定文件的schema如(file://)

-d,–detached : 在后台运行

-p,–parallelism : job需要指定env的并行度,这个一般都需要设置。

-q,–sysoutLogging : 禁止logging输出作为标准输出。

-s,–fromSavepoint : 基于savepoint保存下来的路径,进行恢复。

-sae,–shutdownOnAttachedExit : 如果是前台的方式提交,当客户端中断,集群执行的job任务也会shutdown。

flink run -m yarn-cluster参数

-m,–jobmanager : yarn-cluster集群

-yd,–yarndetached : 后台

-yjm,–yarnjobManager : jobmanager的内存

-ytm,–yarntaskManager : taskmanager的内存

-yn,–yarncontainer : TaskManager的个数

-yid,–yarnapplicationId : job依附的applicationId

-ynm,–yarnname : application的名称

-ys,–yarnslots : 分配的slots个数

例:flink run -m yarn-cluster -yd -yjm 1024m -ytm 1024m -ynm -ys 1

flink-list

flink list:列出flink的job列表。

flink list -r/–runing :列出正在运行的job

flink list -s/–scheduled :列出已调度完成的job

flink cancel

flink cancel [options] <job_id> : 取消正在运行的job id

flink cancel -s/–withSavepoint <job_id> : 取消正在运行的job,并保存到相应的保存点

通过 -m 来指定要停止的 JobManager 的主机地址和端口

例: bin/flink cancel -m 127.0.0.1:8081 5e20cb6b0f357591171dfcca2eea09de

flink stop :仅仅针对Streaming job

flink stop [options] <job_id>

flink stop <job_id>:停止对应的job

通过 -m 来指定要停止的 JobManager 的主机地址和端口

例: bin/flink stop -m 127.0.0.1:8081 d67420e52bd051fae2fddbaa79e046bb

取消和停止(流作业)的区别如下:

  cancel() 调用,立即调用作业算子的 cancel() 方法,以尽快取消它们。如果算子在接到 cancel() 调用后没有停止,Flink 将开始定期中断算子线程的执行,直到所有算子停止为止。

  stop() 调用,是更优雅的停止正在运行流作业的方式。stop() 仅适用于 Source 实现了 StoppableFunction 接口的作业。当用户请求停止作业时,作业的所有 Source 都将接收 stop() 方法调用。直到所有 Source 正常关闭时,作业才会正常结束。这种方式,使作业正常处理完所有作业。

flink modify 修改任务并行度

flink modify <job_id> [options]

flink modify <job_id> -p /–parallelism p : 修改job的并行度

例: flink modify -p 并行数 <job_pid>

flink savepoint

flink savepoint [options] <job_id>

eg: # 触发保存点

flink savepoint <job_id> hdfs://xxxx/xx/x : 将flink的快照保存到hdfs目录

使用yarn触发保存点
flink savepoint <job_id> <target_directory> -yid <application_id>

使用savepoint取消作业
flink cancel -s <tar_directory> <job_id>

从保存点恢复
flink run -s <target_directoey> [:runArgs]

如果复原的程序,对逻辑做了修改,比如删除了算子可以指定allowNonRestoredState参数复原。
flink run -s <target_directory> -n/–allowNonRestoredState [:runArgs]

savepoint 与 checkpoint 的区别

  checkpoint是增量做的,每次的时间短,数据量小,只要在程序里面启用后会自动触发,用户无需感知;savepoint是全量做的,时间长,数据量大,需要用户主动触发。

  checkpoint 是作业failover 的时候自动使用,不需要用户指定,savepoint 一般用于程序版本更新、bug修复、A/B Test 等场景,需要用户指定。

0

打赏

评论

博主关闭了当前页面的评论