Kafka命令大全

article/2025/4/20 0:59:58

kafka 脚本

connect-distributed.sh
connect-mirror-maker.sh
connect-standalone.sh
kafka-acls.sh
kafka-broker-api-versions.sh
kafka-configs.sh
kafka-console-consumer.sh
kafka-console-producer.sh
kafka-consumer-groups.sh
kafka-consumer-perf-test.sh
kafka-delegation-tokens.sh
kafka-delete-records.sh
kafka-dump-log.sh
kafka-leader-election.sh
kafka-log-dirs.sh
kafka-mirror-maker.sh
kafka-preferred-replica-election.sh
kafka-producer-perf-test.sh
kafka-reassign-partitions.sh
kafka-replica-verification.sh
kafka-run-class.sh
kafka-server-start.sh
kafka-server-stop.sh
kafka-streams-application-reset.sh
kafka-topics.sh
kafka-verifiable-consumer.sh
kafka-verifiable-producer.sh
trogdor.sh
windows
zookeeper-security-migration.sh
zookeeper-server-start.sh
zookeeper-server-stop.sh
zookeeper-shell.sh

集群管理

# 启动zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties &# 停止zookeeper
bin/zookeeper-server-stop.sh# 前台启动broker Ctrl + C 关闭
bin/kafka-server-start.sh <path>/server.properties# 后台启动broker
bin/kafka-server-start.sh -daemon <path>/server.properties# 关闭broker
bin/kafka-server-stop.sh

topic相关

# 创建topic(4个分区,2个副本)
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic test
# kafka版本 >= 2.2
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test--create:指定创建topic动作--topic:指定新建topic的名称--zookeeper: 指定kafka连接zk的连接url,该值和server.properties文件中的配置项{zookeeper.connect}一样--partitions:指定当前创建的kafka分区数量,默认为1个--replication-factor:指定每个分区的复制因子个数,默认1个# 分区扩容,注意:分区数量只能增加,不能减少
# kafka版本 < 2.2
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 --partitions 2
# kafka版本 >= 2.2
bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic topic1 --partitions 2# 删除topic
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test# 查询topic列表
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --list
# 查询topic列表(支持0.9版本+)
bin/kafka-topics.sh --list --bootstrap-server localhost:9092# 查看所有topic的详细信息
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe # 查询topic详情
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topicname

消费者组

查看consumer group列表有新、旧两种命令,分别查看新版(信息保存在broker中)consumer列表和老版(信息保存在zookeeper中)consumer列表,因而需要区分指定bootstrap–server和zookeeper参数

# 新消费者列表查询(支持0.9版本+)
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list
# 消费者列表查询(支持0.10版本+)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list# 显示某个消费组的消费详情(仅支持offset存储在zookeeper上的)
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test
# 显示某个新消费组的消费详情(0.9版本 - 0.10.1.0 之前)
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group my-group
## 显示某个消费组的消费详情(0.10.1.0版本+)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group# 重设消费者组位移
# 最早处
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupname --reset-offsets --all-topics --to-earliest --execute
# 最新处
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupname --reset-offsets --all-topics --to-latest --execute
# 某个位置
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupname --reset-offsets --all-topics --to-offset 2000 --execute
# 调整到某个时间之后的最早位移
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupname --reset-offsets --all-topics --to-datetime 2019-09-15T00:00:00.000# 删除消费者组
bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --delete --group groupname

显示某个消费组的消费详情
显示某个消费组的消费详情
各字段含义如下

TOPICPARTITIONCURRENT-OFFSETLOG-END-OFFSETLAGCONSUMER-IDHOSTCLIENT-ID
topic名字分区id当前已消费的条数总条数未消费的条数消费id主机ip客户端id

发送和消费

# 生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test# 消费者,其中"--from-beginning"为可选参数,表示要从头消费消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicname --from-beginning
# 指定groupid
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicname --from-beginning --consumer-property group.id=old-consumer-group
# 指定分区
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicname --from-beginning --partition 0# 新生产者(支持0.9版本+)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --producer.config config/producer.properties# 新消费者(支持0.9版本+)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --new-consumer --from-beginning --consumer.config config/consumer.properties# kafka-verifiable-consumer.sh(消费者事件,例如:offset提交等)
bin/kafka-verifiable-consumer.sh --broker-list localhost:9092 --topic test --group-id groupName# 高级点的用法
bin/kafka-simple-consumer-shell.sh --brist localhost:9092 --topic test --partition 0 --offset 1234  --max-messages 10

切换leader

# kafka版本 <= 2.4
bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot# kafka新版本
bin/kafka-preferred-replica-election.sh --bootstrap-server broker_host:port

kafka自带压测命令

