kafka命令行操作主要分为三大类: 主题命令行操作、生产者命令行操作、消费者命令行操作。
注意: 命令行操作前提,启动kafka集群。
1. 主题命令行操作
1.1 查看主题命令行参数
a) 查询命令
[root@hadoop102 ~]# /opt/module/kafka/bin/kafka-topics.sh
b)参数列表
参数 | 描述 |
--bootstrap-server<String:server toconnect to> | 连接的Kafka Broker主机名称和端口号 |
--topic<String:topic> | 操作的topic名称 |
--create | 创建主题 |
--delete | 删除主题 |
--alter | 修改主题 |
--list | 查看所有主题 |
--describle | 查看主题详细描述 |
--partitions<Integer:#of partitions> | 设置分区数 |
--replication-factor<Integer:replication factor> | 设置分区副本 |
--config<String:name=value> | 更新系统默认的配置 |
1.2 查看当前主机的所有主题(topic)
# 防止kafka宕机可以使用两台
[root@hadoop102 ~]# /opt/module/kafka/bin/kafka-topics.sh --bootstrap-server hadoop102:9092,hadoop103:9092 --list# 测试一般使用一台即可
[root@hadoop102 ~]# /opt/module/kafka/bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --list
报错问题:kafka没有启动
产生原因:
之前设置的zk的clientPort值不同导致kafka无法注册到zk上,启动后无法注册导致宕机。
解决:
zk集群的所有主机的配置文件中的配置项clientPort都要设置为一样的,zk的默认端口是2181,都设置为clientPort=2181(必须是相同的)
还没有主题
1.3 创建主题
a)创建一个first主题并指定分区为1,分区副本3份。
[root@hadoop102 ~]# /opt/module/kafka/bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --topic first --create --partitions 1 --replication-factor 3
b)查看所有的主题
[root@hadoop102 ~]# /opt/module/kafka/bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --list
c)查看指定主题的详细信息
[root@hadoop102 ~]# /opt/module/kafka/bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --topic first --describe
1.4 修改分区数
注意: 分区数只能增加,不能减少
[root@hadoop102 ~]# /opt/module/kafka/bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 3
查看当前分区详情
发现分区数改变
1.5 删除主题
[root@hadoop102 ~]# /opt/module/kafka/bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --delete --topic first
2. 生产者与消费者命令操作
2.1 查看命令参数
1) 查看生产者命令参数
[root@hadoop102 kafka]# bin/kafka-console-producer.sh
参数 | 描述 |
--bootstrap-server <String: server toconnect to> | 连接的 Kafka Broker 主机名称和端口号 |
--topic <String: topic> | 操作的 topic 名称 |
2)查看消费者命令参数
[root@hadoop102 kafka]# bin/kafka-console-consumer.sh
参数 | 描述 |
--bootstrap-server <String: server toconnect to> | 连接的 Kafka Broker 主机名称和端口号 |
--topic <String: topic> | 操作的 topic 名称 |
--from-beginning | 从头开始消费 |
--group <String: consumer group id> | 指定消费者组名称 |
2.2 发送消息与消费消息
使用hadoop102生产消息,hadoop103消费消息。
1)生产消息
一次只能发送一条,多发送会报错,等消费者消费完可以发送之后的消息。
[root@hadoop102 kafka]# bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
开启后需要等待,等待消费者开启
2)消费消息
消费 first 主题中的数据
注意:新创建消费者无法消费之前的历史记录,只能消费生产者新产生的记录
[root@hadoop103 kafka]# bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
3) 测试
生产者发送一条消息
这时消费者消费一条
4)查询消费者消费记录
把主题中所有的数据都读取出来(包括历史数据)
[root@hadoop102 kafka]# bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first