Spark取到Kafka,出现ZK和Kafka offset不一致

    xiaoxiao2021-03-25  65

    在项目中用到Spark Streaming读取Kafka,应用的是Kafka的low level的API因此手动的把Offset存储到ZK(每次执行成功后,才更新zk中的offset信息)当中,但是如果出现Kafka出现网络问题或者ZK没有写入到情况就会出现ZK的offset和Kafka的offset不一致。此时就要对比Kafka和ZK中的Offset

    PS:另外spark中也可以做checkpoint来保存state

    Using checkpointsKeeping track of the offsets that have been processed. 另外it takes time for Spark to prepare them and store them checkpoint比较耗时(平均时间3S做checkpoint) 墙裂推荐:http://aseigneurin.github.io/2016/05/07/spark-kafka-achieving-zero-data-loss.html

    逻辑: 如果ZK中的offset小于 EarliestOffset 大于LastestOffset说明ZK中的offset已经失效,把ZK中的offset更新为EarliestOffset;如果ZK的offset在 EarliestOffset 大于LastestOffset之间那么以ZK的offset为准

    KafkaUtil (SimpleConsumer从Kafka读取offset)

    public class KafkaUtil implements Serializable { private static final long serialVersionUID = -7708717328840L; private static KafkaUtil kafkaUtil = null; private KafkaUtil() { } public static KafkaUtil getInstance() { if (kafkaUtil == null) { synchronized (KafkaUtil.class) { if (kafkaUtil == null) { kafkaUtil = new KafkaUtil(); } } } return kafkaUtil; } /** * 从brokerList中获取host * * @param brokerList * @return */ public String[] getHostFromBrokerList(String brokerList) { String[] brokers = brokerList.split(","); for (int i = 0; i < brokers.length; i++) { brokers[i] = brokers[i].split(":")[0]; } return brokers; } /** * 从brokerList中获取port * * @param brokerList * @return */ public Map<String, Integer> getPortFromBrokerList(String brokerList) { Map<String, Integer> portMap = new HashMap<String, Integer>(); String[] brokers = brokerList.split(","); for (int i = 0; i < brokers.length; i++) { String host = brokers[i].split(":")[0]; Integer port = Integer.valueOf(brokers[i].split(":")[1]); portMap.put(host, port); } return portMap; } public KafkaTopicOffset topicAndMetadataRequest(String brokerList, String topic) { List<String> topics = Collections.singletonList(topic); TopicMetadataRequest topicMetadataRequest = new TopicMetadataRequest(topics); KafkaTopicOffset kafkaTopicOffset = new KafkaTopicOffset(topic); String[] hosts = getHostFromBrokerList(brokerList); Map<String, Integer> portMap = getPortFromBrokerList(brokerList); for (String host : hosts) { SimpleConsumer simpleConsumer = null; try { simpleConsumer = new SimpleConsumer(host, portMap.get(host), Constant.TIME_OUT, Constant.BUFFERSIZE, Constant.groupId); kafka.javaapi.TopicMetadataResponse response = simpleConsumer.send(topicMetadataRequest); List<TopicMetadata> topicMetadatas = response.topicsMetadata(); for (TopicMetadata metadata : topicMetadatas) { for (PartitionMetadata partitionMetadata : metadata.partitionsMetadata()) { kafkaTopicOffset.getLeaderList().put(partitionMetadata.partitionId(), partitionMetadata.leader().host()); kafkaTopicOffset.getOffsetList().put(partitionMetadata.partitionId(), 0L); } } } catch (Exception e) { e.printStackTrace(); } finally { if (simpleConsumer != null) { simpleConsumer.close(); } } } return kafkaTopicOffset; } /** * 从Kafka取出某个topic中某个partition的最小或者最大offset * * @param brokerList * @param topic * @return */ public KafkaTopicOffset getOffset(String brokerList, String topic, String flag) { KafkaTopicOffset kafkaTopicOffset = topicAndMetadataRequest(brokerList, topic); String[] hosts = getHostFromBrokerList(brokerList); Map<String, Integer> portMap = getPortFromBrokerList(brokerList); for (String host : hosts) { Iterator iterator = kafkaTopicOffset.getOffsetList().entrySet().iterator(); SimpleConsumer simpleConsumer = null; try { simpleConsumer = new SimpleConsumer(host, portMap.get(host), Constant.TIME_OUT, Constant.BUFFERSIZE, Constant.groupId); while (iterator.hasNext()) { Map.Entry<Integer, Long> entry = (Map.Entry<Integer, Long>) iterator.next(); int partitionId = entry.getKey(); //判断当前的host是否为leader if (!kafkaTopicOffset.getLeaderList().get(partitionId).equals(partitionId)) { continue; } TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionId); Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfoMap = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); if (flag.equals(Constant.EARLIEST_OFFSET)) { requestInfoMap.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.EarliestTime(), 1)); } else if (flag.equals(Constant.LATEST_OFFSET)) { requestInfoMap.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 1)); } OffsetRequest offsetRequest = new OffsetRequest(requestInfoMap, kafka.api.OffsetRequest.CurrentVersion(), Constant.groupId); OffsetResponse offsetResponse = simpleConsumer.getOffsetsBefore(offsetRequest); long[] offset = offsetResponse.offsets(topic, partitionId); if (offset.length > 0) { kafkaTopicOffset.getOffsetList().put(partitionId, offset[0]); } } } catch (Exception e) { e.printStackTrace(); } finally { if (simpleConsumer != null) { simpleConsumer.close(); } } } return kafkaTopicOffset; } }
    转载请注明原文地址: https://ju.6miu.com/read-35297.html

    最新回复(0)