kafka sasl_ssl配置

article/2025/10/20 14:07:34

一、切换到存储证书的路径

我这里在家目录中的创建了ssl文件夹
mkdir ssl && cd ssl

二、生成服务端密钥库

keytool -keystore server.keystore.jks -alias localhost -validity 365 -genkey

验证证书:

keytool -list -v -keystore server.keystore.jks

三、创建CA 并将CA导入到truststore中

生成CA:
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365将CA导入到truststore中:
keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert

四、从KeyStore导出证书,并用上述步骤生成的CA来签名生成的证书:

导出证书:
keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file签名:
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:ka123456

 五、将证书导入KeyStore

keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed

六、生成客户端密钥库

keytool -keystore client.keystore.jks -alias localhost -validity 365 -keyalg RSA -genkey

七、配置zookeeper认证:

1、zookeeper是homebrew安装kafka是安装,配置文件路径为:/usr/local/opt/kafka/libexec/config/,在此目录下创建zookeeper-server-jaas.conf配置文件,定义了两个用户,可提供给生产者和消费者使用,格式为:user_用户名="用户密码",内容如下:

Server {org.apache.zookeeper.server.auth.DigestLoginModule requireduser_admin="qaz!@#QAZ"user_kafka="qaz123!@#QAZ";
};

2、zookeeper配置文件zookeeper.properties中新增SASL认证配置,如下:

dataDir=/usr/local/var/lib/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000

3、在/usr/local/opt/zookeeper/libexec/bin/zkEnv.sh脚本中新增jvm参数,让其启动时加载jaas配置文件

export SERVER_JVMFLAGS="-Xmx${ZK_SERVER_HEAP}m $SERVER_JVMFLAGS -Djava.security.auth.login.config=/usr/local/opt/kafka/libexec/config/zookeeper-server-jaas.conf"# default heap for zookeeper client
ZK_CLIENT_HEAP="${ZK_CLIENT_HEAP:-256}"
export CLIENT_JVMFLAGS="-Xmx${ZK_CLIENT_HEAP}m $CLIENT_JVMFLAGS"

八、配置kafka安全认证:

1、在/usr/local/opt/kafka/libexec/config目录下创建kafka-server-jaas.conf和kafka-client-jaas.conf配置文件,内容如下:

kafka-server-jaas.conf:
KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="admin"password="qazQAZ123!@#"user_admin="qazQAZ123!@#"user_kafka="qaz!@#123";
};Client {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="kafka"password="qaz123!@#QAZ";
};kafka-client-jaas.conf:
KafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="kafka"password="qaz!@#123";
};

2、在kafka启动脚本(/usr/local/opt/kafka/libexec/bin/kafka-server-start.sh)配置环境变量,指定jaas.conf文件,增加如下代码:

-Djava.security.auth.login.config=/usr/local/Cellar/kafka/3.2.0/libexec/config/kafka-server-jaas.conf

3、修改kafka配置文件:/usr/local/opt/kafka/libexec/config/server.properties

九、java端代码

将步骤三生成的客户信任库client.truststore.jks复制到到java项目中,以便client可以信任这个CA

生产者:

// 1. 创建 kafka 生产者的配置对象
Properties props = new Properties();// 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9093");  props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,"SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM,"PLAIN"); props.put(SaslConfigs.SASL_JAAS_CONFIG,"org.apache.kafka.common.security.plain.PlainLoginModule required username='kafka' password='qaz!@#123';");
props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG,"");      props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,DEFAULT_BASE_CONF_PATH+"client.truststore.jks");
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,"ka123456");
// 3. 创建 kafka 生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
System.out.println("开始发送数据");String msg = "Hello World";
// 4. 调用 send 方法,发送消息
producer.send(new ProducerRecord<String, String>("topic1","value ",msg));
// 5. 关闭资源
producer.close();

消费者:

