1.在清华镜像站下载kafka_2.10-0.10.0.0.tgz 和 zookeeper-3.4.10.tar.gz
分别解压到/usr/local目录下
2.进入zookeeper目录,在conf目录下将zoo_sample.cfg文件拷贝,并更名为zoo.cfg
参考
zoo.cfg文件的内容
# The number of ticks that the initial # synchronization phase can takeinitLimit=10# The number of ticks that can pass between # sending a request and getting an acknowledgementsyncLimit=5# the directory where the snapshot is stored.# do not use /tmp for storage, /tmp here is just # example sakes.dataDir=/home/common/zookeeper/zookeeperdir/zookeeper-datadataLogDir=/home/common/zookeeper/zookeeperdir/logs# the port at which the clients will connectclientPort=2181server.1=10.10.100.10:2888:3888# the maximum number of client connections.# increase this if you need to handle more clients#maxClientCnxns=60## Be sure to read the maintenance section of the # administrator guide before turning on autopurge.## http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance## The number of snapshots to retain in dataDir#autopurge.snapRetainCount=3# Purge task interval in hours# Set to "0" to disable auto purge feature#autopurge.purgeInterval=1
新建下面这两个目录
/home/common/zookeeper/zookeeperdir/zookeeper-data/home/common/zookeeper/zookeeperdir/logs
在zookeeper-data目录下新建一个myid文件,内容为1,代表这个服务器的编号是1,具体参考上面网址中的内容
最后在/etc/profile中添加环境变量,并source
export ZOOKEEPER_HOME=/usr/local/zookeeperexport PATH=${ZOOKEEPER_HOME}/bin:$PATH
现在zookeeper就安装好了,现在启动zookeeper
bin/zkServer.sh start
查看状态
bin/zkServer.sh status
启动客户端脚本
bin/zkCli.sh -server localhost:2181
停止zookeeper
bin/zkServer.sh stop
1.现在安装kafka,同样是解压之后就安装好了
参考
2.进入kafka目录下
kafka需要使用Zookeeper,首先需要启动Zookeeper服务,上面的操作就已经启动了Zookeeper服务
如果没有的话,可以使用kafka自带的脚本启动一个简单的单一节点Zookeeper实例
bin/zookeeper-server-start.sh config/zookeeper.properties
启动 Kafka服务
bin/kafka-server-start.sh config/server.properties
停止 Kafka服务
bin/kafka-server-stop.sh config/server.properties
3.创建一个主题
首先创建一个名为test
的topic,只使用单个分区和一个复本
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
现在可以运行list topic命令看到我们的主题
bin/kafka-topics.sh --list --zookeeper localhost:2181
4.发送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testThis is a messageThis is another message
如果要批量导入文件数据到kafka,参考:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic < file_pat
如果要模拟实时数据到打入kafka的情况,可以写一个shell脚本
#!/usr/bin/env bashcat XXXX.log | while read linedo sleep 0.1 echo "${line}" echo "${line}" | /home/lintong/software/apache/kafka_2.11-0.10.0.0/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topicAdone
5.启动一个消费者,消费者会接收到消息
旧版消费者
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning 2>/dev/null
新版消费者
bin/kafka-console-consumer.sh --new-consumer --bootstrap-server localhost:9092 --topic input --from-beginning 2>/dev/null
6.查看指定的topic的offset信息
对于结尾是ZK的消费者,其消费者的信息是存储在Zookeeper中的
对于结尾是KF的消费者,其消费者的信息是存在在Kafka的broker中的
都可以使用下面的命令进行查看
bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181 --group xxx --topic xxx
结果
bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181 --group test-consumer-group --topic xxx[2018-09-03 20:34:57,595] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)Group Topic Pid Offset logSize Lag Ownertest-consumer-group xxx 0 509 0 -509 none
或者
./bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group xxxx --topic xxxx
结果
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test-consumer-group[2018-09-03 20:45:02,967] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)Group Topic Pid Offset logSize Lag Ownertest-consumer-group xxx 0 509 509 0 none
lag是负数的原因是 topic中的消息数量过期(超过kafka默认的7天后被删除了),变成了0,所以Lag=logSize减去Offset,所以就变成了负数
7.删除一个topic
需要在 conf/server.properties 文件中设置
# For delete topicdelete.topic.enable=true
否则在执行了以下删除命令后,再 list 查看所有的topic,还是会看到该topic
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topicB
再到 配置文件 中的kafka数据存储地址去删除物理数据了,我的地址为
/tmp/kafka-logs
最后需要到zk里删除kafka的元数据
./bin/zkCli.sh #进入zk shellls /brokers/topicsrmr /brokers/topics/topicA
参考:
8.查看某个group的信息
新版
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group xxx
结果
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group group_idGROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNERgroup_id xxx 0 509 509 0 consumer-1_/127.0.0.1
如果这时候消费者进程关闭了之后,使用上面的命令和下面的-list命令将不会查出这个group_id,但是当消费者进程重新开启后,这个group_id又能重新查到,且消费的offset不会丢失
旧版
bin/kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --group xxx --describe
9.查看consumer group的列表
ZK的消费者可以使用下面命令查看,比如上面的例子中的 test-consumer-group
bin/kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --list
KF的消费者可以使用下面命令查看,比如上面的例子中的 console-consumer-xxx ,但是只会查看到类似于 KMOffsetCache-lintong-B250M-DS3H 的结果,这是由于这种消费者的信息是存放在 __consumer_offsets 中
对于如何查看存储于 __consumer_offsets 中的新版消费者的信息,可以参考huxihx的博文:
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list
10.在zk中删除一个consumer group
rmr /consumers/test-consumer-group
11.查看topic的offset的最小值
参考:
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic xxxx --time -2xxxx:0:0
12.查看topic的offset的最大值
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic xxxx --time -1