Spring Boot锦集(三):Spring Boot整合Kafka | Zookeeper/Kafka的安装和配置 | 总结的很详细

article/2025/8/22 7:54:18

前言

在学习本章节前,务必做好以下准备工作:

1、安装并启动了Zookeeper[官网],如需帮助,点击进入;

2、安装并启动了Kafka[官网],如需帮助,点击进入。

注:zk和kafka的安装与介绍,本文不做重点介绍,具体参考上方链接。


¥¥¥¥¥¥下面我们一起来学习一下Spring Boot整个Kafka的入门级教程¥¥¥¥¥¥


一、准备工作

1.1、新建 Spring Boot 2.x Web 工程

1.1.1、工程创建步骤演示

 注意勾选下面几个选项!

1.1.2、工程目录展示

注:项目创建成功后,先创建package和java文件,为下面的代码编写工作做铺垫。

1.2、pom.xml添加spring-kafka相关依赖

注:里面添加的依赖主要有三方面,分别是系统已自动配置好的、kafka核心依赖+测试依赖、其他相关辅助性的依赖(比如:lombok)。

 <?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.4</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.succ</groupId><artifactId>SpringBootKafaka</artifactId><version>0.0.1-SNAPSHOT</version><name>SpringBootKafaka</name><description>Demo project for Spring Boot</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- Kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency><!-- 阿里巴巴 fastjson --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.58</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

1.3、在application.yml 文件中,添加 kafka 相关配置

spring:kafka:# 指定 kafka 地址,我这里部署在的虚拟机,开发环境是Windows,kafkahost是虚拟机的地址, 若外网地址,注意修改为外网的IP( 集群部署需用逗号分隔)bootstrap-servers: kafkahost:9092consumer:# 指定 group_idgroup-id: group_idauto-offset-reset: earliest# 指定消息key和消息体的序列化方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:# 指定消息key和消息体的序列化方式key-deserializer: org.apache.kafka.common.serialization.StringSerializervalue-deserializer: org.apache.kafka.common.serialization.StringSerializer

注意:kafkahost别名需要单独配置,如需帮助点击进入;当然,这里也可以直接写虚机的IP地址(因为开发环境是Windows,kafka部署在虚拟机上,所以这里不能写localhost(等价于127.0.0.1),否则访问的就是windows的localhost,根本访问不到虚拟机的kafka)。 

auto.offset.reset 有3个值可以设置: 

earliest:当各分区下有已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset时,从头开始消费;


latest:当各分区下有已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset 时,消费新产生的该分区下的数据;


none: topic各分区都存在已提交的 offset 时,从 offset 后开始消费;只要有一个分区不存在已提交的 offset,则抛出异常;


默认建议用 earliest, 设置该参数后 kafka出错后重启,找到未消费的offset可以继续消费。

latest 这个设置容易丢失消息,假如 kafka 出现问题,还有数据往topic中写,这个时候重启kafka,这个设置会从最新的offset开始消费, 中间出问题的那些就不管了。 

注:更详细的配置信息,见底部的拓展 

二、代码编写

2.1、Order(订单)实体Bean的编码

package model;import lombok.*;import java.time.LocalDateTime;/*** @create 2022-10-08 1:25* @describe 订单类javaBean实体*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class Order {/*** 订单id*/private long orderId;/*** 订单号*/private String orderNum;/*** 订单创建时间*/private LocalDateTime createTime;
}

2.2、KafkaProvider(消息提供者)的编写

package com.succ.springbootkafaka.provider;import com.alibaba.fastjson.JSONObject;
import com.succ.springbootkafaka1.model.Order;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;import java.time.LocalDateTime;/*** @create 2022-10-14 21:39* @describe 话题的创建类,使用它向kafka中创建一个关于Order的订单主题*/
@Component
@Slf4j
public class KafkaProvider {/*** 消息 TOPIC*/private static final String TOPIC = "shopping";@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(long orderId, String orderNum, LocalDateTime createTime) {// 构建一个订单类Order order = Order.builder().orderId(orderId).orderNum(orderNum).createTime(createTime).build();// 发送消息,订单类的 json 作为消息体ListenableFuture<SendResult<String, String>> future =kafkaTemplate.send(TOPIC, JSONObject.toJSONString(order));// 监听回调future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {@Overridepublic void onFailure(Throwable throwable) {log.info("## Send message fail ...");}@Overridepublic void onSuccess(SendResult<String, String> result) {log.info("## Send message success ...");}});}
}

2.3、KafkaConsumer(消费者)代码编写

package consumer;import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;/*** @create 2022-10-08 1:25* @describe 通过指定的话题和分组来消费对应的话题*/
@Component
@Slf4j
public class KafkaConsumer {@KafkaListener(topics = "shopping", groupId = "group_id") //这个groupId是在yml中配置的public void consumer(String message) {log.info("## consumer message: {}", message);}
}

三、单元测试

3.1、准备工作

3.1.1、查看Zookeeper的启动状态

cd命令,进入到zk的安装目录

通过 bin 目录下的 zookeeper-server-start.sh 启动脚本,来启动 zk 单节点实例:

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties 

