【Kafka】消息的同步发送和异步发送

article/2025/8/28 5:02:32

文章目录

  • 概述
  • 1. sync vs async
    • 1.1 java代码同步和异步
  • 2. 可靠性机制(ack属性配置)
    • 2.1 oneway
  • 3. 一般配置
  • 4. 同步异步和ack的联系和区别
  • 参考

概述

kafka有同步(sync)、异步(async)以及oneway这三种发送方式,某些概念上区分也可以分为同步和异步两种,同步和异步的发送方式通过“producer.type”参数指定,而oneway由“request.require.acks”参数指定。

1. sync vs async

在官方文档Producer Configs中有如下:
在这里插入图片描述
翻译过来就是:
producer.type的默认值是sync,即同步的方式。这个参数指定了在后台线程中消息的发送方式是同步的还是异步的。如果设置成异步的模式,可以运行生产者以batch的形式push数据,这样会极大的提高broker的性能,但是这样会增加丢失数据的风险。

为什么是后台线程进行发送? 其实client调用发送接口,所有的数据被临时加入请求队列InFlightRequest,后台线程是通过 读取该队列的数据,进行发送操作的。

对于异步模式,还有4个配套的参数,如下:

PropertyDefaultDescription
queue.buffering.max.ms5000启用异步模式时,producer缓存消息的时间。比如我们设置成1000时,它会缓存1s的数据再一次发送出去,这样可以极大的增加broker吞吐量,但也会造成时效性的降低。
queue.buffering.max.messages10000启用异步模式时,producer缓存队列里最大缓存的消息数量,如果超过这个值,producer就会阻塞或者丢掉消息。
queue.enqueue.timeout.ms-1当达到上面参数时producer会阻塞等待的时间。如果设置为0,buffer队列满时producer不会阻塞,消息直接被丢掉;若设置为-1,producer会被阻塞,不会丢消息。
batch.num.messages200启用异步模式时,一个batch缓存的消息数量。达到这个数值时,producer才会发送消息。(每次批量发送的数量)

以batch的方式推送数据可以极大的提高处理效率,kafka producer可以将消息在内存中累计到一定数量后作为一个batch发送请求。batch的数量大小可以通过producer的参数(batch.num.messages)控制。通过增加batch的大小,可以减少网络请求和磁盘IO的次数,当然具体参数设置需要在效率和时效性方面做一个权衡。在比较新的版本中还有batch.size这个参数。

注:这里的参数是指安装包中的shell脚本命令,而java客户端代码,需要用相应的语法

总结:

  • 同步方式,一定是逐条发送的,第一条响应到达后,才会请求第二条
  • 异步方式,可以发送一条,也可以批量发送多条,特性是不需等第一次(注意这里单位是次,因为单次可以是单条,也可以是批量数据)响应,就立即发送第二次

1.1 java代码同步和异步

同步发送
如果需要使用同步发送,可以在每次发送之后使用get方法,因为producer.send方法返回一个Future类型的结果,Future的get方法会一直阻塞直到该线程的任务得到返回值,也就是broker返回发送成功。

kafkaTemplate.send("testJson", message).get();

异步发送回调
可以从返回的future对象中稍后获取发送的结果,ProducerRecord、RecordMetadata包含了返回的结果信息

ListenableFuture<SendResult<String, Message>> future = kafkaTemplate.send("testJson", message);
future.addCallback(new ListenableFutureCallback<SendResult<String, Message>>() {@Overridepublic void onFailure(Throwable ex) {ex.printStackTrace();}@Overridepublic void onSuccess(SendResult<String, Message> result) {System.out.println(result.getProducerRecord());System.out.println(result.getRecordMetadata());}
});

2. 可靠性机制(ack属性配置)

producers可以一步的并行向kafka发送消息,但是通常producer在发送完消息之后会得到一个响应,返回的是offset值或者发送过程中遇到的错误。这其中有个非常重要的参数“request.required.acks",这个参数决定了producer要求leader partition收到确认的副本个数:

  • 如果acks设置为0,表示producer不会等待broker的相应,所以,producer无法知道消息是否发生成功,这样有可能导致数据丢失,但同时,acks值为0会得到最大的系统吞吐量。

  • 若acks设置为1,表示producer会在leader partition收到消息时得到broker的一个确认,这样会有更好的可靠性,因为客户端会等待知道broker确认收到消息。

  • 若设置为-1,producer会在所有备份的partition收到消息时得到broker的确认,这个设置可以得到最高的可靠性保证。

2.1 oneway

前面只提到了sync和async,那么oneway是什么呢?
oneway是只顾消息发出去而不管死活,消息可靠性最低,但是低延迟、高吞吐,这种对于某些完全对可靠性没有要求的场景还是适用的,即request.required.acks设置为0。

oneway的效果也是异步的。因此,设置同步和异步非时候,要综合考虑。

3. 一般配置

对于sync的发送方式:

producer.type=sync
request.required.acks=1

对于async的发送方式:

producer.type=async
request.required.acks=1
queue.buffering.max.ms=5000
queue.buffering.max.messages=10000
queue.enqueue.timeout.ms = -1
batch.num.messages=200

