kafka-python-client-example

kafka-python-client-example

安装kafka-python

pip安装

1
2
pip install kafka-python

源码安装

1
2
3
4
5
6
7
8
9
10
### pip
git clone https://github.com/dpkp/kafka-python
pip install ./kafka-python
### Setuptools
git clone https://github.com/dpkp/kafka-python
easy_install ./kafka-python
### setup
git clone https://github.com/dpkp/kafka-python
cd kafka-python
python setup.py install

如果想启用压缩功能需要额外安装以下两个模块

1
2
pip install lz4tools
pip install xxhash

使用方法

kafka生产端

第一步:连接到服务器端

1
2
3
4
5
6
from kafka import KafkaProducer
from kafka.errors import KafkaError

## 连接到服务器端
producer = KafkaProducer(bootstrap_servers=['192.168.56.12:9092'])

第二步:发送一个简单的消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
## 默认发送普通的消息
datenow = datetime.datetime.now().strftime('%Y-%m-%d:%H-%M-%s')
my_bytes = bytes(source=datenow,encoding='utf-8')
future = producer.send('topic1', my_bytes) ##消息必须是二进制格式

### OR 延时发送,并获取相关参数
try:
record_metadata = future.get(timeout=10)
except KafkaError:
# Decide what to do if produce request failed...
#log.exception()
pass

# Successful result returns assigned partition and offset
print (record_metadata.topic) ##打印写到那个topic上了。
print (record_metadata.partition) ## 打印消息所在的分区。
print (record_metadata.offset) ## 打印消息的位置

第三步:发送json格式的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# produce keyed messages to enable hashed partitioning
producer.send('my-topic', key=b'foo', value=b'bar')

# encode objects via msgpack
producer = KafkaProducer(value_serializer=msgpack.dumps) ##msgpack为自定义json格式。
producer.send('msgpack-topic', {'key': 'value'})

# produce json messages
producer = KafkaProducer(value_serializer=lambda m: json.dumps(m).encode('utf-8'),
bootstrap_servers=['192.168.56.12:9092'])
producer.send('json-topic1', {'key': 'value'})

# produce asynchronously
for _ in range(100):
producer.send('my-topic', b'msg')

# block until all async messages are sent
producer.flush() ##锁住进程,直到所有消息发送完毕,在执行下一步。

# configure multiple retries
producer = KafkaProducer(retries=5)

kafka消费端

kafka 实时消费程序

只消费新写入的消息,不消费旧消息。

1
2
3
4
5
6
7
8
9
10
11
12
from kafka import KafkaConsumer

# To consume latest messages and auto-commit offsets
consumer = KafkaConsumer('my-topic',
group_id='my-group', ## 定义一个组,group中记录office_set的位置。
bootstrap_servers=['localhost:9092'])
for message in consumer:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))

kafka消息早期的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
    consumer = KafkaConsumer('topic1',
auto_offset_reset='earliest',
enable_auto_commit=False,
bootstrap_servers=['192.168.56.12:9092'])

for message in consumer:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))

### 结果
topic1:0:0: key=None value=b'11-16-19:11-2016-00'
topic1:0:1: key=None value=b'11-16-19:11-2016-02'
topic1:0:2: key=None value=b'11-16-19:11-2016-03'
topic1:0:3: key=None value=b'11-16-19:11-2016-03'
topic1:0:4: key=None value=b'2016-11-19:11-05-1479524731'


自定义分析结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
    consumer = KafkaConsumer('json-topic1',
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='earliest', ## or latest。
enable_auto_commit=False, ## 如果设置为False,不会自动提交office_set的位置。
bootstrap_servers=['192.168.56.12:9092'])

for message in consumer:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))

### 结果
json-topic1:0:0: key=None value={'key': 'value'}

其他参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 如果1s没有数据,就退出。
KafkaConsumer(consumer_timeout_ms=1000)

# 使用正则去匹配topic。
consumer = KafkaConsumer()
consumer.subscribe(pattern='^awesome.*')

# 开启多个客户端去消费消息。
# Use multiple consumers in parallel w/ 0.9 kafka brokers
# typically you would run each on a different server / process / CPU
consumer1 = KafkaConsumer('my-topic',
group_id='my-group',
bootstrap_servers='my.server.com')
consumer2 = KafkaConsumer('my-topic',
group_id='my-group',
bootstrap_servers='my.server.com')

Example

  • 将文件a.txt的内容写入到kafka中。
  • 消费者定义个my-group的组去消费kafka中的数据。

第一步编写一个生产者,生产消息。

1
2
3
4
5
6
7
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['192.168.56.12:9092'])
with open('a.txt','rb') as file:
for n in file:
future = producer.send('topic1', n)
producer.flush()

第一步编写一个消费者,消费消息

1
2
3
4
5
6
7
8
9
10
11
12
13
from kafka import KafkaConsumer

consumer = KafkaConsumer('topic1',
group_id='my-group',
bootstrap_servers=['192.168.56.12:9092'])

for message in consumer:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))

帮助文档

kafka-python官方参考