kafka sasl java_Kafka安装及开启SASL_PLAINTEXT认证(用户名和密码认证)

article/2025/10/20 14:11:05

前些日子要封装一个kafka的客户端驱动,配置了下kafka环境,发现配置复杂度完爆rabbitmq很多倍啊,而且发布订阅模式使用起来也很麻烦,可能就胜在分布式了吧。

kafka需要java环境,自行安装java sdk 1.8+.

官方加载安装包,当前为kafka_2.11-1.1.0.tgz

(新版kafka包内集成了zookeeper,直接启动即可)

解压缩后kafka目录下文件:

00e7810e339a6bcc148bce17b9573211.png

配置:

config路径下配置文件:

consumer.properites 消费者配置,这个配置文件用于配置开启的消费者,默认即可

producer.properties 生产者配置,这个配置文件用于配置开启的生产者,默认即可

server.properties kafka服务器的配置,此配置文件用来配置kafka服务器,核心配置如下:

(1)broker.id 申明当前kafka服务器在集群中的唯一ID,需配置为integer,并且集群中的每一个kafka服务器的id都应是唯一的,默认即可

(2)listeners 申明此kafka服务器需要监听的端口号,如果是在本机上跑虚拟机运行可以不用配置本项,默认会使用localhost的地址,如果是在远程服务器上运行则必须配置,

例如:listeners=PLAINTEXT:// 192.168.1.1:9092。并确保服务器的9092端口能够访问

(3)zookeeper.connect 申明kafka所连接的zookeeper的地址 ,需配置为zookeeper的地址,由于本次使用的是kafka高版本中自带zookeeper,默认即可,

例如:zookeeper.connect=localhost:2181

启动zookeeper:

bin/zookeeper-server-start.sh config/zookeeper.properties & (&代表后台运行)

启动kafka:

bin/kafka-server-start.sh config/server.properties &

创建一个名为test的topic:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

查看已经创建的topic:

bin/kafka-topics.sh --list --zookeeper localhost:2181

更改tioic分区:

bin/kafka-topics.sh --zookeeper localhost:2181 --topic test --alter --partitions 4

查看指定topic信息:

bin/kafka-topics.sh --zookeeper localhost:2181 --topic test --describe

创建一个消息消费者:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

(如果弹出WARN [Producer clientId=console-producer] Connection to node -1 could not be established. Broker may not be available,需将上文中localhost改为队列配置的地址)

创建一个消息生产者:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

出现 > 符号后输入消息发送,消费者连接可以看到输出。

(如果弹出WARN [Consumer clientId=consumer-1, groupId=console-consumer-59948] Connection to node -1 could not be established. Broker may not be available,

需将上文中localhost改为队列配置的地址)

Kafka安装完成。

Kafka开启使用 SASL_PLAINTEXT认证:

输入下面命令,关闭kafka:

bin/kafka-server-stop.sh

输入下面命令,关闭zookeeper:

bin/zookeeper-server-stop.sh

进入config目录,增加如下配置文件:

cd config

(1)touch kafka_server_jaas.conf

配置如下:

KafkaServer {

org.apache.kafka.common.security.plain.PlainLoginModule required

username="admin"

password="admin"

user_admin="admin"

user_alice="alice";

};

在KafkaServer部分,username和password是broker用于初始化连接到其他的broker,在上面配置中,admin用户为broker间的通讯,

user_userName定义了所有连接到 broker和 broker验证的所有的客户端连接包括其他 broker的用户密码,user_userName必须配置admin用户,否则报错。

(2)touch kafka_cilent_jaas.conf

配置如下:

KafkaClient {

org.apache.kafka.common.security.plain.PlainLoginModule required

username="admin"

password="admin";

};

在KafkaClient部分,username和password是客户端用来配置客户端连接broker的用户,在上面配置中,客户端使用admin用户连接到broker。

更改server.properties配置文件:

listeners=SASL_PLAINTEXT://ip:9092

# 使用的认证协议

