kafka使用
- 查看Kafka当前的主题列表
- 创建一个主题
- 查看主题信息
- 修改分区信息
- 删除一个主题
- 生成者推送消息
- 消费者接收
- 查看分组信息
- 查看特定consumer group 详情
基于Kafka 2.13版本的操作
查看Kafka当前的主题列表
./kafka-topics.sh --zookeeper 127.0.0.1:2181 --list
创建一个主题
–zookeeper 连接zk集群
–replication-factor 副本数量
–partitions 分区数量
–topic 主题名
./kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --replication-factor 1 --partitions 1 -topic secondTest
查看主题信息
./kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe --topic first_test
修改分区信息
./kafka-topics.sh --zookeeper 127.0.0.1:2181 --alter --topic first_test --partitions 2
删除一个主题
需要server.properties中设置delete.topic.enable=true否则只是标记删除或者手动删除文件或者直接重启
./kafka-topics.sh --zookeeper 127.0.0.1:2181 --delete --topic secondTest
生成者推送消息
./kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic first_test
消费者接收
低版本的命令如下,在这个版本行不通:
./kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic first_test --from-beginning
需要换成
./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_test --from-beginning
查看分组信息
注意
- 这里的IP地址似乎不能是localhost或127.0.0.1,一开始用这两个一直报错Failed to find brokers to send ListGroups,后来修改了server.properties中的listeners=PLAINTEXT://172.20.40.245:9292 以及producer.properties和consumer.properties中的bootstrap.servers=172.20.40.245:9092
./kafka-consumer-groups.sh --bootstrap-server 172.20.40.245:9292 --list
查看特定consumer group 详情
./kafka-consumer-groups.sh --bootstrap-server 172.20.40.245:9092 --group firstId --describe