对于oneway的发送发送:

producer.type=async           '既然都已经设置ack=0忽略高可靠性了,也就没必要设置为同步了'
request.required.acks=0

4. 同步异步和ack的联系和区别

在这里插入图片描述
上图分析:

  • 当用户调用send时,就完成数据发送了(对于用户来说),后台线程负责实际发送数据,因此,新版本下,我们说数据发送总是异步的。

  • send()方法每次只能发送一条数据至InFlightRequest队列

  • 用户可以通过send().get() ,把用户主线程改为同步方式
    因此,同步和异步概念 分为用户线程和发送线程,用户线程有同步和异步之分;发送线程只有异步

    用户线程选择同步,效果是逐条发送,因为请求队列InFlightRequest中永远最多有一条数据。异步+设置 后台线程的异步发送参数:max.in.flight.requests.per.connection=1 & batch.size=1,效果也是逐条发送。

    max.in.flight.requests.per.connection控制只能发送一次请求,发送次数有个窗口,控制该窗口的值,但是每次可发送一批数据;batch.size是控制一批数据的上限,当batch.size=1时,每次最多发送一条。组合在一起就是 只能连续发送一次请求,每次最多发送一条。

同步和异步指client(producer)是否收到leader给的ack后才发下一条,acks = 0, -1和1是指leader节点和follower节点数据同步的方式,可靠性机制,是保证数据能成功备份到其他节点的机制,二者是独立关系,因此可以是下面的组合

同步+ack任意值
异步+ack任意值

但是由于ack的选项有3个,会有最佳搭配的概念,例如:

producer.type=async           '既然都已经设置ack=0忽略高可用性了,也就没必要设置为同步了'
request.required.acks=0

既然都已经设置ack=0忽略高可靠性了,ack=0牺牲可靠性换取速度,也就没必要设置为同步了,设置为异步又可以提高数据

参考

Kafka之sync、async以及oneway
kafka 同步、异步发送java代码同步和异步


http://chatgpt.dhexx.cn/article/9ABRxktm.shtml

相关文章

http请求与响应,同步异步请求以及异步请求axios的配置

文章目录 httphttp简介&#xff0c;协议http请求http响应接收请求行请求头数据 同步异步请求异步请求axios的配置配置文件 http http简介&#xff0c;协议 http是超文本传输协议 &#xff08;HyperText Transfer Protocol&#xff09;服务器传输超文本 到本地浏览器的传送协议…

ajax同步和异步的区别

一、同步访问和异步访问的区别&#xff0c;先从概念上区别&#xff1a; 1、同步的概念应该是来自于操作系统中关于同步的概念。 2、不同进程为协同完成某项工作而在先后次序上调整(通过阻塞,唤醒等方式)。同步强调的是顺序性&#xff0c;谁先谁后&#xff1b;异步则不存在这种顺…

C++ LinuxWebServer项目(5)同步异步日志系统

一、前言 对于任何一个服务器而言,日志系统的设计是非常重要的,尝试设计一个简易的同步异步日志系统来完成系统日志的记录。 二、基础知识 日志,由服务器自动创建,并记录运行状态,错误信息,访问数据的文件。 同步日志,日志写入函数与工作线程串行执行,由于涉及到I…

一文搞懂同步异步阻塞非阻塞

相信很多人在面试的过程中&#xff0c;都被问到过同步和异步的区别、阻塞和非阻塞的区别&#xff0c;以及这两对关系又有什么联系&#xff1f;本文尽可能从专业的角度&#xff0c;用易懂的语言&#xff0c;帮助大家理解 01 前置知识 用户空间和内核空间 操作系统可以支持多个…

同步异步半同步分离式通信

同步通信&#xff1a;采用统一的时钟信号 读数据 T1上升沿主设备给出从设备地址 T2上升沿给出读命令 T3上升沿读数据 T4上升沿撤销 写数据 T1上升沿主设备给出从设备地址 T1下降沿给出数据 T2上升沿给写命令 T4上升沿撤销 异步通信分为三类&#xff1a; 不互锁&#…

一篇文章理解 同步异步、阻塞非阻塞

前述 同步异步&#xff0c;阻塞非阻塞是一些非常常见的概念&#xff0c;但是对于开发者来说往往是用到了很难说清楚。 笔者专门整理了下这方面的概念&#xff0c;作此文以记之。 这部分内容可能存在一些争议&#xff0c;如有不同意见欢迎评论交流。 概念 个人理解同步异步与阻…

ES6同步异步处理

同步和异步 1.同步就是代码从上而下依次执行&#xff0c;除了函数或则回调函数 2.异步也有先后之分但是不明显&#xff0c;等js代码先执行同步后再去执行异步的代码. 如何处理同步异步的问题&#xff08;ES6的Promise&#xff09; 接上面&#xff0c;看下面的代码 var arr[…

前端学习-同步异步问题

在做实作课作品时&#xff0c;出现过对象中变量访问不到的情况&#xff0c;查阅相关资料发现时同步异步的问题&#xff0c;这篇文章帮助我理清思路&#xff0c;希望也能帮助你们更好地理解同步异步。 一、什么是同步/异步任务&#xff1f; 同步任务&#xff1a;指的是在主线程…

