kafka命令行操作大全

article/2025/4/20 0:48:46

最近利用flink使用一个流式SQL处理平台,利用kafka, mysql, hive等组件比较多,命令行突然间需要操作一次记不住命令很麻烦,索性直接整理成笔记。
在这里插入图片描述

在 0.9.0.0 之后的 Kafka,出现了几个新变动,一个是在 Server 端增加了 GroupCoordinator 这个角色,另一个较大的变动是将 topic 的 offset 信息由之前存储在 zookeeper 上改为存储到一个特殊的 topic(__consumer_offsets)中。

启动 Kafka

后台常驻方式,带上参数 -daemon,如:

bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

指定 JMX port 端口启动,指定 jmx,可以方便监控 Kafka 集群

JMX_PORT=9991 bin/kafka-server-start.sh -daemon ./config/server.properties

停止 Kafka

bin/kafka-server-stop.sh

Topic

bin/kafka-topics.sh --zookeeper ZOOKEEPER_HOST1:PORT1,ZOOKEEPER_HOST2:PORT2 --create --replication-factor REPLICA_NUM --partitions PARTITION_NUM --topic TOPIC_NAME

创建 Topic

参数 --topic 指定 Topic 名,--partitions 指定分区数,--replication-factor 指定副本数:

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

注意,如果配置文件 server.properties 指定了 Kafka 在 zookeeper 上的目录,则参数也要指定,否则会报无可用的 brokers(下面部分命令也有同样的情况),如:

bin/kafka-topics.sh --create --zookeeper localhost:2181/kafka --replication-factor 1 --partitions 1 --topic test

partions: kafka通过分区策略,将不同的分区分配到一个集群中的broker上,然后消息会通过负载均衡发不到不同的分区上,consumer会监测偏移量来获取哪个分区有新数据,从而从该分区上拉取消费,分区数越多,在一定程度上会提升消息处理的吞吐量,但因为kafka是基于文件进行读写,因此也需要打开更多的文件句柄,也会增加一定的性能开销。分区可以根据消费者数量定义,通常为消费者个数*配置项中的线程数.
replication-factor:用来设置主题的副本数。每个主题可以有多个副本,副本位于集群中不同的broker上,也就是说副本的数量不能超过broker的数量,否则创建主题时会失败。

列出所有 Topic

bin/kafka-topics.sh --list --zookeeper localhost:2181

查看 Topic

bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test 

增加 Topic 的 partition 数

bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic test --partitions 5 

查看topic的状态和分区负载详情

bin/kafka-topics.sh --zookeeper ZOOKEEPER_HOST1:PORT1,ZOOKEEPER_HOST2:PORT2 --describe --topic TOPIC_NAME

如果发现以下现象说明kafka异常:
某个topic的每个分区,同步副本数量和设定的副本数量不一致;
某个topic的每个分区,leader的id数值是-1或者none;

查看 topic 指定分区 offset 的最大值或最小值

time 为 -1 时表示最大值,为 -2 时表示最小值:

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic test --time -1 --broker-list 127.0.0.1:9092 --partitions 0 

查询topic的offset的范围

查询offset最小值

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list slave6:9092 -topic videoplay --time -2

查询offset最大值

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list slave6:9092 -topic videoplay --time -1

重置消费者offset

bin/kafka-consumer-groups.sh --bootstrap-server BORKER_HOST1:PORT1,BORKER_HSOT2:PORT2 --group GROUP_NAME  --reset-offsets --execute --to-offset NEW_OFFSET --topic TOPIC_NAME
bin/kafka-consumer-groups.sh --bootstrap-server BORKER_HOST1:PORT1,BORKER_HSOT2:PORT2 --group GROUP_NAME  --reset-offsets --execute --to-earliest/--to-latest --topic TOPIC_NAME

删除 Topic

bin/kafka-topics.sh --zookeeper localhost:2181 --topic test --delete 

删除topic下的数据

bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic test_topic --config cleanup.policy=delete

给指定TOPIC设置消息存储时间 – 针对数据量大,磁盘小的情况

查看某一个topic设置过期时间

259200000 – 72 小时
86400000 – 24 小时
43200000 – 12 小时
28800000 – 8 小时
10800000 – 3 小时

bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test_topic --entity-type topics
单独对某一个topic设置过期时间(下列两条命令都可用)
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-name test_topic --entity-type topics --add-config retention.ms=86400000
bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name test_topic --alter --add-config retention.ms=259200000

生产消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 

消费消息

从头开始

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

从尾部开始

从尾部开始取数据,必需要指定分区:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --offset latest --partition 0

指定分区

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --offset latest --partition 0

取指定个数

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --offset latest --partition 0 --max-messages 1 

消费者 Group

消费指定 Group

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test -group test_group --from-beginning

消费者 Group 列表

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

