Flink提交任务的三种方式
web页面提交
flink命令提交
将jar包上传至集群,提交任务
flink run -c com.flink.Flinktest flinktest.jar
com.flink.Flinktest -- 主类名
flinktest.jar -- jar包名
rpc方式提交任务 --- 远程提交
用的较少
代码写完之后需要先打包,再运行
Flink框架报错有一个特点:前几条报错原因都是废话,要从后面看
package com.flink
import org.apache.flink.streaming.api.scala._
object Flinktest {
def main(args: Array[String]): Unit = {
/**
* 创建远程环境,远程提交flink任务
*
*/
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createRemoteEnvironment(
//主机名
"master",
//端口号
8081,
//指定jar包的路径
"C:\\Users\\HB\\Desktop\\market_analysis\\flinktest.jar"
)
val linesDS: DataStream[String] = env.socketTextStream("master", 8888)
//1、将数据展开
val wordsDS: DataStream[String] = linesDS.flatMap(line => line.split(","))
//2、转换成kv格式
val kvDS: DataStream[(String, Int)] = wordsDS.map((_, 1))
//3、按照单词进行分组
val keyByDS: KeyedStream[(String, Int), String] = kvDS.keyBy(kv => kv._1)
//4、统计数量,对value进行求和, 指定下标进行聚合
val countDS: DataStream[(String, Int)] = keyByDS.sum(1)
//打印结果
countDS.print()
env.execute("rpc")
}
}