Kafka生产者发送流程详解

article/2025/6/20 8:26:43

参考资料:《深入理解Kafka核心设计与实践原理》、《尚硅谷2022版Kafka3.x教程》

Kafka生产者发送流程详解

      • 序列化器
      • 分区策略
        • 如何将一些相关连的数据放进同一张表里?
        • 自定义分区
      • 生产者拦截器

Kafka生产者发送流程详解大致流程如下:
整个生产者客户端由两个线程协调运行,这两个线程分别是main线程和Sender线程。在主线程中由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用后缓存到消息累加器(RecordAccumlator,也称为消息收集器)中。Sender线程负载从RecordAccumulator中获取消息并将其发送到Kafka集群中。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-DFzWMnjW-1660649642553)(D:\note\笔记仓库\图片\image-20220815125151314.png)]

  • main线程:在客户端将数据放入双端队列里

  • Sender线程:从队列里读取数据发送到kafka集群

    Sender从RecordAccumulator获取到缓存的消息之后,会将原本==<分区,Deque<ProducerBath>>的保存形式转化为<Node,List<ProducerBatch>>==的形式。

    其中Node表示的就是kafka集群中的broker节点

    在转换成<Node,List<ProducerBatch>>之后,Sender还会进一步封装成==<Node,Request>==的形式,这样就可以将Request请求发往各个Node之中

  • DQueue:双端队列,每个分区对应一个双端队列。队列中的内容就是ProducerBatch,即DQueue<ProducerBatch>,写入缓存时放入尾部,Sender读取消息时从头部读取。****

  • batch.size:只有数据积累到batch.size之后,sender才会取数据发送,默认大小为16k

    消息在网络上都是以字节来传输,在发送之前需要创建一块内存区域来保存消息。

    在Kafka生产者客户端,通过java.io.ByteBuffer来实现消息内存创建和释放,为了解约创建和释放的消耗。在RecordAccumulator由一个==BufferPool==来实现ByteBuffer的复用,以实现缓存的高效利用。

    BufferPool的大小就由batch.size参数决定

  • linger.ms:如果数据迟迟没有到达batch.size,那么sender线程在等待linger.ms设置的实践到达后就会取数据发送。(默认是0,表示没有延迟)

  • ProducerBatch:就是一个消息批次,包含很多ProducerRecord

    ProducerBatchbatch.size参数有关系,当消息放入双端队列中时,会从队列尾部获取一个ProducerBatch,查询是否可以将当前消息写入ProducerBatch

    • 如果可以就不需要新建ProducerBatch。

    • 否则新建ProducerBatch时也需要评估这条消息的大小是否超过batch.size,

      如果不超过,那么就以batch.size大小创建ProducerBatch,并且使用完后,还可以通过BUfferPool进行管理复用。

      如果超过,就以原始大小来创建ProducerBatch,并且不会复用,使用完就释放掉这段内存

  • RecordAccumulator主要用来缓存消息以便 Sender 线程可以批量发送,进而==减少网络传输的资源消耗以提升性能。==可通过生产者客户端参数buffer.memory配置,默认大小为32M。

    如果生产者速度大于消费者速度,则会导致生产者空间不足,这时候调用send()方法要么阻塞,要么抛出异常。由参数max.block.ms配置,默认为60s。

  • InFlightRequests缓存已经发出去但还没有收到响应的请求

    请求从Sender线程发往至Kafka集群之前还会保存到InFlightRequests中,其中保存的具体形式为Map<NodeId,Deque<Request>>

    InFlightRequests可以通过配置参数来限制缓存的broker的连接数(客户端和Node之间的连接),默认为5.

    InFlightRequests还可以决定**leastLoadedNode(当前在InFlightRequests中负载最小的node),发送请求时会优先发送leastLoadedNode**

    • 元数据信息:

      元数据信息值的就是kafka集群中某个主题的分区数、副本数、leader副本所在的节点这些信息。

      在发送消息之前,我们必须要得知这些信息后才能够正常发送消息。

      当客户端没有指明需要使用的元数据信息时或者超过metadata.max.age.ms(默认5分钟)时间没有更新元数据都会引起元数据的更新操作,就会从InFlightRequests中选出**leastLoadedNode,然后向这个Node发送MetadataRequest**请求来获取元数据信息。

  • retries:重试次数,可自行配置

  • 响应acks:kafka集群收到消息后的响应,用于决定发送端是否重试。

    • 0:生产者发送到服务端即可,不需要等待应答

      如果发送过程由于网络等原因发送失败,或者服务端接收后突然宕机都会导致丢数问题

    • 1:leader副本收到即可(常常用于记录日志使用

      应答完成后,leader还未进行同步就挂了,那么其它选举出来的leader就没有这条数据,也会导致丢数问题

    • -1:ISR队列里的所有节点都收到消息后返回(支付相关的场景使用

      副本数大于2的情况下能够保证数据可靠(高水位)。如果副本数为1,那么就和ack=1的场景一样,也可能会导致丢数

序列化器

为什么不用Java的序列化器?

Java序列化器太重,会增加额外的数据来保证安全传输。在大数据场景下,一般不会使用java原生的序列化器。

生产者需要用序列化器(Serializer)把对象转换成字节数组才能通过网络发送给Kafka。而在对侧, 消费者需要用反序列化器(Deserializer)把从Kafka中收到的字节数组转换成相应的对象。

综上所述:生产者和消费者的序列化器必须是相同的,否则可能就会出现乱码的情况

下面是kafka的序列化接口的官方定义:表明了必须将我们发送的消息转化为字节数组

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-DBg9hM3y-1660649642556)(D:\note\笔记仓库\图片\image-20220816162100522.png)]

