首页
动态
时间轴
归档
友链
关于
Search
1
sqoop导入、导出数据
31,897 阅读
2
Flink集群搭建--StandAlone
30,753 阅读
3
Spark报错总结
30,604 阅读
4
Sqoop安装与配置
29,528 阅读
5
Sqoop常用命令及参数
28,809 阅读
日常
大数据
经验分享
技术总结
登录
Search
标签搜索
bigdata
Hive
Spark
hadoop
Flume
建站
Flink
linux
Mysql
sqoop
Yarn
摸鱼
羊毛
docker
VMware
图床
sql
function
清泓
累计撰写
39
篇文章
累计收到
---
条评论
首页
动态
后台
栏目
日常
大数据
经验分享
技术总结
页面
时间轴
归档
友链
关于
搜索到
20
篇
bigdata
的结果
2022-06-20
导入数据到hive的字段含有转义符问题
问题 从mysql中抽取数据到hive中发现数据错乱了,mysql中的两百条数据抽到hive中变成了九千多条。初步怀疑是建表时的分割符问题,更换行分隔符和列分割符都无法解决问题。后面查看mysql中数据,发现有个字段中包含大量换行符、制表符、回车等,问题找到了,开始解决解决 Hive数据处理中去除字段中的换行符、分割符需要使用regexp_replace()函数具体使用方式如下:regexp_replace(字段,需要替换的符号,替换符号)举例:mysql数据:test表中字段field_B中含有换行符等符号,需要在抽取时进行处理field_Afield_Bfield_Caaa\t\n\001\bxxxaaabbb\t\n\001\bxxxbbbccc\t\n\001\bxxxbbbhive建表语句:create table if not exists test( field_A string, field_B string, field_C string) partitioned by (date timestamp) row format delimited fields terminated by ',';抽取语句:insert overwrite table partition (date = current_date()) select field_A, regexp_replace(field_B,'\\n|\\t|\\r',''), field_C from test
2022年06月20日
2,326 阅读
0 评论
0 点赞
2022-06-14
Spark提交任务内存不足
出现的异常spark在yarn模式下运行任务报错异常信息: 1 、 ERROR cluster.YarnScheduler: Lost executor 2 、 ERROR client.TransportClient: Failed to send RPC 3 、 WARN storage.BlockManagerMaster: Failed to remove RDD 4 、 ERROR cluster.YarnScheduler: Lost executor 1 on 192.168.23.105: Slave lost出现此问题的原因是任务需要的内存过高,需要修改spark-env.sh调整默认的内存大小解决在spark-env.sh文件内添加如下配置[root@master ~]# vi /usr/local/src/spark/conf/spark-env.sh export SPARK_EXECUTOR_INSTANCES=4 # yarn集群中,最多能够同时启动的EXECUTOR的实例个数 export SPARK_EXECUTOR_CORES=4 # 每个EXECUTOR能够使用的CPU core的数量 export SPARK_EXECUTOR_MEMORY=4G # 每个EXECUTOR分配的内存的大小 export SPARK_DRIVER_MEMORY=2G # DRIVER分配的内存的大小
2022年06月14日
10,042 阅读
0 评论
0 点赞
2022-06-14
Spark在Yarn模式下提交未找到驱动
报错相关使用spark提交任务到yarn上,出现找不到mysql驱动问题22/06/14 16:48:38 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, slave1, executor 1): java.lang.ClassNotFoundException: com.mysql.jdbc.Driver at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:38) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:51) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:50) at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:286) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 22/06/14 16:48:38 INFO scheduler.TaskSetManager: Starting task 0.1 in stage 0.0 (TID 1, slave1, executor 1, partition 0, PROCESS_LOCAL, 5868 bytes) 22/06/14 16:48:38 INFO scheduler.TaskSetManager: Lost task 0.1 in stage 0.0 (TID 1) on slave1, executor 1: java.lang.ClassNotFoundException (com.mysql.jdbc.Driver) [duplicate 1] 22/06/14 16:48:38 INFO scheduler.TaskSetManager: Starting task 0.2 in stage 0.0 (TID 2, slave1, executor 1, partition 0, PROCESS_LOCAL, 5868 bytes) 22/06/14 16:48:38 INFO scheduler.TaskSetManager: Lost task 0.2 in stage 0.0 (TID 2) on slave1, executor 1: java.lang.ClassNotFoundException (com.mysql.jdbc.Driver) [duplicate 2] 22/06/14 16:48:38 INFO scheduler.TaskSetManager: Starting task 0.3 in stage 0.0 (TID 3, slave1, executor 1, partition 0, PROCESS_LOCAL, 5868 bytes) 22/06/14 16:48:38 INFO scheduler.TaskSetManager: Lost task 0.3 in stage 0.0 (TID 3) on slave1, executor 1: java.lang.ClassNotFoundException (com.mysql.jdbc.Driver) [duplicate 3] 22/06/14 16:48:38 ERROR scheduler.TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job 22/06/14 16:48:38 INFO cluster.YarnScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool 22/06/14 16:48:38 INFO cluster.YarnScheduler: Cancelling stage 0 22/06/14 16:48:38 INFO scheduler.DAGScheduler: ResultStage 0 (sql at A1.scala:43) failed in 0.686 s due to Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, slave1, executor 1): java.lang.ClassNotFoundException: com.mysql.jdbc.Driver at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:38) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:51) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:50) at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:286) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Driver stacktrace: 22/06/14 16:48:38 INFO scheduler.DAGScheduler: Job 0 failed: sql at A1.scala:43, took 0.869800 s Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, slave1, executor 1): java.lang.ClassNotFoundException: com.mysql.jdbc.Driver at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:38) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:51) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:50) at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:286) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1958) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.saveAsHiveFile(InsertIntoHiveTable.scala:210) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult$lzycompute(InsertIntoHiveTable.scala:310) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult(InsertIntoHiveTable.scala:221) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.doExecute(InsertIntoHiveTable.scala:407) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92) at org.apache.spark.sql.Dataset.<init>(Dataset.scala:185) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:592) at com.hive.task_1.A1$.main(A1.scala:43) at com.hive.task_1.A1.main(A1.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:743) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: com.mysql.jdbc.Driver at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:38) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:51) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:50) at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:286) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)问题解决我提交任务指令如下:spark-submit --master yarn \ --deploy-mode client \ --class com.hive.task_1.A1 \ /opt/Spark-01-1.0.SNAPSHOT.jar我已经将mysql驱动复制到spark的conf目录下,但直接报错Exception in thread "main" java.lang.ClassNotFoundException: com.mysql.jdbc.Driver但我使用--driver-class-path /opt/mysql-connector-java-5.1.38.jar指定驱动后还是出现找不到驱动的问题查找资料找到的解决方案是在spark安装目录下的conf/spark-env.sh配置SPARK_CLASSPATH来设置driver的环境变量,如下export SPARK_CLASSPATH=$SPARK_CLASSPATH:/lib/com/mysql-connector-java-5.1.35.jar注意:不能同时在conf/spark-env.sh里面配置SPARK_CLASSPATH和提交作业加上--driver-class-path参数,否则会出现异常但还是无法解决问题,经过尝试后发现只需要在提交参数后添加--driver-class-path和--jars指定驱动即可spark-submit --driver-class-path /opt/mysql-connector-java-5.1.38.jar \ --jars /opt/mysql-connector-java-5.1.38.jar \ --master yarn \ --deploy-mode client \ --class com.hive.task_1.A1 \ /opt/Spark-01-1.0.SNAPSHOT.jar
2022年06月14日
10,496 阅读
0 评论
0 点赞
2022-06-07
HadoopHA模式下Hive启动报错
报错hadoop配置HA后启动hive报错Exception in thread "main" java.lang.IllegalArgumentException: java.net.UnknownHostException: hadoopcluster[root@master hive]# hive which: no hbase in (/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/root/bin:/usr/local/src/jdk/bin:/usr/local/src/hadoop/bin:/usr/local/src/hadoop/sbin:/usr/local/src/zookeeper/bin:/usr/local/src/hive/bin:/usr/local/src/scala/bin:/usr/local/src/spark/bin:/usr/local/src/sqoop/bin:/usr/local/src/kafka/bin:/usr/local/src/flume/bin:/usr/local/src/flink/bin) SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/usr/local/src/hive/lib/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/usr/local/src/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] Logging initialized using configuration in jar:file:/usr/local/src/hive/lib/hive-common-2.3.4.jar!/hive-log4j2.properties Async: true Exception in thread "main" java.lang.IllegalArgumentException: java.net.UnknownHostException: hadoopcluster at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:377) at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:320) at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176) at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:687) at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:628) at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:93) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2701) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2683) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:372) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:171) at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:579) at org.apache.hadoop.hive.ql.session.SessionState.beginStart(SessionState.java:549) at org.apache.hadoop.hive.cli.CliDriver.run(CliDriver.java:750) at org.apache.hadoop.hive.cli.CliDriver.main(CliDriver.java:686) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.util.RunJar.run(RunJar.java:226) at org.apache.hadoop.util.RunJar.main(RunJar.java:141) Caused by: java.net.UnknownHostException: hadoopcluster ... 22 more原因原因是hadoop配置文件hdfs-site.xml的nameservices的名称为hadoopcluster<!-- 指定hdfs的nameservices名称为hadoopcluster --> <property> <name>dfs.nameservices</name> <value>hadoopcluster</value> </property>而指定HDFS客户端连接active namenode的java类时漏写了client<property> <name>dfs.failover.proxy.provider.hadoopcluster</name> <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value> </property>修改为:<property> <name>dfs.client.failover.proxy.provider.hadoopcluster</name> <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value> </property>
2022年06月07日
2,802 阅读
0 评论
1 点赞
2022-06-07
Flink提交任务的三种方式
Flink提交任务的三种方式web页面提交 flink命令提交将jar包上传至集群,提交任务flink run -c com.flink.Flinktest flinktest.jar com.flink.Flinktest -- 主类名flinktest.jar -- jar包名rpc方式提交任务 --- 远程提交用的较少代码写完之后需要先打包,再运行Flink框架报错有一个特点:前几条报错原因都是废话,要从后面看package com.flink import org.apache.flink.streaming.api.scala._ object Flinktest { def main(args: Array[String]): Unit = { /** * 创建远程环境,远程提交flink任务 * */ val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createRemoteEnvironment( //主机名 "master", //端口号 8081, //指定jar包的路径 "C:\\Users\\HB\\Desktop\\market_analysis\\flinktest.jar" ) val linesDS: DataStream[String] = env.socketTextStream("master", 8888) //1、将数据展开 val wordsDS: DataStream[String] = linesDS.flatMap(line => line.split(",")) //2、转换成kv格式 val kvDS: DataStream[(String, Int)] = wordsDS.map((_, 1)) //3、按照单词进行分组 val keyByDS: KeyedStream[(String, Int), String] = kvDS.keyBy(kv => kv._1) //4、统计数量,对value进行求和, 指定下标进行聚合 val countDS: DataStream[(String, Int)] = keyByDS.sum(1) //打印结果 countDS.print() env.execute("rpc") } }
2022年06月07日
2,133 阅读
0 评论
0 点赞
2022-06-07
Flink提交参数详解
Flink命令行提交参数1 参数必选 : -n,--container <arg> 分配多少个yarn容器 (=taskmanager的数量) 2 参数可选 : -D <arg> 动态属性 -d,--detached 独立运行 -jm,--jobManagerMemory <arg> JobManager的内存 [in MB] -nm,--name 在YARN上为一个自定义的应用设置一个名字 -q,--query 显示yarn中可用的资源 (内存, cpu核数) -qu,--queue <arg> 指定YARN队列. -s,--slots <arg> 每个TaskManager使用的slots数量 -tm,--taskManagerMemory <arg> 每个TaskManager的内存 [in MB] -z,--zookeeperNamespace <arg> 针对HA模式在zookeeper上创建NameSpace -id,--applicationId <yarnAppId> YARN集群上的任务id,附着到一个后台运行的yarn session中 3 run [OPTIONS] <jar-file> <arguments> run操作参数: -c,--class <classname> 如果没有在jar包中指定入口类,则需要在这里通过这个参数指定 -m,--jobmanager <host:port> 指定需要连接的jobmanager(主节点)地址,使用这个参数可以指定一个不同于配置文件中的jobmanager -p,--parallelism <parallelism> 指定程序的并行度。可以覆盖配置文件中的默认值。 4 启动一个新的yarn-session,它们都有一个y或者yarn的前缀 例如:./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar 连接指定host和port的jobmanager: ./bin/flink run -m SparkMaster:1234 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1 启动一个新的yarn-session: ./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1 5 注意:命令行的选项也可以使用./bin/flink 工具获得。 6 Action "run" compiles and runs a program. Syntax: run [OPTIONS] <jar-file> <arguments> "run" action options: -c,--class <classname> Class with the program entry point ("main" method or "getPlan()" method. Only needed if the JAR file does not specify the class in its manifest. -C,--classpath <url> Adds a URL to each user code classloader on all nodes in the cluster. The paths must specify a protocol (e.g. file://) and be accessible on all nodes (e.g. by means of a NFS share). You can use this option multiple times for specifying more than one URL. The protocol must be supported by the {@link java.net.URLClassLoader}. -d,--detached If present, runs the job in detached mode -n,--allowNonRestoredState Allow to skip savepoint state that cannot be restored. You need to allow this if you removed an operator from your program that was part of the program when the savepoint was triggered. -p,--parallelism <parallelism> The parallelism with which to run the program. Optional flag to override the default value specified in the configuration. -q,--sysoutLogging If present, suppress logging output to standard out. -s,--fromSavepoint <savepointPath> Path to a savepoint to restore the job from (for example hdfs:///flink/savepoint-1537). 7 Options for yarn-cluster mode: -d,--detached If present, runs the job in detached mode -m,--jobmanager <arg> Address of the JobManager (master) to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration. -yD <property=value> use value for given property -yd,--yarndetached If present, runs the job in detached mode (deprecated; use non-YARN specific option instead) -yh,--yarnhelp Help for the Yarn session CLI. -yid,--yarnapplicationId <arg> Attach to running YARN session -yj,--yarnjar <arg> Path to Flink jar file -yjm,--yarnjobManagerMemory <arg> Memory for JobManager Container with optional unit (default: MB) -yn,--yarncontainer <arg> Number of YARN container to allocate (=Number of Task Managers) -ynl,--yarnnodeLabel <arg> Specify YARN node label for the YARN application -ynm,--yarnname <arg> Set a custom name for the application on YARN -yq,--yarnquery Display available YARN resources (memory, cores) -yqu,--yarnqueue <arg> Specify YARN queue. -ys,--yarnslots <arg> Number of slots per TaskManager -yst,--yarnstreaming Start Flink in streaming mode -yt,--yarnship <arg> Ship files in the specified directory (t for transfer) -ytm,--yarntaskManagerMemory <arg> Memory per TaskManager Container with optional unit (default: MB) -yz,--yarnzookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability modeflink run参数flink run命令执行模板:flink run [option]-c,–class : 需要指定的main方法的类-C,–classpath : 向每个用户代码添加url,他是通过UrlClassLoader加载。url需要指定文件的schema如(file://)-d,–detached : 在后台运行-p,–parallelism : job需要指定env的并行度,这个一般都需要设置。-q,–sysoutLogging : 禁止logging输出作为标准输出。-s,–fromSavepoint : 基于savepoint保存下来的路径,进行恢复。-sae,–shutdownOnAttachedExit : 如果是前台的方式提交,当客户端中断,集群执行的job任务也会shutdown。flink run -m yarn-cluster参数-m,–jobmanager : yarn-cluster集群-yd,–yarndetached : 后台-yjm,–yarnjobManager : jobmanager的内存-ytm,–yarntaskManager : taskmanager的内存-yn,–yarncontainer : TaskManager的个数-yid,–yarnapplicationId : job依附的applicationId-ynm,–yarnname : application的名称-ys,–yarnslots : 分配的slots个数例:flink run -m yarn-cluster -yd -yjm 1024m -ytm 1024m -ynm -ys 1flink-listflink list:列出flink的job列表。flink list -r/–runing :列出正在运行的jobflink list -s/–scheduled :列出已调度完成的jobflink cancelflink cancel [options] <job_id> : 取消正在运行的job idflink cancel -s/–withSavepoint <job_id> : 取消正在运行的job,并保存到相应的保存点通过 -m 来指定要停止的 JobManager 的主机地址和端口例: bin/flink cancel -m 127.0.0.1:8081 5e20cb6b0f357591171dfcca2eea09deflink stop :仅仅针对Streaming jobflink stop [options] <job_id>flink stop <job_id>:停止对应的job通过 -m 来指定要停止的 JobManager 的主机地址和端口例: bin/flink stop -m 127.0.0.1:8081 d67420e52bd051fae2fddbaa79e046bb取消和停止(流作业)的区别如下: cancel() 调用,立即调用作业算子的 cancel() 方法,以尽快取消它们。如果算子在接到 cancel() 调用后没有停止,Flink 将开始定期中断算子线程的执行,直到所有算子停止为止。 stop() 调用,是更优雅的停止正在运行流作业的方式。stop() 仅适用于 Source 实现了 StoppableFunction 接口的作业。当用户请求停止作业时,作业的所有 Source 都将接收 stop() 方法调用。直到所有 Source 正常关闭时,作业才会正常结束。这种方式,使作业正常处理完所有作业。flink modify 修改任务并行度flink modify <job_id> [options]flink modify <job_id> -p /–parallelism p : 修改job的并行度例: flink modify -p 并行数 <job_pid>flink savepointflink savepoint [options] <job_id>eg: # 触发保存点flink savepoint <job_id> hdfs://xxxx/xx/x : 将flink的快照保存到hdfs目录使用yarn触发保存点flink savepoint <job_id> <target_directory> -yid <application_id>使用savepoint取消作业flink cancel -s <tar_directory> <job_id>从保存点恢复flink run -s <target_directoey> [:runArgs]如果复原的程序,对逻辑做了修改,比如删除了算子可以指定allowNonRestoredState参数复原。flink run -s <target_directory> -n/–allowNonRestoredState [:runArgs]savepoint 与 checkpoint 的区别 checkpoint是增量做的,每次的时间短,数据量小,只要在程序里面启用后会自动触发,用户无需感知;savepoint是全量做的,时间长,数据量大,需要用户主动触发。 checkpoint 是作业failover 的时候自动使用,不需要用户指定,savepoint 一般用于程序版本更新、bug修复、A/B Test 等场景,需要用户指定。
2022年06月07日
2,103 阅读
0 评论
0 点赞
1
2
...
4
首页
复制
搜索
前进
后退
重载网页