写了一个小的项目,自定义java对象,发送到kafka
kafka安装搭建这里就不在描述了,解压简单配置即可
直接进入正题吧
一. 自定义java对象,并实现序列化,省略get,set方法 public class Document implements Serializable { private String title; private String content; private String id; private String date; private long updatetime; public byte[] toBytes(){ ByteArrayOutputStream bo = new ByteArrayOutputStream(); ObjectOutputStream oos = null; try { oos = new ObjectOutputStream(bo); oos.writeObject(this); oos.flush(); oos.close(); bo.close(); } catch (IOException e) { e.printStackTrace(); } return bo.toByteArray(); } public Document toDocument(byte[] bytes){ Document document = null; try { ByteArrayInputStream bis = new ByteArrayInputStream (bytes); ObjectInputStream ois = new ObjectInputStream (bis); document = (Document) ois.readObject(); ois.close(); bis.close(); } catch (IOException ex) { ex.printStackTrace(); } catch (ClassNotFoundException ex) { ex.printStackTrace(); } return document; } }二. 自定义Encoder
import java.util.Map; import org.apache.kafka.common.serialization.Serializer; import com.thunisoft.data.domain.Document; public class DocumentEncoder implements Serializer<Document> { @Override public void configure(Map<String, ?> map, boolean b) { } @Override public byte[] serialize(String s, Document document) { return document.toBytes(); } @Override public void close() { } } 三. producer实现 import java.io.IOException; import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.stereotype.Service; import com.thunisoft.data.domain.Document; import com.thunisoft.data.fy.api.kafka.DocumentProducer; import com.thunisoft.data.fy.api.kafka.domain.DocumentEncoder; import com.thunisoft.data.fy.constant.Constants; public class DocumentProducer { private static Properties props; private static KafkaProducer<String, Document> producer; static { if (props == null){ props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Constants.KAFKA_METDATA_BROKERS); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, DocumentEncoder.class.getName()); //自定义分区 // props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DocumentPartitioner.class.getName()); } producer = new KafkaProducer<String, Document>(props); } public void produce(Document document) throws IOException { producer.send(new ProducerRecord<String, Document>(Constants.TOPIC, document)); } } 四. 自定义Decoder import com.bigdata.frame.data.Document; import kafka.serializer.Decoder; public class DocumentDecoder implements Decoder<Document>{ @Override public Document fromBytes(byte[] bytes) { Document document = new Document(); return document.toDocument(bytes); } } 五. 编写Consumer import com.bigdata.frame.constant.Constants; import com.bigdata.frame.kafka.DocumentConsumer; import com.bigdata.frame.kafka.domain.DocumentDecoder; import kafka.consumer.ConsumerConfig; import kafka.javaapi.consumer.ConsumerConnector; import java.util.Properties; public class DocumentConsumer { private static Properties props; private static ConsumerConnector consumer; static{ if(props == null){ props = new Properties(); //zookeeper 配置 props.put("zookeeper.connect", Constants.ZOOKEERER_CONNECT); //group 代表一个消费组 props.put("group.id", Constants.KAFKA_GROUP_ID); //指定客户端连接zookeeper的最大超时时间 props.put("zookeeper.connection.timeout.ms", Constants.ZOOLEEPER_CONNECT_SESSION_TOMEOUT); //rebalance.max.retries * rebalance.backoff.ms > zookeeper.session.timeout.ms // 连接zk的session超时时间 props.put("zookeeper.session.timeout.ms", Constants.ZOOKEEPER_SESSION_TIMEOUT); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "smallest"); props.put("rebalance.max.retries", "5"); props.put("rebalance.backoff.ms", "1200"); //序列化类 props.put("serializer.class", DocumentDecoder.class.getName()); } } public ConsumerConnector getConsumer(){ if(consumer == null){ consumer = consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(props)); } return consumer; } }六. 也可自定义partitioner规则 import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import java.util.Map; public class DocumentPartitioner implements Partitioner { @Override public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) { return 0; } @Override public void close() { } @Override public void configure(Map<String, ?> map) { } } 七, 测试代码 public static void main(String[] args){ DocumentProducer producer = new DocumentProducer(); Document documentnt = new Document(); documentnt.setTitle("测试"); documentnt.setContent("这是Producer测试"); producer.produce(documentnt); } public static void main(String[] args) { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, 1); StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties()); DocumentDecoder valueDecoder = new DocumentDecoder(); Map<String, List<KafkaStream<String, Document>>> consumerMap = consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder); KafkaStream<String, Document> stream = consumerMap.get(topic).get(0); ConsumerIterator<String, Document> it = stream.iterator(); while (it.hasNext()) { Document document = it.next().message(); System.out.println(document.toString()); } }
