玩转Kafka—SpringGo整合Kafka

article/2025/9/28 12:09:36

玩转Kafka—Spring整合Kafka

1 新建Spring Boot项目,增加依赖

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.20</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.76</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency>
</dependencies>

2 项目结构

在这里插入图片描述

3 代码

3.1 配置文件和Kafka服务器所需配置

application.properties

server.port=8080
#制定kafka代理地址
spring.kafka.bootstrap-servers=8.131.57.161:9092
#消息发送失败重试次数
spring.kafka.producer.retries=0
#每次批量发送消息的数量
spring.kafka.producer.batch-size=16384
#每次批量发送消息的缓冲区大小
spring.kafka.producer.buffer-memory=335554432
# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer# 指定默认消费者group id
spring.kafka.consumer.group-id=user-log-group
spring.kafka.consumer.bootstrap-servers=8.131.57.161:9092
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

Kafka服务器所需配置,server.properties文件

# 33行左右 0.0.0.0代表允许外部端口连接
listeners=PLAINTEXT://0.0.0.0:9092  
# 36行左右 ip代表外部代理地址
advertised.listeners=PLAINTEXT://8.131.57.161:9092   

3.2 生产者和实体类代码

Student.java

/*** @desc: 实体类* @author: YanMingXin* @create: 2021/11/20-12:43**/
@Data
@Accessors(chain = true)
@NoArgsConstructor
@AllArgsConstructor
public class Student implements Serializable {private String id;private String name;private String context;}

StudentService.java

/*** @desc: 接口* @author: YanMingXin* @create: 2021/11/20-12:43**/
public interface StudentService {void stuSayHello(Student student);
}

StudentServiceImpl.java

/*** @desc: 接口实现类* @author: YanMingXin* @create: 2021/11/20-12:43**/
@Service
public class StudentServiceImpl implements StudentService {@Autowiredprivate KafkaTemplate kafkaTemplate;/*** topic*/private static final String STU_TOPIC = "stu.sayHello";@Overridepublic void stuSayHello(Student student) {Student stu = new Student("1", "zs", "Hello Ls.");kafkaTemplate.send(STU_TOPIC, JSON.toJSONString(stu));}
}

3.3 消费者代码

MyKafkaListener.java

/*** @desc: 消费者监听* @author: YanMingXin* @create: 2021/11/20-12:44**/
@Component
public class MyKafkaListener {/*** topic*/private static final String STU_TOPIC = "stu.sayHello";@KafkaListener(topics = {STU_TOPIC})public void stuTopicConsumer(ConsumerRecord consumerRecord) {Optional kafkaMsg = Optional.ofNullable(consumerRecord.value());if (kafkaMsg.isPresent()) {Object msg = kafkaMsg.get();System.err.println(msg);}}
}

3.4 测试

@SpringBootTest
class SpKafkaApplicationTests {@Autowiredprivate StudentService studentService;@Testvoid contextLoads() throws Exception{for (int i = 0; i < 900000; i++) {studentService.stuSayHello(new Student());}}
}

玩转Kafka—Golang整合Kafka

几个常见的Go整合Kafka客户端工具:我们本次使用的是Shopify

  • Shopify:https://github.com/Shopify/sarama

  • Big Data Open Source Security:https://github.com/stealthly/go_kafka_client

  • OptioPay:https://github.com/optiopay/kafka

    https://github.com/nuance/kafka

    https://github.com/jdamick/kafka.go

  • Confluent:https://github.com/confluentinc/confluent-kafka-go

    Docs: http://docs.confluent.io/current/clients/index.html

  • Travis Bischel: https://pkg.go.dev/github.com/twmb/kafka-go/pkg/kgo

ps:配置go get代理(类似于Maven配置阿里云镜像)教程:

https://goproxy.io/zh/docs/getting-started.html

1 新建go modules

在这里插入图片描述

2 项目结构

在这里插入图片描述

3 生产者代码

KakaProducer.go

package mainimport ("fmt""github.com/Shopify/sarama""time"
)//消息生产者
func main() {//获取配置类config := sarama.NewConfig() //配置类实例(指针类型)config.Producer.RequiredAcks = sarama.WaitForAll //代理需要的确认可靠性级别(默认为WaitForLocal)config.Producer.Partitioner = sarama.NewRandomPartitioner  //生成用于选择要发送消息的分区的分区(默认为散列消息键)。config.Producer.Return.Successes = true //如果启用,成功传递的消息将在成功通道(默认禁用)。//获取客户端对象client, err := sarama.NewSyncProducer([]string{"8.131.57.161:9092"}, config)if err != nil {//获取客户端失败fmt.Println("producer close, err:", err)return}//延迟执行,类似于栈,等到其他代码都执行完毕后再执行defer client.Close()//一直循环for {//获取Message对象msg := &sarama.ProducerMessage{}//设置topicmsg.Topic = "go_kafka"//设置Message值msg.Value = sarama.StringEncoder("this is a good test, my message is good")//发送消息,返回pid、片偏移pid, offset, err := client.SendMessage(msg)//发送失败if err != nil {fmt.Println("send message failed,", err)return}//打印返回结果fmt.Printf("pid:%v offset:%v\n", pid, offset)//线程休眠下time.Sleep(10 * time.Second)}
}

