spark streaming 实现kafka的createDirectStream方式!!不坑

    xiaoxiao2021-03-25  89

    网上搜了很多spark streaming 用createDirectStream方式消费kafka的,信息是有很多,但是照着做都遇到了坑,最大的坑就是KafkaCluster是private的!根本就new不了,折腾了一会终于搞定了,也不复杂

    1. 新建一个包org.apache.spark.streaming.kafka,就是在你的project建一个这个目录的包,在这个包下面的类里,就可以new出KafkaCluster了!

    2. new出KafkaCluster,后面的就都是小问题了,但是网上给的例子都太复杂,又或者太简单,我们只需要实现重启接着上次的offset消费,消费完保存offset这么简单的功能就行了。

    下面是我根据网上找的精简的代码

    class KafkaManager(val kafkaParams: HashMap[String, String]) extends Serializable { private val kc = new KafkaCluster(kafkaParams) /** * 创建数据流 * * @param ssc * @param kafkaParams * @param topics * @tparam K * @tparam V * @tparam KD * @tparam VD * @return */ def createDirectStream[K: ClassTag, V: ClassTag, KD <: Decoder[K] : ClassTag, VD <: Decoder[V] : ClassTag]( ssc: StreamingContext, kafkaParams: HashMap[String, String], topics: Set[String]): InputDStream[(K, V)] = { val groupId = kafkaParams.get("group.id").get //从zookeeper上读取offset开始消费message // val messages = { val partitionsE = kc.getPartitions(topics) if (partitionsE.isLeft) throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}") val partitions = partitionsE.right.get val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions) if (!consumerOffsetsE.isLeft) { val consumerOffsets = consumerOffsetsE.right.get KafkaUtils.createDirectStream[K, V, KD, VD, (K, V)]( ssc, kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)) } else { val p = kafkaParams + ("auto.offset.reset" -> "largest") KafkaUtils.createDirectStream(ssc, p, topics) } // } // messages } /** * 更新消费offsets * * @param rdd */ def updateZKOffsets(rdd: RDD[(String, String)]): Unit = { val groupId = kafkaParams.get("group.id").get val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges for (offsets <- offsetsList) { val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition) val o = kc.setConsumerOffsets(groupId, HashMap((topicAndPartition, offsets.untilOffset))) if (o.isLeft) { println(s"Error updating the offset to Kafka cluster: ${o.left.get}") } } } }

    只有两个方法 cteate 和 update

    create方法比较简单,当保存的offset在kafka上不存在时会出异常,因为我们集群kafka保留时间是7天,也就是说我程序停了7天再启动才会报错,都停7天了,肯定是使用新的groupid了,也没多大影响,想实现比较完善的话,可以百度下,有复杂的实现。

    转载请注明原文地址: https://ju.6miu.com/read-13646.html

    最新回复(0)