kafka环境搭建(WindowsLinux)

    xiaoxiao2021-04-12  29

    (一)安装zookeeper(windows)

    kafka需要用到zookeeper,所以需要先安装zookeeper

    1.到官网下载最新版zookeeper,http://www.apache.org/dyn/closer.cgi/zookeeper/

    2.解压到你喜欢的路径,我这里为:E:\zookeeper\zookeeper-3.4.10

    3.复制conf目录下zoo_sample.cfg,粘贴改名为zoo.cfg,修改zoo.cfg中的dataDir的值为E:/data/zookeeper,并添加一行dataLogDir=E:/log/zookeeper

    4.修改系统环境变量,在Path后添加    ;E:\zookeeper\zookeeper-3.4.10\bin

    5.运行cmd命令窗口,输入zkServer回车,出现下图的就表示zookeeper启动成功,也表明安装成功了。

    安装zookeeper(Linux)

    1. Xshell等工具连接Linux服务器,切换到任意目录,下载zookeeper最新稳定版,下载地址http://mirrors.hust.edu.cn/apache/zookeeper/stable/,命令如下

    cd /usr/soft

    wget http://mirrors.hust.edu.cn/apache/zookeeper/stable/zookeeper-3.4.10.tar.gz

    2.解压

    tar -xzvf zookeeper-3.4.10.tar.gz

    3.切换到conf配置文件目录,复制zoo_sample.cfg为zoo.cfg可以按需修改配置文件内容

    4.切换到bin目录,启动zookeeper,看到Starting zookeeper ... STARTED字样表示启动成功了

    ./zkServer.sh start

    (二)安装kafka(windows)

    1. 到官网下载最新版kafka,http://kafka.apache.org/downloads

    2.解压到你喜欢的路径,我这里解压路径为:E:\kafka_2.12-0.10.2.0

    3.修改E:\kafka_2.12-0.10.2.0\config目录下的server.properties中 log.dirs的值为E:/log/kafka

    4.添加系统环境变量,在Path后添加   ;E:\kafka_2.12-0.10.2.0\bin\windows

    5.启动kafka,在cmd命令行用cd命令切换到kafka根目录E:\kafka_2.12-0.10.2.0,输入命令

     .\bin\windows\kafka-server-start.bat .\config\server.properties

    出现started (kafka.server.KafkaServer)字样表示启动成功

    启动时若出现“wvim不是内部或外部命令...”错误提示,则需要在系统Path环境变量后添加 ;C:\Windows\System32\wbem 6.运行cmd命令行,创建一个topic

    kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

    7.再打开一个cmd,创建一个Producer

    kafka-console-producer.bat --broker-list localhost:9092 --topic test

    8.再打开一个cmd,创建一个Customer

    kafka-console-consumer.bat --zookeeper localhost:2181 --topic test

    9.在Producer窗口下输入信息进行测试 ,每输入一行回车后消息马上就会出现在Customer中,表明kafka已经安装测试成功

    安装kafka(Linux)

    1.下载kafka最新版https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.2.0/kafka_2.12-0.10.2.0.tgz

    wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/0.10.2.0/kafka_2.12-0.10.2.0.tgz

    2.解压,文件夹重命名

    tar -xzvf kafka_2.12-0.10.2.0.tgz

    mv kafka_2.12-0.10.2.0 kafka

    3.切换目录到kafka目录下的bin目录,用vi命令修改kafka-server-start.sh中jvm内存大小,把

    export KAFKA_HEAP_OPTS="-Xms1G -Xms1G" 修改为 export KAFKA_HEAP_OPTS="-Xms256M -Xms128M",当然如果你的内存够大可以不修改

    4.切换到kafka根目录,启动kafka,启动成功如下图

    bin/kafka-server-start.sh config/server.properties

    5.创建topic bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test 创建一个名为test的topic,只有一个副本,一个分区。 通过list命令查看刚刚创建的topic bin/kafka-topics.sh -list -zookeeper 127.0.0.1:2181 6.启动producer并发送消息启动producer bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 启动之后就可以发送消息了 按Ctrl+C退出发送消息 7.启动consumer bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning 启动consumer之后就可以在console中看到producer发送的消息了 可以开启两个终端,一个发送消息,一个接受消息。

    (三)kafka编程之Java接口

    1.新建Maven工程,我这里用的是Eclipse;pom加入kafka依赖,如下:

    <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.10.2.0</version> </dependency> 2.新建生产测试类TestProducer.java

    import java.util.Properties; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; public class TestProducer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); //The "all" setting we have specified will result in blocking on the full commit of the record, the slowest but most durable setting. //“所有”设置将导致记录的完整提交阻塞,最慢的,但最持久的设置。 props.put("acks", "all"); //如果请求失败,生产者也会自动重试,即使设置成0 the producer can automatically retry. props.put("retries", 0); //The producer maintains buffers of unsent records for each partition. props.put("batch.size", 16384); //默认立即发送,这里这是延时毫秒数 props.put("linger.ms", 1); //生产者缓冲大小,当缓冲区耗尽后,额外的发送调用将被阻塞。时间超过max.block.ms将抛出TimeoutException props.put("buffer.memory", 33554432); //The key.serializer and value.serializer instruct how to turn the key and value objects the user provides with their ProducerRecord into bytes. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //创建kafka的生产者类 Producer<String, String> producer = new KafkaProducer<String, String>(props); //生产者的主要方法 // close();//Close this producer. // close(long timeout, TimeUnit timeUnit); //This method waits up to timeout for the producer to complete the sending of all incomplete requests. // flush() ;所有缓存记录被立刻发送 for(int i = 0; i < 100; i++) producer.send(new ProducerRecord<String, String>("test",0, Integer.toString(i), Integer.toString(i))); producer.close(); } } 3.新建消费测试类TestCustomer.java

    import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class TestConsumer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); System.out.println("this is the group part test 1"); //消费者的组id props.put("group.id", "GroupA");//这里是GroupA或者GroupB props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); //从poll(拉)的回话处理时长 props.put("session.timeout.ms", "30000"); //poll的数量限制 //props.put("max.poll.records", "100"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); //订阅主题列表topic consumer.subscribe(Arrays.asList("test")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) // 正常这里应该使用线程池处理,不应该在这里处理 System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()+"\n"); } } } 4.先运行(run/debug)TestCustomer再运行TestProducer,在TestCustomer的控制台看到下图的结果就表示消息发送并接收成功了

    并且在之前启动的消费端的命令窗口也能看到接收到的数据:

    dazu表示kakazhj

    转载请注明原文地址: https://ju.6miu.com/read-667471.html

    最新回复(0)