librdkafka的使用和介绍

    xiaoxiao2021-03-25  126

    librdkafka的使用介绍

    librdkafkakafkac语言接口

     

    下面简单的介绍一下其接口

        1、rd_kafka_conf_set设置全局配置

        2、rd_kafka_topic_conf_set设置topic配置

        3、rd_kafka_brokers_add设置broker地址,启动向broker发送消息的线程

        4、rd_kafka_new启动kafka主线程

        5、rd_kafka_topic_new建topic

       6、rd_kafka_produce使用本函数发送消息

        7、rd_kafka_poll调用回调函数

    根据其github上的example我简单的封装一下producerhttps://github.com/edenhill/librdkafka

    编译静态:把上面的地中的代码下载到你的linux系统中

    接下来就是传统的三部安装方法:

    ./configure

    make

    make install

    编译好的静态和动态在src以及src-cpp目录下,我这里使用的是src目录下librdkafka.a  rdkafka.h  你也可以使用动态库 都是一样的 注意头文件的路径-I跟上头文件路径,我这里把静态库和头文件都是放在当前目录下  所以写 -I./

    编译方法:g++ kafka_producer.cpp -o testkafka -I./ -lcurl  librdkafka.a -lpthread -lrt

    写成c++的接口

    #include <ctype.h> #include <signal.h> #include <string.h> #include <unistd.h> #include <stdlib.h> #include <syslog.h> #include <sys/time.h> #include <errno.h> #include "rdkafka.h" const int PRODUCER_INIT_FAILED = -1; const int PRODUCER_INIT_SUCCESS = 0; const int PUSH_DATA_FAILED = -1; const int PUSH_DATA_SUCCESS = 0; static void logger(const rd_kafka_t *rk, int level,const char *fac, const char *buf) { struct timeval tv; gettimeofday(&tv, NULL); fprintf(stderr, "%u.u RDKAFKA-%i-%s: %s: %s\n", (int)tv.tv_sec, (int)(tv.tv_usec / 1000), level, fac, rk ? rd_kafka_name(rk) : NULL, buf); } class ProducerKafka { public: ProducerKafka(){}; ~ProducerKafka(){} int init_kafka(int partition, char *brokers, char *topic); int push_data_to_kafka(const char* buf, const int buf_len); void destroy(); private: int partition_; //rd rd_kafka_t* handler_; rd_kafka_conf_t *conf_; //topic rd_kafka_topic_t *topic_; rd_kafka_topic_conf_t *topic_conf_; }; int ProducerKafka::init_kafka(int partition, char *brokers, char *topic) { char tmp[16]={0}; char errstr[512]={0}; partition_ = partition; /* Kafka configuration */ conf_ = rd_kafka_conf_new(); //set logger :register log function rd_kafka_conf_set_log_cb(conf_, logger); /* Quick termination */ snprintf(tmp, sizeof(tmp), "%i", SIGIO); rd_kafka_conf_set(conf_, "internal.termination.signal", tmp, NULL, 0); /*topic configuration*/ topic_conf_ = rd_kafka_topic_conf_new(); if (!(handler_ = rd_kafka_new(RD_KAFKA_PRODUCER, conf_, errstr, sizeof(errstr)))) { fprintf(stderr, "*****Failed to create new producer: %s*******\n",errstr); return PRODUCER_INIT_FAILED; } rd_kafka_set_log_level(handler_, LOG_DEBUG); /* Add brokers */ if (rd_kafka_brokers_add(handler_, brokers) == 0) { fprintf(stderr, "****** No valid brokers specified********\n"); return PRODUCER_INIT_FAILED; } /* Create topic */ topic_ = rd_kafka_topic_new(handler_, topic, topic_conf_); return PRODUCER_INIT_SUCCESS; } void ProducerKafka::destroy() { /* Destroy topic */ rd_kafka_topic_destroy(topic_); /* Destroy the handle */ rd_kafka_destroy(handler_); } int ProducerKafka::push_data_to_kafka(const char* buffer, const int buf_len) { int ret; char errstr[512]={0}; if(NULL == buffer) return 0; ret = rd_kafka_produce(topic_, partition_, RD_KAFKA_MSG_F_COPY, (void*)buffer, (size_t)buf_len, NULL, 0, NULL); if(ret == -1) { fprintf(stderr,"****Failed to produce to topic %s partition %i: %s*****\n", rd_kafka_topic_name(topic_), partition_, rd_kafka_err2str(rd_kafka_errno2err(errno))); rd_kafka_poll(handler_, 0); return PUSH_DATA_FAILED; } fprintf(stderr, "***Sent %d bytes to topic:%s partition:%i*****\n", buf_len, rd_kafka_topic_name(topic_), partition_); rd_kafka_poll(handler_, 0); return PUSH_DATA_SUCCESS; } int main() { char test_data[100]; strcpy(test_data, "helloworld"); ProducerKafka* producer = new ProducerKafka; if (PRODUCER_INIT_SUCCESS == producer->init_kafka(0, "192.168.1.108:9092", "chenxun")) { printf("producer init success\n"); } else { printf("producer init failed\n"); return 0; } while (fgets(test_data, sizeof(test_data), stdin)) { size_t len = strlen(test_data); if (test_data[len - 1] == '\n') test_data[--len] = '\0'; if (strcmp(test_data, "end") == 0) break; if (PUSH_DATA_SUCCESS == producer->push_data_to_kafka(test_data, strlen(test_data))) printf("push data success %s\n", test_data); else printf("push data failed %s\n", test_data); } producer->destroy(); return 0; }

    转载请注明原文地址: https://ju.6miu.com/read-7531.html

    最新回复(0)