下面是kafka的序列化接口的定义:

public interface Serializer<T> extends Closeable {/*** 配置这个类。* @param 键/值对中的配置* @param 是键还是值*/void configure(Map<String, ?> configs, boolean isKey);/*** 将data转换为字节数组。** @param topic 与数据相关的主题* @param data 输入数据* @return 序列化字节*/byte[] serialize(String topic, T data);/*** 关闭此序列化程序。此方法必须是幂等的,因为它可能会被多次调用。*/@Overridevoid close();
}

总的来说:

  • configure()方法用来配置当前类
  • serialize()方法用来执行序列化操作
  • close()方法用来关 闭当前的序列化器:一般情况下close()是一个空方法, 如果实现了此方法, 则必须确保此方法的幕等性, 因为这个方法很可能会被KafkaProducer调用多次

下面来看一下StringSerializer的代码实现:

/***  String encoding defaults to UTF8 and can be customized by setting the property *  value.serializer.encoding or serializer.encoding. The first two take precedence over the last.*/
public class StringSerializer implements Serializer<String> {private String encoding = "UTF8";@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";Object encodingValue = configs.get(propertyName);if (encodingValue == null)encodingValue = configs.get("serializer.encoding");if (encodingValue instanceof String)encoding = (String) encodingValue;}@Overridepublic byte[] serialize(String topic, String data) {try {if (data == null)return null;elsereturn data.getBytes(encoding);} catch (UnsupportedEncodingException e) {throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);}}@Overridepublic void close() {// nothing to do}
}
  1. 首先对于configure()方法,这个方法是在创建KafkaProducer实例的时候调用,用于确定编码类型。(一般情况下我们不会配置,所以默认就是UTF-8)
  2. serialize()方法就很直观,将String类型转化为byte[]类型

分区策略

分区好处

  1. 便于合理的使用存储资源,各个分区存储在不同的broker节点上,合理控制分区的任务,可以实现负载均衡的效果,并提高可存储数据量‘
  2. 提高了并行度,不同的消费者可以同时消费不同分区上的数据(这里要注意的是,一个分区只能由一个消费者消费)

kafka的几种分区策略

主要是由在构造ProducerRecord对象的时候,有没有指定partition、key决定

  1. 指明partition:如果在创建ProducerRecord的时候,声明了分区号partition,那么就存储到这个分区中去
  2. 指明Key,未指明partition将key的hash值与topic的分区数取余得到最终的partition值
  3. 即没有key,也没有partition:kafka会采用默认的RoundRobin轮询分配,首先随机选择一个分区存储(并尽可能一直使用该分区),直到分区存储满时或者已完成,再去随机选择下一个分区使用(和上一次的分区不同)。

在不改变主题分区数量的情况下 key 与分区之间的映射可保持不变。不过, 一旦主题中增加了分区,那么就难以保证 key 与分区之间的映射关系了。

如何将一些相关连的数据放进同一张表里?

将表名字作为key传进去(它会通过hash算法得到一个相同的值),存入指定的分区。消费者能够将对应分区的数据放进一个表里

自定义分区

实现Partitioner接口,重写方法即可,返回对应的分区值。还需要再主程序关联我们自定义的分区(在Properties中声明),不然它会走默认的分区策略

例如:我们想让含有“china”相关的字符串,存到主题为china的分区里。

我们可以通过去判断value里是否包含china,如果包含就指明对应的分区号即可。

public class MyPartition implements Partitioner {/**** @param s:topic* @param o:key* @param bytes* @param o1:value* @param bytes1* @param cluster:集群* @return*/@Overridepublic int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {String message = bytes1.toString();int partition;if(message.contains("china")){partition = 1;}else{partition = 2;}return partition;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}
}

生产者拦截器

生产者拦截器主要是为消息发送前做一些准备工作,如过滤某些不表的信息、修改消息的内容等等,有可以在发送回调逻辑前做一些定制化需求,比如统计类工作

