options:一些关于producer的属性
{ // Configuration for when to consider a message as acknowledged, default 1 requireAcks: 1, // The amount of time in milliseconds to wait for all acks before considered, default 100ms ackTimeoutMs: 100, // Partitioner type (default = 0, random = 1, cyclic = 2, keyed = 3), default 0 partitionerType: 2 }payloads:是一个数组,元素都是ProducerRequest,ProducerRequest是一个JSON Object 类似与下面的数据
{ topic: 'topicName', messages: ['message body'], // multi messages should be a array, single message can be just a string or a KeyedMessage instance key: 'theKey', // only needed when using keyed partitioner partition: 0, // default 0 attributes: 2 // default: 0 }cb:是一个回调函数attribute 控制着发送消息的压缩方式 0:不压缩1:GZip的方式压缩2: snappy压缩格式 ⚠️一批给同一个topic发送多个消息,要使用messages,否则会丢失信息基本和Producer一样
options: 属性关于Producer
{ // Configuration for when to consider a message as acknowledged, default 1 requireAcks: 1, // The amount of time in milliseconds to wait for all acks before considered, default 100ms ackTimeoutMs: 100, // Partitioner type (default = 0, random = 1, cyclic = 2, keyed = 3), default 2 partitionerType: 3 }payloads:数组,数组的元素类型是FetchRequest,FetchRequest是一个JSON类型的数据,类似如下:
{ topic: 'topicName', offset: 0, //default 0 }options:关于consumer的属性设置,例如下:
{ groupId: 'kafka-node-group',//consumer group id, default `kafka-node-group` // Auto commit config autoCommit: true, autoCommitIntervalMs: 5000, // The max wait time is the maximum amount of time in milliseconds to block waiting if insufficient data is available at the time the request is issued, default 100ms fetchMaxWaitMs: 100, // This is the minimum number of bytes of messages that must be available to give a response, default 1 byte fetchMinBytes: 1, // The maximum bytes to include in the message set for this partition. This helps bound the size of the response. fetchMaxBytes: 1024 * 1024, // If set true, consumer will fetch message from the given offset in the payloads fromOffset: false, // If set to 'buffer', values will be returned as raw buffer objects. encoding: 'utf8' } example: var kafka = require('kafka-node'), Consumer = kafka.Consumer, client = new kafka.Client(), consumer = new Consumer( client, [ { topic: 't', partition: 0 }, { topic: 't1', partition: 1 } ], { autoCommit: false } );- topics:数组,被添加的topic的数组 - cb:函数,回调函数 - fromOffset:布尔值,如果是真的,consumber将会从特定的位置获取message,否则默认最后一个
- topic:字符串 - partition:数字 - offset:数字
example: consumer.setOffset('topic', 0, 0);