kafka-python-client-example

    xiaoxiao2021-03-25  101

    http://www.biglittleant.cn/2016/12/28/kafka-python/

    kafka-python-client-example

    安装kafka-python

    pip安装

    1 pip install kafka-python

    源码安装

    1 2 3 4 5 6 7 8 9 10 ### pip git clone https://github.com/dpkp/kafka-python pip install ./kafka-python ### Setuptools git clone https://github.com/dpkp/kafka-python easy_install ./kafka-python ### setup git clone https://github.com/dpkp/kafka-python cd kafka-python python setup.py install

    如果想启用压缩功能需要额外安装以下两个模块

    pip install lz4tools pip install xxhash

    使用方法

    kafka生产端

    第一步:连接到服务器端

    1 2 3 4 5 from kafka import KafkaProducer from kafka.errors import KafkaError ## 连接到服务器端 producer = KafkaProducer(bootstrap_servers=[ '192.168.56.12:9092'])

    第二步:发送一个简单的消息

    1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 ## 默认发送普通的消息 datenow = datetime.datetime.now().strftime( '%Y-%m-%d:%H-%M-%s') my_bytes = bytes(source=datenow,encoding= 'utf-8') future = producer.send( 'topic1', my_bytes) ##消息必须是二进制格式 ### OR 延时发送,并获取相关参数 try: record_metadata = future.get(timeout= 10) except KafkaError: # Decide what to do if produce request failed... #log.exception() pass # Successful result returns assigned partition and offset print (record_metadata.topic) ##打印写到那个topic上了。 print (record_metadata.partition) ## 打印消息所在的分区。 print (record_metadata.offset) ## 打印消息的位置

    第三步:发送json格式的数据

    1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 # produce keyed messages to enable hashed partitioning producer.send( 'my-topic', key= b'foo', value= b'bar') # encode objects via msgpack producer = KafkaProducer(value_serializer=msgpack.dumps) ##msgpack为自定义json格式。 producer.send( 'msgpack-topic', { 'key': 'value'}) # produce json messages producer = KafkaProducer(value_serializer= lambda m: json.dumps(m).encode( 'utf-8'), bootstrap_servers=[ '192.168.56.12:9092']) producer.send( 'json-topic1', { 'key': 'value'}) # produce asynchronously for _ in range( 100): producer.send( 'my-topic', b'msg') # block until all async messages are sent producer.flush() ##锁住进程,直到所有消息发送完毕,在执行下一步。 # configure multiple retries producer = KafkaProducer(retries= 5)

    kafka消费端

    kafka 实时消费程序

    只消费新写入的消息,不消费旧消息。

    1 2 3 4 5 6 7 8 9 10 11 12 from kafka import KafkaConsumer # To consume latest messages and auto-commit offsets consumer = KafkaConsumer( 'my-topic', group_id= 'my-group', ## 定义一个组,group中记录office_set的位置。 bootstrap_servers=[ 'localhost:9092']) for message in consumer: # message value and key are raw bytes -- decode if necessary! # e.g., for unicode: `message.value.decode('utf-8')` print ( "%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value))

    kafka消息早期的数据

    1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 consumer = KafkaConsumer( 'topic1', auto_offset_reset= 'earliest', enable_auto_commit= False, bootstrap_servers=[ '192.168.56.12:9092']) for message in consumer: # message value and key are raw bytes -- decode if necessary! # e.g., for unicode: `message.value.decode('utf-8')` print ( "%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value)) ### 结果 topic1: 0: 0: key= None value= b'11-16-19:11-2016-00' topic1: 0: 1: key= None value= b'11-16-19:11-2016-02' topic1: 0: 2: key= None value= b'11-16-19:11-2016-03' topic1: 0: 3: key= None value= b'11-16-19:11-2016-03' topic1: 0: 4: key= None value= b'2016-11-19:11-05-1479524731'

    自定义分析结果

    1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 consumer = KafkaConsumer( 'json-topic1', value_deserializer= lambda m: json.loads(m.decode( 'utf-8')), auto_offset_reset= 'earliest', ## or latest。 enable_auto_commit= False, ## 如果设置为False,不会自动提交office_set的位置。 bootstrap_servers=[ '192.168.56.12:9092']) for message in consumer: # message value and key are raw bytes -- decode if necessary! # e.g., for unicode: `message.value.decode('utf-8')` print ( "%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value)) ### 结果 json-topic1: 0: 0: key= None value={ 'key': 'value'}

    其他参数

    1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 # 如果1s没有数据,就退出。 KafkaConsumer(consumer_timeout_ms= 1000) # 使用正则去匹配topic。 consumer = KafkaConsumer() consumer.subscribe(pattern= '^awesome.*') # 开启多个客户端去消费消息。 # Use multiple consumers in parallel w/ 0.9 kafka brokers # typically you would run each on a different server / process / CPU consumer1 = KafkaConsumer( 'my-topic', group_id= 'my-group', bootstrap_servers= 'my.server.com') consumer2 = KafkaConsumer( 'my-topic', group_id= 'my-group', bootstrap_servers= 'my.server.com')

    Example

    将文件a.txt的内容写入到kafka中。消费者定义个my-group的组去消费kafka中的数据。

    第一步编写一个生产者,生产消息。

    1 2 3 4 5 6 7 from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers=[ '192.168.56.12:9092']) with open( 'a.txt', 'rb') as file: for n in file: future = producer.send( 'topic1', n) producer.flush()

    第一步编写一个消费者,消费消息

    1 2 3 4 5 6 7 8 9 10 11 12 from kafka import KafkaConsumer consumer = KafkaConsumer( 'topic1', group_id= 'my-group', bootstrap_servers=[ '192.168.56.12:9092']) for message in consumer: # message value and key are raw bytes -- decode if necessary! # e.g., for unicode: `message.value.decode('utf-8')` print ( "%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value))

    帮助文档

    kafka-python官方参考

    分享到 python kafka
    转载请注明原文地址: https://ju.6miu.com/read-14642.html

    最新回复(0)