http://www.biglittleant.cn/2016/12/28/kafka-python/
kafka-python-client-example
安装kafka-python
pip安装
1
pip install kafka-python
源码安装
1
2
3
4
5
6
7
8
9
10
git clone https://github.com/dpkp/kafka-python
pip install ./kafka-python
git clone https://github.com/dpkp/kafka-python
easy_install ./kafka-python
git clone https://github.com/dpkp/kafka-python
cd kafka-python
python setup.py install
如果想启用压缩功能需要额外安装以下两个模块
pip install lz4tools
pip install xxhash
使用方法
kafka生产端
第一步:连接到服务器端
1
2
3
4
5
from kafka
import KafkaProducer
from kafka.errors
import KafkaError
producer = KafkaProducer(bootstrap_servers=[
'192.168.56.12:9092'])
第二步:发送一个简单的消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
datenow = datetime.datetime.now().strftime(
'%Y-%m-%d:%H-%M-%s')
my_bytes = bytes(source=datenow,encoding=
'utf-8')
future = producer.send(
'topic1', my_bytes)
try:
record_metadata = future.get(timeout=
10)
except KafkaError:
pass
print (record_metadata.topic)
print (record_metadata.partition)
print (record_metadata.offset)
第三步:发送json格式的数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
producer.send(
'my-topic', key=
b'foo', value=
b'bar')
producer = KafkaProducer(value_serializer=msgpack.dumps)
producer.send(
'msgpack-topic', {
'key':
'value'})
producer = KafkaProducer(value_serializer=
lambda m: json.dumps(m).encode(
'utf-8'),
bootstrap_servers=[
'192.168.56.12:9092'])
producer.send(
'json-topic1', {
'key':
'value'})
for _
in range(
100):
producer.send(
'my-topic',
b'msg')
producer.flush()
producer = KafkaProducer(retries=
5)
kafka消费端
kafka 实时消费程序
只消费新写入的消息,不消费旧消息。
1
2
3
4
5
6
7
8
9
10
11
12
from kafka
import KafkaConsumer
consumer = KafkaConsumer(
'my-topic',
group_id=
'my-group',
bootstrap_servers=[
'localhost:9092'])
for message
in consumer:
print (
"%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
kafka消息早期的数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
consumer = KafkaConsumer(
'topic1',
auto_offset_reset=
'earliest',
enable_auto_commit=
False,
bootstrap_servers=[
'192.168.56.12:9092'])
for message
in consumer:
print (
"%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
topic1:
0:
0: key=
None value=
b'11-16-19:11-2016-00'
topic1:
0:
1: key=
None value=
b'11-16-19:11-2016-02'
topic1:
0:
2: key=
None value=
b'11-16-19:11-2016-03'
topic1:
0:
3: key=
None value=
b'11-16-19:11-2016-03'
topic1:
0:
4: key=
None value=
b'2016-11-19:11-05-1479524731'
自定义分析结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
consumer = KafkaConsumer(
'json-topic1',
value_deserializer=
lambda m: json.loads(m.decode(
'utf-8')),
auto_offset_reset=
'earliest',
enable_auto_commit=
False,
bootstrap_servers=[
'192.168.56.12:9092'])
for message
in consumer:
print (
"%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
json-topic1:
0:
0: key=
None value={
'key':
'value'}
其他参数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
KafkaConsumer(consumer_timeout_ms=
1000)
consumer = KafkaConsumer()
consumer.subscribe(pattern=
'^awesome.*')
consumer1 = KafkaConsumer(
'my-topic',
group_id=
'my-group',
bootstrap_servers=
'my.server.com')
consumer2 = KafkaConsumer(
'my-topic',
group_id=
'my-group',
bootstrap_servers=
'my.server.com')
Example
将文件a.txt的内容写入到kafka中。消费者定义个my-group的组去消费kafka中的数据。
第一步编写一个生产者,生产消息。
1
2
3
4
5
6
7
from kafka
import KafkaProducer
producer = KafkaProducer(bootstrap_servers=[
'192.168.56.12:9092'])
with open(
'a.txt',
'rb')
as file:
for n
in file:
future = producer.send(
'topic1', n)
producer.flush()
第一步编写一个消费者,消费消息
1
2
3
4
5
6
7
8
9
10
11
12
from kafka
import KafkaConsumer
consumer = KafkaConsumer(
'topic1',
group_id=
'my-group',
bootstrap_servers=[
'192.168.56.12:9092'])
for message
in consumer:
print (
"%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
帮助文档
kafka-python官方参考
分享到
python
kafka
转载请注明原文地址: https://ju.6miu.com/read-14642.html