Spark抽取MySQL数据到Hive

Spark抽取MySQL数据到Hive

清泓
2022-05-06 / 0 评论 / 1,834 阅读 / 5184字 / 正在检测是否收录...

Spark抽取MySQL数据到Hive

1.添加Maven依赖

<dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.38</version>
        </dependency>
    </dependencies>

<build>
        <sourceDirectory>src/main/java</sourceDirectory>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.3.2</version>
                <executions>
                    <execution>
                        <id>compile-scala</id>
                        <phase>compile</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>test-compile-scala</id>
                        <phase>test-compile</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

2.配置Hive-site.xml

<property>
        <name>javax.jdo.option.ConnectionURL</name>
        <value>jdbc:mysql://master:3306/hive?createDatabaseIfNotExist=true&amp;useSSL=false</value>
</property>
<property>
        <name>javax.jdo.option.ConnectionDriverName</name>
        <value>com.mysql.jdbc.Driver</value>
</property>
<property>
        <name>javax.jdo.option.ConnectionUserName</name>
        <value>数据库用户名</value>
</property>
<property>
        <name>javax.jdo.option.ConnectionPassword</name>
        <value>数据库密码</value>
</property>
<property>
        <name>hive.cli.print.header</name>
        <value>true</value>
</property>
<property>
        <name>hive.cli.print.current.db</name>
        <value>true</value>
</property>
<property>
        <name>hive.metastore.uris</name>
        <value>thrift://master:9083</value>
</property>

3.编写scala代码抽取数据

<div>import org.apache.spark.sql.{DataFrame, SparkSession}

object CreateCustomerTbl {

  def main(args: Array[String]): Unit = {

    val sqlSession: SparkSession = SparkSession.builder()
      .master("local[*]")     //指定运行模式,本地调试时使用,打包时应删除,可在集群上指定模式
      .appName("createCustpmerTable")
      .config("hive.metastore.uris", "thrift://localhost:9083")        //配置Hive元数据访问
      .enableHiveSupport()        //开启hive支持
      .getOrCreate()

    val df: DataFrame = sqlSession.read.format("jdbc")
      .option("url", "jdbc:mysql://localhost:3306/test")
      .option("driver", "com.mysql.jdbc.Driver")
      .option("user", "root")
      .option("password", "123456")
      .option("dbtable", "customer")
      .load()

    df.createOrReplaceTempView("mysql_customer")     //在内存中建立临时表
    df.show()

    sqlSession.sql("use ods") //使用Hive的ods库

      //建立分区表
    sqlSession.sql(
      """
        |create table customer(
        |custkey                 string,
        |name                    string,
        |address                 string,
        |nationkey               string,
        |phone                   string,
        |acctbal                 string,
        |mktsegment              string,
        |comment                 string)
        |partitioned  by (date string)
        |""".stripMargin)

      //抽取全量数据到customer表20220503分区中
    sqlSession.sql(
      """
        |insert overwrite table customer partition (date = "20220503") select * from mysql_customer
        |""".stripMargin)

    sqlSession.close()
  }
}</div>

4.打包代码提交至集群运行

提交指令:

spark-submit
    --files /usr/local/src/hive/conf/hive-site.xml    \
    --driver-class-path /usr/local/src/hive/lib/mysql-connector-java-5.1.38.jar \
    --master yarn \
    --deploy-mode client \
    --class CreateCustomerTbl \
    /root/Spark-01-1.0-SNAPSHOT.jar 10

说明:
--files 指定hive-site.xml配置文件位置
--driver-class-path 指定mysql驱动位置
--master yarn 指定以yarn模式运行
--mode-client 以客户端模式运行
--class 指定程序运行主类

以yarn模式运行时需修改如下配置:

修改hadoop yarn-site.xml配置文件,添加以下配置

<property>
    <name>yarn.nodemanager.pmem-check-enabled</name>
    <value>false</value>
</property>

<property>
    <name>yarn.nodemanager.vmem-check-enabled</name>
    <value>false</value>
</property>

修改spark-env.sh,添加JAVA_HOME和YARN_CONF_DIR

export JAVA_HOME=jdk所在路径
YARN_CONF_DIR=hadoop配置文件路径(eg:/usr/local/src/hadoop/etc/hadoop)
1

打赏

评论

博主关闭了当前页面的评论