security.inter.broker.protocol=SASL_PLAINTEXT

#SASL机制

sasl.enabled.mechanisms=PLAIN

sasl.mechanism.inter.broker.protocol=PLAIN

# 完成身份验证的类

authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

# 如果没有找到ACL(访问控制列表)配置,则允许任何操作。

#allow.everyone.if.no.acl.found=true

super.users=User:admin

修改consuer和producer的配置文件consumer.properties和producer.properties,分别增加如下配置:

security.protocol=SASL_PLAINTEXT

sasl.mechanism=PLAIN

切换到kafka目录下bin路径:

cd ..

cd bin

JAAS文件作为每个broker的jvm参数,在kafka-server-start.sh脚本中增加如下配置(可在最上面):

export KAFKA_OPTS=" -Djava.security.auth.login.config=/data/kafka/kafka_2.11-1.1.0/config/kafka_server_jaas.conf"

在kafka-console-consumer.sh和kafka-console-producer.sh中添加:

export KAFKA_OPTS=" -Djava.security.auth.login.config=/data/kafka/kafka_2.11-1.1.0/config/kafka_client_jaas.conf"

启动zookeeper和kafka:

bin/zookeeper-server-start.sh config/zookeeper.properties & (&代表后台运行)

bin/kafka-server-start.sh config/server.properties &

输入命令,启动生产者:

bin/kafka-console-producer.sh --broker-list 10.100.17.79:9092 --topic test --producer.config config/producer.properties

输入命令,启动消费者

bin/kafka-console-consumer.sh --bootstrap-server 10.100.17.79:9092 --topic test --from-beginning --consumer.config config/consumer.properties

如果消费者启动不了或者无法消费指定topic,尝试设置所使用用户的组权限(当前使用用户admin为超级用户,不需要配置权限):

利用kafka-acls.sh为topic设置ACL:

bin/kafka-acls.sh --authorizer-properties zookeeper.connect=ip:2181 --add --allow-principal  User:admin  --group test-consumer-group --topic test

注意,如果admin要作为消费端连接alice-topic的话,必须对其使用的group(test-consumer-group)也赋权(group 在consumer.properties中有默认配置group.id)

权限操作例子:

增加权限:

# 为用户 alice 在 test(topic)上添加读写的权限

bin/kafka-acls.sh --authorizer-properties zookeeper.connect=ip:2181 --add --allow-principal User:alice --operation Read --operation Write --topic test

# 对于 topic 为 test 的消息队列,拒绝来自 ip 为192.168.1.100账户为 zhangsan 进行 read 操作,其他用户都允许

bin/kafka-acls.sh --authorizer-properties zookeeper.connect=ip:2181 --add --allow-principal User:* --allow-host * --deny-principal User:zhangsan

--deny-host 192.168.1.100 --operation Read --topic test

# 为 zhangsan 和 alice 添加all,以允许来自 ip 为192.168.1.100或者192.168.1.101的读写请求

bin/kafka-acls.sh --authorizer-properties zookeeper.connect=ip:2181 --add --allow-principal User:zhangsan --allow-principal User:alice --allow-host 192.168.1.100 --allow-host 192.168.1.101 --operation Read --operation Write --topic test

获取权限列表:

# 列出 topic 为 test 的所有权限账户

bin/kafka-acls.sh --authorizer-properties zookeeper.connect=ip:2181 --list --topic test

移除权限:

# 移除 acl

bin/kafka-acls.sh --authorizer-properties zookeeper.connect=ip:2181 --remove --allow-principal User:zhangsan --allow-principal User:Alice --allow-host 192.168.1.100 --allow-host 192.168.1.101 --operation Read --operation Write --topic test

以上完成Kafka的SASL_PLAINTEXT认证。

多节点zookeeper下认证:

增加配置文件:

touch kafka_zoo_jaas.conf

配置如下:

ZKServer{

org.apache.kafka.common.security.plain.PlainLoginModule required

username="admin"

password="admin"

user_admin="admin";

};