dubbo同步异步调用

通常我们通过dubbo调用服务接口&#xff0c;等待提供方处理完响应结果&#xff0c;这是同步调用&#xff1b;也是默认的调用方式。通过查看原吗DubboInvoker可以看到&#xff1a; 整体来说有三种方式&#xff1a; 1、是否关注结果&#xff0c;returntrue关注&#xff0c;默认也…

理解:什么是同步和异步?什么是阻塞和非阻塞?

一、同步和异步 同步与异步是指访问数据的机制&#xff0c;同步一般指主动请求并等待IO操作完成的方式。 异步则指主动请求数据后便可以继续处理其它任务&#xff0c;随后等待IO操作完毕的通知。 同步和异步最大的区别就在于&#xff1a;同步需要等待&#xff0c;异步不需要等…

CPUGPU加速计算

1、CPU(Centrol Processing Unit) CPU上的大部分面积做了cache 和控制逻辑&#xff0c;天然适合做复杂串行程序&#xff1b; 2、GPU(Graphic Processing Unit): GPU有更多的晶体管用于数据处理&#xff0c;特别适用于解决并行计算的问题。可以使程序执行速度加快。为处理图形…

tensorflow使用GPU加速

测试faster-rcnn时&#xff0c;cpu计算速度较慢&#xff0c;调整代码改为gpu加速运算 将 with tf.Session() as sess: 替换为 1 gpu_options tf.GPUOptions(per_process_gpu_memory_fraction0.9) 2 with tf.Session(configtf.ConfigProto(gpu_optionsgpu_options,log_device_…

GPU 及其加速库简介

文章目录 一、GPU 与 CPU 简介1、GPU 与 CPU 的区别2、GPU 分类3、GPU&#xff08;NVIDIA A100 &#xff09; 介绍 二、CUDA 简介1、多版本 CUDA 切换2、为各种 NVIDIA 架构匹配 CUDA arch 和 gencode 三、OpenCL 简介1、OpenCL 平台模型2、OpenCL 执行模型 四、参考资料 一、G…

Pytorch使用GPU加速的步骤

CUDA&#xff08;Compute Unified Device Architecture&#xff09;&#xff0c;是显卡厂商NVIDIA推出的运算平台。 CUDA是一种由NVIDIA推出的通用并行计算架构&#xff0c;该架构使GPU能够解决复杂的计算问题。近些年来&#xff0c;显卡的计算能力越来越强大&#xff0c;如果只…

Python程序如何用GPU加速:Tesla、CUDA、Numba

概念解析 首先要明白&#xff0c;普通的Python代码是无法使用GPU加速的&#xff0c;只能在GPU上跑针对GPU设计的程序。 硬件加速必须要用硬件语言实现。 查询PythonGPU关键字&#xff0c;除了TensorFlow&#xff0c;另外出镜率比较高的几个概念是&#xff1a;Numba、CUDA、PyCU…

什么是 GPU 加速的计算?

什么是 GPU 加速的计算? GPU 加速计算是指同时采用图形处理单元 (GPU) 和 CPU&#xff0c;以加快科学、分析、设计、消费者和企业应用程序的速度。GPU 加速器于 2007 年由 NVIDIA 率先推出&#xff0c;现已在世界各地为政府实验室、大学、公司以及中小型企业的高能效数据中心提…

MATLAB上的GPU加速计算

概述 怎样在MATLAB上做GPU计算呢?主要分为三个步骤&#xff1a;数据的初始化、对GPU数据进行操作、把GPU上的数据回传给CPU 一、数据的初始化 首先要进行数据的初始化。有两种方法可以进行初始化&#xff1a;一是先在CPU上设置好数据然后拷贝到GPU&#xff1b;二是直接在GPU…

常用的python gpu加速方法

在使用 PyCharm进行机器学习的时候&#xff0c;我们常常需要自己创建一些函数&#xff0c;这个过程中可能会浪费一些时间&#xff0c;在这里&#xff0c;我们为大家整理了一些常用的 Python加速方法&#xff0c;希望能给大家带来帮助。 在 Python中&#xff0c;我们经常需要创建…

Matlab 应用GPU加速

由于GPU近几年地迅速发展&#xff0c;GPU在多线程计算等方面逐渐超越CPU成为计算的主力军。而Matlab是常用的数学应用软件&#xff0c;现在讲解一下如何在Matlab中使用GPU加速计算 文章目录 0. 必要条件1.给GPU传输数据1.1 CPU的数据复制到GPU1.2 直接在GPU上设置数据&#xf…

tensorflow的GPU加速计算

参考 tensorflow的GPU加速计算 - 云社区 - 腾讯云 一、概述 tensorflow程序可以通过tf.device函数来指定运行每一个操作的设备&#xff0c;这个设备可以是本地的CPU或者GPU&#xff0c;也可以是某一台远程的服务器。tensorflow会给每一个可用的设备一个名称&#xff0c;tf.…