一、安装前准备
1、kafka版本:kafka_2.10-0.10.1.0.tgz 2、zookeeper版本:zookeeper-3.4.3.tar.gz 3、zookeeper集群: 192.168.1.108:2181,192.168.1.109:2181,192.168.1.110:21814、hosts文件中主机与ip映射关系
192.168.1.108 master 192.168.1.109 slave1 192.168.1.110 slave2二、zookeeper环境搭建
具体的zookeeper环境搭建请参考:
http://blog.csdn.net/liuchuanhong1/article/details/53192618
三、Kafka环境搭建
1、下载tar包
下载地址如下:
https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.1.0/kafka_2.10-0.10.1.0.tgz
2、解压到/usr/kafka目录下
解压命令如下:
tar -zxvf kafka_2.10-0.10.1.0.tgz3、解压后目录结构
四、修改配置文件
1、修改server.properties
修改配置文件如下:
broker.id=0 host.name=192.168.1.108 zookeeper.connect=slave1:2181,master:2181,slave2:2181五、启动kafka服务
1、启动zookeeper集群
2、进入/bin目录下,输入如下命令,启动kafka服务
./kafka-server-start.sh ../config/server.properties启动之后,可以看到如下的配置信息:
[root@192 bin]# ./kafka-server-start.sh ../config/server.properties [2016-11-18 10:50:51,067] INFO KafkaConfig values: advertised.host.name = null advertised.listeners = null advertised.port = null authorizer.class.name = auto.create.topics.enable = true auto.leader.rebalance.enable = true background.threads = 10 broker.id = 0 broker.id.generation.enable = true broker.rack = null compression.type = producer connections.max.idle.ms = 600000 controlled.shutdown.enable = true controlled.shutdown.max.retries = 3 controlled.shutdown.retry.backoff.ms = 5000 controller.socket.timeout.ms = 30000 default.replication.factor = 1 delete.topic.enable = false fetch.purgatory.purge.interval.requests = 1000 group.max.session.timeout.ms = 300000 group.min.session.timeout.ms = 6000 host.name = inter.broker.protocol.version = 0.10.1-IV2 leader.imbalance.check.interval.seconds = 300 leader.imbalance.per.broker.percentage = 10 listeners = null log.cleaner.backoff.ms = 15000 log.cleaner.dedupe.buffer.size = 134217728 log.cleaner.delete.retention.ms = 86400000 log.cleaner.enable = true log.cleaner.io.buffer.load.factor = 0.9 log.cleaner.io.buffer.size = 524288 log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308 log.cleaner.min.cleanable.ratio = 0.5 log.cleaner.min.compaction.lag.ms = 0 log.cleaner.threads = 1 log.cleanup.policy = [delete] log.dir = /tmp/kafka-logs log.dirs = /home/kafka/kafka-logs log.flush.interval.messages = 9223372036854775807 log.flush.interval.ms = null log.flush.offset.checkpoint.interval.ms = 60000 log.flush.scheduler.interval.ms = 9223372036854775807 log.index.interval.bytes = 4096 log.index.size.max.bytes = 10485760 log.message.format.version = 0.10.1-IV2 log.message.timestamp.difference.max.ms = 9223372036854775807 log.message.timestamp.type = CreateTime log.preallocate = false log.retention.bytes = -1 log.retention.check.interval.ms = 300000 log.retention.hours = 168 log.retention.minutes = null log.retention.ms = null log.roll.hours = 168 log.roll.jitter.hours = 0 log.roll.jitter.ms = null log.roll.ms = null log.segment.bytes = 1073741824 log.segment.delete.delay.ms = 60000 max.connections.per.ip = 2147483647 max.connections.per.ip.overrides = message.max.bytes = 1000012 metric.reporters = [] metrics.num.samples = 2 metrics.sample.window.ms = 30000 min.insync.replicas = 1 num.io.threads = 8 num.network.threads = 3 num.partitions = 1 num.recovery.threads.per.data.dir = 1 num.replica.fetchers = 1 offset.metadata.max.bytes = 4096 offsets.commit.required.acks = -1 offsets.commit.timeout.ms = 5000 offsets.load.buffer.size = 5242880 offsets.retention.check.interval.ms = 600000 offsets.retention.minutes = 1440 offsets.topic.compression.codec = 0 offsets.topic.num.partitions = 50 offsets.topic.replication.factor = 3 offsets.topic.segment.bytes = 104857600 port = 9092 principal.builder.class = class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder producer.purgatory.purge.interval.requests = 1000 queued.max.requests = 500 quota.consumer.default = 9223372036854775807 quota.producer.default = 9223372036854775807 quota.window.num = 11 quota.window.size.seconds = 1 replica.fetch.backoff.ms = 1000 replica.fetch.max.bytes = 1048576 replica.fetch.min.bytes = 1 replica.fetch.response.max.bytes = 10485760 replica.fetch.wait.max.ms = 500 replica.high.watermark.checkpoint.interval.ms = 5000 replica.lag.time.max.ms = 10000 replica.socket.receive.buffer.bytes = 65536 replica.socket.timeout.ms = 30000 replication.quota.window.num = 11 replication.quota.window.size.seconds = 1 request.timeout.ms = 30000 reserved.broker.max.id = 1000 sasl.enabled.mechanisms = [GSSAPI] sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.principal.to.local.rules = [DEFAULT] sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism.inter.broker.protocol = GSSAPI security.inter.broker.protocol = PLAINTEXT socket.receive.buffer.bytes = 102400 socket.request.max.bytes = 104857600 socket.send.buffer.bytes = 102400 ssl.cipher.suites = null ssl.client.auth = none ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS unclean.leader.election.enable = true zookeeper.connect = slave1:2181,master:2181,slave2:2181 zookeeper.connection.timeout.ms = 6000 zookeeper.session.timeout.ms = 6000 zookeeper.set.acl = false zookeeper.sync.time.ms = 2000 (kafka.server.KafkaConfig) [2016-11-18 10:50:51,175] INFO starting (kafka.server.KafkaServer) [2016-11-18 10:50:51,244] INFO [ThrottledRequestReaper-Fetch], Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper) [2016-11-18 10:50:51,261] INFO [ThrottledRequestReaper-Produce], Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper) [2016-11-18 10:50:51,269] INFO Connecting to zookeeper on slave1:2181,master:2181,slave2:2181 (kafka.server.KafkaServer) [2016-11-18 10:50:51,308] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread) [2016-11-18 10:50:51,328] INFO Client environment:zookeeper.version=3.4.8--1, built on 02/06/2016 03:18 GMT (org.apache.zookeeper.ZooKeeper) [2016-11-18 10:50:51,328] INFO Client environment:host.name=master (org.apache.zookeeper.ZooKeeper) [2016-11-18 10:50:51,328] INFO Client environment:java.version=1.8.0_111 (org.apache.zookeeper.ZooKeeper) [2016-11-18 10:50:51,328] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper) [2016-11-18 10:50:51,328] INFO Client environment:java.home=/usr/Java/jdk1.8.0_111/jre (org.apache.zookeeper.ZooKeeper)3、查看kafka进程
4、创建topic
./kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181 --replication-factor 1 --partitions 1 --topic kafka-test5、查看详细的topic
六、Kafka生产者服务封装
1、创建Kafka类
public class Kafka { private static Logger LOG = Logger.getLogger(Kafka.class); /** * 生产者的broker列表 */ private String producerBrokerList; /** * 序列化类 */ private String producerSerializerClass; /** * topic */ private String producerTopic; /** * 重试次数 */ private int retry; /** * kafka生产者 */ private Producer<String, String> kafkaProducer; public Kafka(String producerBrokerList, String producerSerializerClass, String producerTopic, int retry) { super(); this.producerBrokerList = producerBrokerList; this.producerSerializerClass = producerSerializerClass; this.producerTopic = producerTopic; this.retry = retry; init(); } /** * Details:初始化方法 */ public void init() { kafkaProducer = initProducer(producerBrokerList, producerSerializerClass); } /** * Details:创建Produce */ private Producer<String, String> initProducer(String producerBrokerList, String producerSerializerClass) { Producer<String, String> kafkaProducer; Properties props = new Properties(); props.put("metadata.broker.list", producerBrokerList); props.put("serializer.class", producerSerializerClass); props.put("key.serializer.class", producerSerializerClass); props.put("request.required.acks", "-1"); props.put("producerType", "async"); ProducerConfig producerConfig = new ProducerConfig(props); kafkaProducer = new Producer<String, String>(producerConfig); return kafkaProducer; } /** * Details:供外部调用的接口,用来发送消息到kafka */ public void sendData(String content) { KeyedMessage<String, String> kafkaMessage = new KeyedMessage<String, String>( producerTopic, content); try { kafkaProducer.send(kafkaMessage); } catch (Exception e) { LOG.warn("send kafka message failed , prepare to retry......", e); // 重试发送消息 retryKafkaSendData(content); } } /** * Details:发送消息失败后的重试机制 */ public void retryKafkaSendData(String content) { KeyedMessage<String, String> kafkaMessage = new KeyedMessage<String, String>( producerTopic, content); for (int i = 1; i <= (retry <= 0 ? 3 : retry); i++) { try { kafkaProducer.send(kafkaMessage); return; } catch (Exception e) { LOG.warn("send kafka message failed , retry times:" + i, e); } } } /** * Details:销毁生产者的方法 */ public void close() { kafkaProducer.close(); } }2、创建KafkaFactoryBean类
public class KafkaFactoryBean implements FactoryBean<Kafka>, InitializingBean, DisposableBean { /** * FactoryBean生成的目标对象 */ private Kafka kafka; /** * broker列表 */ private String producerBrokerList; /** * 序列化类 */ private String producerSerializerClass; /** * kafka的topic */ private String producerTopic; /** * 重试次数 */ private int retry; /** * Details:实例销毁方法,当实例销毁时,会自动调用这个方法 */ @Override public void destroy() throws Exception { if (null != kafka) { kafka.close(); } } /** * Details:spring加载后,会调用该方法 */ @Override public void afterPropertiesSet() throws Exception { kafka = new Kafka(producerBrokerList, producerSerializerClass, producerTopic, retry); } /** * Details:返回工厂创建的对象,注意,此处返回的并不是FactoryBean的一个实例,而是返回Kafka的一个实例 */ @Override public Kafka getObject() throws Exception { return this.kafka; } /** * Details:获取返回对象的类型 */ @Override public Class<?> getObjectType() { return (this.kafka == null ? Kafka.class : this.kafka.getClass()); } /** * Details:创建的对象是否为单例 */ @Override public boolean isSingleton() { return true; } }如果对FactoryBean的使用有不明白的地方,请参考另一篇文章:
http://blog.csdn.net/liuchuanhong1/article/details/52939353
七、测试生产者服务
1、在服务器上开启消费者监听进程
./kafka-console-consumer.sh --zookeeper master:2181,slave1:2181,slave2:2181 --topic kafka-test --from-beginnin消费者进程如下:
2、测试的spring配置文件
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd" default-lazy-init="false"> <context:property-placeholder location="classpath:kafka.properties" /> <bean id="kafkaFactoryBean" class="com.chhliu.myself.KafkaFactoryBean"> <property name="producerBrokerList" value="${bootstrap.servers}" /> <property name="producerSerializerClass" value="kafka.serializer.StringEncoder" /> <!--目标地址 --> <property name="producerTopic" value="${kafka.topic}" /> <!-- 重试机制默认3次 --> <property name="retry" value="3" /> </bean> </beans>3、测试的properties文件 bootstrap.servers=192.168.1.108:9092 kafka.topic=kafka-test4、运行测试代码
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = { "classpath:applicationContext-kafka.xml" }) public class KafkaProducerTest { // 此处返回的并不是KafkaFactoryBean的一个实例,而是Kafka的实例 @Resource(name = "kafkaFactoryBean") private Kafka factory; @Test public void test() { try { for (int i = 0; i < 1000; i++) { factory.sendData("hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!" + i); } } catch (Exception e) { System.out.println(e.getMessage()); } } }测试效果如下:
hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!601 hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!602 hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!603 hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!604 hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!605 hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!606 hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!607 hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!608 hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!609 hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!610 hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!611 hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!612 hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!613 hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!614 hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!615 hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!616 hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!617我们发现,生产者成功的将消息发送到了kafka,消费者也成功的消费了消息。