 由于本虚拟机zk的各项基本配置都已到位,可以直接启动(zk的安装如需帮助,点击进入)

#zkServer.sh status 查看服务状态
#zkServer.sh  start 启动zk
#zkServer.sh  stop  停掉zk
#zkServer.sh  restart  重启zk

如上图所示,zk的启动模式为standalone单例模式(非集群),已启动。

3.1.2、启动kafka

使用cd命令,进入kafka安装目录下的bin目录

进入解压目录,通过 bin 目录下的 kafka-server-start.sh 脚本,后台启动 Kafka : 

pwd 
./kafka-server-start.sh  ../config/server.properties 

 

正常启动,如下图所示:

温馨提示:Kafka启动报错不识别主机名的解决办法,点击进入。

java.net.UnknownHostException|unknown error at java.net.Inet6AddressImpl.lookupAllHost

3.1.3、三种方式,查看kafka的启动状态

jps -ml #方式一,通过jps命令查看(尾部的-ml为非必须参数)

netstat -nalpt | grep 9092  #方式二,通过查看端口号查看
 lsof -i:9092 #方式三

3.2、单元测试代码编写

package com.succ.springbootkafaka;import com.succ.springbootkafaka.provider.KafkaProvider;
import org.junit.jupiter.api.Test;//注意,这个junit用自带的就可以
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;import java.time.LocalDateTime;
import java.util.UUID;
import java.util.concurrent.TimeUnit;@SpringBootTest
class SpringBootKafakaApplicationTests {@Autowiredprivate KafkaProvider kafkaProvider;@Testpublic void sendMessage() throws InterruptedException {//如果这里打印为null,要么是zk或kafka没正常启动,此时进入linux分别查看他们状态接口,另外也需要排查一下你的yum文件配置的kafka的地址,最后排查自己的注解是否引入错误的packageSystem.out.println("是否为空??+"+kafkaProvider);// 发送 1000 个消息for (int i = 0; i < 1000; i++) {long orderId = i+1;String orderNum = UUID.randomUUID().toString();kafkaProvider.sendMessage(orderId, orderNum, LocalDateTime.now());}TimeUnit.MINUTES.sleep(1);}}

3.3、测试

3.3.1、发送 1000 个消息,看消息是否能够被正常发布与消费

控制台日志如下:

3.3.2、查看Kafka 的 topic 列表,看 “shopping” 这个topic 是否正常被创建

执行 bin 目录下查看 topic 列表的 kafka-topics.sh 脚本:

注:如果你的kafka版本,高于2.2+=,使用如下命令查看

 bin/kafka-topics.sh --list --bootstrap-server kafkahost:9092

如上图所示,可以看到刚刚创建的主题shopping 

注:如果你的kafka版本,低于2.2-,使用如下命令查看

bin/kafka-topics.sh --list --zookeeper kafkahost:2181

 上面的kafkahost,是在 vim /etc/host中配置的,另外IP通过ifconfig命令获取

 

至此,测试成功!

四、为什么要先启动zk,然后启动kafka

因为kafka的运行依赖zk的启动。
具体,可以进入kafka的解压目录的/conf/目录下

cd /usr/src/kafka_2.13-3.3.1/config/ && ls
vi server.properties

 

更多kafka教程,点击进入 

五、收尾工作

1、关闭Zookeeper

zkServer.sh status
zkServer.sh stop
zkServer.sh status

  

2、关闭kafka

cd /usr/src/kafka_2.13-3.3.1/ && ls
jps 
bin/kafka-server-stop.sh
jps

 

注意事项:如果你是首次安装并使用kafka,那么该关闭命令是不能生效的,需要进入kafka的配置文件对配置做一些更改,具体如下: 

vim bin/kafka-server-stop.sh 

