kafka-node

    xiaoxiao2021-12-10  7

    Producer

    Producer(client,[options])

    client:和kafka服务保持连接的client对象

    options:一些关于producer的属性

    { // Configuration for when to consider a message as acknowledged, default 1 requireAcks: 1, // The amount of time in milliseconds to wait for all acks before considered, default 100ms ackTimeoutMs: 100, // Partitioner type (default = 0, random = 1, cyclic = 2, keyed = 3), default 0 partitionerType: 2 }

    Events

    ready:这个事件被触发,当我们准备好发送消息的时候error:发生错误的时候,触发这个事件

    send(payloads,cb)

    payloads:是一个数组,元素都是ProducerRequest,ProducerRequest是一个JSON Object 类似与下面的数据

    { topic: 'topicName', messages: ['message body'], // multi messages should be a array, single message can be just a string or a KeyedMessage instance key: 'theKey', // only needed when using keyed partitioner partition: 0, // default 0 attributes: 2 // default: 0 }cb:是一个回调函数attribute 控制着发送消息的压缩方式 0:不压缩1:GZip的方式压缩2: snappy压缩格式 ⚠️一批给同一个topic发送多个消息,要使用messages,否则会丢失信息

    createTopics(topics,async,cb)

    这个方法用来在kafka server上创建新的topics.只有在auto.create.topics.enable是true的时候这个函数才会起作用我们的client发送消息的给server的时候,server会自动创建队列(topics)当async呗设置为false的时候,这个函数不会结束,除非所有的topic都被创建,否则是立即返回。就是是否阻塞的意思topics:是一个数组。async: 是一个布尔类型的变量cb: 函数,回调函数 example: var kafka = require('kafka-node'), Producer = kafka.Producer, client = new kafka.Client(), producer = new Producer(client); // Create topics sync producer.createTopics(['t','t1'], false, function (err, data) { console.log(data); }); // Create topics async producer.createTopics(['t'], true, function (err, data) {}); producer.createTopics(['t'], function (err, data) {});// Simply omit 2nd arg

    HighLevelProducer

    基本和Producer一样

    HighLevelProducer(client,[option])

    client:和Producer类似,client都是和kafka server保持一个连接

    options: 属性关于Producer

    { // Configuration for when to consider a message as acknowledged, default 1 requireAcks: 1, // The amount of time in milliseconds to wait for all acks before considered, default 100ms ackTimeoutMs: 100, // Partitioner type (default = 0, random = 1, cyclic = 2, keyed = 3), default 2 partitionerType: 3 }

    Events

    ready:被触发当准备好发送消息的时候error:发生错误的时候触发

    Consumer

    Consumer(client,payloads,options)

    client:和上方一样

    payloads:数组,数组的元素类型是FetchRequest,FetchRequest是一个JSON类型的数据,类似如下:

    { topic: 'topicName', offset: 0, //default 0 }

    options:关于consumer的属性设置,例如下:

    { groupId: 'kafka-node-group',//consumer group id, default `kafka-node-group` // Auto commit config autoCommit: true, autoCommitIntervalMs: 5000, // The max wait time is the maximum amount of time in milliseconds to block waiting if insufficient data is available at the time the request is issued, default 100ms fetchMaxWaitMs: 100, // This is the minimum number of bytes of messages that must be available to give a response, default 1 byte fetchMinBytes: 1, // The maximum bytes to include in the message set for this partition. This helps bound the size of the response. fetchMaxBytes: 1024 * 1024, // If set true, consumer will fetch message from the given offset in the payloads fromOffset: false, // If set to 'buffer', values will be returned as raw buffer objects. encoding: 'utf8' } example: var kafka = require('kafka-node'), Consumer = kafka.Consumer, client = new kafka.Client(), consumer = new Consumer( client, [ { topic: 't', partition: 0 }, { topic: 't1', partition: 1 } ], { autoCommit: false } );

    on(‘message’,onMessage)

    一般来说,我们会consume最后一个提交的messages从我们的current group中 onMessage:函数,新消息来的时候的回调函数 example: consumer.on('message', function (message) { console.log(message); });

    addTopics(topics,cb,fromoffset)

    添加进来的topics如果不存在,则返回错误

    - topics:数组,被添加的topic的数组 - cb:函数,回调函数 - fromOffset:布尔值,如果是真的,consumber将会从特定的位置获取message,否则默认最后一个

    removeTopics(topics,cb)

    topics:数组,被移除的topic的数组cb:回调函数

    Commit(cb)

    cb:回调函数

    setOffset(topic,partition,offset)

    设定给定topic的offset

    - topic:字符串 - partition:数字 - offset:数字

    example: consumer.setOffset('topic', 0, 0);

    Pause()

    暂停consumer,仅仅是暂停consumer,不会影响producer

    resume()

    恢复consumer,恢复fetch 的这个循环

    PauseTopics(topics)

    暂停指定的topics example: consumer.pauseTopics([ 'topic1', { topic: 'topic2', partition: 0 } ]);

    resumeTopics(topics)

    close(force,cb)

    force:布尔类型的值,如果是true,那么她会强制consumer提交现在的offset在关闭之前,默认的是false example: consumer.close(true, cb); consumer.close(cb); //force is disabled
    转载请注明原文地址: https://ju.6miu.com/read-700179.html

    最新回复(0)