消费者消费消息分析

article/2025/8/20 22:42:55

消费者读流程

消费者读消息流程

】每个consumer都可以根据分配策略(默认RangeAssignor),获得要消费的分区
】 获取到consumer对应的offset(默认从ZK中获取上一次消费的offset)
】 找到该分区的leader,拉取数据
】 消费者提交offset

消费者实例代码

public class KafkaConsumerClient {public static final String brokerList = "localhost:9092";public static final String topic = "topic-demo";public static final String groupId = "group.demo";public static final AtomicBoolean isRunning = new AtomicBoolean(true);public static Properties initConfig(){Properties props = new Properties();props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");props.put("bootstrap.servers", brokerList);props.put("group.id", groupId);props.put("client.id", "consumer.client.id.demo");return props;}public static void main(String[] args) {Properties props = initConfig();KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList(topic));try {while (isRunning.get()) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println("topic = " + record.topic() + ", partition = "+ record.partition() + ", offset = " + record.offset());System.out.println("key = " + record.key()+ ", value = " + record.value());//do something to process record.}}} catch (Exception e) {log.error("occur exception ", e);} finally {consumer.close();}}
}

订阅主题和分区

public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener)
public void subscribe(Collection<String> topics)
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener)
public void subscribe(Pattern pattern)

消费者消息的类型对象

public class ConsumerRecord<K, V> {private final String topic;private final int partition;private final long offset;private final long timestamp;private final TimestampType timestampType;private final int serializedKeySize;private final int serializedValueSize;private final Headers headers;private final K key;private final V value;private volatile Long checksum;}

位移自动提交,拉取的一批消息没有消费完成发生异常会丢失消息;
手动提交,拉取的一批消息没有消费完成发生异常,最前面的消息重新消费的时候会重复消费。

消费者分区分配策略

1.Range范围分配策略:
配置
配置消费者的partition.assignment.strategy为org.apache.kafka.clients.consumer.RangeAssignor。
算法公式:
n = 分区数量 / 消费者数量
m = 分区数量 % 消费者数量
前m个消费者消费n+1个,剩余消费者消费n个

2.RoundRobin轮询策略

RoundRobinAssignor轮询策略是将消费组内所有消费者以及消费者所订阅的所有topic的partition按照字典序排序(topic和分区的hashcode进行排序),然后通过轮询方式逐个将分区以此分配给每个消费者。

3.Stricky粘性分配策略

1.分区分配尽可能均匀;
2.在发生rebalance的时候,分区的分配尽可能与上一次分配保持相同;
没有发生rebalance时,Striky粘性分配策略和RoundRobin分配策略类似,发生rebalance时,尽量与上一次分配的保持一致。


http://chatgpt.dhexx.cn/article/BLSnNaML.shtml

相关文章

消费者详解-消费消息(1)

文章目录 消费者消费消息流程Pull消费流程1. 初始化消费者2. 拉取topic的消息队列3. 拉取消费位点4. 根据消费位点消费消息5. 保存消费进度拉取消息-pullKernelImpl Broker处理拉取消息请求1、权限、参数校验并且获取初始化变量&#xff1a;2、获取拉取消息的topic配置3、解析订…

直播报名 | 海外社交媒体趋势如何?出海品牌如何掌握消费者洞察?

近年来&#xff0c;中国品牌出海势头强劲&#xff0c;智能硬件、互联网应用、时尚服饰等正加速风靡海外市场&#xff0c;涌现出像安克创新、SHEIN这样的全球化品牌。有人提问&#xff0c;安克创新&#xff0c;凭借什么成为全球化品牌&#xff1f;出海企业可以从中借鉴什么&…

Kafka消费者不消费数据

背景&#xff1a; 工作往往是千篇一律&#xff0c;真正能学到点知识都是在上线后。使用SkywalkingKafkaES进行应用监控。 现象&#xff1a; 公司使用Skywalking在开发测试环境中Kafka顺利消费数据&#xff0c;到了UAT环境一开始还正常&#xff0c;后面接入了更多的应用后出现…

食品品牌如何做好消费需求洞察直抵消费者心智

做生意的都明白这样一个道理&#xff1a;“先找到买主&#xff0c;再依照需求出售”。之所以这一点很重要&#xff0c;因为这揭示了一条经营企业过程中必须遵守的金科玉律&#xff1a;先有需求&#xff0c;而后才有你的品牌、服务或功能。 需求洞察是品牌、产品、服务等一切的…

消费者洞察:数据影响消费,消费营造数据

本文根据Stratifyd资深解决方案经理段鑫龙(Bruce Duan)在9月24日的直播演讲内容整理,演讲围绕“如何洞察消费者”从四个层面展开:首先是(疫情期间以及后疫情时代)消费品行业的发展现状和未来趋势;然后是当前现状下如何通过数据化闭环洞察消费者;有了前面的理论支撑和方…

助力品牌洞察——消费者情绪行为分析

什么是情绪分析&#xff1f; 随着社交网络和数字营销的出现&#xff0c;消费者对产品和品牌的评价受到越来越多的关注。在线用户反馈&#xff08;例如产品评价、社交媒体评论和调查问卷等&#xff09;包含了大量具有价值的数据。通过这些数据&#xff0c;可以了解消费者对您产…

数据分析:消费者数据分析

数据分析&#xff1a;消费者数据分析 作者&#xff1a;i阿极 作者简介&#xff1a;Python领域新星作者、多项比赛获奖者&#xff1a;博主个人首页 &#x1f60a;&#x1f60a;&#x1f60a;如果觉得文章不错或能帮助到你学习&#xff0c;可以点赞&#x1f44d;收藏&#x1f4c1…

