kafka使用高级api获取指定数量消息,低级api更好用只不过还未调试通过,后期再跟上.
public Map<String, List<Object>> getMessages(long maxReads, long timeout,
List<String> topics) { stop = false; kafkaConsumer = new KafkaConsumer<>(props); kafkaConsumer.subscribe(topics); Map<String, List<Object>> messages = new HashMap<>(); for (String topic : topics) { messages.put(topic, new ArrayList<Object>()); } Long counts = maxReads; ConsumerRecord<byte[], byte[]> record = null; ConsumerRecords<byte[], byte[]> records = null; try { while (!stop) { if (counts == 0) break; records = kafkaConsumer.poll(timeout); if (records.count() == 0) continue; counts -= records.count(); // 出现消息过剩状况 if (counts < 0) { // 记录将要消费的offsets Map<String, Map<Integer, Long>> commitMap = new HashMap<>(); Iterator<ConsumerRecord<byte[], byte[]>> itt = records .iterator(); while (itt.hasNext()) { record = itt.next(); if (commitMap.containsKey(record.topic())) { if (!commitMap.get(record.topic()).containsKey( record.partition())) { commitMap.get(record.topic()).put( record.partition(), record.offset()); } else continue; } else { Map<Integer, Long> m = new HashMap<>(); m.put(record.partition(), record.offset()); commitMap.put(record.topic(), m); } } Iterator<ConsumerRecord<byte[], byte[]>> it = records .iterator(); for (int i = 0; i < counts + records.count(); i++) { record = it.next(); commitMap.get(record.topic()).put(record.partition(), record.offset() + 1); messages.get(record.topic()).add( kryo.deserialize( record.value(), TopicMapperUtil.getValueClass( record.topic(), mapper))); } Map<TopicPartition, OffsetAndMetadata> map = new HashMap<>(); TopicPartition partition = null; OffsetAndMetadata offset = null; for (Entry<String, Map<Integer, Long>> entry : commitMap .entrySet()) { for (Entry<Integer, Long> en : entry.getValue() .entrySet()) { partition = new TopicPartition(entry.getKey(), en.getKey());offset = new OffsetAndMetadata(en.getValue());
//重置下一次poll的offset
kafkaConsumer.seek(partition, offset.offset()); map.put(partition, offset); } } kafkaConsumer.commitSync(map); break; } kafkaConsumer.commitSync(); for (ConsumerRecord<byte[], byte[]> tmpRecord : records) { messages.get(tmpRecord.topic()).add( kryo.deserialize( tmpRecord.value(), TopicMapperUtil.getValueClass( tmpRecord.topic(), mapper))); } } } finally { kafkaConsumer.close(); kafkaConsumer = null; } return messages; }