首页
动态
时间轴
归档
友链
关于
Search
1
Spark在Yarn模式下提交未找到驱动
10,736 阅读
2
Spark提交任务内存不足
10,320 阅读
3
Flink集群搭建--Yarn模式
9,837 阅读
4
Spark SQL函数总结
8,546 阅读
5
Flume应用--日志采集
7,224 阅读
日常
大数据
经验分享
技术总结
登录
Search
标签搜索
bigdata
Hive
Spark
hadoop
Flume
建站
Flink
linux
Mysql
sqoop
Yarn
摸鱼
羊毛
docker
VMware
图床
sql
function
清泓
累计撰写
39
篇文章
累计收到
---
条评论
首页
动态
后台
栏目
日常
大数据
经验分享
技术总结
页面
时间轴
归档
友链
关于
搜索到
5
篇
Spark
的结果
2022-07-01
Spark SQL函数总结
前言org.apache.spark.sql.functions是一个Object,提供了约两百多个函数大部分函数与Hive的接近除UDF函数,均可在spark-sql中直接使用经过import org.apache.spark.sql.functions._ ,也可以用于Dataframe,Datasetversion 2.3.0大部分支持Column的函数也支持String类型的列名。这些函数的返回类型基本都是Column函数总结聚合函数approx_count_distinct count_distinct近似值 avg 平均值 collect_list 聚合指定字段的值到list collect_set 聚合指定字段的值到set corr 计算两列的Pearson相关系数 count 计数 countDistinct 去重计数 SQL中用法:select count(distinct class) covar_pop 总体协方差(population covariance) covar_samp 样本协方差(sample covariance) first 分组第一个元素 last 分组最后一个元素 grouping grouping_id kurtosis 计算峰态(kurtosis)值 skewness 计算偏度(skewness) max 最大值 min 最小值 mean 平均值 stddev 即stddev_samp stddev_samp 样本标准偏差(sample standard deviation) stddev_pop 总体标准偏差(population standard deviation) sum 求和 sumDistinct 非重复值求和 SQL中用法:select sum(distinct class) var_pop 总体方差(population variance) var_samp 样本无偏方差(unbiased variance) variance 即var_samp 集合函数array_contains(column,value) 检查array类型字段是否包含指定元素 explode 展开array或map为多行 explode_outer 同explode,但当array或map为空或null时,会展开为null。 posexplode 同explode,带位置索引。 posexplode_outer 同explode_outer,带位置索引。 from_json 解析JSON字符串为StructType or ArrayType,有多种参数形式,详见文档。 to_json 转为json字符串,支持StructType, ArrayType of StructTypes, a MapType or ArrayType of MapTypes。 get_json_object(column,path) 获取指定json路径的json对象字符串。 json_tuple(column,fields) 获取json中指定字段值。select json_tuple('{"a":1,"b":2}','a','b'); map_keys 返回map的键组成的array map_values 返回map的值组成的array size array or map的长度 sort_array(e: Column, asc: Boolean) 将array中元素排序(自然排序),默认asc。 时间函数add_months(startDate: Column, numMonths: Int) 指定日期添加n月 date_add(start: Column, days: Int) 指定日期之后n天 e.g. select date_add('2018-01-01',3) date_sub(start: Column, days: Int) 指定日期之前n天 datediff(end: Column, start: Column) 两日期间隔天数 current_date() 当前日期 current_timestamp() 当前时间戳,TimestampType类型 date_format(dateExpr: Column, format: String) 日期格式化 dayofmonth(e: Column) 日期在一月中的天数,支持 date/timestamp/string dayofyear(e: Column) 日期在一年中的天数, 支持 date/timestamp/string weekofyear(e: Column) 日期在一年中的周数, 支持 date/timestamp/string from_unixtime(ut: Column, f: String) 时间戳转字符串格式 from_utc_timestamp(ts: Column, tz: String) 时间戳转指定时区时间戳 to_utc_timestamp(ts: Column, tz: String) 指定时区时间戳转UTF时间戳 hour(e: Column) 提取小时值 minute(e: Column) 提取分钟值 month(e: Column) 提取月份值 quarter(e: Column) 提取季度 second(e: Column) 提取秒 year(e: Column):提取年 last_day(e: Column) 指定日期的月末日期 months_between(date1: Column, date2: Column) 计算两日期差几个月 next_day(date: Column, dayOfWeek: String) 计算指定日期之后的下一个周一、二...,dayOfWeek区分大小写,只接受 "Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"。 to_date(e: Column) 字段类型转为DateType trunc(date: Column, format: String) 日期截断 unix_timestamp(s: Column, p: String) 指定格式的时间字符串转时间戳 unix_timestamp(s: Column) 同上,默认格式为 yyyy-MM-dd HH:mm:ss unix_timestamp():当前时间戳(秒),底层实现为unix_timestamp(current_timestamp(), yyyy-MM-dd HH:mm:ss) window(timeColumn: Column, windowDuration: String, slideDuration: String, startTime: String) 时间窗口函数,将指定时间(TimestampType)划分到窗口数学函数cos,sin,tan 计算角度的余弦,正弦。。。 sinh,tanh,cosh 计算双曲正弦,正切,。。 acos,asin,atan,atan2 计算余弦/正弦值对应的角度 bin 将long类型转为对应二进制数值的字符串For example, bin("12") returns "1100". bround 舍入,使用Decimal的HALF_EVEN模式,v>0.5向上舍入,v< 0.5向下舍入,v0.5向最近的偶数舍入。 round(e: Column, scale: Int) HALF_UP模式舍入到scale为小数点。v>=0.5向上舍入,v< 0.5向下舍入,即四舍五入。 ceil 向上舍入 floor 向下舍入 cbrt Computes the cube-root of the given value. conv(num:Column, fromBase: Int, toBase: Int) 转换数值(字符串)的进制 log(base: Double, a: Column):$log_{base}(a)$ log(a: Column):$log_e(a)$ log10(a: Column):$log_{10}(a)$ log2(a: Column):$log_{2}(a)$ log1p(a: Column):$log_{e}(a+1)$ pmod(dividend: Column, divisor: Column):Returns the positive value of dividend mod divisor. pow(l: Double, r: Column):$r^l$ 注意r是列 pow(l: Column, r: Double):$r^l$ 注意l是列 pow(l: Column, r: Column):$r^l$ 注意r,l都是列 radians(e: Column):角度转弧度 rint(e: Column):Returns the double value that is closest in value to the argument and is equal to a mathematical integer. shiftLeft(e: Column, numBits: Int):向左位移 shiftRight(e: Column, numBits: Int):向右位移 shiftRightUnsigned(e: Column, numBits: Int):向右位移(无符号位) signum(e: Column):返回数值正负符号 sqrt(e: Column):平方根 hex(column: Column):转十六进制 unhex(column: Column):逆转十六进制混杂(misc)函数crc32(e: Column):计算CRC32,返回bigint hash(cols: Column*):计算 hash code,返回int md5(e: Column):计算MD5摘要,返回32位,16进制字符串 sha1(e: Column):计算SHA-1摘要,返回40位,16进制字符串 sha2(e: Column, numBits: Int):计算SHA-1摘要,返回numBits位,16进制字符串。numBits支持224, 256, 384, or 512.其他非聚合函数abs(e: Column) 绝对值 array(cols: Column*) 多列合并为array,cols必须为同类型 map(cols: Column*): 将多列组织为map,输入列必须为(key,value)形式,各列的key/value分别为同一类型。 bitwiseNOT(e: Column): Computes bitwise NOT. broadcast[T](df: Dataset[T]): Dataset[T]: 将df变量广播,用于实现broadcast join。如left.join(broadcast(right), "joinKey") coalesce(e: Column*): 返回第一个非空值 col(colName: String): 返回colName对应的Column column(colName: String): col函数的别名 expr(expr: String): 解析expr表达式,将返回值存于Column,并返回这个Column。 greatest(exprs: Column*): 返回多列中的最大值,跳过Null least(exprs: Column*): 返回多列中的最小值,跳过Null input_file_name():返 回当前任务的文件名 ?? isnan(e: Column): 检查是否NaN(非数值) isnull(e: Column): 检查是否为Null lit(literal: Any): 将字面量(literal)创建一个Column typedLit[T](literal: T)(implicit arg0: scala.reflect.api.JavaUniverse.TypeTag[T]): 将字面量(literal)创建一个Column,literal支持 scala types e.g.: List, Seq and Map. monotonically_increasing_id(): 返回单调递增唯一ID,但不同分区的ID不连续。ID为64位整型。 nanvl(col1: Column, col2: Column): col1为NaN则返回col2 negate(e: Column): 负数,同df.select( -df("amount") ) not(e: Column): 取反,同df.filter( !df("isActive") ) rand(): 随机数[0.0, 1.0] rand(seed: Long): 随机数[0.0, 1.0],使用seed种子 randn(): 随机数,从正态分布取 randn(seed: Long): 同上 spark_partition_id(): 返回partition ID struct(cols: Column*): 多列组合成新的struct column ?? when(condition: Column, value: Any): 当condition为true返回value,如 people.select(when(people("gender") === "male", 0) .when(people("gender") === "female", 1) .otherwise(2)) 如果没有otherwise且condition全部没命中,则返回null. 排序函数asc(columnName: String):正序 asc_nulls_first(columnName: String):正序,null排最前 asc_nulls_last(columnName: String):正序,null排最后 e.g. df.sort(asc("dept"), desc("age")) 对应有desc函数 desc,desc_nulls_first,desc_nulls_last 字符串函数 ascii(e: Column): 计算第一个字符的ascii码 base64(e: Column): base64转码 unbase64(e: Column): base64解码 concat(exprs: Column*):连接多列字符串 concat_ws(sep: String, exprs: Column*):使用sep作为分隔符连接多列字符串 decode(value: Column, charset: String): 解码 encode(value: Column, charset: String): 转码,charset支持 'US-ASCII', 'ISO-8859-1', 'UTF-8', 'UTF-16BE', 'UTF-16LE', 'UTF-16'。 format_number(x: Column, d: Int):格式化'#,###,###.##'形式的字符串 format_string(format: String, arguments: Column*): 将arguments按format格式化,格式为printf-style。 initcap(e: Column): 单词首字母大写 lower(e: Column): 转小写 upper(e: Column): 转大写 instr(str: Column, substring: String): substring在str中第一次出现的位置 length(e: Column): 字符串长度 levenshtein(l: Column, r: Column): 计算两个字符串之间的编辑距离(Levenshtein distance) locate(substr: String, str: Column): substring在str中第一次出现的位置,位置编号从1开始,0表示未找到。 locate(substr: String, str: Column, pos: Int): 同上,但从pos位置后查找。 lpad(str: Column, len: Int, pad: String):字符串左填充。用pad字符填充str的字符串至len长度。有对应的rpad,右填充。 ltrim(e: Column):剪掉左边的空格、空白字符,对应有rtrim. ltrim(e: Column, trimString: String):剪掉左边的指定字符,对应有rtrim. trim(e: Column, trimString: String):剪掉左右两边的指定字符 trim(e: Column):剪掉左右两边的空格、空白字符 regexp_extract(e: Column, exp: String, groupIdx: Int): 正则提取匹配的组 regexp_replace(e: Column, pattern: Column, replacement: Column): 正则替换匹配的部分,这里参数为列。 regexp_replace(e: Column, pattern: String, replacement: String): 正则替换匹配的部分 repeat(str: Column, n: Int):将str重复n次返回 reverse(str: Column): 将str反转 soundex(e: Column): 计算桑迪克斯代码(soundex code)PS:用于按英语发音来索引姓名,发音相同但拼写不同的单词,会映射成同一个码。 split(str: Column, pattern: String): 用pattern分割str substring(str: Column, pos: Int, len: Int): 在str上截取从pos位置开始长度为len的子字符串。 substring_index(str: Column, delim: String, count: Int):Returns the substring from string str before count occurrences of the delimiter delim. If count is positive, everything the left of the final delimiter (counting from left) is returned. If count is negative, every to the right of the final delimiter (counting from the right) is returned. substring_index performs a case-sensitive match when searching for delim. translate(src: Column, matchingString: String, replaceString: String):把src中的matchingString全换成replaceString。 UDF函数user-defined function. callUDF(udfName: String, cols: Column*): 调用UDF import org.apache.spark.sql._ val df = Seq(("id1", 1), ("id2", 4), ("id3", 5)).toDF("id", "value") val spark = df.sparkSession spark.udf.register("simpleUDF", (v: Int) => v * v) df.select($"id", callUDF("simpleUDF", $"value")) udf: 定义UDF窗口函数cume_dist(): cumulative distribution of values within a window partition currentRow(): returns the special frame boundary that represents the current row in the window partition. rank():排名,返回数据项在分组中的排名,排名相等会在名次中留下空位 1,2,2,4。 dense_rank(): 排名,返回数据项在分组中的排名,排名相等会在名次中不会留下空位 1,2,2,3。 row_number():行号,为每条记录返回一个数字 1,2,3,4 percent_rank():returns the relative rank (i.e. percentile) of rows within a window partition. lag(e: Column, offset: Int, defaultValue: Any): offset rows before the current row lead(e: Column, offset: Int, defaultValue: Any): returns the value that is offset rows after the current row ntile(n: Int): returns the ntile group id (from 1 to n inclusive) in an ordered window partition. unboundedFollowing():returns the special frame boundary that represents the last row in the window partition.
2022年07月01日
8,546 阅读
0 评论
2 点赞
2022-06-14
Spark提交任务内存不足
出现的异常spark在yarn模式下运行任务报错异常信息: 1 、 ERROR cluster.YarnScheduler: Lost executor 2 、 ERROR client.TransportClient: Failed to send RPC 3 、 WARN storage.BlockManagerMaster: Failed to remove RDD 4 、 ERROR cluster.YarnScheduler: Lost executor 1 on 192.168.23.105: Slave lost出现此问题的原因是任务需要的内存过高,需要修改spark-env.sh调整默认的内存大小解决在spark-env.sh文件内添加如下配置[root@master ~]# vi /usr/local/src/spark/conf/spark-env.sh export SPARK_EXECUTOR_INSTANCES=4 # yarn集群中,最多能够同时启动的EXECUTOR的实例个数 export SPARK_EXECUTOR_CORES=4 # 每个EXECUTOR能够使用的CPU core的数量 export SPARK_EXECUTOR_MEMORY=4G # 每个EXECUTOR分配的内存的大小 export SPARK_DRIVER_MEMORY=2G # DRIVER分配的内存的大小
2022年06月14日
10,320 阅读
0 评论
0 点赞
2022-06-14
Spark在Yarn模式下提交未找到驱动
报错相关使用spark提交任务到yarn上,出现找不到mysql驱动问题22/06/14 16:48:38 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, slave1, executor 1): java.lang.ClassNotFoundException: com.mysql.jdbc.Driver at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:38) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:51) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:50) at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:286) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 22/06/14 16:48:38 INFO scheduler.TaskSetManager: Starting task 0.1 in stage 0.0 (TID 1, slave1, executor 1, partition 0, PROCESS_LOCAL, 5868 bytes) 22/06/14 16:48:38 INFO scheduler.TaskSetManager: Lost task 0.1 in stage 0.0 (TID 1) on slave1, executor 1: java.lang.ClassNotFoundException (com.mysql.jdbc.Driver) [duplicate 1] 22/06/14 16:48:38 INFO scheduler.TaskSetManager: Starting task 0.2 in stage 0.0 (TID 2, slave1, executor 1, partition 0, PROCESS_LOCAL, 5868 bytes) 22/06/14 16:48:38 INFO scheduler.TaskSetManager: Lost task 0.2 in stage 0.0 (TID 2) on slave1, executor 1: java.lang.ClassNotFoundException (com.mysql.jdbc.Driver) [duplicate 2] 22/06/14 16:48:38 INFO scheduler.TaskSetManager: Starting task 0.3 in stage 0.0 (TID 3, slave1, executor 1, partition 0, PROCESS_LOCAL, 5868 bytes) 22/06/14 16:48:38 INFO scheduler.TaskSetManager: Lost task 0.3 in stage 0.0 (TID 3) on slave1, executor 1: java.lang.ClassNotFoundException (com.mysql.jdbc.Driver) [duplicate 3] 22/06/14 16:48:38 ERROR scheduler.TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job 22/06/14 16:48:38 INFO cluster.YarnScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool 22/06/14 16:48:38 INFO cluster.YarnScheduler: Cancelling stage 0 22/06/14 16:48:38 INFO scheduler.DAGScheduler: ResultStage 0 (sql at A1.scala:43) failed in 0.686 s due to Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, slave1, executor 1): java.lang.ClassNotFoundException: com.mysql.jdbc.Driver at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:38) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:51) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:50) at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:286) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Driver stacktrace: 22/06/14 16:48:38 INFO scheduler.DAGScheduler: Job 0 failed: sql at A1.scala:43, took 0.869800 s Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, slave1, executor 1): java.lang.ClassNotFoundException: com.mysql.jdbc.Driver at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:38) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:51) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:50) at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:286) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1958) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.saveAsHiveFile(InsertIntoHiveTable.scala:210) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult$lzycompute(InsertIntoHiveTable.scala:310) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult(InsertIntoHiveTable.scala:221) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.doExecute(InsertIntoHiveTable.scala:407) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92) at org.apache.spark.sql.Dataset.<init>(Dataset.scala:185) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:592) at com.hive.task_1.A1$.main(A1.scala:43) at com.hive.task_1.A1.main(A1.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:743) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: com.mysql.jdbc.Driver at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:38) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:51) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:50) at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:286) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)问题解决我提交任务指令如下:spark-submit --master yarn \ --deploy-mode client \ --class com.hive.task_1.A1 \ /opt/Spark-01-1.0.SNAPSHOT.jar我已经将mysql驱动复制到spark的conf目录下,但直接报错Exception in thread "main" java.lang.ClassNotFoundException: com.mysql.jdbc.Driver但我使用--driver-class-path /opt/mysql-connector-java-5.1.38.jar指定驱动后还是出现找不到驱动的问题查找资料找到的解决方案是在spark安装目录下的conf/spark-env.sh配置SPARK_CLASSPATH来设置driver的环境变量,如下export SPARK_CLASSPATH=$SPARK_CLASSPATH:/lib/com/mysql-connector-java-5.1.35.jar注意:不能同时在conf/spark-env.sh里面配置SPARK_CLASSPATH和提交作业加上--driver-class-path参数,否则会出现异常但还是无法解决问题,经过尝试后发现只需要在提交参数后添加--driver-class-path和--jars指定驱动即可spark-submit --driver-class-path /opt/mysql-connector-java-5.1.38.jar \ --jars /opt/mysql-connector-java-5.1.38.jar \ --master yarn \ --deploy-mode client \ --class com.hive.task_1.A1 \ /opt/Spark-01-1.0.SNAPSHOT.jar
2022年06月14日
10,736 阅读
0 评论
1 点赞
2022-06-05
Spark报错总结
1.client模式异常 Yarn application has already ended! 解决:hadoop的配置文件yarn-site.xml,添加如下内容:<property> <name>yarn.nodemanager.pmem-check-enabled</name> <value>false</value> </property> <property> <name>yarn.nodemanager.vmem-check-enabled</name> <value>false</value> </property>注意: 修改完配置文件后需要分发到各个节点,并重启Hadoop
2022年06月05日
3,094 阅读
0 评论
1 点赞
2022-05-06
Spark抽取MySQL数据到Hive
Spark抽取MySQL数据到Hive{lamp/}1.添加Maven依赖<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.1.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.1.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.11</artifactId> <version>2.1.1</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.38</version> </dependency> </dependencies> <build> <sourceDirectory>src/main/java</sourceDirectory> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.3.2</version> <executions> <execution> <id>compile-scala</id> <phase>compile</phase> <goals> <goal>add-source</goal> <goal>compile</goal> </goals> </execution> <execution> <id>test-compile-scala</id> <phase>test-compile</phase> <goals> <goal>add-source</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build>2.配置Hive-site.xml<property> <name>javax.jdo.option.ConnectionURL</name> <value>jdbc:mysql://master:3306/hive?createDatabaseIfNotExist=true&useSSL=false</value> </property> <property> <name>javax.jdo.option.ConnectionDriverName</name> <value>com.mysql.jdbc.Driver</value> </property> <property> <name>javax.jdo.option.ConnectionUserName</name> <value>数据库用户名</value> </property> <property> <name>javax.jdo.option.ConnectionPassword</name> <value>数据库密码</value> </property> <property> <name>hive.cli.print.header</name> <value>true</value> </property> <property> <name>hive.cli.print.current.db</name> <value>true</value> </property> <property> <name>hive.metastore.uris</name> <value>thrift://master:9083</value> </property>3.编写scala代码抽取数据<div>import org.apache.spark.sql.{DataFrame, SparkSession} object CreateCustomerTbl { def main(args: Array[String]): Unit = { val sqlSession: SparkSession = SparkSession.builder() .master("local[*]") //指定运行模式,本地调试时使用,打包时应删除,可在集群上指定模式 .appName("createCustpmerTable") .config("hive.metastore.uris", "thrift://localhost:9083") //配置Hive元数据访问 .enableHiveSupport() //开启hive支持 .getOrCreate() val df: DataFrame = sqlSession.read.format("jdbc") .option("url", "jdbc:mysql://localhost:3306/test") .option("driver", "com.mysql.jdbc.Driver") .option("user", "root") .option("password", "123456") .option("dbtable", "customer") .load() df.createOrReplaceTempView("mysql_customer") //在内存中建立临时表 df.show() sqlSession.sql("use ods") //使用Hive的ods库 //建立分区表 sqlSession.sql( """ |create table customer( |custkey string, |name string, |address string, |nationkey string, |phone string, |acctbal string, |mktsegment string, |comment string) |partitioned by (date string) |""".stripMargin) //抽取全量数据到customer表20220503分区中 sqlSession.sql( """ |insert overwrite table customer partition (date = "20220503") select * from mysql_customer |""".stripMargin) sqlSession.close() } }</div>4.打包代码提交至集群运行提交指令:spark-submit --files /usr/local/src/hive/conf/hive-site.xml \ --driver-class-path /usr/local/src/hive/lib/mysql-connector-java-5.1.38.jar \ --master yarn \ --deploy-mode client \ --class CreateCustomerTbl \ /root/Spark-01-1.0-SNAPSHOT.jar 10说明:--files 指定hive-site.xml配置文件位置--driver-class-path 指定mysql驱动位置--master yarn 指定以yarn模式运行--mode-client 以客户端模式运行--class 指定程序运行主类以yarn模式运行时需修改如下配置:修改hadoop yarn-site.xml配置文件,添加以下配置<property> <name>yarn.nodemanager.pmem-check-enabled</name> <value>false</value> </property> <property> <name>yarn.nodemanager.vmem-check-enabled</name> <value>false</value> </property>修改spark-env.sh,添加JAVA_HOME和YARN_CONF_DIRexport JAVA_HOME=jdk所在路径 YARN_CONF_DIR=hadoop配置文件路径(eg:/usr/local/src/hadoop/etc/hadoop)
2022年05月06日
1,834 阅读
0 评论
1 点赞
首页
复制
搜索
前进
后退
重载网页