首页
动态
时间轴
归档
友链
关于
Search
1
Spark在Yarn模式下提交未找到驱动
10,718 阅读
2
Spark提交任务内存不足
10,296 阅读
3
Flink集群搭建--Yarn模式
9,806 阅读
4
Spark SQL函数总结
8,524 阅读
5
Flume应用--日志采集
7,207 阅读
日常
大数据
经验分享
技术总结
登录
Search
标签搜索
bigdata
Hive
Spark
hadoop
Flume
建站
Flink
linux
Mysql
sqoop
Yarn
摸鱼
羊毛
docker
VMware
图床
sql
function
清泓
累计撰写
39
篇文章
累计收到
---
条评论
首页
动态
后台
栏目
日常
大数据
经验分享
技术总结
页面
时间轴
归档
友链
关于
搜索到
20
篇
bigdata
的结果
2022-06-20
导入数据到hive的字段含有转义符问题
问题 从mysql中抽取数据到hive中发现数据错乱了,mysql中的两百条数据抽到hive中变成了九千多条。初步怀疑是建表时的分割符问题,更换行分隔符和列分割符都无法解决问题。后面查看mysql中数据,发现有个字段中包含大量换行符、制表符、回车等,问题找到了,开始解决解决 Hive数据处理中去除字段中的换行符、分割符需要使用regexp_replace()函数具体使用方式如下:regexp_replace(字段,需要替换的符号,替换符号)举例:mysql数据:test表中字段field_B中含有换行符等符号,需要在抽取时进行处理field_Afield_Bfield_Caaa\t\n\001\bxxxaaabbb\t\n\001\bxxxbbbccc\t\n\001\bxxxbbbhive建表语句:create table if not exists test( field_A string, field_B string, field_C string) partitioned by (date timestamp) row format delimited fields terminated by ',';抽取语句:insert overwrite table partition (date = current_date()) select field_A, regexp_replace(field_B,'\\n|\\t|\\r',''), field_C from test
2022年06月20日
2,642 阅读
0 评论
0 点赞
2022-06-14
Spark提交任务内存不足
出现的异常spark在yarn模式下运行任务报错异常信息: 1 、 ERROR cluster.YarnScheduler: Lost executor 2 、 ERROR client.TransportClient: Failed to send RPC 3 、 WARN storage.BlockManagerMaster: Failed to remove RDD 4 、 ERROR cluster.YarnScheduler: Lost executor 1 on 192.168.23.105: Slave lost出现此问题的原因是任务需要的内存过高,需要修改spark-env.sh调整默认的内存大小解决在spark-env.sh文件内添加如下配置[root@master ~]# vi /usr/local/src/spark/conf/spark-env.sh export SPARK_EXECUTOR_INSTANCES=4 # yarn集群中,最多能够同时启动的EXECUTOR的实例个数 export SPARK_EXECUTOR_CORES=4 # 每个EXECUTOR能够使用的CPU core的数量 export SPARK_EXECUTOR_MEMORY=4G # 每个EXECUTOR分配的内存的大小 export SPARK_DRIVER_MEMORY=2G # DRIVER分配的内存的大小
2022年06月14日
10,296 阅读
0 评论
0 点赞
2022-06-14
Spark在Yarn模式下提交未找到驱动
报错相关使用spark提交任务到yarn上,出现找不到mysql驱动问题22/06/14 16:48:38 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, slave1, executor 1): java.lang.ClassNotFoundException: com.mysql.jdbc.Driver at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:38) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:51) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:50) at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:286) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 22/06/14 16:48:38 INFO scheduler.TaskSetManager: Starting task 0.1 in stage 0.0 (TID 1, slave1, executor 1, partition 0, PROCESS_LOCAL, 5868 bytes) 22/06/14 16:48:38 INFO scheduler.TaskSetManager: Lost task 0.1 in stage 0.0 (TID 1) on slave1, executor 1: java.lang.ClassNotFoundException (com.mysql.jdbc.Driver) [duplicate 1] 22/06/14 16:48:38 INFO scheduler.TaskSetManager: Starting task 0.2 in stage 0.0 (TID 2, slave1, executor 1, partition 0, PROCESS_LOCAL, 5868 bytes) 22/06/14 16:48:38 INFO scheduler.TaskSetManager: Lost task 0.2 in stage 0.0 (TID 2) on slave1, executor 1: java.lang.ClassNotFoundException (com.mysql.jdbc.Driver) [duplicate 2] 22/06/14 16:48:38 INFO scheduler.TaskSetManager: Starting task 0.3 in stage 0.0 (TID 3, slave1, executor 1, partition 0, PROCESS_LOCAL, 5868 bytes) 22/06/14 16:48:38 INFO scheduler.TaskSetManager: Lost task 0.3 in stage 0.0 (TID 3) on slave1, executor 1: java.lang.ClassNotFoundException (com.mysql.jdbc.Driver) [duplicate 3] 22/06/14 16:48:38 ERROR scheduler.TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job 22/06/14 16:48:38 INFO cluster.YarnScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool 22/06/14 16:48:38 INFO cluster.YarnScheduler: Cancelling stage 0 22/06/14 16:48:38 INFO scheduler.DAGScheduler: ResultStage 0 (sql at A1.scala:43) failed in 0.686 s due to Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, slave1, executor 1): java.lang.ClassNotFoundException: com.mysql.jdbc.Driver at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:38) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:51) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:50) at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:286) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Driver stacktrace: 22/06/14 16:48:38 INFO scheduler.DAGScheduler: Job 0 failed: sql at A1.scala:43, took 0.869800 s Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, slave1, executor 1): java.lang.ClassNotFoundException: com.mysql.jdbc.Driver at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:38) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:51) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:50) at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:286) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1958) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.saveAsHiveFile(InsertIntoHiveTable.scala:210) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult$lzycompute(InsertIntoHiveTable.scala:310) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult(InsertIntoHiveTable.scala:221) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.doExecute(InsertIntoHiveTable.scala:407) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92) at org.apache.spark.sql.Dataset.<init>(Dataset.scala:185) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:592) at com.hive.task_1.A1$.main(A1.scala:43) at com.hive.task_1.A1.main(A1.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:743) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: com.mysql.jdbc.Driver at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:38) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:51) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:50) at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:286) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)问题解决我提交任务指令如下:spark-submit --master yarn \ --deploy-mode client \ --class com.hive.task_1.A1 \ /opt/Spark-01-1.0.SNAPSHOT.jar我已经将mysql驱动复制到spark的conf目录下,但直接报错Exception in thread "main" java.lang.ClassNotFoundException: com.mysql.jdbc.Driver但我使用--driver-class-path /opt/mysql-connector-java-5.1.38.jar指定驱动后还是出现找不到驱动的问题查找资料找到的解决方案是在spark安装目录下的conf/spark-env.sh配置SPARK_CLASSPATH来设置driver的环境变量,如下export SPARK_CLASSPATH=$SPARK_CLASSPATH:/lib/com/mysql-connector-java-5.1.35.jar注意:不能同时在conf/spark-env.sh里面配置SPARK_CLASSPATH和提交作业加上--driver-class-path参数,否则会出现异常但还是无法解决问题,经过尝试后发现只需要在提交参数后添加--driver-class-path和--jars指定驱动即可spark-submit --driver-class-path /opt/mysql-connector-java-5.1.38.jar \ --jars /opt/mysql-connector-java-5.1.38.jar \ --master yarn \ --deploy-mode client \ --class com.hive.task_1.A1 \ /opt/Spark-01-1.0.SNAPSHOT.jar
2022年06月14日
10,718 阅读
0 评论
1 点赞
2022-06-07
HadoopHA模式下Hive启动报错
报错hadoop配置HA后启动hive报错Exception in thread "main" java.lang.IllegalArgumentException: java.net.UnknownHostException: hadoopcluster[root@master hive]# hive which: no hbase in (/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/root/bin:/usr/local/src/jdk/bin:/usr/local/src/hadoop/bin:/usr/local/src/hadoop/sbin:/usr/local/src/zookeeper/bin:/usr/local/src/hive/bin:/usr/local/src/scala/bin:/usr/local/src/spark/bin:/usr/local/src/sqoop/bin:/usr/local/src/kafka/bin:/usr/local/src/flume/bin:/usr/local/src/flink/bin) SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/usr/local/src/hive/lib/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/usr/local/src/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] Logging initialized using configuration in jar:file:/usr/local/src/hive/lib/hive-common-2.3.4.jar!/hive-log4j2.properties Async: true Exception in thread "main" java.lang.IllegalArgumentException: java.net.UnknownHostException: hadoopcluster at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:377) at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:320) at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176) at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:687) at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:628) at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:93) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2701) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2683) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:372) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:171) at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:579) at org.apache.hadoop.hive.ql.session.SessionState.beginStart(SessionState.java:549) at org.apache.hadoop.hive.cli.CliDriver.run(CliDriver.java:750) at org.apache.hadoop.hive.cli.CliDriver.main(CliDriver.java:686) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.util.RunJar.run(RunJar.java:226) at org.apache.hadoop.util.RunJar.main(RunJar.java:141) Caused by: java.net.UnknownHostException: hadoopcluster ... 22 more原因原因是hadoop配置文件hdfs-site.xml的nameservices的名称为hadoopcluster<!-- 指定hdfs的nameservices名称为hadoopcluster --> <property> <name>dfs.nameservices</name> <value>hadoopcluster</value> </property>而指定HDFS客户端连接active namenode的java类时漏写了client<property> <name>dfs.failover.proxy.provider.hadoopcluster</name> <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value> </property>修改为:<property> <name>dfs.client.failover.proxy.provider.hadoopcluster</name> <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value> </property>
2022年06月07日
2,878 阅读
0 评论
1 点赞
2022-06-07
Flink提交任务的三种方式
Flink提交任务的三种方式web页面提交 flink命令提交将jar包上传至集群,提交任务flink run -c com.flink.Flinktest flinktest.jar com.flink.Flinktest -- 主类名flinktest.jar -- jar包名rpc方式提交任务 --- 远程提交用的较少代码写完之后需要先打包,再运行Flink框架报错有一个特点:前几条报错原因都是废话,要从后面看package com.flink import org.apache.flink.streaming.api.scala._ object Flinktest { def main(args: Array[String]): Unit = { /** * 创建远程环境,远程提交flink任务 * */ val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createRemoteEnvironment( //主机名 "master", //端口号 8081, //指定jar包的路径 "C:\\Users\\HB\\Desktop\\market_analysis\\flinktest.jar" ) val linesDS: DataStream[String] = env.socketTextStream("master", 8888) //1、将数据展开 val wordsDS: DataStream[String] = linesDS.flatMap(line => line.split(",")) //2、转换成kv格式 val kvDS: DataStream[(String, Int)] = wordsDS.map((_, 1)) //3、按照单词进行分组 val keyByDS: KeyedStream[(String, Int), String] = kvDS.keyBy(kv => kv._1) //4、统计数量,对value进行求和, 指定下标进行聚合 val countDS: DataStream[(String, Int)] = keyByDS.sum(1) //打印结果 countDS.print() env.execute("rpc") } }
2022年06月07日
2,461 阅读
0 评论
0 点赞
2022-06-07
Flink提交参数详解
Flink命令行提交参数1 参数必选 : -n,--container <arg> 分配多少个yarn容器 (=taskmanager的数量) 2 参数可选 : -D <arg> 动态属性 -d,--detached 独立运行 -jm,--jobManagerMemory <arg> JobManager的内存 [in MB] -nm,--name 在YARN上为一个自定义的应用设置一个名字 -q,--query 显示yarn中可用的资源 (内存, cpu核数) -qu,--queue <arg> 指定YARN队列. -s,--slots <arg> 每个TaskManager使用的slots数量 -tm,--taskManagerMemory <arg> 每个TaskManager的内存 [in MB] -z,--zookeeperNamespace <arg> 针对HA模式在zookeeper上创建NameSpace -id,--applicationId <yarnAppId> YARN集群上的任务id,附着到一个后台运行的yarn session中 3 run [OPTIONS] <jar-file> <arguments> run操作参数: -c,--class <classname> 如果没有在jar包中指定入口类,则需要在这里通过这个参数指定 -m,--jobmanager <host:port> 指定需要连接的jobmanager(主节点)地址,使用这个参数可以指定一个不同于配置文件中的jobmanager -p,--parallelism <parallelism> 指定程序的并行度。可以覆盖配置文件中的默认值。 4 启动一个新的yarn-session,它们都有一个y或者yarn的前缀 例如:./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar 连接指定host和port的jobmanager: ./bin/flink run -m SparkMaster:1234 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1 启动一个新的yarn-session: ./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1 5 注意:命令行的选项也可以使用./bin/flink 工具获得。 6 Action "run" compiles and runs a program. Syntax: run [OPTIONS] <jar-file> <arguments> "run" action options: -c,--class <classname> Class with the program entry point ("main" method or "getPlan()" method. Only needed if the JAR file does not specify the class in its manifest. -C,--classpath <url> Adds a URL to each user code classloader on all nodes in the cluster. The paths must specify a protocol (e.g. file://) and be accessible on all nodes (e.g. by means of a NFS share). You can use this option multiple times for specifying more than one URL. The protocol must be supported by the {@link java.net.URLClassLoader}. -d,--detached If present, runs the job in detached mode -n,--allowNonRestoredState Allow to skip savepoint state that cannot be restored. You need to allow this if you removed an operator from your program that was part of the program when the savepoint was triggered. -p,--parallelism <parallelism> The parallelism with which to run the program. Optional flag to override the default value specified in the configuration. -q,--sysoutLogging If present, suppress logging output to standard out. -s,--fromSavepoint <savepointPath> Path to a savepoint to restore the job from (for example hdfs:///flink/savepoint-1537). 7 Options for yarn-cluster mode: -d,--detached If present, runs the job in detached mode -m,--jobmanager <arg> Address of the JobManager (master) to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration. -yD <property=value> use value for given property -yd,--yarndetached If present, runs the job in detached mode (deprecated; use non-YARN specific option instead) -yh,--yarnhelp Help for the Yarn session CLI. -yid,--yarnapplicationId <arg> Attach to running YARN session -yj,--yarnjar <arg> Path to Flink jar file -yjm,--yarnjobManagerMemory <arg> Memory for JobManager Container with optional unit (default: MB) -yn,--yarncontainer <arg> Number of YARN container to allocate (=Number of Task Managers) -ynl,--yarnnodeLabel <arg> Specify YARN node label for the YARN application -ynm,--yarnname <arg> Set a custom name for the application on YARN -yq,--yarnquery Display available YARN resources (memory, cores) -yqu,--yarnqueue <arg> Specify YARN queue. -ys,--yarnslots <arg> Number of slots per TaskManager -yst,--yarnstreaming Start Flink in streaming mode -yt,--yarnship <arg> Ship files in the specified directory (t for transfer) -ytm,--yarntaskManagerMemory <arg> Memory per TaskManager Container with optional unit (default: MB) -yz,--yarnzookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability modeflink run参数flink run命令执行模板:flink run [option]-c,–class : 需要指定的main方法的类-C,–classpath : 向每个用户代码添加url,他是通过UrlClassLoader加载。url需要指定文件的schema如(file://)-d,–detached : 在后台运行-p,–parallelism : job需要指定env的并行度,这个一般都需要设置。-q,–sysoutLogging : 禁止logging输出作为标准输出。-s,–fromSavepoint : 基于savepoint保存下来的路径,进行恢复。-sae,–shutdownOnAttachedExit : 如果是前台的方式提交,当客户端中断,集群执行的job任务也会shutdown。flink run -m yarn-cluster参数-m,–jobmanager : yarn-cluster集群-yd,–yarndetached : 后台-yjm,–yarnjobManager : jobmanager的内存-ytm,–yarntaskManager : taskmanager的内存-yn,–yarncontainer : TaskManager的个数-yid,–yarnapplicationId : job依附的applicationId-ynm,–yarnname : application的名称-ys,–yarnslots : 分配的slots个数例:flink run -m yarn-cluster -yd -yjm 1024m -ytm 1024m -ynm -ys 1flink-listflink list:列出flink的job列表。flink list -r/–runing :列出正在运行的jobflink list -s/–scheduled :列出已调度完成的jobflink cancelflink cancel [options] <job_id> : 取消正在运行的job idflink cancel -s/–withSavepoint <job_id> : 取消正在运行的job,并保存到相应的保存点通过 -m 来指定要停止的 JobManager 的主机地址和端口例: bin/flink cancel -m 127.0.0.1:8081 5e20cb6b0f357591171dfcca2eea09deflink stop :仅仅针对Streaming jobflink stop [options] <job_id>flink stop <job_id>:停止对应的job通过 -m 来指定要停止的 JobManager 的主机地址和端口例: bin/flink stop -m 127.0.0.1:8081 d67420e52bd051fae2fddbaa79e046bb取消和停止(流作业)的区别如下: cancel() 调用,立即调用作业算子的 cancel() 方法,以尽快取消它们。如果算子在接到 cancel() 调用后没有停止,Flink 将开始定期中断算子线程的执行,直到所有算子停止为止。 stop() 调用,是更优雅的停止正在运行流作业的方式。stop() 仅适用于 Source 实现了 StoppableFunction 接口的作业。当用户请求停止作业时,作业的所有 Source 都将接收 stop() 方法调用。直到所有 Source 正常关闭时,作业才会正常结束。这种方式,使作业正常处理完所有作业。flink modify 修改任务并行度flink modify <job_id> [options]flink modify <job_id> -p /–parallelism p : 修改job的并行度例: flink modify -p 并行数 <job_pid>flink savepointflink savepoint [options] <job_id>eg: # 触发保存点flink savepoint <job_id> hdfs://xxxx/xx/x : 将flink的快照保存到hdfs目录使用yarn触发保存点flink savepoint <job_id> <target_directory> -yid <application_id>使用savepoint取消作业flink cancel -s <tar_directory> <job_id>从保存点恢复flink run -s <target_directoey> [:runArgs]如果复原的程序,对逻辑做了修改,比如删除了算子可以指定allowNonRestoredState参数复原。flink run -s <target_directory> -n/–allowNonRestoredState [:runArgs]savepoint 与 checkpoint 的区别 checkpoint是增量做的,每次的时间短,数据量小,只要在程序里面启用后会自动触发,用户无需感知;savepoint是全量做的,时间长,数据量大,需要用户主动触发。 checkpoint 是作业failover 的时候自动使用,不需要用户指定,savepoint 一般用于程序版本更新、bug修复、A/B Test 等场景,需要用户指定。
2022年06月07日
2,408 阅读
0 评论
0 点赞
2022-06-06
Flink集群搭建--Yarn模式
准备Flink ON YARN模式就是使用客户端的方式,直接向Hadoop集群提交任务即可。不需要单独启动Flink进程。注意:1。Flink ON YARN 模式依赖Hadoop 2.4.1及以上版本2.Flink ON YARN支持两种使用方式 yarn-session会话模式: 使用Flink中的yarn-session(yarn客户端),会启动两个必要服务 JobManager 和 TaskManagers 客户端通过yarn-session提交作业 yarn-session会一直启动,不停地接收客户端提交的作用 有大量的小作业,适合使用这种方式yarn-per-job模式 直接提交任务给YARN,大作业,适合使用这种方式安装包下载:http://archive.apache.org/dist/flink/flink-1.10.2/ Flink on Yarn集群搭建{dotted startColor="#ff6c6c" endColor="#1989fa"/}yarn-session模式搭建解压[root@master ~]# tar -zxvf /data/flink-1.10.2 -C /usr/local/src [root@master ~]# mv /usr/local/src/flink-1.10.2 /usr/local/src/flink配置环境变量[root@master ~]# vi /etc/profile export FLINK_HOME=/usr/local/src/flink export PATH=$PATH:$FLINK_HOME/bin [root@master ~]# source /etc/profileflink on yarn搭建,资源的调度由yarn完成,所以不需要修改配置文件HADOOP_CLASSPATH 执行yarn-session.sh脚本需要先设置HADOOP_CLASSPATH这个环境变量,否则,执行yarn-session.sh报错,提示找不到hadoop的一些依赖方式一(推荐):在/etc/profile中配置HADOOP_CLASSPATH[root@master ~]# vi /etc/profile export HADOOP_CLASSPATH=`${HADOOP_HOME}/bin/hadoop classpath`方式二:命令行导入HADOOP_CLASSPATH[root@master ~]# export HADOOP_CLASSPATH=`hadoop classpath`缺点:只在当前命令行本次运行有效方式三:修改yarn-session.sh,添加HADOOP_CLASSPATH启动集群接下来,使用yarn-session.sh在YARN中创建一个长时间运行的Flink集群[root@master flink]# bin/yarn-session.sh -jm 1024m -tm 1024m -d这个表示创建一个Flink集群,-jm是指定主节点的内存,-tm是指定从节点的内存,-d是表示把这个进程放到后台去执行。启动之后,会看到类似这样的日志信息,这里面会显示flink web界面的地址,以及这个flink集群在yarn中对应的applicationid 此时在YARN的web界面中可以看到这个flink集群。 可以使用屏幕中显示的flink的web地址或者yarn中这个链接都是可以进入这个flink的web界面 提交作业(job)接下来向这个Flink集群中提交任务,此时使用Flink中的内置案例。[root@master flink]# bin/flink run ./examples/batch/WordCount.jar注意:这个时候我们使用flink run的时候,它会默认找这个文件,然后根据这个文件找到刚才我们创建的那个永久的Flink集群,这个文件里面保存的就是刚才启动的那个Flink集群在YARN中对应的applicationid。[root@master flink]# more /tmp/.yarn-properties-root #Generated YARN properties file #Tue Jun 07 01:39:29 CST 2022 dynamicPropertiesString= applicationID=application_1654423542500_0004任务提交上去执行完成之后,再来看flink的web界面,发现这里面有一个已经执行结束的任务了。 注意:这个任务在执行的时候,会动态申请一些资源执行任务,任务执行完毕之后,对应的资源会自动释放掉。最后把这个Flink集群停掉,使用yarn的kill命令。[root@master flink]# yarn application -kill application_1654423542500_0004 Killing application application_1654423542500_0004 22/06/07 01:46:43 INFO impl.YarnClientImpl: Killed application application_1654423542500_0004针对yarn-session命令,它后面还支持一些其它参数,可以在后面传一个-help参数[root@master flink]# bin/yarn-session.sh -help Usage: Optional -at,--applicationType <arg> Set a custom application type for the application on YARN -D <property=value> use value for given property -d,--detached If present, runs the job in detached mode -h,--help Help for the Yarn session CLI. -id,--applicationId <arg> Attach to running YARN session -j,--jar <arg> Path to Flink jar file -jm,--jobManagerMemory <arg> Memory for JobManager Container with optional unit (default: MB) -m,--jobmanager <arg> Address of the JobManager to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration. -nl,--nodeLabel <arg> Specify YARN node label for the YARN application -nm,--name <arg> Set a custom name for the application on YARN -q,--query Display available YARN resources (memory, cores) -qu,--queue <arg> Specify YARN queue. -s,--slots <arg> Number of slots per TaskManager -t,--ship <arg> Ship files in the specified directory (t for transfer) -tm,--taskManagerMemory <arg> Memory per TaskManager Container with optional unit (default: MB) -yd,--yarndetached If present, runs the job in detached mode (deprecated; use non-YARN specific option instead) -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode常见命令的中文注释 yarn-per-job模式搭建flink run -m yarn-cluster (创建Flink集群+提交任务)使用flink run直接创建一个临时的Flink集群,并且提交任务此时这里面的参数前面加上了一个y参数[root@master flink]# bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 ./examples/batch/WordCount.jar针对Flink命令的一些用法汇总
2022年06月06日
9,806 阅读
0 评论
2 点赞
1
2
3
首页
复制
搜索
前进
后退
重载网页