windows10中远程连接虚拟机上的kafka错误解决办法

    xiaoxiao2022-06-29  37

           kafka部署在虚拟机上,真集群分布式,三台机器。版本为kafka_2.8.0-0.8.0.tar.gz;我想在windows10上的myeclipse10中用java代码远程连接虚拟机上的kafka,结果却报错了:

    Failed to collate messages by topic, partition due to: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(id:0,host:172.16.4.214,port:9092)] failed

    ERROR Producer connection to slave1:9092 unsuccessful (kafka.producer.SyncProducer)

    java.net.ConnectException: Connection refused

      网上有人说出现上述错误是因为没有配置advertised.host.name,但是我打开server.properties里面根本没有这个属性,只有host.name,而host.name我早已经配置为主机名,既然他没有那个属性,我硬加了一个advertised.host.name属性,值也为主机名。然后重启发现根本没用。错误照样;还有网友说kafka自带的zookeeper   jar包与我们自己安装zookeeper版本不一致,但是我更换后一致后还是不行,所以我自己研究尝试找到以下方法:

     解决方法:

    1.kafka安装在3台虚拟机上,那么3台机器都要执行开启命令(#kafka-server-start.sh ./kafka/config/server.properties &),它跟zookeeper一样都需要分别启动(我这里说的zookeeper并非hbase自带),我刚开始在hadoop主节点上安装的kafka,然后将配置分发到其他从节点,我以为只要在主节点上启动kafka就行,结果证明是错的,所以报了上述错误

    2.我忘了配置producer.properties,用#vi producer.properties 打开后,按i键进入编辑模式,找到metadata.broker.list这一行,去掉前面的注释,改为metadata.broker.list=master:9092,slave1:9092,slave3:9092  然后保存退出,因为真集群分布,这里必须改,另外我这里写master和slave这些名字,是因为我在三台虚拟上都配置了hosts文件(ip和主机映射),windows10上面的hosts我也配置了,如果你没配置还是写ip吧。

    3.consumer.properties里面的zookeeper.connect也要改为zookeeper.connect=master:2181,slave1:2181,slave3:2181

    至此运行成功

    下面贴上我的代码:

    在classpath(src下面)下新添两个属性文件:producer.properties和consumer.properties

    producer.properties内容如下

    ---------------------------------------------------------------------------------------------------------------------------------------

    metadata.broker.list=master:9092,slave1:9092,slave3:9092 producer.type=sync compression.codec=0 serializer.class=kafka.serializer.StringEncoder

    ---------------------------------------------------------------------------------------------------------------------------------------

    consumer.properties内容如下

    ---------------------------------------------------------------------------------------------------------------------------------------

    zookeeper.connect=master:2181,slave1:2181,slave3:2181 zookeeper.connectiontimeout.ms=1000000 group.id=test-group auto.offset.reset=smallest auto.commit.enable=true

    ---------------------------------------------------------------------------------------------------------------------------------------

    生产者代码

    import java.io.File; import java.io.FileInputStream; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class MyProducer {     /**      * @param args      */     public static void main(String[] args) {         Producer<String,String> inner=null;         try {             Properties properties=new Properties();             properties.load(new FileInputStream(new File("E:/javaws/bigdata/src/producer.properties")));             ProducerConfig config=new ProducerConfig(properties);             inner=new Producer<String, String>(config);             int i=0;             while(true){                 KeyedMessage<String,String> km=new KeyedMessage<String, String>("test-topic",                         "this is a sample"+i);                 inner.send(km);                 i++;                 Thread.sleep(2000);             }         } catch (Exception e) {             e.printStackTrace();         }finally{             if(inner!=null){                 inner.close();             }         }     } }

    消费者代码

    import java.io.File; import java.io.FileInputStream; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; public class MyConsumer {     /**      * @param args      */     public static void main(String[] args) {         ConsumerConfig config=null;         ConsumerConnector connector=null;         ExecutorService threadPool=null;         try {             Properties properties=new Properties();             properties.load(new FileInputStream(new File("E:/javaws/bigdata/src/consumer.properties")));             config=new ConsumerConfig(properties);             connector=Consumer.createJavaConsumerConnector(config);             Map<String,Integer> topics=new HashMap<String,Integer>();             topics.put("test-topic",2);//第二个参数是分区数partitionsNum             Map<String,List<KafkaStream<byte[],byte[]>>> streams=connector.createMessageStreams(topics);             List<KafkaStream<byte[],byte[]>> partitions=streams.get("test-topic");             threadPool=Executors.newFixedThreadPool(2);             for(final KafkaStream<byte[],byte[]> partition:partitions){                 threadPool.execute(new MessageRunner(partition));             }             System.in.read();             threadPool.shutdownNow();         } catch (Exception e) {             e.printStackTrace();         }finally{             connector.shutdown();         }     } }

    转载请注明原文地址: https://ju.6miu.com/read-1125228.html

    最新回复(0)