String filePath = file.getAbsolutePath();
// 1.创建消费者的配置对象
Properties props = new Properties();
// 2.给消费者配置对象添加参数
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9093");
// 配置序列化 必须
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 配置消费者组(组名任意起名) 必须
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group1");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);  //关闭自动提交
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,1);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,"SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM,"PLAIN");  props.put(SaslConfigs.SASL_JAAS_CONFIG,"org.apache.kafka.common.security.plain.PlainLoginModule required username='kafka' password='qaz!@#123';");props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG,"");      props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,filePath+"/conf/client.truststore.jks");
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,"ka123456");// 创建消费者对象
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 注册要消费的主题(可以消费多个主题)
ArrayList<String> topics = new ArrayList<>();
topics.add("topic1");
consumer.subscribe(topics);
System.out.println("准备接收数据.........");
// 拉取数据打印
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));for(ConsumerRecord<String,String> record : records){String value = record.value();System.out.println(value);}if(records.count() > 0) {//手动异步提交offset,当前线程提交offset不会阻塞,可以继续处理后面的程序逻辑consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {if (exception != null) {System.err.println("Commit falide for" + offsets);System.err.println("Commit falide exception" + exception.getStackTrace());}}});}
}


http://chatgpt.dhexx.cn/article/xZgJHjal.shtml

相关文章

KAFKA SASL配置 记录

kafka配置SASL 第1步 将kafka_client_jaas.conf/kafka_server_jaas.conf/kafka_zoo_jaas.conf三个文件放入kafka的config文件夹中&#xff0c;文件中配置用户&#xff0c;superadmin用户必须配置。 kafka_client_jaas.conf内容如下 KafkaClient { …

WIN10 VS2019 编译Cyrus SASL

环境 下载安装Visual Studio 2019 安装时在【工作负载】必须勾选【使用C的桌面开发】下载cyrus-sasl源码 从Github上clone或者下载zip包&#xff0c;我本来是需要2.1.26&#xff0c;但是从从https://www.cyrusimap.org/releases/下载对应版本的源码包编译都有问题&#xff0c;…

Kafka3.0 SASL安全认证

下面主要介绍Kafka两种认证方式 kafka验证方式&#xff1a; SASL/PLAIN&#xff1a;不能动态添加用户配置文件写死账号密码 SASL/SCRAM&#xff1a; 可以动态的添加用户 SASL/PLAIN方式 cd /usr/local/kafka/kafka_2.12-3.0.1/bin/ ## 复制一份saslcp kafka-server-start.…

集成OpenLDAP与Kerberos实现统一认证(三):基于SASL/GSSAPI深度集成

文章目录 1. 写作背景2. 既定目标3. 重要概念3.1 SASL3.2 GSSAPI3.3 SASL与GSSAPI的关系3.4 saslauthd3.5 Kerberos化 4. 核心原理4.1 基于SASL/GSSAPI实现Kerberos账号登录OpenLDAP4.2 基于olcAuthzRegexp规则映射Kerberos与OpenLDAP账号4.3 基于saslauthd进行委托认证 5. 安装…

kafka sasl java_Kafka安装及开启SASL_PLAINTEXT认证(用户名和密码认证)

前些日子要封装一个kafka的客户端驱动&#xff0c;配置了下kafka环境&#xff0c;发现配置复杂度完爆rabbitmq很多倍啊&#xff0c;而且发布订阅模式使用起来也很麻烦&#xff0c;可能就胜在分布式了吧。 kafka需要java环境&#xff0c;自行安装java sdk 1.8. 官方加载安装包&a…

go kafka 配置SASL认证及实现SASL PLAIN认证功能

用户认证功能&#xff0c;是一个成熟组件不可或缺的功能。在0.9版本以前kafka是没有用户认证模块的&#xff08;或者说只有SSL&#xff09;&#xff0c;好在kafka0.9版本以后逐渐发布了多种用户认证功能&#xff0c;弥补了这一缺陷&#xff08;这里仅介绍SASL&#xff09;。 本…

kafka集群开启sasl认证

kafka集群开启sasl认证 sasl认证 sasl 是扩展C/S模式验证能力的一种认证机制。它可以规范客户端和服务端传输应答和传输内容编码&#xff0c;简而言之sasl决定了认证的规则&#xff0c;即客户端如何存储身份证书、客户端与服务端如何校验密码都由sasl决定。当我们的客户端通过…

mysql sasl_SASL认证失败的原因(authentication failed)

SASL认证失败的原因(authentication failed) (2012-06-15 00:45:43) 标签&#xff1a; 杂谈 authentication failed) SASL认证失败的原因可分为如下几个可能的方面&#xff1a; Permission问题&#xff1a;对系统用户的SASL Auth尤其重要&#xff0c;要保证postfix用户(smtpd)对…

Kafka安全(以SASL+ACL为例)

