Spark SQL访问Hive,MySQL

    xiaoxiao2021-03-25  137

    一: 版本

            搭建好的Hadoop环境,Hive环境,Spark环境。本文Hadoop版本为 Hadoop-2.6.4,Hive版本为Hive-2.0.0,Spark版本为spark-1.6.1-bin-hadoop2.6。

    二: 配置spark-env.sh

            在 SPARK_HOME/conf/spark-env.sh 中配置以下内容:  

                export SCALA_HOME=/mysoftware/scala-2.11.8

                export JAVA_HOME=/mysoftware/jdk1.7.0_80

                export SPARK_MASTER_IP=master

                export SPARK_WORKER_MEMORY=512m

                export master=spark://master:7077

            另外往上很多资料都添加了如下两行内容,即:

      export CLASSPATH=$CLASSPATH:/mysoftware/spark-1.6.1/lib   export SPARK_CLASSPATH=$SPARK_CLASSPATH:/mysoftware/spark-1.6.1/lib/mysql-connector-java-5.1.5-bin.jar

            在这里spark-env.sh中并没有添加如上两行内容,因为Spark1.0+版本已经将这个否决了,所以在此没有添加,可以看到在启动spark-shell出现如下信息,即:

    SPARK_CLASSPATH was detected (set to ':/mysoftware/spark-1.6.1/lib/mysql-connector-java-5.1.5-bin.jar'). This is deprecated in Spark 1.0+.

    Please instead use:  - ./spark-submit with --driver-class-path to augment the driver classpath  - spark.executor.extraClassPath to augment the executor classpath

    三:配置spark-defaults.sh

            首先将SPARK_HOME/conf/spark-defaults.conf.template 拷贝(cp)一份为 spark-defaults.conf ,然后可以看到该文件中已告知众多配置信息都是默认的即default。所以本文并没有修改,如需要修改,请修改成与自己环境相符合的。

            另外,网上很多资料在该文件中内容添加了如下内容,即:

    spark.executor.extraClassPath    /mysoftware/spark-1.6.1/lib/mysql-connector-java-5.1.5-bin.jar spark.driver.extraClassPath    /mysoftware/spark-1.6.1/lib/mysql-connector-java-5.1.5-bin.jar

    结果启动spark-shell时,出现了WARN,原因是设置了上面两行内容。Setting。

    四: 添加mysql的驱动jar包

           将mysql-connector-java-5.1.5-bin.jar 添加到 SPARK_HOME/lib/目录下

    五: 添加SPARK_HOME/conf目录下文件

            将 hive-site.xml , core-site.xml(为安全起见),hdfs-site.xml(为HDFS配置)拷贝一份至 SPARK_HOME/conf目录下。

            官网介绍:

            Configuration of Hive is done by placing your hive-site.xml, core-site.xml (for security configuration), hdfs-site.xml (for HDFS configuration) file in conf/. Please note when running the query on a YARN cluster (cluster mode), the datanucleus jars under the lib directory and hive-site.xml under conf/ directory need to be available on the driver and all executors launched by the YARN cluster. The convenient way to do this is adding them through the --jars option and --file option of the spark-submit command.

    六:Spark SQL 访问Hive

    6.1   第一种方式 启动spark-shell:

    bin/spark-shell --driver-class-path /mysoftware/spark-1.6.1/lib/mysql-connector-java-5.1.5-bin.jar

    hadoop@master:/mysoftware/spark-1.6.1$ bin/spark-shell --driver-class-path /mysoftware/spark-1.6.1/lib/mysql-connector-java-5.1.5-bin.jar log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. Using Spark's repl log4j profile: org/apache/spark/log4j-defaults-repl.properties To adjust logging level use sc.setLogLevel("INFO") Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.6.1 /_/ Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_80) Type in expressions to have them evaluated. Type :help for more information. Spark context available as sc. 16/06/06 18:56:11 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies) 16/06/06 18:56:12 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies) 16/06/06 18:56:20 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0 16/06/06 18:56:20 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException 16/06/06 18:56:28 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies) 16/06/06 18:56:28 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies) 16/06/06 18:56:31 ERROR ObjectStore: Version information found in metastore differs 2.0.0 from expected schema version 1.2.0. Schema verififcation is disabled hive.metastore.schema.verification so setting version. SQL context available as sqlContext. scala>

    运行如下命令,即:

    scala> sc res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@631a8160

    scala> val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) sqlContext: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@3a957b9e

    scala> sqlContext.sql("CREATE TABLE IF NOT EXISTS sparkhive (key INT, value STRING)") res1: org.apache.spark.sql.DataFrame = [result: string]

    当运行完上述第三条命令后,创建的表 sparkhive,能够在hive中查询到,即:

    hive> show tables; OK hbase_person hivehbase hivehbase_person hivehbase_student multiplehive sparkhive testhive testsparkhive Time taken: 1.154 seconds, Fetched: 8 row(s)

     现在往表中添加数据和查看数据,即:

    scala> sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE sparkhive");

    scala>  sqlContext.sql("FROM sparkhive SELECT key, value").collect()  

    'examples/src/main/resources/kv1.txt' --》该路径在安装包中有。

    scala> sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE sparkhive"); res2: org.apache.spark.sql.DataFrame = [result: string] scala> sqlContext.sql("FROM sparkhive SELECT key, value").collect() res3: Array[org.apache.spark.sql.Row] = Array([238,val_238], [86,val_86], [311,val_311], [27,val_27], [165,val_165], [409,val_409], [255,val_255], [278,val_278], [98,val_98], [484,val_484], [265,val_265], [193,val_193], [401,val_401], [150,val_150], [273,val_273], [224,val_224], [369,val_369], [66,val_66], [128,val_128], [213,val_213], [146,val_146], [406,val_406], [429,val_429], [374,val_374], [152,val_152], [469,val_469], [145,val_145], [495,val_495], [37,val_37], [327,val_327], [281,val_281], [277,val_277], [209,val_209], [15,val_15], [82,val_82], [403,val_403], [166,val_166], [417,val_417], [430,val_430], [252,val_252], [292,val_292], [219,val_219], [287,val_287], [153,val_153], [193,val_193], [338,val_338], [446,val_446], [459,val_459], [394,val_394], [237,val_237], [482,val_482], ... scala>

    也可以在hive中通过 seelct * from sparkhive查看数据。  

    6.2     第二种方式 启动spark-shell:

        SPARK_CLASSPATH=$SPARK_CLASSPATH:/mysoftware/spark-1.6.1/lib/mysql-connector-java-5.1.5-bin.jar bin/spark-shell     

    但是会出现 一些 WARN 信息,如下:(建议第一种方式启动)

    hadoop@master:/mysoftware/spark-1.6.1$ SPARK_CLASSPATH=$SPARK_CLASSPATH:/mysoftware/spark-1.6.1/lib/mysql-connector-java-5.1.5-bin.jar bin/spark-shell log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. Using Spark's repl log4j profile: org/apache/spark/log4j-defaults-repl.properties To adjust logging level use sc.setLogLevel("INFO") Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.6.1 /_/ Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_80) Type in expressions to have them evaluated. Type :help for more information. 16/06/06 19:14:10 WARN SparkConf: SPARK_CLASSPATH was detected (set to ':/mysoftware/spark-1.6.1/lib/mysql-connector-java-5.1.5-bin.jar'). This is deprecated in Spark 1.0+. Please instead use: - ./spark-submit with --driver-class-path to augment the driver classpath - spark.executor.extraClassPath to augment the executor classpath 16/06/06 19:14:10 WARN SparkConf: Setting 'spark.executor.extraClassPath' to ':/mysoftware/spark-1.6.1/lib/mysql-connector-java-5.1.5-bin.jar' as a work-around. 16/06/06 19:14:10 WARN SparkConf: Setting 'spark.driver.extraClassPath' to ':/mysoftware/spark-1.6.1/lib/mysql-connector-java-5.1.5-bin.jar' as a work-around. Spark context available as sc. 16/06/06 19:14:26 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies) 16/06/06 19:14:27 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies) 16/06/06 19:14:35 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0 16/06/06 19:14:35 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException 16/06/06 19:14:39 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies) 16/06/06 19:14:39 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies) SQL context available as sqlContext. scala>

     

    七:Spark SQL 访问MySQL

        同样以上述第一种方式启动 spark-shell。

        注意以下参数的书写:

            "url" -> "jdbc:mysql://192.168.226.129:3306/hive?user=hive&password=xujun",

            --  (远程端连接mysql的url地址加用户名与密码企图连接hive数据库)

            "dbtable" -> "hive.TBLS",   (这里用的是 hive数据库中原本存在的一张表 TBLS )

            "driver" -> "com.mysql.jdbc.Driver" ( 驱动 )

             7.1    第一种方式,通过 sqlContext.read.format("jdbc").options("xxxx") 加载数据,  (中途产生了一个DataFrameReader对象,详情可参见API)

     

         val jdbcDF = sqlContext.read.format("jdbc").options( Map("url" -> "jdbc:mysql://192.168.226.129:3306/hive?user=hive&password=xujun", "dbtable" -> "hive.TBLS","driver" -> "com.mysql.jdbc.Driver")).load()

        具体信息如下:

     scala> val jdbcDF = sqlContext.read.format("jdbc").options( Map("url" -> "jdbc:mysql://192.168.226.129:3306/hive?user=hive&password=xujun", "dbtable" -> "hive.TBLS","driver" -> "com.mysql.jdbc.Driver")).load() jdbcDF: org.apache.spark.sql.DataFrame = [TBL_ID: bigint, CREATE_TIME: int, DB_ID: bigint, LAST_ACCESS_TIME: int, OWNER: string, RETENTION: int, SD_ID: bigint, TBL_NAME: string, TBL_TYPE: string, VIEW_EXPANDED_TEXT: string, VIEW_ORIGINAL_TEXT: string] scala> jdbcDF.show() +------+-----------+-----+----------------+------+---------+-----+-----------------+--------------+------------------+------------------+ |TBL_ID|CREATE_TIME|DB_ID|LAST_ACCESS_TIME| OWNER|RETENTION|SD_ID|         TBL_NAME|      TBL_TYPE|VIEW_EXPANDED_TEXT|VIEW_ORIGINAL_TEXT| +------+-----------+-----+----------------+------+---------+-----+-----------------+--------------+------------------+------------------+ |    11| 1464510462|    1|               0|  hive|        0|   11|         testhive| MANAGED_TABLE|              null|              null| |    22| 1464513715|    1|               0|hadoop|        0|   22|        hivehbase| MANAGED_TABLE|              null|              null| |    23| 1464517000|    1|               0|hadoop|        0|   23|     hbase_person|EXTERNAL_TABLE|              null|              null| |    24| 1464517563|    1|               0|hadoop|        0|   24|hivehbase_student|EXTERNAL_TABLE|              null|              null| |    29| 1464521014|    1|               0|hadoop|        0|   29|     multiplehive| MANAGED_TABLE|              null|              null| |    36| 1464522011|    1|               0|hadoop|        0|   36| hivehbase_person| MANAGED_TABLE|              null|              null| |    41| 1465227955|    1|               0|hadoop|        0|   41|    testsparkhive| MANAGED_TABLE|              null|              null| |    46| 1465264720|    1|               0|hadoop|        0|   46|        sparkhive| MANAGED_TABLE|              null|              null| +------+-----------+-----+----------------+------+---------+-----+-----------------+--------------+------------------+------------------+

    7.2    第二种方式,通过 sqlContext.load("jdbc","xxxx")来加载数据即:

        val jdbcDF = sqlContext.load( "jdbc",Map("url" -> "jdbc:mysql://192.168.226.129:3306/hive?user=hive&password=xujun", "dbtable" -> "hive.TBLS","driver" -> "com.mysql.jdbc.Driver"))

     

        显示数据:

        jdbcDF.show()

        具体信息跟上述第一种方式一样。

    转载请注明原文地址: https://ju.6miu.com/read-2677.html

    最新回复(0)