4 消费者代码

KafkaConsumer.go

package mainimport ("fmt""github.com/Shopify/sarama""strings""sync""time"
)var (wg sync.WaitGroup //同步等待组//在类型上,它是一个结构体。一个WaitGroup的用途是等待一个goroutine的集合执行完成。//主goroutine调用了Add()方法来设置要等待的goroutine的数量。//然后,每个goroutine都会执行并且执行完成后调用Done()这个方法。//与此同时,可以使用Wait()方法来阻塞,直到所有的goroutine都执行完成。
)func main() {//获取消费者对象 可以设置多个IP地址和端口号,使用逗号进行分割consumer, err := sarama.NewConsumer(strings.Split("8.131.57.161:9092", ","), nil)//获取失败if err != nil {fmt.Println("Failed to start consumer: %s", err)return}//对该topic进行监听partitionList, err := consumer.Partitions("go_kafka")if err != nil {fmt.Println("Failed to get the list of partitions: ", err)return}//打印分区fmt.Println(partitionList)//获取分区和片偏移for partition := range partitionList {pc, err := consumer.ConsumePartition("go_kafka", int32(partition), sarama.OffsetNewest)if err != nil {fmt.Printf("Failed to start consumer for partition %d: %s\n", partition, err)return}//延迟执行defer pc.AsyncClose()//启动多线程go func(pc sarama.PartitionConsumer) {wg.Add(1)//获得message的信息for msg := range pc.Messages() {fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))fmt.Println()}wg.Done()}(pc)}//线程休眠time.Sleep(10 * time.Second)wg.Wait()consumer.Close()
}

5 测试

在这里插入图片描述
在这里插入图片描述
参考文章:https://www.cnblogs.com/angelyan/p/10800739.html

欢迎关注公众号

在这里插入图片描述


http://chatgpt.dhexx.cn/article/U5NH8NuL.shtml

相关文章

视图单行子查询返回mysql,Oracle命令整理 - osc_sj1kgo4z的个人空间 - OSCHINA - 中文开源技术交流社区...

常用命令 1 sqlplus scott/tiger192.168.47.10:1521/orcl后面不要加&#xff1b; sqlplus sys/oracle as sysdba 【密码认证】 sqlplus用户名任意/密码任意as sysdba 【主机认证】 2 spool d:\基本查询.txt 录屏开始 spool off …

MLX90316KGO-BDG-100-RE传感器 旋转位置 角度测量

介绍 MLX90316是Tria⊗is旋转位置传感器&#xff0c;提供在设备表面旋转的小偶极磁铁(轴端磁铁)的绝对角位置。 得益于其表面的集成磁集中器(IMC)&#xff0c;单片设备以非接触式方式感知应用磁通量密度的水平分量。 这种独特的传感原理应用于旋转位置传感器&#xff0c;可在机…

图形语言 Kgo

http://www.ferlysoft.com/product-kgo.asp 始于2008 无代码开发技术&#xff0c;以图形取代编程 Kgo 介绍Kgo是无代码开发管理信息系统的特定领域语言。 Kgo语言由语法语义、图形设计云工具和运行时环境三个部分构成。Kgo 语法语义采用XSD&#xff08;XML Schemas Definition&…

KgoUI 页面展示

前端框架 vue layui sass 框架源代码&#xff1a;码云

微软账号登陆不上_微软待办(todo)如何跟Outlook任务同步?

小生我之前也遇到这个问题了&#xff0c;后来参考了知友的一个答案&#xff0c;自此得到了解决 但是也还是由很多人不太明白&#xff0c;所以我尝试着把我解决这个问题的关键点描述一下&#xff0c;希望能给大家一些帮助 关键点只有一个&#xff1a; 三个软件的账户一定要一样 …

微软HPC解决方案

首先我们先来看下高性能群集与其它几种群集的不同 高可用群集&#xff1a;群集所有节点&#xff0c;来维持一个应用的持续运作&#xff0c;如果当前应用所在节点失败&#xff0c;自动故障转移至其它节点 负载均衡群集&#xff1a;群集所有节点来平衡一个应用的访问请求&#x…

群辉服务器间同步文件,群晖NAS端之间同步(Cloud Station Sharesync)

Cloud Station套件是一个套件组&#xff0c;包含5个功能模块&#xff0c;其中Cloud Station Server是Cloud Station在NAS上的服务器端&#xff1b;云同步(Cloud Station Drive)和云备份(Cloud Station Backup)是电脑端软件&#xff0c;Drive是电脑端和NAS端的双向同步&#xff…

微软官方硬盘备份软件SyncToy

微软官方硬盘备份软件SyncToy 前言 最近用硬盘检测工具发现硬盘坏道有点多&#xff0c;状态不佳&#xff0c;折腾了一阵还差点把硬盘搞坏&#xff0c;好在重装系统解决了问题&#xff08;重装大法好&#xff09;。再加上现在网上对资源限制越来愈大&#xff0c;所以对数据安全…