生产者拦截器主要由接口ProducerInterceptors实现,主要包含以下三个方法:

  • 在消息序列化和计算分区之前会调用onSend()方法

    public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record)
  • 在消息应答或者发送失败时会调用onAcknowledgement()方法,优先于Callback之前执行

    public void onAcknowledgement(RecordMetadata metadata, Exception exception) 
    
  • close()方法主要是关闭拦截器时执行的一些资源清理工作

    上述3个方法中抛出的异常都会被捕获并记录到日志中去,且不会再向上传递

KafkaProducer 中不仅可以指定一个拦截器,还可以指定多个拦截器以形成拦截链。拦截链会按照 interceptor.classes 参数配置的拦截器的顺序来一一执行(配置的时候,各个拦截器之间使用逗号隔开)。

自定义拦截器:

/*** @authoer:zky* @createDate:2022/8/16* @description:*/
public class ProducerInterceptorDate implements ProducerInterceptor<String,String> {private volatile long sendSuccess = 0;private volatile long sendFailure = 0;@Overridepublic ProducerRecord<String,String>  onSend(ProducerRecord<String,String>  record) {return new ProducerRecord(record.topic(),record.partition(),record.timestamp(),record.key(),record.value() + "-" + new SimpleDateFormat("yyyy-MM-dd").format(new Date()),record.headers());}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {if (exception == null) {sendSuccess++;} else{sendFailure++;}}@Overridepublic void close() {double successRatio = (double)sendSuccess / (sendFailure + sendSuccess);System.out.println("[INFO]发送成功率=" + String.format("%f",successRatio * 100) + "%");}@Overridepublic void configure(Map<String, ?> configs) {}
}

添加properties属性:

  properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.zky.kafkademo.ProducerInterceptorDate");

进行如上两个操作后,生产者生成消息的时候就会走我们自定义的拦截器


http://chatgpt.dhexx.cn/article/vOOwuny6.shtml

相关文章

使用 Lvs + Nginx 集群搭建高并发架构

高并发站点不仅要考虑网站后端服务的稳定&#xff0c;还需要考虑服务能否接入巨大流量、承受巨大流量&#xff0c;如上图&#xff1a; 1:流量接入&#xff0c;可以采用LvsNginx集群&#xff0c;这种方式能接入的QPS能高达数百万 2:通过Lvs实现Nginx集群&#xff0c;NginxTomca…

5G注册流程详解

1.1 注册流程 1.1.1 专享篇 UE发送Registration Request到(R)AN&#xff0c;消息中包含注册类型、用户标识、UE能力及请求的切片等参数。 (R)AN接收到消息&#xff0c;根据用户临时标识或切片选择合适的AMF&#xff0c;如果(R)AN找不到合适的AMF&#xff0c;则将Registration…

WebRTC建立会话流程分析

WebRTC建立会话流程总结 了解如何运行PeerConnection Demo后&#xff0c;熟悉运行流程可以做为深入学习WebRTC的切入点。本节重点解释客户端双方建立会话时交互的主要信令&#xff08;控制会话的文本协议&#xff09;和与信令相关的 WebRTC API。 准备工作 peerconnection_clie…

RPA-机器人流程自动化

RPA-机器人流程自动化 RPA-机器人流程自动化简介RPA是什么&#xff1f;RPA历史上的演变RPA原理RPA特点RPA技术框架及功能1.TagUI2.RPA for Python3.Robot Framework4.Automagica5.Taskt6.OpenRPA RPA部署模式1 环境配置的参数调整2 将自动化程序整体打包部署3 版本的管理和控制…

网络安全应急响应----7、数据泄漏应急响应

文章目录 一、数据泄露简介二、数据泄露途径1、外部泄露2、内部泄露 三、数据泄露应急响应方法1、发现数据泄露2、梳理基本情况3、确定排查范围和目标4、判断泄露途径4.1、主动泄露4.2、被动泄露 5、系统排查 四、数据泄露防御1、数据外部泄露防范2、数据内部泄露防范 一、数据…

发送邮件 显示对方服务器未响应,邮件对方服务器未响应

邮件对方服务器未响应 内容精选 换一换 MX优先级,用来指定邮件服务器接收邮件的先后顺序,数值越小优先级越高。当DNS服务器的解析记录中只有一条MX记录时,MX优先级没有意义。当DNS服务器的解析记录中存在多条MX记录时,邮件发送方的DNS服务器会优先把邮件投递到MX优先级高的…

网络安全应急响应----9、WebShell应急响应

文章目录 一、Webshell简介1、常见webshell2、Webshell检测 二、Webshell应急响应流程1、判断是否被植入webshell2、临时处置3、Webshell排查4、系统排查4.1、Windows系统排查4.2、Linux系统排查4.3、Web日志分析4.4、网络流量排查4.5、清除加固 三、Webshell防御方法 一、Webs…