 找到下图中的这段代码,修改一下

#PIDS=$(ps ax | grep ' kafka\.Kafka ' | grep java | grep -v grep | awk '{print $1}')
PIDS=$(jps -lm | grep -i 'kafka.Kafka' | awk '{print $1}')

修改后的命令作用:使用jps -lm命令列出所有的java进程,然后通过管道,利用grep -i 'kafka.Kafka’命令将kafka进程筛出来,最后再接一管道命令,利用awk将进程号取出来。

 

总结

本文对通过一个小案例,初步介绍了 Spring Boot对kafka的整合,完成了从Spring Boot中调用生产者(向kafka中创建主题)、消费者(消费信息)完成对kafka的调用。

当然kafka的使用,远不止于此,后期也会在不同的篇幅中,更多、更加深入的介绍。

尾言

走前人走过的路,为后来者踩坑。

在整合的过程中,难免遇到磕磕碰碰,还好有我与你同行,里面遇到的一些坑,基本都有标注。

如果觉得文章还不错,欢迎点赞收藏!

拓展

关于yum文件更详细的配置

spring:kafka:bootstrap-servers: 172.101.203.33:9092producer:# 发生错误后,消息重发的次数。retries: 0#当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。batch-size: 16384# 设置生产者内存缓冲区的大小。buffer-memory: 33554432# 键的序列化方式key-serializer: org.apache.kafka.common.serialization.StringSerializer# 值的序列化方式value-serializer: org.apache.kafka.common.serialization.StringSerializer# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。acks: 1consumer:# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5Dauto-commit-interval: 1S# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:# latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录auto-offset-reset: earliest# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量enable-auto-commit: false# 键的反序列化方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 值的反序列化方式value-deserializer: org.apache.kafka.common.serialization.StringDeserializerlistener:# 在侦听器容器中运行的线程数。concurrency: 5#listner负责ack,每调用一次,就立即commitack-mode: manual_immediatemissing-topics-fatal: false

附注 

1、ZK/Zookeeper的下载和安装 | 真/伪集群的快速搭建| 总结的很详细

2、Kafka锦集(一):Kafka介绍和安装,总结的很详细


http://chatgpt.dhexx.cn/article/YInDidiP.shtml

相关文章

Flink系列之:Flink CDC深入了解MySQL CDC连接器

Flink系列之&#xff1a;Flink CDC深入了解MySQL CDC连接器 一、增量快照特性1.增量快照读取2.并发读取3.全量阶段支持 checkpoint4.无锁算法5.MySQL高可用性支持 二、增量快照读取的工作原理三、全量阶段分片算法四、Chunk 读取算法五、Exactly-Once 处理六、MySQL心跳事件支持…

大数据面试重点之kafka(三)

Kafka如何保证全局有序&#xff1f; 可回答&#xff1a;1&#xff09;Kafka消费者怎么保证有序性&#xff1f;2&#xff09;Kafka生产者写入数据怎么保证有序&#xff1f;3&#xff09;Kafka可以保证 数据的局部有序&#xff0c;如何保证数据的全局有序&#xff1f;4&#xff0…

Apache Kafka-auto.offset.reset参数(earliest、latest、none)含义说明

文章目录 官方说明参数解读CodePOM依赖配置文件生产者消费者单元测试测试earliestlatest(默认&#xff09;noneexception 源码地址 官方说明 https://kafka.apache.org/documentation/ 选择对应的版本&#xff0c;我这里选的是 2.4.X https://kafka.apache.org/24/documenta…

Kafka之auto.offset.reset值解析

今日在使用kafka时&#xff0c;发现将 auto.offset.reset 设置为earliest、latest、none 都没有达到自己预期的效果。 earliest&#xff1a; 当各分区下有已提交的offset时&#xff0c;从提交的offset开始消费&#xff1b;无提交的offset时&#xff0c;从头开始消费latest&…

关于EarlyZ

在前向渲染中&#xff0c;ZTest是在Fragement Shader之后进行的&#xff0c;也就是说&#xff0c;被遮挡的部分也要绘制FS&#xff0c;就产生了Over Draw&#xff0c;其实很费&#xff0c;Early Z Culling就解决了这个问题 Early fragment tests, as an optimization, exist t…

【EARLIER/EARLIEST函数】引用不存在的更早的行上下文 报错解决

引用PowerQuery的例子并给予个人理解 X1 SUMX(FILTER(Data,Data[订单日期]>EARLIER(Data[订单日期])),[金额])---WRONG X2CALCULATE(SUM(Data[金额]),FILTER(Data,SUMX(FILTER(Data,Data[订单日期]>EARLIER(Data[订单日期])),[金额])))---RIGHT X1报错原因&#xff1a…

EarlyStop

在训练中&#xff0c;我们希望在中间箭头的位置停止训练。而Early stopping就可以实现该功能&#xff0c;这时获得的模型泛化能力较强&#xff0c;还可以得到一个中等大小的w的弗罗贝尼乌斯范数。其与L2正则化相似&#xff0c;选择参数w范数较小的神经网络。 可以用L2正则化代…

Kafka 使用java api从指定位移消费 (从开头消费/从结尾消费)

一、auto.offset.reset值详解 在 Kafka 中&#xff0c;每当消费者组内的消费者查找不到所记录的消费位移或发生位移越界时&#xff0c;就会根据消费者客户端参数 auto.offset.reset 的配置来决定从何处开始进行消费&#xff0c;这个参数的默认值为 “latest” 。 auto.offset…

动态SQL之 where 标签

动态SQL之 where 标签 where和if一般结合使用&#xff1a; 1.若where标签中的 if 条件都不满足&#xff0c;则where标签没有任何功能&#xff0c;即不会添加where关键字 2.若where标签中的 if 条件满足&#xff0c;则where标签会自动添加where关键字&#xff0c;并将条件最前…

mybatis-动态sql

文章目录 1. 动态sql简述2. 动态sql示例 2.1 if2.2 choose2.3 foreach2.4 sql 及 include2.5 sql中的特殊字符3. 后台分页实现4. 数据版本号处理并发问题 1. 动态sql简述 mybatis的动态sql语句是基于OGNL表达式的。可以方便的在sql语句中实现某些逻辑. 总体说来mybatis动态SQL…

mysql动态sql拼接_动态SQL(拼接)

Q1:什么是动态SQL呢? A1:首先是SQL语句,是根据条件来拼接SQL Q2:为什么要用动态SQL? A2:因为在条件WHERE中出现OR会导致不能使用索引,从而使效率差别巨大。 例如:如图1、2, 图(1) 图(2) Q3:怎么样使用动态SQL? A3: 存储过程Proc_Test是没有采用拼接的:CREATE PROC…

Mybatis学习之动态Sql

目录 1. 什么是动态Sql 2. 动态Sql需要学习什么 3. 动态Sql之《if》 4. 动态Sql之《where》 5. 动态Sql之《foreach》 6. 动态Sql之《sql》 7. PageHelper分页插件的使用 1. 什么是动态Sql 答案&#xff1a;动态Sql指的是&#xff0c;Sql语句是变化的&#xff0c;不是固…

Mybatis 动态SQL

Mybatis 动态SQL 一 .动态SQL 数组 array 使用foreach 标签 <!-- mybatis的集合操作知识点: 如果遇到集合参数传递,需要将集合遍历标签: foreach 循环遍历集合标签属性说明:1.collection 表示遍历的集合类型1.1 数组 关键字 array1.2 List集合 关键字 list1.3 Map集…

Mybatis动态SQL解析

文章目录 1 为什么需要动态SQL&#xff1f;2 动态标签有哪些?3 举例说明ifchoose (when, otherwise)trim (where, set)foreach 1 为什么需要动态SQL&#xff1f; 看一段Oracle存储过程代码&#xff1a; 由于前台传入的查询参数不同&#xff0c;所以写了很多的if else&#x…

Java MyBatis动态SQL

&#x1f9ed;MyBatis学习 &#x1f389; 内容回顾 Java MyBatis的介绍及其执行原理 Java MyBatis配置详解 Java Mybatis中使用Junit进行测试_程序员必备 Java MyBatis的使用 &#x1f4e2;今天我们进行 Java MyBatis动态SQL 的学习&#xff0c;感谢你的阅读&#xff0c;内容若…

mysql动态SQL用法

顾名思义“动态”SQL就是不固定的SQL&#xff0c;根据不同的条件把SQL语句进行拼接&#xff0c;来实现对数据库更加精准的操作。可以通过配置文件或者注解的形式实现&#xff0c;多用于多条件联查。 xml版&#xff08;配置文件&#xff09;&#xff1a; xml版的动态SQL 接…

动态SQL标签

所谓的动态SQL&#xff0c;本职还是SQL语句&#xff0c;只是可以在SQL层面&#xff0c;去执行一个逻辑代码 动态SQL就是在拼接SQL语句&#xff0c;我们只要保证SQL的正确性&#xff0c;按照SQL的格式&#xff0c;去排列组合。 建议&#xff1a; 先在MySQL中写出完整的SQL&am…

MyBatis动态SQL

文章目录 前言一、\<if\>标签二、\<where\>标签三、\<trim\>标签四、\<set\>标签五、\<foreach\>标签五、\<sql\>标签 与 \<include\>标签 前言 动态sql是Mybatis的强大功能特性之一&#xff0c;能够完成不同条件下的sql拼接 以上…

动态SQL

动态SQL 在项目开发中&#xff0c;动态SQL可以解决很多不确定因素导致的SQL语句不同的问题。动态SQL可以简单高效的进行编码。在接下来的案例中进行认识和学习动态SQL。 动态SQL只是在原有的SQL语句中进行细微修改。案例贴合实际&#xff0c;编码简单易懂 文章目录 动态SQL一、…

动态 SQL

文章目录 一、学习目的二、动态 SQL 中的元素三、条件查询操作四、更新操作五、复杂查询操作1.foreach 元素中的属性2.foreach 元素迭代数组3.foreach 元素迭代 List4.foreach 元素迭代 Map 一、学习目的 在实际项目的开发中&#xff0c;开发人员在使用 JDBC 或其他持久层框架…