配置应用名称为ZKServer,zookeeper默认使用的JAAS应用名称是Server(或者zookeeper)。

设置的环境变量其实只是传入到JVM的参数。这里设置的环境变量是KAFKA_OPTS。修改zookeeper的启动脚本zookeeper-server-start.sh如下:

export KAFKA_OPTS=" -Djava.security.auth.login.config=/data/kafka/kafka_2.11-1.1.0/config/kafka_zoo_jaas.conf  -Dzookeeper.sasl.serverconfig=ZKServer"

如果配置使用默认名称,则只需要添加:

export KAFKA_OPTS=" -Djava.security.auth.login.config=/data/kafka/kafka_2.11-1.1.0/config/kafka_zoo_jaas.conf“”

修改zookeeper.properties配置文件,增加配置:

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider

requireClientAuthScheme=sasl

jaasLoginRenew=3600000

以上配置完成后可启动尝试多节点zookeeper身份认证


http://chatgpt.dhexx.cn/article/Vn9Td6w8.shtml

相关文章

go kafka 配置SASL认证及实现SASL PLAIN认证功能

用户认证功能,是一个成熟组件不可或缺的功能。在0.9版本以前kafka是没有用户认证模块的(或者说只有SSL),好在kafka0.9版本以后逐渐发布了多种用户认证功能,弥补了这一缺陷(这里仅介绍SASL)。 本…

kafka集群开启sasl认证

kafka集群开启sasl认证 sasl认证 sasl 是扩展C/S模式验证能力的一种认证机制。它可以规范客户端和服务端传输应答和传输内容编码,简而言之sasl决定了认证的规则,即客户端如何存储身份证书、客户端与服务端如何校验密码都由sasl决定。当我们的客户端通过…

mysql sasl_SASL认证失败的原因(authentication failed)

SASL认证失败的原因(authentication failed) (2012-06-15 00:45:43) 标签: 杂谈 authentication failed) SASL认证失败的原因可分为如下几个可能的方面: Permission问题:对系统用户的SASL Auth尤其重要,要保证postfix用户(smtpd)对…

Kafka安全(以SASL+ACL为例)

目录 1 Security2 SASLACL实现用户及权限认证2.1 下载2.2 Kafka服务配置2.3 修改Kafka 服务启动脚本2.4 配置server.properties2.5 启动Zookeeper2.6 启动Kafka 集群2.7 ACL2.7.1 admin2.7.2 生产者2.7.3 消费者2.7.4 sharga用户2.7.5 shargb用户2.7.6 说明 2.8 生产者客户端代…

安装sasl出错

场景:python项目需要用到 pyhive0.6.4 pyhdfs0.2.2 thrift0.13.0 thrift_connector0.12 thrift_sasl0.3.0进项镜像构建时, 报错:error: command gcc failed with exit status 1 解决方法: 1.ubuntu系统: sudo apt-…

sasl认证原理

SASL - 简单认证和安全层 SASL是一种用来扩充C/S模式验证能力的机制认证机制, 全称Simple Authentication and Security Layer. 当你设定sasl时,你必须决定两件事;一是用于交换“标识信 息”(或称身份证书)的验证机制&#xff1…

kafka sasl java_Kafka 集群配置SASL+ACL

** Kafka 集群配置SASLACL 测试环境:** 系统: CentOS 6.5 x86_64 JDK : java version 1.8.0_121 kafka: kafka_2.11-1.0.0.tgz zookeeper: 3.4.5 ip: 192.168.49.161 (我们这里在一台机上部署整套环境) kafka 名词解析: Broker: Kafka 集群包含一个或多个…

kafka sasl java_Kafka SASL 安全认证

java client 中添加 SASL 设置信息: Java client consumer properties配置.png 注意 sasl.jaas.config 配置中的分号必不可少。 package kafka; import java.time.Duration; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer…

SASL讲解,以及在Spark中的应用

