Java代码重置kafka0.10.1.0版本的偏移量offset

article/2025/9/12 7:13:10

如果使用的自动提交偏移量的模式,偏移量会给到kafka或者zk进行管理,其中kafka的偏移量重置给了重新消费kafka内未过期的数据提供了机会,当消费者出错,比如消费了数据,但是中途处理失败,导致数据丢失,这时候重置偏移量就是一剂后悔药,让消费者能够重新来过,当然后悔药也是有保质期的,还得取决于数据的保留策略。

这里讨论一下kafka_2.11.0.10.1.0版本重置偏移量的方案

该版本kafka不像其他版本一样,通过执行一句方便的命令就可以重置到指定的偏移量,本文给出了一种通过Java代码来重置偏移量的方式,若有不足,还望指教。

先来看一下如下代码

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;import java.util.HashMap;
import java.util.Properties;public class ResetKafkaOffset {public ResetKafkaOffset() {}public static void main(String[] args) {try {HashMap<String, String> params = new HashMap();// 解析参数for (String param : args) {String[] split = param.split("=");if (split.length == 2) {System.out.println(split[0] + "=" + split[1]);params.put(split[0], split[1]);}}String topic = params.remove("topic");//参数校验 topic、消费组、分区、sever地址没有设置则提示并退出if (null == topic) {System.out.println("please set topic==XXX");System.exit(-1);}String groupId = params.remove("group.id");if (null == groupId) {System.out.println("please set group.id==XXX");System.exit(-1);}String server = params.remove("server");if (null == server) {System.out.println("please set server==XXX");System.exit(-1);}if (params.size() == 0) {System.out.println("please set at lease one partition_x==XXX");System.exit(-1);}// kafka设置Properties properties = new Properties();properties.put("bootstrap.servers", server);properties.put("group.id", groupId);properties.put("enable.auto.commit", "true");properties.put("auto.commit.interval.ms", "6000");properties.put("session.timeout.ms", "10000");properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.put("auto.offset.reset", "earliest");KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);HashMap offsets = new HashMap();// 设置每个分区的偏移量for (String key : params.keySet()) {String[] partitions = key.split("_");TopicPartition topicPartition = new TopicPartition(topic, Integer.parseInt(partitions[1]));OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(Long.parseLong(params.get(key)), "manual");offsets.put(topicPartition, offsetAndMetadata);}// 提交kafkaConsumer.commitSync(offsets);System.out.println("ok");} catch (Exception e) {e.printStackTrace();}}
}

我们把kafka的连接以及我们需要重置的偏移量参数以等号“=”分割的方式传入程序中,然后解析,校验,kafka连接设置,分区偏移量设置,最后提交达到偏移量重置的目的。

参数设置如:server=127.0.0.1:9092 group.id=console-consumer-48585 topic=testTopic partition_0=1 partition_1=1

server:kafka连接地址

group.id:消费组

topic:kafka的topic

partition_a = b:a表示几号分区数,b表示要重置到指定的偏移量,既要把分区a重置到b位置

现在console-consumer-48585消费组信息如下,两个分区的数据都已经完全消费了

现在停掉console-consumer-48585的消费者进程,把分区0的偏移量指到59,分区2的偏移量指到58,并运行程序

参数

server=127.0.0.1:9092 group.id=console-consumer-48585 topic=testTopic partition_0=59 partition_1=58

程序正常运行

这时候启动我们的消费者进程,会看到console-consumer-48585组下的消费者会重新消费分区1的后一个数和分区2的后一个数,表示重置成功。

sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testTopic --consumer.config ../config/consumer.properties

当然生产、灰度环境等遇到需要像这样来重置偏移量的时候,在本地肯定是连不上生产、灰度的地址,所以需要把改代码编译成class文件,然后把该class文件放到kafka的bin目录下(如果是集群放到任意一个下即可),停掉消费应用,然后执行如下的脚本命令(按需要替换掉分区、偏移量、地址、分组)

dir=../libs
for file in "$dir"/*.jar
dopath="$path":"$file"
done
java -cp $path ResetKafkaOffset server=127.0.0.1:9092 group.id=console-consumer-48585 topic=testTopic partition_0=59 partition_1=58

 

 

 

 

 


http://chatgpt.dhexx.cn/article/4Go9MhcB.shtml

相关文章

Java重置Mysql主键自增长值

MySql 主键自增重置器&#xff08;统一处理多个表&#xff09; resetAutoincrement 是一款基于 Java 开发的程序&#xff0c;其功能为重置 mysql 数据库表的主键自增的值为最近的一个。 介绍 开发背景主要是工作中&#xff0c;测试同学在清理数据的时候&#xff0c;使用的是 D…

java重置按钮功能函数_Bootstrap按钮功能之查询按钮和重置按钮

Bootstrap按钮功能之查询按钮和重置按钮 1、问题背景 一般情况下&#xff0c;查询列表有查询条件、查询按钮和重置按钮&#xff0c;输入查询条件&#xff0c;点击查询按钮查询列表等数据&#xff1b;点击重置按钮会将查询条件恢复到原始状态 2、实现源码Bootstrap-查询按钮和重…

Java窗口重置

1、找到java界面中的Windows 2、右击进去按图下步骤进入 3、确定重置窗口

2021年美赛成绩公布与美赛查询!美赛官网已更新

我在浏览美赛官网查询美赛成绩时 美赛官网显示&#xff0c;2021年竞赛成绩将于4月23日公布 The MCM/ICM contest results will be available April 23, 2021 美赛成绩查询方式 ① 进入comap官网→Advisor Login→查看成绩 ② 直接查看并下载证书 示例:http://www.comap-ma…

2023美赛赛题思路分析

占个位置吧&#xff0c;开始在本帖实时更新赛题思路代码&#xff0c;文章末尾即可获取&#xff01; 2023美赛赛题初步分析 A题&#xff1a;受干旱破坏的植物群落 A题是一个植物群落的环境问题&#xff0c;涉及到预测、评估分析&#xff0c;该题难度较大&#xff0c;我们可以考…

2023年美赛F题

关键点 1.绿色GDP(GGDP)是否比传统GDP更好好的衡量标准? 2.如果GGDP成为经济健康的主要量标准&#xff0c;可能会对环境产生什么影响? 3建立一个简单的模型&#xff0c;估计GGDP取代GDP作为经济健康的主要衡量标准&#xff0c;对减缓气候变化产生的影响。 4.GGDP取代GDP可能会…

【经验分享】美赛报名以及注册方法-以2023年美赛为例

首先点击COMAP的官网链接&#xff1a; https://www.comap.com/ 然后选择Contests目录下的MCM/ICM 选择 Learn More and Register 然后选择 Click here to register for the 2023 MCM/ICM contest 注册分为两个步骤&#xff1a;顾问&#xff08;指导教师&#xff09;注册和填…

2021年美赛准备大全

2021年美赛准备大全 目录 1、2021美赛比赛网址及其介绍 2、2020年美赛摘要页说明 3、美赛常用词语与语句 4、美赛翻译注意事项 5、美赛论文写作一些建议 5.1 团队方面准备 5.2 摘要表部分 5.3 评委关注点 6、组队要求 7、软件与一些建模网址参考 &#xff08;1&…

2021美赛什么时候出成绩?

目录 0引言1、官网公告以及经验分析1.1官网1.2 以往时间节点 2、查询方式总结更新&#xff08;2021年4月12日&#xff09; 0引言 2021年2月9号上午美赛结束&#xff0c;后天才知道提交系统早上的时候卡了一会。现在估计该提交的提交状态都已经更新了。提交成功的人关心的是&am…

【数学建模】2018年数学建模国赛C题 问题一代码

文章目录 问题一代码导入包及数据数据探索与预处理 会员统计分析分析会员的年龄构成、男女比例等基本信息分析会员的总订单占比&#xff0c;总消费金额占比等消费情况分别以季度和天为单位&#xff0c;分析不同时间段会员的消费时间偏好会员与非会员统计分析 问题一代码 本文从…

美赛真题和优秀论文(2019-2021)

无偿提供2019-2021年美赛真题、数据、O奖论文 部分示例&#xff1a; 资源地址&#xff1a;MCM ICM (gitee.com) 如有资源丢失情况&#xff0c;可后台私信获取 一次参赛&#xff0c;受益终身

2020年美赛A题总结

2020年美赛A题总结 更新 最近很多朋友都想看一看论文&#xff0c;我平时不怎么上csdn&#xff0c;可能无法及时发给大家&#xff0c;故上传了论文资源。 祝大家美赛顺利&#xff01; https://download.csdn.net/download/hroukie/14727940 ————分割线———— 更新下&am…

2020美赛建模C题思路和理解

思路和理解 问题中心&#xff1a;评论数据星级建模 简要思路&#xff1a;理解成京东淘宝商城的评论数据&#xff0c;解释4.8星的指数怎么来的&#xff0c;你对商品的一段评论对该等级有多大影响&#xff1f; 个人的习惯是大数据问题第四章单独写数据清洗&#xff0c;具体流程看…

2023年美赛春季赛 赛题浅析

由于今年各种各样的原因&#xff0c;导致美赛头一次&#xff0c;据说也将是最后一次&#xff0c;临时调整&#xff0c;加设春季赛。这对于急需建模奖项的大家来说是一个很好的机会。无论怎样的原因&#xff0c;今年美赛我们可能有所遗憾。但&#xff0c;春季赛也许就是弥补遗憾…

【备战美赛】重要!2023年美赛官方发布最新通知

备战美赛 春节假期结束&#xff0c;各项比赛也需要准备起来啦&#xff01;近日&#xff0c;美赛组委会发布了2023年官方最新邮件&#xff0c;邮件内容主要是介绍本届竞赛基本情况、参赛规则、竞赛奖励、资源下载等相关内容&#xff0c;确定了比赛时间为北京时间2月17日-2月21日…

2018美赛B题总结

Update 2019/07/26:根据读者提出的问题&#xff0c;添加了查找数据方法、时间安排、论文及代码的下载地址等内容。 前言 本文主要是记录这次建模的过程和思路。用到的模型简单提及&#xff0c;并省略数据和结论。 涉及到的最小二乘法、模糊数学模型和马尔科夫链知识可以见我…

[数模美赛]2018数学建模美赛MCM总结

前言 说实话自己已经很久没有更新博客了&#xff0c;一方面是自己在这地方天天摸鱼&#xff0c;不好好学习&#xff0c;没什么可以更新的东西&#xff1b;令一方面&#xff0c;自己是在太懒&#xff0c;没办法&#xff0c;毕竟在一个非211、985学校的所谓“实验班”待着&#x…

2018美赛建模总结+Latex标准美赛模板分享

比赛经验总结 2018年2月13号上午9点&#xff0c;美赛建模比赛截止&#xff01;接下来&#xff0c;分享一下我的感受和经验&#xff1a; 2018年的美赛数学建模是2月9号开始的&#xff0c;然后我们从他官网开始放题的时候&#xff0c;就下载下来&#xff0c;三个人一块商量选题…

【明解C语言】循环语句之while

目录 一、while语句 1&#xff1a;语法结构&#xff1a; 2.while语句的三个部分 二、while语句代码示例&#xff1a; 1.在屏幕上输出1~10 2.while循环流程图&#xff1a; 三、break循环语句中的作用 四、continue在循环语句中的作用 五、getchar()函数代码分析 1.g…

c语言while的知识点,C语言循环语句知识点

C语言循环语句知识点 引导语&#xff1a;循环语句是由循环体及循环的终止条件两部分组成的。以下是百分网小编分享给大家的C语言循环语句知识点&#xff0c;欢迎参考学习! 循环语句 (一)、for循环 它的一般形式为: for(;;) 语句; 初始化总是一个赋值语句&#xff0c;它用来给循…