复杂产品的响应式设计【流程篇】

都说2013年将是响应式设计爆发的一年。一淘设计团队在去年一淘首页改版时初步尝试了响应式&#xff0c;最近在一淘“玩客”项目中有了更加深入地应用&#xff0c;第一次在复杂产品中实现了全站响应式。中间积累了一些经验也踩了不少坑&#xff0c;于是就有了这个响应式设计三部…

史上最全测试流程详解----超详细

前言----- 对于测试流程基本很多做过测试的大牛&#xff0c;小哥哥&#xff0c;小姐姐都能说出个十之八九&#xff0c;但是对于细节&#xff0c;可能还需要一些整理文件&#xff0c;这不&#xff0c;我整理了一些测试的全部流程&#xff0c;希望能给大家带来帮助&#xff0c;有…

C语言 操作系统实验 四种调度(最高响应比优先算法 HRN)

注&#xff1a; 本文是四个调度算法的第一篇算法。 本文是根据CSDN上某一FCFS调度算法魔改来的&#xff0c;所以FCFS的算法不会发到网站。 我是个菜鸡&#xff0c;发文是为了纪念自己完成了代码&#xff0c;以及累计自己的经验。 如有知识错误或者算法有逻辑漏洞请各位大佬高…

处理动态图的图神经网络

汤吉良老师团队发表于2020年的SIGIR 《Streaming Graph Neural Networks》论文阅读笔记 背景&#xff1a; 图能够很好的表示实际数据&#xff08;如社交网络&#xff0c;传输网络&#xff09;。利用神经网络建模图结构数据&#xff0c;学习特征表示&#xff0c;改善图相关任务…

ImageView加载网络图片

使用第三方的库Glide加载网络图片 首先去下载一个glide的包 下载地址&#xff1a;https://github.com/bumptech/glide/releases/download/v4.7.0/glide-full-4.7.0.jar 我这里用的是glide-full-4.7.0 下载好之后直接复制到app\libs下面&#xff0c;然后点同步&#xff0c;可…

图神经网络的池化操作

图神经网络有两个层面的任务&#xff1a;一个是图层面&#xff08;graph-level&#xff09;&#xff0c;一个是节点&#xff08;node-level&#xff09;层面&#xff0c;图层面任务就是对整个图进行分类或者回归&#xff08;比如分子分类&#xff09;&#xff0c;节点层面就是对…

网络图的绘制方法详细讲解

网络拓扑图形如网络结构,并且由箭头线条、节点、路线三个因素组成。网络工程师在绘制网络图时,为了展示网络传输方式和途径,通常将网络节点设备和通信介质进行物理布局。网络拓扑图的结构类型有:星型、环型、树型、总线型、网状、分布式结构、等等。 网络图一般用处 在计算…

网络拓扑图怎么画 详细教程

大数据时代&#xff0c;如何更好地去运营、呈现数据&#xff0c;并从其中发掘出更多信息成为了人们探索的方向。网络拓扑图就是一种非常有用的信息化图表&#xff0c;这种网状关系呈现出来的利器可以使我们把想要传递的信息更加清晰、有逻辑的呈现在别人的眼前。 1. 什么是网络…

图神经网络及其应用

Graph Neural Networks and its applications 摘要 以图形形式构建的数据存在于生物化学、图像处理、推荐系统和社会网络分析等多个领域。利用预处理技术在图结构数据上训练机器学习模型有几种方法&#xff0c;但它们缺乏完全适应数据集和手头任务的灵活性。图神经网络允许创…

[概念]神经网络的种类(前馈神经网络,反馈神经网络,图网络)

随着神经网络的不断发展&#xff0c;越来越多的人工神经网络模型也被创造出来了&#xff0c;其中&#xff0c;具有代表性的就是前馈型神经网络模型、反馈型神经网络模型以及图网络. 1.前馈型神经网络模型 前馈神经网络&#xff08;Feedforward Neural Network ,FNN&#xff09…

java实现下载网络图片到本地

文章目录 前言一、示例二、代码 1.代码示例2.运行结果总结 前言 当我们在网络上看到自己想要保存的照片&#xff0c;有的网站设置了权限&#xff0c;不能保存情况下&#xff0c;我们可以借助Java的文件流读取网络上的图片&#xff0c;并保存到本地。 一、示例 比如豆瓣话题第…

用python实现数字图片识别神经网络--启动网络的自我训练流程,展示网络数字图片识别效果

上一节&#xff0c;我们完成了网络训练代码的实现&#xff0c;还有一些问题需要做进一步的确认。网络的最终目标是&#xff0c;输入一张手写数字图片后&#xff0c;网络输出该图片对应的数字。由于网络需要从0到9一共十个数字中挑选出一个&#xff0c;于是我们的网络最终输出层…