目录 1 Security2 SASLACL实现用户及权限认证2.1 下载2.2 Kafka服务配置2.3 修改Kafka 服务启动脚本2.4 配置server.properties2.5 启动Zookeeper2.6 启动Kafka 集群2.7 ACL2.7.1 admin2.7.2 生产者2.7.3 消费者2.7.4 sharga用户2.7.5 shargb用户2.7.6 说明 2.8 生产者客户端代…

安装sasl出错

场景&#xff1a;python项目需要用到 pyhive0.6.4 pyhdfs0.2.2 thrift0.13.0 thrift_connector0.12 thrift_sasl0.3.0进项镜像构建时&#xff0c; 报错&#xff1a;error: command gcc failed with exit status 1 解决方法&#xff1a; 1.ubuntu系统&#xff1a; sudo apt-…

sasl认证原理

SASL - 简单认证和安全层 SASL是一种用来扩充C/S模式验证能力的机制认证机制, 全称Simple Authentication and Security Layer. 当你设定sasl时&#xff0c;你必须决定两件事&#xff1b;一是用于交换“标识信 息”&#xff08;或称身份证书&#xff09;的验证机制&#xff1…

kafka sasl java_Kafka 集群配置SASL+ACL

** Kafka 集群配置SASLACL 测试环境&#xff1a;** 系统: CentOS 6.5 x86_64 JDK : java version 1.8.0_121 kafka: kafka_2.11-1.0.0.tgz zookeeper: 3.4.5 ip: 192.168.49.161 (我们这里在一台机上部署整套环境) kafka 名词解析&#xff1a; Broker: Kafka 集群包含一个或多个…

kafka sasl java_Kafka SASL 安全认证

java client 中添加 SASL 设置信息: Java client consumer properties配置.png 注意 sasl.jaas.config 配置中的分号必不可少。 package kafka; import java.time.Duration; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer…

SASL讲解,以及在Spark中的应用

是什么? SASL全称Simple Authentication and Security Layer,是一种用来扩充C/S模式验证能力的机制。在Postfix可以利用SASL来判断用户是否有权使用转发服务,或是辨认谁在使用你的服务器。      SASL提供了一个通用的方法为基于连接的协议增加验证支持,而XMPP使用了一…

关于SASL的介绍文档

http://docs.sun.com/app/docs/doc/819-7056/6n91eac4q?lzh&aview 简单验证安全层 (Simple Authentication Security Layer, SASL) 介绍 SASL 为应用程序和共享库的开发者提供了用于验证、数据完整性检查和加密的机制。开发者可通过 SASL 对通用 API 进行编码。此方法避免…

文献管理与信息分析期末考试答案

可以在考试界面按ctrlf&#xff0c;对照着下图自己敲关键字搜索相应题目&#xff0c;感觉这样更方便点&#xff0c;也不会漏下题目。&#xff08;仅供参考&#xff09;

使用NoteExpress做文献管理

NoteExpress 是北京爱琴海软件公司开发的一款专业级别的文献检索与管理系统&#xff0c;具备文献信息检索与下载功能&#xff0c;可以用来管理参考文献的题录&#xff0c;以附件方式管理参考文献全文或者任何格式的文件、文档。 除此以为还有EndNote等其他文献管理软件&#x…

Zotero——一款文献管理工具

1.简介&#xff1a;Zotero是一款开源的文献管理工具&#xff0c;可以提供文献管理、浏览等众多服务&#xff0c;可以极大地为我们的科研和论文写作提供便利。 2.下载与安装 官方网站&#xff1a;Zotero | Your personal research assistant 我们可以直接在其官网上下载该软件…

使用 bibtex 进行参考文献管理

原  文&#xff1a;Bibliography management with bibtex 译  者&#xff1a;Xovee 翻译时间&#xff1a;2020年11月9日 使用 bibtex 进行参考文献管理 LaTeX 中直接支持参考文献的管理。本篇文章介绍如何使用thebibliography环境和BibTeX系统来管理参考文献。 注意&…

参考文献管理

一年以后latex和word都积累了一些经验再看这篇博客&#xff0c;发现有些小错误和语焉不详处&#xff0c;小修小改了一波。 处理参考文献真的是一件说复杂也很复杂&#xff0c;说简单也很简单的事&#xff0c;关键看能不能掌握门&#xff08;tao&#xff09;路&#xff0c;这里只…