是什么? SASL全称Simple Authentication and Security Layer,是一种用来扩充C/S模式验证能力的机制。在Postfix可以利用SASL来判断用户是否有权使用转发服务,或是辨认谁在使用你的服务器。      SASL提供了一个通用的方法为基于连接的协议增加验证支持,而XMPP使用了一…

关于SASL的介绍文档

http://docs.sun.com/app/docs/doc/819-7056/6n91eac4q?lzh&aview 简单验证安全层 (Simple Authentication Security Layer, SASL) 介绍 SASL 为应用程序和共享库的开发者提供了用于验证、数据完整性检查和加密的机制。开发者可通过 SASL 对通用 API 进行编码。此方法避免…

文献管理与信息分析期末考试答案

可以在考试界面按ctrlf,对照着下图自己敲关键字搜索相应题目,感觉这样更方便点,也不会漏下题目。(仅供参考)

使用NoteExpress做文献管理

NoteExpress 是北京爱琴海软件公司开发的一款专业级别的文献检索与管理系统,具备文献信息检索与下载功能,可以用来管理参考文献的题录,以附件方式管理参考文献全文或者任何格式的文件、文档。 除此以为还有EndNote等其他文献管理软件&#x…

Zotero——一款文献管理工具

1.简介:Zotero是一款开源的文献管理工具,可以提供文献管理、浏览等众多服务,可以极大地为我们的科研和论文写作提供便利。 2.下载与安装 官方网站:Zotero | Your personal research assistant 我们可以直接在其官网上下载该软件…

使用 bibtex 进行参考文献管理

原  文:Bibliography management with bibtex 译  者:Xovee 翻译时间:2020年11月9日 使用 bibtex 进行参考文献管理 LaTeX 中直接支持参考文献的管理。本篇文章介绍如何使用thebibliography环境和BibTeX系统来管理参考文献。 注意&…

参考文献管理

一年以后latex和word都积累了一些经验再看这篇博客,发现有些小错误和语焉不详处,小修小改了一波。 处理参考文献真的是一件说复杂也很复杂,说简单也很简单的事,关键看能不能掌握门(tao)路,这里只…

Zotero文献管理软件入门使用方法:软件下载、文献导入、引文插入

本文介绍文献管理软件Zotero的基础使用方法,包括软件下载与安装、文献与PDF导入、在Word中插入参考文献等的方法。 在EndNote文献输出引用格式自定义修改与编辑界面解读(https://blog.csdn.net/zhebushibiaoshifu/article/details/115221112)…

学习笔记:MOOC 文献管理与信息分析

学习笔记:MOOC 文献管理与信息分析 文章目录 学习笔记:MOOC 文献管理与信息分析前言本科硕士博士的差异科研的特性读研的意义学习策略 学习与搜索两种类型的知识什么是需求?搜商基本检索及逻辑关系(AND OR NOT高级搜索命令检索图片…

今天【分享】一个好用的文献管理软件——Zetero

文章目录 一、下载与安装二、基础设置 一、下载与安装 首先去Zetero的官网下载即可,该软件不大,直接在百度搜索下载即可,这里不再放其网址。 下载之后,直接解压 点击安装即可。 下载完进去,是这种页面显示&#xff…

文献管理软件Zotero的安装和使用

文章目录 前言一、Zotero简介二、安装与使用1、账号注册2、软件安装3、插件安装4、关联账户设置5、坚果云扩充(WebDAV)6、保存路径设置7、与Connected Papers联动8、参考文献的引用 前言 随着阅读文献数量的增加,感觉一个好用的文献管理工具…

文献管理与信息分析

文献管理与信息分析_罗昭锋 一、搜索引擎使用1.谷歌百度的高级搜索功能2.命令搜索3.图片搜索4.语音搜索5.本地搜索工具Everything 二、RSS-同步世界最新资讯1.RSS的意义及使用2.订阅科技文献 三、为知笔记和思维导图1.为知笔记2.思维导图3.快速学习某个主题知识的步骤 四、十大…