kafka部署在虚拟机上,真集群分布式,三台机器。版本为kafka_2.8.0-0.8.0.tar.gz;我想在windows10上的myeclipse10中用java代码远程连接虚拟机上的kafka,结果却报错了:
Failed to collate messages by topic, partition due to: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(id:0,host:172.16.4.214,port:9092)] failed
ERROR Producer connection to slave1:9092 unsuccessful (kafka.producer.SyncProducer)
java.net.ConnectException: Connection refused
网上有人说出现上述错误是因为没有配置advertised.host.name,但是我打开server.properties里面根本没有这个属性,只有host.name,而host.name我早已经配置为主机名,既然他没有那个属性,我硬加了一个advertised.host.name属性,值也为主机名。然后重启发现根本没用。错误照样;还有网友说kafka自带的zookeeper jar包与我们自己安装zookeeper版本不一致,但是我更换后一致后还是不行,所以我自己研究尝试找到以下方法:
解决方法:
1.kafka安装在3台虚拟机上,那么3台机器都要执行开启命令(#kafka-server-start.sh ./kafka/config/server.properties &),它跟zookeeper一样都需要分别启动(我这里说的zookeeper并非hbase自带),我刚开始在hadoop主节点上安装的kafka,然后将配置分发到其他从节点,我以为只要在主节点上启动kafka就行,结果证明是错的,所以报了上述错误
2.我忘了配置producer.properties,用#vi producer.properties 打开后,按i键进入编辑模式,找到metadata.broker.list这一行,去掉前面的注释,改为metadata.broker.list=master:9092,slave1:9092,slave3:9092 然后保存退出,因为真集群分布,这里必须改,另外我这里写master和slave这些名字,是因为我在三台虚拟上都配置了hosts文件(ip和主机映射),windows10上面的hosts我也配置了,如果你没配置还是写ip吧。
3.consumer.properties里面的zookeeper.connect也要改为zookeeper.connect=master:2181,slave1:2181,slave3:2181
至此运行成功
下面贴上我的代码:
在classpath(src下面)下新添两个属性文件:producer.properties和consumer.properties
producer.properties内容如下
---------------------------------------------------------------------------------------------------------------------------------------
metadata.broker.list=master:9092,slave1:9092,slave3:9092 producer.type=sync compression.codec=0 serializer.class=kafka.serializer.StringEncoder
---------------------------------------------------------------------------------------------------------------------------------------
consumer.properties内容如下
---------------------------------------------------------------------------------------------------------------------------------------
zookeeper.connect=master:2181,slave1:2181,slave3:2181 zookeeper.connectiontimeout.ms=1000000 group.id=test-group auto.offset.reset=smallest auto.commit.enable=true
---------------------------------------------------------------------------------------------------------------------------------------
生产者代码
import java.io.File; import java.io.FileInputStream; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class MyProducer { /** * @param args */ public static void main(String[] args) { Producer<String,String> inner=null; try { Properties properties=new Properties(); properties.load(new FileInputStream(new File("E:/javaws/bigdata/src/producer.properties"))); ProducerConfig config=new ProducerConfig(properties); inner=new Producer<String, String>(config); int i=0; while(true){ KeyedMessage<String,String> km=new KeyedMessage<String, String>("test-topic", "this is a sample"+i); inner.send(km); i++; Thread.sleep(2000); } } catch (Exception e) { e.printStackTrace(); }finally{ if(inner!=null){ inner.close(); } } } }
消费者代码
import java.io.File; import java.io.FileInputStream; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; public class MyConsumer { /** * @param args */ public static void main(String[] args) { ConsumerConfig config=null; ConsumerConnector connector=null; ExecutorService threadPool=null; try { Properties properties=new Properties(); properties.load(new FileInputStream(new File("E:/javaws/bigdata/src/consumer.properties"))); config=new ConsumerConfig(properties); connector=Consumer.createJavaConsumerConnector(config); Map<String,Integer> topics=new HashMap<String,Integer>(); topics.put("test-topic",2);//第二个参数是分区数partitionsNum Map<String,List<KafkaStream<byte[],byte[]>>> streams=connector.createMessageStreams(topics); List<KafkaStream<byte[],byte[]>> partitions=streams.get("test-topic"); threadPool=Executors.newFixedThreadPool(2); for(final KafkaStream<byte[],byte[]> partition:partitions){ threadPool.execute(new MessageRunner(partition)); } System.in.read(); threadPool.shutdownNow(); } catch (Exception e) { e.printStackTrace(); }finally{ connector.shutdown(); } } }