消费者洞察:案例透视消费者洞察实践与收益

本文根据Stratifyd资深解决方案经理段鑫龙(Bruce Duan)在“如何洞察您的消费者”直播课主题演讲整理。点击链接(https://live.vhall.com/534333188)查看完整演讲视频,关注Stratifyd微信公众号并在后台回复“粉丝群”,还可进群申领演讲课件,及时关注Stratifyd最新培训资讯…

消费者洞察:一文看懂消费者是如何做选择的

作者&#xff1a;付永承 全文共 4214 字&#xff0c;阅读需要 9 分钟 ———— / BEGIN / ———— “为什么有时候明明比对手更有优势&#xff0c;产品就是无人问津&#xff1f;” “为什么花了那么多广告费&#xff0c;销量怎么就是上不来&#xff1f;” “为什么之前的营销…

消费者洞察:数据化闭环洞察消费者

本文根据Stratifyd资深解决方案经理段鑫龙(Bruce Duan)在“如何洞察您的消费者”直播课主题演讲整理。上周我们通过“数据影响消费,消费营造数据”这篇文章了解了消费品行业的发展现状,新生代互联网消费者更乐于分享表达,然而消费者触点分散,对企业洞察消费者、了解消费者…

系统资源不足,无法完成请求服务。

使用visual studio 编译时&#xff1a; 清理了磁盘…重装了VS2019 没用… 后来一个学长说试试卸载McAfee well done!

win10突然提示系统资源不足,无法完成请求服务的解决方法(VS)win10跑代码变慢

大概率是McAfee搞得鬼 解决方法&#xff1a;点击PC安全-实时扫描-关闭 电脑也快多了&#xff0c;跑代码也快了很多 直接卸载McAfee也可

c语言无纸化软件系统资源不足,系统资源不足无法完成请求的服务怎么办_系统资源不足卸载迈克菲解决教程 - 系统家园...

最近很多用户在问小编系统资源不足无法完成请求的服务怎么解决&#xff0c;小编电脑没有遇到这个问题&#xff0c;但是看到了贴吧很多小伙伴都有这个问题&#xff0c;发现有个共性就是装了迈克菲McAfee杀毒防护软件&#xff0c;下面快点来看看怎么解决问题吧。 系统资源不足卸载…

windows远程拷贝和解压大文件异常:系统资源不足无法完成请求的服务

windows服务器异常&#xff1a;系统资源不足无法完成请求的服务 Insufficient system resources exist to complete the requested service. by qunying.liu(刘群英) 问题描述&#xff1a; 开发人员需要将线上环境windows的某些数据文件复制到开发环境windows用于测试&#xff…

c语言无纸化软件系统资源不足,win10系统安装软件显示“系统资源不足,无法完成请求的服务”的解决方法...

相信不少网友在win10系统安装软件时都遇到过“系统资源不足&#xff0c;无法完成请求的服务”的情况&#xff0c;到底是怎么回事&#xff1f;检查内存空间都足够大的&#xff0c;其实往往导致这一问题的不是存储空间不足问题&#xff0c;而是病毒问题。知道原因后&#xff0c;问…

c语言无纸化软件系统资源不足,win10系统安装软件显示“系统资源不足,无法完成请求的服务”的解决方法-系统城...

相信不少网友在win10系统安装软件时都遇到过“系统资源不足&#xff0c;无法完成请求的服务”的情况&#xff0c;到底是怎么回事&#xff1f;检查内存空间都足够大的&#xff0c;其实往往导致这一问题的不是存储空间不足问题&#xff0c;而是病毒问题。知道原因后&#xff0c;问…

打开U盘 提示 服务器无法运行,win10打开u盘提示“系统资源不足 无法完成请求的服务”怎么办...

一位用户在windows10系统电脑上连接U盘后&#xff0c;发现打开时总会弹出“位置不可用”的提示框&#xff0c;提示&#xff1a;无法访问X:\ 系统资源不足 无法完成请求的服务&#xff0c;这是怎么回事呢&#xff1f;其实&#xff0c;该问题是u盘损坏或内存配置过小所导致的。下…

瑞友服务器系统资源不足,win10/win7打开软件提示系统资源不足,无法完成请求服务的解决方法...

作者&#xff1a;陈俊 日期&#xff1a;2017-11-12 12:32 近期有些win7、win10用户莫名的遇到了一个问题&#xff0c;就是打开疯狂的美工软件的时候提示系统资源不足,无法完成请求服务&#xff0c;刚开始以为是系统缺少了某些组件&#xff0c;结果发现是国外杀毒 迈克菲(McA…

系统使用一段时间就出现“系统资源不足,无法完成请求的服务”

2019独角兽企业重金招聘Python工程师标准>>> 系统使用一段时间后&#xff0c;就会出现这个问题&#xff0c;导致电脑无法在打开软件&#xff0c;网络连接禁止使用&#xff0c;无法进行关机。只能强制关机。 经查证&#xff0c;发现System的句柄数在一直增加。增加到…

win10计算机资源不足无法登陆,win10电脑安装软件提示“系统资源不足,无法完成请求的服务”如何解决...

我们经常会在电脑中安装软件&#xff0c;不过有时候并没有那么顺利&#xff0c;比如有win10系统用户反映说在电脑中安装软件的时候&#xff0c;提示“系统资源不足&#xff0c;无法完成请求的服务”&#xff0c;导致无法安装软件&#xff0c;经过检查发现是病毒问题&#xff0c…