Spark抽取MySQL数据到Hive
1.添加Maven依赖
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/java</sourceDirectory>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.3.2</version>
<executions>
<execution>
<id>compile-scala</id>
<phase>compile</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile-scala</id>
<phase>test-compile</phase>
<goals>
<goal>add-source</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
2.配置Hive-site.xml
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://master:3306/hive?createDatabaseIfNotExist=true&useSSL=false</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>数据库用户名</value>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>数据库密码</value>
</property>
<property>
<name>hive.cli.print.header</name>
<value>true</value>
</property>
<property>
<name>hive.cli.print.current.db</name>
<value>true</value>
</property>
<property>
<name>hive.metastore.uris</name>
<value>thrift://master:9083</value>
</property>
3.编写scala代码抽取数据
<div>import org.apache.spark.sql.{DataFrame, SparkSession}
object CreateCustomerTbl {
def main(args: Array[String]): Unit = {
val sqlSession: SparkSession = SparkSession.builder()
.master("local[*]") //指定运行模式,本地调试时使用,打包时应删除,可在集群上指定模式
.appName("createCustpmerTable")
.config("hive.metastore.uris", "thrift://localhost:9083") //配置Hive元数据访问
.enableHiveSupport() //开启hive支持
.getOrCreate()
val df: DataFrame = sqlSession.read.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/test")
.option("driver", "com.mysql.jdbc.Driver")
.option("user", "root")
.option("password", "123456")
.option("dbtable", "customer")
.load()
df.createOrReplaceTempView("mysql_customer") //在内存中建立临时表
df.show()
sqlSession.sql("use ods") //使用Hive的ods库
//建立分区表
sqlSession.sql(
"""
|create table customer(
|custkey string,
|name string,
|address string,
|nationkey string,
|phone string,
|acctbal string,
|mktsegment string,
|comment string)
|partitioned by (date string)
|""".stripMargin)
//抽取全量数据到customer表20220503分区中
sqlSession.sql(
"""
|insert overwrite table customer partition (date = "20220503") select * from mysql_customer
|""".stripMargin)
sqlSession.close()
}
}</div>
4.打包代码提交至集群运行
提交指令:
spark-submit
--files /usr/local/src/hive/conf/hive-site.xml \
--driver-class-path /usr/local/src/hive/lib/mysql-connector-java-5.1.38.jar \
--master yarn \
--deploy-mode client \
--class CreateCustomerTbl \
/root/Spark-01-1.0-SNAPSHOT.jar 10
说明:
--files 指定hive-site.xml配置文件位置
--driver-class-path 指定mysql驱动位置
--master yarn 指定以yarn模式运行
--mode-client 以客户端模式运行
--class 指定程序运行主类
以yarn模式运行时需修改如下配置:
修改hadoop yarn-site.xml配置文件,添加以下配置
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
修改spark-env.sh,添加JAVA_HOME和YARN_CONF_DIR
export JAVA_HOME=jdk所在路径
YARN_CONF_DIR=hadoop配置文件路径(eg:/usr/local/src/hadoop/etc/hadoop)