bin/kafka-producer-perf-test.sh --topic test --num-records 100 --record-size 1 --throughput 100  --producer-props bootstrap.servers=localhost:9092

kafka持续发送消息

持续发送消息到指定的topic中,且每条发送的消息都会有响应信息:

kafka-verifiable-producer.sh --broker-list $(hostname -i):9092 --topic test --max-messages 100000

zookeeper-shell.sh

如果kafka集群的zk配置了chroot路径,那么需要加上/path

bin/zookeeper-shell.sh localhost:2181[/path]
ls /brokers/ids
get /brokers/ids/0

迁移分区

  1. 创建规则json
cat > increase-replication-factor.json <<EOF
{"version":1, "partitions":[
{"topic":"__consumer_offsets","partition":0,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":1,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":2,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":3,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":4,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":5,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":6,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":7,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":8,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":9,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":10,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":11,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":12,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":13,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":14,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":15,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":16,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":17,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":18,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":19,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":20,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":21,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":22,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":23,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":24,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":25,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":26,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":27,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":28,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":29,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":30,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":31,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":32,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":33,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":34,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":35,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":36,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":37,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":38,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":39,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":40,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":41,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":42,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":43,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":44,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":45,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":46,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":47,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":48,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":49,"replicas":[0,1]}]
}
EOF
  1. 执行
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute
  1. 验证
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --verify

MirrorMaker 跨机房灾备工具

bin/kafka-mirror-maker.sh --consumer.config consumer.properties --producer.config producer.properties --whitelist topicA|topicB

 


http://chatgpt.dhexx.cn/article/BWlLnNZ1.shtml

相关文章

Kafka常用命令行命令

文章目录 Kafka常用命令kafka的基本操作&#xff08;命令行操作&#xff09;1.启动集群&#xff1a;2.查看当前服务器中的所有topic&#xff08;在kafka目录下&#xff09;3.创建主题topic&#xff08;在kafka目录下&#xff09;4.删除topic&#xff08;在kafka目录下&#xff…

美国Stripe支付Android端集成流程

上家公司想要拓展自己在新加坡的市场,打算做一个新加坡本地的生活服务应用,其中少不了的就是支付了。国外支付这块一直是个头疼的问题。想用Google Wallet吧,但它是采用NFC接触式交易,想要进行线上服务时没法进行,后来就去整个贝宝PayPal支付。在这里想吐槽一下,PayPal支付做起…

zencart1.55手把手教你开发stripe支付插件

第一步&#xff1a;在includes/modules/payment目录下创建名称为c_stripe的文件夹&#xff0c;用于存放stripe支付logo 第二步&#xff1a;在同includes/modules/payment目录下创建c_stripe.php文件&#xff0c;这个文件就是用于编写zencart支付插件,代码如下 <?php // /…

JAVA接入STRIPE支付教程(测试环境),STRIPE支付的调用以及STRIPE WEBHOOK回调

一、环境准备 1.注册 2.密钥 3.WEBHOOK回调 二、核心代码 1.配置API.key以及webhook.key 2.支付demo 3.WEBHOOK回调 一、环境准备 1.注册 STRIPE官网自行注册账号 2.两个重要的密钥 首先在STRIPE官网注册账号之后进入首页&#xff0c;点击API密钥&#xff0c;查看账号对应的…

前端对接stripe支付,创建测试session_is

第一次搞 stripe支付&#xff0c;国外的文档全英文 接stripe支付&#xff0c;根据官方文档&#xff0c;首先就是先跟服务端交互&#xff0c;创建session会话&#xff0c;获取id&#xff0c;当服务端不做这个功能时&#xff0c;就需要前端去掉stripe最底层的api&#xff0c;拿到…

Stripe支付简介和前端js调用

最近公司正在做一个国际版APP&#xff0c;涉及到海外支付&#xff0c;调研过Paypal、Skrill、BrainTree、Stripe&#xff08;可参考海外移动支付方案对比&#xff09;&#xff0c;最终 选择了Stripe支付。Stripe特点如下&#xff1a; 收费规则简单透明&#xff0c;手续费就是收…

laravel 对接stripe支付

参考文档 &#xff1a; stripe文档 stripe/stripe-php stripe api 文档 目录 一 获取关键参数二 安装Stripe库三 代码示例 一 获取关键参数 SCRIPE_SECRET_KEY &#xff08;调用api秘钥&#xff09; NOTIFY_SIGN (签名 支付回调使用) 二 安装Stripe库 # Install the PHP libr…

Stripe支付配置

开通支付 首先&#xff0c;你需要在 Stripe 官网开通你自己的支付账号信息&#xff1a; https://stripe.com/ 注册好以后&#xff0c;你即可获取 Stripe 的密钥信息&#xff1a; 密钥主要包含两部分&#xff0c;可发布的密钥 密钥 同时&#xff0c;你需要找到你交易的对应的货…

php实现Stripe支付 | ecshop stripe支付

Stripe支付 &#xff1a;Stripe Login | Sign in to the Stripe Dashboard 1. 安装Stripe&#xff1a; composer require stripe/stripe-php 2. 获取密钥&#xff1a;https://dashboard.stripe.com/test/apikeys 3. 创建产品&#xff1a;Stripe Login | Sign in to the Stri…

php实现Stripe支付

Stripe支付 &#xff1a;https://dashboard.stripe.com/dashboard 1. 安装Stripe&#xff1a; composer require stripe/stripe-php 2. 获取密钥&#xff1a; https://dashboard.stripe.com/test/apikeys 3. 创建产品&#xff1a; https://dashboard.stripe.com/test/product…

stripe 支付

stripe 支付整理 1、创建账号 官方网址 中文版 https://stripe.com/zh-cn-us/payments 2、激活你的账号 填写信息只支持国外与香港的哦 3、开发者秘钥 如果不激活的话&#xff0c;只能用测试api秘钥 4、配置你的回调地址 配置秘钥&#xff0c;选择webhook事件 事件一定要选择…

java 对接 stripe支付

stripe 支付跟国内的 支付宝 、微信、等第三方支付平台不一样 码字不易&#xff0c;开源更不易&#xff0c;点赞收藏关注&#xff0c;多多支持 开源地址 https://gitee.com/J-LJJ/stripe-demo 支付方式一 先看效果 支付方式2&#xff08;需要配合回调&#xff09; 2023-04…

Stripe支付流程简要描述

在国外&#xff0c;除了Paypal支付之外&#xff0c;Stripe支付也占有很大一部分市场份额&#xff0c;Stripe支付官网 https://stripe.com/ 下面简单介绍一下Stripe的支付流程。 1、用户页面输入充值金额&#xff0c;点击确定跳转到支付页面&#xff08;页面的样式由stripe提供…

stripe支付集成

最近公司要做一下Stripe支付的集成&#xff0c;浅浅地谈一下自己的一点理解 1、stripe是什么&#xff1f; stripe是第三方的支付平台&#xff0c;就像国内的支付宝、微信支付。。。 stripe官方文档&#xff1a;Developer tools | Stripe Documentation 关于stripe支付&…

初步认识 Stripe 支付

前言 这段时间在做支付相关的工作&#xff0c;由于业务主要是面向国外的用户&#xff0c;因而就接触了部分国外的支付支付相关的平台。接下来的内容主要是初步看了 Stripe 平台的文档所了解到的基本内容&#xff0c;后面会在使用的过程中不断地进行完善。 基本介绍和与其他支…

Stripe支付流程

近几天因为公司的项目中遇到了需要支持给国外本土支付提供支持&#xff0c;经过调研了市面上几款的产品后选择了stripe支付 由于资料比较少没有太多讨论&#xff0c;慢慢查看官方文档以下是我对官方文档梳理和对接过程中的一些经验和理解记录了下来 关于Stripe Stripe是一家提…

Stripe国际支付简介及API对接

文章目录 一、了解Stripe支付二、Stripe注册流程三、Stripe API 特点3.1 Apikey3.2 Idempotent Requests 幂等请求3.3 两种付款方式 四、Stripe 支付核心API4.1 Token4.2 Customer4.3 Card4.4 Source4.5 charge4.6 PaymentIntents4.7 PaymentMethod 五、完整Stripe支付代码 一、…

mingw(msys2)编译ffmpeg

mingw(msys2)编译ffmpeg 首先要确保pacman环境是最新的&#xff0c;否则会出现莫名其妙的问题&#xff0c;可以执行“pacman -Syu”更新包 安装mingw: pacman -S gcc mingw-w64-i686-toolchain yasm mingw-w64-i686-SDL2 //mingw32 pacman -S gcc mingw-w64-x86_64-toolchai…

Hyperscan Windows 编译指南

Hyperscan Windows 编译指南 Hyperscan 源码下载&#xff1a;https://www.hyperscan.io/准备环境&#xff1a; Windows 10 X64 Cygwin : https://www.cygwin.com/ CMake&#xff1a;https://cmake.org/ Visual Studio 2017 Python (2.7 版本) Boost : https://www.boost…

Hyperscan 5.4.0 安装教程 (CentOS7环境)

参考&#xff1a;Getting Started — Hyperscan 5.4.0 documentationhttp://intel.github.io/hyperscan/dev-reference/getting_started.html 目录 1.下载 2.安装环境配置 2.1 硬件需求 2.2 软件需求 3.安装 3.1 创建构建目录 3.2 设置编译选项 3.3 构建hyperscan 4.安…