查看消费group状态和消费详情

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test_group --describe

输出:

Consumer group 'test_group' has no active members.TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
test            0          5               5               0               -               -               -# CURRENT-OFFSET: 当前消费者群组最近提交的 offset,也就是消费者分区里读取的当前位置
# LOG-END-OFFSET: 当前最高水位偏移量,也就是最近一个读取消息的偏移量,同时也是最近一个提交到集群的偏移量
# LAG:消费者的 CURRENT-OFFSET 与 broker 的 LOG-END-OFFSET 之间的差距

设置consumer group的offset

//启动zookeeper client
zookeeper/bin/zkCli.sh//通过下面命令设置consumer group:DynamicRangeGroup topic:DynamicRange partition:0的offset为1288:
set /consumers/DynamicRangeGroup/offsets/DynamicRange/0 1288// 或者:  注意如果你的kafka设置了zookeeper root,比如为/kafka,那么命令应该改为:
set /kafka/consumers/DynamicRangeGroup/offsets/DynamicRange/0 1288// 重启kafka zookeeper

删除 group 中的 Topic

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test_group --topic test --delete

删除 Group

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test_group --delete

平衡 leader

bin/kafka-preferred-replica-election.sh --bootstrap-server localhost:9092

自带压测工具

bin/kafka-producer-perf-test.sh --topic test --num-records 100 --record-size 1 --throughput 100 --producer-props bootstrap.servers=localhost:9092 

作者:yongxinz
链接:参考地址

Kafka常用命令合集
https://www.cnblogs.com/toutou/p/kafka_command.html

kafka 命令行 生产消费数据,查看偏移量,修改偏移量
https://blog.csdn.net/ispringmw/article/details/108834144
https://blog.csdn.net/qq_29116427/article/details/80206125


http://chatgpt.dhexx.cn/article/xLcpVWEV.shtml

相关文章

Kafka命令大全

kafka 脚本 connect-distributed.sh connect-mirror-maker.sh connect-standalone.sh kafka-acls.sh kafka-broker-api-versions.sh kafka-configs.sh kafka-console-consumer.sh kafka-console-producer.sh kafka-consumer-groups.sh kafka-consumer-perf-test.sh kafka-dele…

Kafka常用命令行命令

