Flink提交任务的三种方式

Flink提交任务的三种方式

清泓
2022-06-07 / 0 评论 / 2,446 阅读 / 1824字 / 正在检测是否收录...

Flink提交任务的三种方式

web页面提交

1

2

3

4

5

6

7

flink命令提交

将jar包上传至集群,提交任务

flink run -c com.flink.Flinktest flinktest.jar 

com.flink.Flinktest -- 主类名
flinktest.jar -- jar包名

rpc方式提交任务 --- 远程提交

用的较少

代码写完之后需要先打包,再运行

Flink框架报错有一个特点:前几条报错原因都是废话,要从后面看

package com.flink

import org.apache.flink.streaming.api.scala._

object Flinktest {
  def main(args: Array[String]): Unit = {

    /**
      * 创建远程环境,远程提交flink任务
      *
      */
      
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createRemoteEnvironment(
      //主机名
      "master",
      //端口号
      8081,
      //指定jar包的路径
      "C:\\Users\\HB\\Desktop\\market_analysis\\flinktest.jar" 
    )

    val linesDS: DataStream[String] = env.socketTextStream("master", 8888)
    //1、将数据展开
    val wordsDS: DataStream[String] = linesDS.flatMap(line => line.split(","))
    //2、转换成kv格式
    val kvDS: DataStream[(String, Int)] = wordsDS.map((_, 1))
    //3、按照单词进行分组
    val keyByDS: KeyedStream[(String, Int), String] = kvDS.keyBy(kv => kv._1)
    //4、统计数量,对value进行求和, 指定下标进行聚合
    val countDS: DataStream[(String, Int)] = keyByDS.sum(1)
    //打印结果
    countDS.print()

    env.execute("rpc")

  }
}
0

打赏

评论

博主关闭了当前页面的评论