kafka使用高级api获取指定数量消息

    xiaoxiao2025-07-15  6

    kafka使用高级api获取指定数量消息,低级api更好用只不过还未调试通过,后期再跟上.

    public Map<String, List<Object>> getMessages(long maxReads, long timeout,

                List<String> topics) {         stop = false;         kafkaConsumer = new KafkaConsumer<>(props);         kafkaConsumer.subscribe(topics);         Map<String, List<Object>> messages = new HashMap<>();         for (String topic : topics) {             messages.put(topic, new ArrayList<Object>());         }         Long counts = maxReads;         ConsumerRecord<byte[], byte[]> record = null;         ConsumerRecords<byte[], byte[]> records = null;         try {             while (!stop) {                 if (counts == 0)                     break;                 records = kafkaConsumer.poll(timeout);                 if (records.count() == 0)                     continue;                 counts -= records.count();                 // 出现消息过剩状况                 if (counts < 0) {                     // 记录将要消费的offsets                     Map<String, Map<Integer, Long>> commitMap = new HashMap<>();                     Iterator<ConsumerRecord<byte[], byte[]>> itt = records                             .iterator();                     while (itt.hasNext()) {                         record = itt.next();                         if (commitMap.containsKey(record.topic())) {                             if (!commitMap.get(record.topic()).containsKey(                                     record.partition())) {                                 commitMap.get(record.topic()).put(                                         record.partition(), record.offset());                             } else                                 continue;                         } else {                             Map<Integer, Long> m = new HashMap<>();                             m.put(record.partition(), record.offset());                             commitMap.put(record.topic(), m);                         }                     }                     Iterator<ConsumerRecord<byte[], byte[]>> it = records                             .iterator();                     for (int i = 0; i < counts + records.count(); i++) {                         record = it.next();                         commitMap.get(record.topic()).put(record.partition(),                                 record.offset() + 1);                         messages.get(record.topic()).add(                                 kryo.deserialize(                                         record.value(),                                         TopicMapperUtil.getValueClass(                                                 record.topic(), mapper)));                     }                     Map<TopicPartition, OffsetAndMetadata> map = new HashMap<>();                     TopicPartition partition = null;                     OffsetAndMetadata offset = null;                     for (Entry<String, Map<Integer, Long>> entry : commitMap                             .entrySet()) {                         for (Entry<Integer, Long> en : entry.getValue()                                 .entrySet()) {                             partition = new TopicPartition(entry.getKey(),                                     en.getKey());

                                offset = new OffsetAndMetadata(en.getValue());

                                //重置下一次poll的offset

                                kafkaConsumer.seek(partition, offset.offset());                             map.put(partition, offset);                         }                     }                     kafkaConsumer.commitSync(map);                     break;                 }                 kafkaConsumer.commitSync();                 for (ConsumerRecord<byte[], byte[]> tmpRecord : records) {                     messages.get(tmpRecord.topic()).add(                             kryo.deserialize(                                     tmpRecord.value(),                                     TopicMapperUtil.getValueClass(                                             tmpRecord.topic(), mapper)));                 }             }         } finally {             kafkaConsumer.close();             kafkaConsumer = null;         }         return messages;     }
    转载请注明原文地址: https://ju.6miu.com/read-1300726.html
    最新回复(0)