kafka简介
Apache Kafka是由Apache软件基金会开发的一个开源消息系统项目,由Scala写成。Kafka最初是由LinkedIn开发,并于2011年初开源。该项目的目标是为处理实时数据提供一个统一、高通量、低等待的平台。
Kafka是一个分布式的、分区的、多复本的日志提交服务。它通过一种独一无二的设计提供了一个消息系统的功能。
kafka应用场景
- Kafka可以应用于消息系统,比如,当下较为热门的消息推送,这些消息推送系统的消息源,可以使用Kafka作为系统的核心组建来完成消息的生产和消息的消费。
- 然后是网站的行迹,我们可以将企业的Portal,用户的操作记录等信息发送到Kafka中,按照实际业务需求,可以进行实时监控,或者做离线处理等。
- 一个是日志收集,类似于Flume套件这样的日志收集系统,但Kafka的设计架构采用push/pull,适合异构集群,Kafka可以批量提交消息,对Producer来说,在性能方面基本上是无消耗的,而在Consumer端中,我们可以使用HDFS这类的分布式文件存储系统进行存储。
相关术语
- Kafka维护按类区分的消息,称为主题(topic)
- 生产者(producer)向kafka的主题发布消息
- 消费者(consumer)向主题注册,并且接收发布到这些主题的消息
- kafka以一个拥有一台或多台服务器的集群运行着,每一台服务器称为broker
- 从高层来说,生产者(producer)通过网络发消息到kafka集群,而kafka集群则以下面这种方式对消费者进行服务。
kafka实战
1 2 3 4 5 6 7
| cd /usr/local/src/ wget https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.1.0/kafka_2.11-0.10.1.0.tgz tar -xzf kafka_2.11-0.10.1.0.tgz mv kafka_2.11-0.10.1.0 /data/app/ ln -s /data/app/kafka_2.11-0.10.1.0 /data/app/kafka /data/app/kafka/bin/zookeeper-server-start.sh config/zookeeper.properties /data/app/kafka/bin/kafka-server-start.sh config/server.properties
|
kafka单节点实战
编辑zookeeper配置文件
1
| vim /data/app/kafka/config/zookeeper.properties
|
编辑kafka配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| vim /data/app/kafka/config/server.properties broker.id=0 port=9092 advertised.host.name=10.0.2.150 num.network.threads=3
num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 log.cleaner.enable=false zookeeper.connect=localhost:4188 zookeeper.connection.timeout.ms=6000
#这三个必须得配置 #这个是配置PRODUCER/CONSUMER连上来的时候使用的地址(必须得配置) advertised.host.name=192.168.56.12 #设置KAFKA LOG路径 log.dirs=$KAFKA_HOME/logs/kafka-logs #设置ZOOKEEPER的连接地址 zookeeper.connect=192.168.56.12:2181
|
启动kafka服务
1
| sh /data/app/kafka/bin/kafka-server-start.sh /data/app/kafka/config/server.properties >/dev/null 2>&1 &
|
kafka集群实战
创建三个配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| cat >/data/app/kafka/config/server.properties-0<<EOF broker.id=0 port=9090 advertised.host.name=10.0.2.150 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs-0 num.partitions=1 num.recovery.threads.per.data.dir=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 log.cleaner.enable=false zookeeper.connect=127.0.0.1:2880,127.0.0.1:2881,127.0.0.1:2882 zookeeper.connection.timeout.ms=6000 EOF
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| cat >/data/app/kafka/config/server.properties-1<<EOF broker.id=1 port=9091 advertised.host.name=10.0.2.150 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs-1 num.partitions=1 num.recovery.threads.per.data.dir=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 log.cleaner.enable=false zookeeper.connect=127.0.0.1:2880,127.0.0.1:2881,127.0.0.1:2882 zookeeper.connection.timeout.ms=6000 EOF
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| cat >/data/app/kafka/config/server.properties-2<<EOF broker.id=2 port=9092 advertised.host.name=10.0.2.150 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs-2 num.partitions=1 num.recovery.threads.per.data.dir=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 log.cleaner.enable=false zookeeper.connect=127.0.0.1:2880,127.0.0.1:2881,127.0.0.1:2882 zookeeper.connection.timeout.ms=6000 EOF
|
启动kafka集群
1 2 3
| ./kafka-server-start.sh /data/app/kafka/config/server.properties >/dev/null 2>&1 & ./kafka-server-start.sh /data/app/kafka/config/server.properties-1 >/dev/null 2>&1 & ./kafka-server-start.sh /data/app/kafka/config/server.properties-2 >/dev/null 2>&1 &
|
创建一个topic 一个分区,三个主机
1 2
| [root@localhost kafka]# ./bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2880 --replication-factor 3 --partitions 1 --topic topics Created topic "topics".
|
- –topic指定topic name
- –partitions指定分区数,这个参数需要根据broker数和数据量决定,正常情况下,每个broker上两个partition最好;
- –replication-factor指定partition的replicas数,建议设置为2;
- KAFKA有几个,replication-factor就填几个
查看topics的信息
1 2 3 4
| [root@localhost kafka]# ./bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2881 --topic topics Topic:topics PartitionCount:1 ReplicationFactor:3 Configs: Topic: topics Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2 [root@localhost kafka]#
|
- Leader:负责处理消息的读和写,Leader是从所有节点中随机选择的。
- Replicas:列出了所有的副本节点,不管节点是否在服务中。
- Isr:是正在服务中的节点
kafka创建topic
1
| ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
|
kafka管理
kafka配置管理
- advertised.host.name=eventbus1.kafka
- num.network.threads=3 broker处理消息的最大线程数,一般情况下数量为cpu核数
- num.io.threads: Kafka broker 处理磁盘 IO 的线程数
- socket.receive.buffer.bytes: socket的接收缓冲区大小
- socket.send.buffer.bytes: socket的发送缓冲区大小
- socket.request.max.bytes=104857600 socket请求的最大数值
- log.dirs=/var/log/kafka 消息日志存放位置
- num.partitions: 创建 topic 如果不指定分区数时的默认值
- log.flush.interval.messages=1000 表示每当消息记录数达到1000时flush一次数据到磁盘
- log.flush.interval.ms=1000 表示每间隔1000毫秒flush一次数据到磁盘
- log.retention.bytes: topic 每个分区的最大文件大小
- log.retention.hours: 消息保留的最大时间
- log.segment.bytes =102410241024 topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖
- log.retention.check.interval.ms=30000000 文件大小检查的周期时间
- auto.create.topics.enable: 自动创建 topic
- default.replication.factor: 创建 topic 如果不指定复制因子时的默认值
- delete.topic.enable: 是否支持删除 topic
- message.max.bytes: 消息的最大尺寸
- num.replica.fetchers: 从分区 Leader 复制消息的线程数
- queued.max.requests: 等待 IO 线程处理的请求队列最大数,若是等待 IO 的请求超过这个数值,就会停止接受外部消息
- zookeeper.connect = localhost:2181 zookeeper集群的地址,可以是多个用逗号分割
hostname1:port1,hostname2:port2,hostname3:port3
- zookeeper.session.timeout.ms=6000 ZooKeeper的最大超时时间,就是心跳的间隔,若是没有反映,那么认为已经死了,不易过大
- zookeeper.connection.timeout.ms =6000 ZooKeeper的连接超时时间
- zookeeper.sync.time.ms =2000 ZooKeeper集群中leader和follower之间的同步时间
这里配置broker的时候,每台机器上的broker保证唯一,从0开始。如:在另外2台机器上分别配置broker.id=1,broker.id=2
kafka配置优化
网络和io操作线程配置优化
1 2 3 4 5 6 7 8 9
| # broker处理消息的最大线程数 num.network.threads=xxx # broker处理磁盘IO的线程数 num.io.threads=xxx
建议配置: 一般num.network.threads主要处理网络io,读写缓冲区数据,基本没有io等待,配置线程数量为cpu核数加1.
num.io.threads主要进行磁盘io操作,高峰期可能有些io等待,因此配置需要大些。配置线程数量为cpu核数2倍,最大不超过3倍.
|
log数据文件刷新策略
1 2 3 4 5 6
| 为了大幅度提高producer写入吞吐量,需要定期批量写文件。 建议配置: # 每当producer写入10000条消息时,刷数据到磁盘 log.flush.interval.messages=10000 # 每间隔1秒钟时间,刷数据到磁盘 log.flush.interval.ms=1000
|
日志保留策略配置
当kafka server的被写入海量消息后,会生成很多数据文件,且占用大量磁盘空间,如果不及时清理,可能磁盘空间不够用,kafka默认是保留7天。
1 2 3 4 5 6 7
| log.retention.hours=72 # 保留三天,也可以更短
log.segment.bytes=1073741824 # 段文件配置1GB,有利于快速回收磁盘空间,重启kafka加载也会加快(如果文件过小,则文件数量比较多, # kafka启动时是单线程扫描目录(log.dir)下所有数据文件)
|
开启自动创建配置:
1 2
| auto.create.topics.enable=true 使用程序直接往kafka中相应的topic发送数据,如果topic不存在就会按默认配置进行创建。
|
创建topic
1
| sh kafka-topics.sh --create --topic topic --replication-factor 1 --partitions 1 --zookeeper localhost:4188
|
KAFKA有几个,replication-factor就填几个
我们查看该Topic的相关信息
1
| kafka-topics.sh --zookeeper localhost:4188 --topic topic --describe
|
查看都有哪些topic
1
| ./bin/kafka-topics.sh --list --zookeeper localhost:2181
|
模拟数据的生产和消费
使用producer生产消息
1 2 3 4
| [root@localhost bin]# sh kafka-console-producer.sh --broker-list localhost:9092 --sync --topic topic [2015-12-10 13:54:40,460] WARN Property topic is not valid (kafka.utils.VerifiableProperties) hello 你好
|
使用consumer去消费消息
1 2 3
| [root@localhost bin]# sh kafka-console-consumer.sh --zookeeper 10.0.2.150:4188 --topic topic --from-beginning hello 你好
|
修改topic的partition
1 2 3 4
| #
./bin/kafka-topics.sh –zookeeper 192.168.2.225:2183/config/mobile/mq –alter –partitions 20 –topic topic_test
|
修改kafka的分片配置
操作步骤如下:
操作,是指手动写扩充replicas的配置文件,然后使用工具进行操作
查看topic的详细信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| lizhitao@users-MacBook-Pro-2:~$ ./bin/kafka-topics.sh –zookeeper 192.168.2.225:2183/config/mobile/mq –describe –topic test.example Topic:test.example PartitionCount:12 ReplicationFactor:1 Configs: Topic: test.example Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: test.example Partition: 1 Leader: 1 Replicas: 1 Isr: 1 Topic: test.example Partition: 2 Leader: 2 Replicas: 2 Isr: 2 Topic: test.example Partition: 3 Leader: 0 Replicas: 0 Isr: 0 Topic: test.example Partition: 4 Leader: 1 Replicas: 1 Isr: 1 Topic: test.example Partition: 5 Leader: 2 Replicas: 2 Isr: 2 Topic: test.example Partition: 6 Leader: 0 Replicas: 0 Isr: 0 Topic: test.example Partition: 7 Leader: 1 Replicas: 1 Isr: 1 Topic: test.example Partition: 8 Leader: 2 Replicas: 2 Isr: 2 Topic: test.example Partition: 9 Leader: 0 Replicas: 0 Isr: 0 Topic: test.example Partition: 10 Leader: 1 Replicas: 1 Isr: 1 Topic: test.example Partition: 11 Leader: 2 Replicas: 2 Isr: 2
|
修改配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| 将原有replicas为[0]扩充为[0,4], [1]扩充为[1,5],[2]扩充为[2,3] [sankuai@data-kafka01 kafka]$ cat partitions-to-move.json { “partitions”: [ { “topic”: “test.example”, “partition”: 0, “replicas”: [0,4] }, ..... { “topic”: “test.example”, “partition”: 11, “replicas”: [2,3] } ], “version”:1 }
|
执行
1
| ./bin/kafka-reassign-partitions.sh –zookeeper 192.168.2.225:2183/config/mobile/mq –reassignment-json-file partitions-to-move.json –execute
|
检查修改情况
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| [sankuai@data-kafka01 kafka]$ ./bin/kafka-topics.sh –zookeeper 192.168.2.225:2183/config/mobile/mq –describe –topic test.example Topic:test.example PartitionCount:12 ReplicationFactor:2 Configs: Topic: test.example Partition: 0 Leader: 0 Replicas: 0,4 Isr: 0,4 Topic: test.example Partition: 1 Leader: 1 Replicas: 1,5 Isr: 1,5 Topic: test.example Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2,3 Topic: test.example Partition: 3 Leader: 0 Replicas: 0,4 Isr: 0,4 Topic: test.example Partition: 4 Leader: 1 Replicas: 1,5 Isr: 1,5 Topic: test.example Partition: 5 Leader: 2 Replicas: 2,3 Isr: 2,3 Topic: test.example Partition: 6 Leader: 0 Replicas: 0,4 Isr: 0,4 Topic: test.example Partition: 7 Leader: 1 Replicas: 1,5 Isr: 1,5 Topic: test.example Partition: 8 Leader: 2 Replicas: 2,3 Isr: 2,3 Topic: test.example Partition: 9 Leader: 0 Replicas: 0,4 Isr: 0,4 Topic: test.example Partition: 10 Leader: 1 Replicas: 1,5 Isr: 1,5 Topic: test.example Partition: 11 Leader: 2 Replicas: 2,3 Isr: 2,3
|
kafka性能优化
1 2 3 4 5
| # kafka server中默认是不启动jmx端口的,需要用户自己配置 vim bin/kafka-run-class.sh #最前面添加一行 JMX_PORT=8060
|
kafka监控和告警
通过使用,个人总结以上三种监控程序的优缺点:
Kafka Web Console:监控功能较为全面,可以预览消息,监控Offset、Lag等信息,但存在bug,不建议在生产环境中使用。
Kafka Manager:偏向Kafka集群管理,若操作不当,容易导致集群出现故障。对Kafka实时生产和消费消息是通过JMX实现的。没有记录Offset、Lag等信息。
KafkaOffsetMonitor:程序一个jar包的形式运行,部署较为方便。只有监控功能,使用起来也较为安全。
若只需要监控功能,推荐使用KafkaOffsetMonito,若偏重Kafka集群管理,推荐使用Kafka Manager。
因为都是开源程序,稳定性欠缺。故需先了解清楚目前已存在哪些Bug,多测试一下,避免出现类似于Kafka Web Console的问题。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
| 描述:所有的topic的消息速率(消息数/秒) Mbean名:"kafka.server":name="AllTopicsMessagesInPerSec",type="BrokerTopicMetrics" 正常的值:
描述:所有的topic的流入数据速率(字节/秒) Mbean名:"kafka.server":name="AllTopicsBytesInPerSec",type="BrokerTopicMetrics" 正常的值:
描述:producer或Fetch-consumer或Fetch-follower的请求速率(请求次数/秒) Mbean名:"kafka.network":name="{Produce|Fetch-consumer|Fetch-follower}-RequestsPerSec",type="RequestMetrics" 正常的值:
描述:所有的topic的流出数据速率(字节/秒) Mbean名: "kafka.server":name="AllTopicsBytesOutPerSec",type="BrokerTopicMetrics" 正常的值:
描述:刷日志的速率和耗时 Mbean名: "kafka.log":name="LogFlushRateAndTimeMs",type="LogFlushStats" 正常的值:
描述:正在做复制的partition的数量(|ISR| < |all replicas|) Mbean名:"kafka.server":name="UnderReplicatedPartitions",type="ReplicaManager" 正常的值:0
描述:当前的broker是否为controller Mbean名:"kafka.controller":name="ActiveControllerCount",type="KafkaController" 正常的值:在集群中只有一个broker的这个值为1
描述:选举leader的速率 Mbean名:"kafka.controller":name="LeaderElectionRateAndTimeMs",type="ControllerStats" 正常的值:如果有broker挂了,此值非0
描述:Unclean的leader选举速率 Mbean名:"kafka.controller":name="UncleanLeaderElectionsPerSec",type="ControllerStats" 正常的值:0
描述:该broker上的partition的数量 Mbean名: "kafka.server":name="PartitionCount",type="ReplicaManager" 正常的值:应在各个broker中平均分布
描述:Leader的replica的数量 Mbean名: "kafka.server":name="LeaderCount",type="ReplicaManager" 正常的值:应在各个broker中平均分布
描述:ISR的收缩(shrink)速率 Mbean名:"kafka.server":name="ISRShrinksPerSec",type="ReplicaManager" 正常的值:如果一个broker挂掉了,一些partition的ISR会收缩。当那个broker重新起来时,一旦它的replica完全跟上,ISR会扩大(expand)。除此之外,正常情况下,此值和下面的扩大速率都是0。
描述:ISR的扩大(expansion)速率 Mbean名: "kafka.server":name="ISRExpandsPerSec",type="ReplicaManager" 正常的值:参见ISR的收缩(shrink)速率
描述:follower落后leader replica的最大的消息数量 Mbean名:"kafka.server":name="([-.\w]+)-MaxLag",type="ReplicaFetcherManager" 正常的值:小于replica.lag.max.messages
描述:每个follower replica落后的消息速率 Mbean名:"kafka.server":name="([-.\w]+)-ConsumerLag",type="FetcherLagMetrics" 正常的值:小于replica.lag.max.messages
描述:等待producer purgatory的请求数 Mbean名:"kafka.server":name="PurgatorySize",type="ProducerRequestPurgatory" 正常的值:如果ack=-1,应为非0值
描述:等待fetch purgatory的请求数 Mbean名:"kafka.server":name="PurgatorySize",type="FetchRequestPurgatory" 正常的值:依赖于consumer的fetch.wait.max.ms的设置
描述:一个请求(producer,Fetch-Consumer,Fetch-Follower)耗费的所有时间 Mbean名:"kafka.network":name="{Produce|Fetch-Consumer|Fetch-Follower}-TotalTimeMs",type="RequestMetrics" 正常的值:包括了queue, local, remote和response send time
描述:请求(producer,Fetch-Consumer,Fetch-Follower)在请求队列中的等待时间 Mbean名:"kafka.network":name="{Produce|Fetch-Consumer|Fetch-Follower}-QueueTimeMs",type="RequestMetrics" 正常的值:
描述:请求(producer,Fetch-Consumer,Fetch-Follower)在leader处理请求花的时间 Mbean名:"kafka.network":name="{Produce|Fetch-Consumer|Fetch-Follower}-LocalTimeMs",type="RequestMetrics" 正常的值:
描述:请求(producer,Fetch-Consumer,Fetch-Follower)等待follower花费的时间 Mbean名:"kafka.network":name="{Produce|Fetch-Consumer|Fetch-Follower}-RemoteTimeMs",type="RequestMetrics" 正常的值:producer的ack=-1时,非0才正常
描述:发送响应花费的时间 Mbean名:"kafka.network":name="{Produce|Fetch-Consumer|Fetch-Follower}-ResponseSendTimeMs",type="RequestMetrics" 正常的值:
描述:consumer落后producer的消息数量 Mbean名:"kafka.consumer":name="([-.\w]+)-MaxLag",type="ConsumerFetcherManager" 正常的值:
建议对GC耗时和其他参数和诸如系统CPU,I/O时间等等进行监控。在client端,建议对"消息数量/字节数"的速率(全局的和对于每一个topic),请求的"速率/大小/耗时"进行监控。还有consumer端,所有partition的最大的落后情况和最小的fetch请求的速率。consumer为了能跟上,最大落后数量需要少于一个threshold并且最小fetch速率需要大于0.
|
kafka客户端汇总
pip install kafka-python
kafka官方文档
帮助文档
分布式消息中间件应用实践
http://www.ibm.com/developerworks/cn/opensource/os-cn-kafka-distributed/
Apache kafka 工作原理介绍
http://www.ibm.com/developerworks/cn/opensource/os-cn-kafka/index.html
kafka 快速入门手册
http://www.blogjava.net/paulwong/archive/2014/05/11/413506.html
kafka 集群安装与扩容
http://my.oschina.net/MaTech/blog/292090
http://my.oschina.net/MaTech/blog/292090
kafka入门:简介、使用场景、设计原理、主要配置及集群搭建(转)
kafka 性能测试
https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
青云kafka集群介绍
https://docs.qingcloud.com/guide/queue.html#id7
kafka python+zabbix 监控脚本
http://club.oneapm.com/t/zabbix-kafka/854