文章目录 Kafka常用命令kafka的基本操作(命令行操作)1.启动集群:2.查看当前服务器中的所有topic(在kafka目录下)3.创建主题topic(在kafka目录下)4.删除topic(在kafka目录下&#xff…

美国Stripe支付Android端集成流程

上家公司想要拓展自己在新加坡的市场,打算做一个新加坡本地的生活服务应用,其中少不了的就是支付了。国外支付这块一直是个头疼的问题。想用Google Wallet吧,但它是采用NFC接触式交易,想要进行线上服务时没法进行,后来就去整个贝宝PayPal支付。在这里想吐槽一下,PayPal支付做起…

zencart1.55手把手教你开发stripe支付插件

第一步&#xff1a;在includes/modules/payment目录下创建名称为c_stripe的文件夹&#xff0c;用于存放stripe支付logo 第二步&#xff1a;在同includes/modules/payment目录下创建c_stripe.php文件&#xff0c;这个文件就是用于编写zencart支付插件,代码如下 <?php // /…

JAVA接入STRIPE支付教程(测试环境),STRIPE支付的调用以及STRIPE WEBHOOK回调

一、环境准备 1.注册 2.密钥 3.WEBHOOK回调 二、核心代码 1.配置API.key以及webhook.key 2.支付demo 3.WEBHOOK回调 一、环境准备 1.注册 STRIPE官网自行注册账号 2.两个重要的密钥 首先在STRIPE官网注册账号之后进入首页&#xff0c;点击API密钥&#xff0c;查看账号对应的…

前端对接stripe支付,创建测试session_is

第一次搞 stripe支付&#xff0c;国外的文档全英文 接stripe支付&#xff0c;根据官方文档&#xff0c;首先就是先跟服务端交互&#xff0c;创建session会话&#xff0c;获取id&#xff0c;当服务端不做这个功能时&#xff0c;就需要前端去掉stripe最底层的api&#xff0c;拿到…

Stripe支付简介和前端js调用

最近公司正在做一个国际版APP&#xff0c;涉及到海外支付&#xff0c;调研过Paypal、Skrill、BrainTree、Stripe&#xff08;可参考海外移动支付方案对比&#xff09;&#xff0c;最终 选择了Stripe支付。Stripe特点如下&#xff1a; 收费规则简单透明&#xff0c;手续费就是收…

laravel 对接stripe支付

参考文档 &#xff1a; stripe文档 stripe/stripe-php stripe api 文档 目录 一 获取关键参数二 安装Stripe库三 代码示例 一 获取关键参数 SCRIPE_SECRET_KEY &#xff08;调用api秘钥&#xff09; NOTIFY_SIGN (签名 支付回调使用) 二 安装Stripe库 # Install the PHP libr…

Stripe支付配置

开通支付 首先&#xff0c;你需要在 Stripe 官网开通你自己的支付账号信息&#xff1a; https://stripe.com/ 注册好以后&#xff0c;你即可获取 Stripe 的密钥信息&#xff1a; 密钥主要包含两部分&#xff0c;可发布的密钥 密钥 同时&#xff0c;你需要找到你交易的对应的货…

php实现Stripe支付 | ecshop stripe支付

Stripe支付 &#xff1a;Stripe Login | Sign in to the Stripe Dashboard 1. 安装Stripe&#xff1a; composer require stripe/stripe-php 2. 获取密钥&#xff1a;https://dashboard.stripe.com/test/apikeys 3. 创建产品&#xff1a;Stripe Login | Sign in to the Stri…

php实现Stripe支付

Stripe支付 &#xff1a;https://dashboard.stripe.com/dashboard 1. 安装Stripe&#xff1a; composer require stripe/stripe-php 2. 获取密钥&#xff1a; https://dashboard.stripe.com/test/apikeys 3. 创建产品&#xff1a; https://dashboard.stripe.com/test/product…

stripe 支付

stripe 支付整理 1、创建账号 官方网址 中文版 https://stripe.com/zh-cn-us/payments 2、激活你的账号 填写信息只支持国外与香港的哦 3、开发者秘钥 如果不激活的话&#xff0c;只能用测试api秘钥 4、配置你的回调地址 配置秘钥&#xff0c;选择webhook事件 事件一定要选择…

java 对接 stripe支付

stripe 支付跟国内的 支付宝 、微信、等第三方支付平台不一样 码字不易&#xff0c;开源更不易&#xff0c;点赞收藏关注&#xff0c;多多支持 开源地址 https://gitee.com/J-LJJ/stripe-demo 支付方式一 先看效果 支付方式2&#xff08;需要配合回调&#xff09; 2023-04…

Stripe支付流程简要描述

在国外&#xff0c;除了Paypal支付之外&#xff0c;Stripe支付也占有很大一部分市场份额&#xff0c;Stripe支付官网 https://stripe.com/ 下面简单介绍一下Stripe的支付流程。 1、用户页面输入充值金额&#xff0c;点击确定跳转到支付页面&#xff08;页面的样式由stripe提供…

stripe支付集成

最近公司要做一下Stripe支付的集成&#xff0c;浅浅地谈一下自己的一点理解 1、stripe是什么&#xff1f; stripe是第三方的支付平台&#xff0c;就像国内的支付宝、微信支付。。。 stripe官方文档&#xff1a;Developer tools | Stripe Documentation 关于stripe支付&…

初步认识 Stripe 支付

前言 这段时间在做支付相关的工作&#xff0c;由于业务主要是面向国外的用户&#xff0c;因而就接触了部分国外的支付支付相关的平台。接下来的内容主要是初步看了 Stripe 平台的文档所了解到的基本内容&#xff0c;后面会在使用的过程中不断地进行完善。 基本介绍和与其他支…

Stripe支付流程

近几天因为公司的项目中遇到了需要支持给国外本土支付提供支持&#xff0c;经过调研了市面上几款的产品后选择了stripe支付 由于资料比较少没有太多讨论&#xff0c;慢慢查看官方文档以下是我对官方文档梳理和对接过程中的一些经验和理解记录了下来 关于Stripe Stripe是一家提…

Stripe国际支付简介及API对接

文章目录 一、了解Stripe支付二、Stripe注册流程三、Stripe API 特点3.1 Apikey3.2 Idempotent Requests 幂等请求3.3 两种付款方式 四、Stripe 支付核心API4.1 Token4.2 Customer4.3 Card4.4 Source4.5 charge4.6 PaymentIntents4.7 PaymentMethod 五、完整Stripe支付代码 一、…

mingw(msys2)编译ffmpeg

mingw(msys2)编译ffmpeg 首先要确保pacman环境是最新的&#xff0c;否则会出现莫名其妙的问题&#xff0c;可以执行“pacman -Syu”更新包 安装mingw: pacman -S gcc mingw-w64-i686-toolchain yasm mingw-w64-i686-SDL2 //mingw32 pacman -S gcc mingw-w64-x86_64-toolchai…

Hyperscan Windows 编译指南

Hyperscan Windows 编译指南 Hyperscan 源码下载&#xff1a;https://www.hyperscan.io/准备环境&#xff1a; Windows 10 X64 Cygwin : https://www.cygwin.com/ CMake&#xff1a;https://cmake.org/ Visual Studio 2017 Python (2.7 版本) Boost : https://www.boost…