spark导入elasticsearch

    xiaoxiao2025-08-09  6

    前面简单的介绍了elasticsearch。现在开始使用,当我们面对海量数据的时候,如果想把数据导入到es,肯定不能想以前那样单条导入,我使用的是spark导入到es的批量导入。第三方依赖包:这里写链接内容 可以到这里下载相应的依赖包解压使用。PS:在使用的时候一定要记得spark和scala以及es的对应关系,我使用的是spark1.6.2 ,scala使用的是2.10.4,在使用的时spark-es的jar包elasticsearch-spark_2.10-2.3.3,该版本对应的编译的时候使用的是scala2.10.5.。开始没注意导致出现了一个问题:错误提示为:java.lang.NoSuchMethodError: scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object; 好了,下面开始介绍使用spark-es: 导入依赖包到spark环境变量: spark使用es的时候,需要配置es的相应参数两种方式: 1)、val conf = new SparkConf( ).setAppName( "loadToES" ) conf.set("es.nodes", "192.168.0.23") conf.set("es.port", "9200") conf.set("es.cluster.name","elasticsearch") val sc = new SparkContext( conf ) 2)、val configuration = Map("es.nodes"->"192.168.0.23","es.port"->"9200","es.cluster.name"->"elasticsearch") spark使用es的时候分为导入和读取,spark分别提供如下两个接口: EsSpark和EsSparkSQL,分别使用spark-rdd和spark-DataFrame的方式进行读取和写入 EsSpark-rdd读取: 1)def esRDD(sc: SparkContext): RDD[(String, Map[String, AnyRef])] 2)def esRDD(sc: SparkContext, resource: String, query: String, cfg: Map[String, String]): RDD[(String, Map[String, AnyRef])] 3)def esJsonRDD(sc: SparkContext, resource: String, query: String, cfg: Map[String, String]): RDD[(String, String)] 例如:sc.esRDD(“radio/artists”, “?q=me*”) 查询出create an RDD streaming all the documents matching me* from index radio/artists

    val conf = new JobConf() conf.set(“es.resource”, “radio/artists”) conf.set(“es.query”, “?q=me*”) val esRDD = sc.hadoopRDD(conf, classOf[EsInputFormat[Text, MapWritable]], classOf[Text], classOf[MapWritable])) val docCount = esRDD.count();

    val conf = new Configuration() conf.set(“es.resource”, “radio/artists”) conf.set(“es.query”, “?q=me*”) val esRDD = sc.newAPIHadoopRDD(conf, classOf[EsInputFormat[Text, MapWritable]], classOf[Text], classOf[MapWritable])) val docCount = esRDD.count();

    EsSpark-rdd批量写入: 1)def saveToEs(rdd: RDD[_], resource: String, cfg: Map[String, String]) 2)def saveToEsWithMeta[K,V](rdd: RDD[(K,V)], resource: String, cfg: Map[String, String]) 3)saveJsonToEs(rdd: RDD[_], resource: String, cfg: Map[String, String])

    EsSpark-DataFrame批量写入: 1)def esDF(sc: SQLContext, resource: String, query: String) // Spark 1.3 style val df = sql.load(“spark/index”, “org.elasticsearch.spark.sql”)

    // Spark 1.4 style val df = sql.read.format(“org.elasticsearch.spark.sql”).load(“spark/index”) // Spark 1.5 style val df = sql.read.format(“es”).load(“spark/index”) val sql = new SQLContext… // options for Spark 1.3 need to include the target path/resource val options13 = Map(“path” -> “spark/index”, “pushdown” -> “true”, “es.nodes” -> “someNode”, “es.port” -> “9200”)

    // Spark 1.3 style val spark13DF = sql.load(“org.elasticsearch.spark.sql”, options13)

    // options for Spark 1.4 - the path/resource is specified separately val options = Map(“pushdown” -> “true”, “es.nodes” -> “someNode”, “es.port” -> “9200”)

    // Spark 1.4 style val spark14DF = sql.read.format(“org.elasticsearch.spark.sql”) .options(options).load(“spark/index”)

    pushdown option - specific to Spark data sources

    sqlContext.sql( “CREATE TEMPORARY TABLE myIndex ” + “USING org.elasticsearch.spark.sql ” + “OPTIONS ( resource ‘spark/index’, nodes ‘spark/index’)” ) “

    include

    es.read.field.include = name, address.

    exclude

    es.read.field.exclude = *.created 2)

    def saveToEs(srdd: DataFrame, resource: String, cfg: Map[String, String])

    import org.apache.spark.sql.SQLContext import org.apache.spark.sql.SQLContext._

    import org.elasticsearch.spark.sql._

    val sqlContext = new SQLContext(sc) case class Person(name: String, surname: String, age: Int) val people = sc.textFile(“people.txt”) .map(_.split(“,”)) .map(p => Person(p(0), p(1), p(2).trim.toInt)) .toDF() people.saveToEs(“spark/people”)

    使用案例:

    使用自定义的_id: 方法一: val rdd = sc.textFile("hdfs://master:9000/es.nb").map( x => (Hashing.md5().hashString( x,Charsets.UTF_8 ).asLong(),x)) val relation = sqlContext.createDataFrame( rdd ).toDF("key","val") EsSparkSQL.saveToEs( relation,"es/info",configuration )

    val otp = Map(“iata” -> “OTP”, “name” -> “Otopeni”) val muc = Map(“iata” -> “MUC”, “name” -> “Munich”) val sfo = Map(“iata” -> “SFO”, “name” -> “San Fran”)

    // instance of SparkContext val sc = …

    val airportsRDD = sc.makeRDD(Seq((1, otp), (2, muc), (3, sfo))) pairRDD.saveToEsWithMeta(airportsRDD, “airports/2015”) 此方式实现的是指定_id;

    方法二: import org.elasticsearch.spark.rdd.Metadata._ (1)

    val otp = Map(“iata” -> “OTP”, “name” -> “Otopeni”) val muc = Map(“iata” -> “MUC”, “name” -> “Munich”) val sfo = Map(“iata” -> “SFO”, “name” -> “San Fran”)

    // metadata for each document // note it’s not required for them to have the same structure val otpMeta = Map(ID -> 1, TTL -> “3h”) (2) val mucMeta = Map(ID -> 2, VERSION -> “23”) (3) val sfoMeta = Map(ID -> 3) (4)

    // instance of SparkContext val sc = …

    val airportsRDD = sc.makeRDD(Seq((otpMeta, otp), (mucMeta, muc), (sfoMeta, sfo))) (5) pairRDD.saveToEsWithMeta(airportsRDD, “airports/2015”) (6) (1)导入这个包,即元数据的枚举类型 (2)ID with a value of 1 and TTL with a value of 3h

    其他的方法大家可以尝试一下。小杨也是刚接触elasticsearch,处于摸索阶段,希望大家多多指导。欢迎来访,qq:791279468

    转载请注明原文地址: https://ju.6miu.com/read-1301575.html
    最新回复(0)