微软同步工具之synctoy

&#xfeff;&#xfeff; synctoy是由 微软 推出的一款免费的文件夹同步工具。微软的软件都以复杂与臃肿著称&#xff0c;不过这款软件还真是摆脱了复杂和臃肿。 &#xff1a;第一項是最主要的雙向同步功能&#xff0c;會依據左右兩端點裡檔案的增加、刪除、修改、更名&#x…

软件工程测试题

学堂在线选择题汇总 初识软件工程 软件工程方法是&#xff08; &#xff09;。 为了获得高质量软件而实施的一系列活动为开发软件提供技术上的解决方法为支持软件开发、维护、管理而研制的计算机程序系统为了理解问题和确定需求而采取的一些技术和方法下面的&#xff08; &am…

远程桌面同步本地计算机,微软更新远程桌面应用现在终于可以在本地和远程计算机上复制文件...

远程桌面连接是许多专业用户和开发者必备的功能&#xff0c;通过远程桌面服务可以直接连接远程计算机并可以直接操作。 系统自带的远程桌面连接程序微软已经很久没有更新&#xff0c;因为微软现在主要通过应用商店发布新版本远程桌面应用。 比如在刚刚发布的远程桌面应用新版本…

解决 vscode 登录微软账户同步设置 出现“vscode.dev 关闭了连接“ 问题

我的电脑最近重装了系统&#xff0c;之前的软件都删除了&#xff0c;在重新安装vscode之后想同步之前的设置、主题时出现了问题。 我的解决方法是 在当前页面 输入 https://vscode.dev 看能不能打开。 如果能打开&#xff0c;再次点击vscode登录账号同步设置 我之前使用微软账…

微软应用商店_微软商店那些好用的UWP软件!你不看这篇文章会后悔的!超级实用! | APP杂货店...

作为一个前 Windows phone 用户,一直对微软应用商店(Microsoft Store)有特殊的感情,犹记得大一时天天活跃在爱应用、微疯客、IT之家只为盼着 wp应用能慢慢赶上其它平台,当微软推出uwp(Universal Windows Platform)应用时,更是感觉微软要成了,wp不会再小众了。 微软画饼 然…

FreeFileSync 免费文件同步软件 实时自动备份重要资料

重要资料&#xff0c;必需备份。狡兔还有三窟呢&#xff0c;更不说突然断电就可能OVER的硬盘。 但要用的资料时时时更新的&#xff0c;对于比较小的文档&#xff0c;我们可以按日期多备份几次。如&#xff1a; 备份文档 2017-12-04 备份文档 2018-01-01 备份文档 2018-10-05 ……

自动与时间服务器时间同步,Windows系统时间同步(附时间同步服务器地址)

eProvidersNtpClient ] 分支&#xff0c;并双击 SpecialPollInterval 键值&#xff0c;将对话框中的“基数栏”选择到“十进制”上&#xff0c;如图1所示 图 1 3. 而这时在对话框中显示的数字正是自动对时的间隔(以秒为单位)&#xff0c;比如默认的604800就是由7(天)24(时)60(分…

如何让微软Onedrive同步其他硬盘的文件

最近在工作中&#xff0c;有同事发现Onedriver只能同步自己文件夹中的数据。其他硬盘的数据不同步&#xff0c;发现好恼火。 因为OneDrive在默认文件夹下&#xff0c;无法对其他文件夹备份&#xff0c;进行同步备份。 今天我们就来解决这个同步的麻烦&#xff01;&#xff01; …

微软drive服务器,OneDrive:微软云存储服务

OneDrive&#xff0c;全名Microsoft OneDrive&#xff0c;前称Windows Live SkyDrive&#xff0c;是微软所推出的网络硬盘及云存储服务。用户可以上传他们的文件到网络服务器上&#xff0c;并且透过网络浏览器来浏览那些文件。更可直接编辑和观看Microsoft Office文件。同时推出…

软件体系结构风格介绍

文章目录 软件体系结构风格介绍&#xff08;一&#xff09;管道和过滤器风格&#xff08;二&#xff09;数据抽象与面向对象风格&#xff08;三&#xff09;基于事件的隐式调用风格&#xff08;四&#xff09;层次系统风格&#xff08;五&#xff09;仓库风格&#xff08;六&am…

跨设备同步保存密码

跨设备同步保存密码 文章目录 跨设备同步保存密码前言一、密码管理方式二、跨设备存储密码1、采用Keepass存储密码2、Keepass连接坚果云 三、小记&#xff1a; 前言 现在网络平台越来越多&#xff0c;除了可以直接通过手机登陆的方式&#xff0c;其次就需要使用到账号和密码。账…

【WIN问题】微软Microsoft onenote/store 无法连接网络无法同步解决

问题描述 将近1个月没有使用PC端的微软笔记&#xff0c;手机app正常使用。登录PC端记录笔记后&#xff0c;Ctrl s 保存时OneNote提示&#xff1a;无法同步&#xff0c;网上看了很多博客推荐的方法&#xff0c;尝试无果&#xff01; 后面发现 Microsoft store 也不能登录&…