Kafka3.0 SASL安全认证

article/2025/10/20 14:06:24

下面主要介绍Kafka两种认证方式

kafka验证方式:

  • SASL/PLAIN:不能动态添加用户配置文件写死账号密码

  • SASL/SCRAM: 可以动态的添加用户

SASL/PLAIN方式

cd /usr/local/kafka/kafka_2.12-3.0.1/bin/
## 复制一份saslcp kafka-server-start.sh kafka-server-start-sasl.sh 

在kafka-server-start-sasl.sh 末尾修改配置

exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/usr/local/kafka/kafka_2.12-3.0.1/config/kafka-server-jaas.conf kafka.Kafka "$@"

或者在环境变量vim /etc/profile末尾增加如下配置:

if [ "x$KAFKA_OPTS" ]; thenexport KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka/kafka_2.12-3.0.1/config/kafka-client-jaas.conf"
fi

config目录下增加kafka_server_jaas.conf文件

touch kafka-server-jaas.confKafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="admin"password="admin"user_admin="admin"user_rex="123456"user_alice="123456"user_lucy="123456";
};

进入config目录将server.properties 复制一份改为server-sasl.properties 并添加如下配置:

listeners=SASL_PLAINTEXT://localhost:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.authorizer.AclAuthorizer ## 如果kafka是3.0一下的配置authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer 因为kafka3.0开始已经移除了SimpleAclAuthorizer改用AclAuthorizer 如果还是配置SimpleAclAuthorizer 启动时会报ClassNotFoundException
super.users=User:admin

使用sasl认证启动kafka

./bin/kafka-server-start-sasl.sh -daemon config/server-sasl.properties

SASL/SCRAM方式

创建kafka用户

bin/kafka-configs.sh --zookeeper 127.0.0.1:2181/kafka --alter --add-config 'SCRAM-SHA-256=[password=admin],SCRAM-SHA-512=[password=admin]' --entity-type users --entity-name admin # 系统用户bin/kafka-configs.sh --zookeeper 127.0.0.1:2181/kafka --alter --add-config 'SCRAM-SHA-256=[password=chan_test],SCRAM-SHA-512=[password=chan_test]' --entity-type users --entity-name chan # 测试用户

可以看到在zk上已经创建了对应的用户信息,并且对密码做了加密

在kafka的config目录下创建jaas文件

 touch kafka_server_jaas_scram.confvim kafka_server_jaas_scram.confKafkaServer{
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin";
} 
# 同时启用SCRAM和PLAIN机制
sasl.enabled.mechanisms=SCRAM-SHA-256,PLAIN
# 为broker间通讯开启SCRAM机制,采用SCRAM-SHA-512算法
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
# broker间通讯使用PLAINTEXT
security.inter.broker.protocol=SASL_PLAINTEXT
# 配置listeners使用SASL_PLAINTEXT
listeners=SASL_PLAINTEXT://:9092
# 配置advertised.listeners
advertised.listeners=SASL_PLAINTEXT://127.0.0.1:9092#ACL配置
allow.everyone.if.no.acl.found=false
# 系统用户,多个分号隔开
super.users=User:admin;
authorizer.class.name=kafka.security.authorizer.AclAuthorizer

完整配置

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.# see kafka.server.KafkaConfig for additional details and defaults############################# Server Basics ############################## The id of the broker. This must be set to a unique integer for each broker.
broker.id=0############################# Socket Server Settings ############################## The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092# Hostname and port the broker will advertise to producers and consumers. If not set, 
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600############################# Log Basics ############################## A comma separated list of directories under which to store log files
log.dirs=/usr/local/kafka/data/kafka-logs# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1############################# Log Flush Policy ############################## Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000############################# Log Retention Policy ############################## The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000############################# Zookeeper ############################## Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=127.0.0.1:2181/kafka# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000############################# Group Coordinator Settings ############################## The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0# 同时启用SCRAM和PLAIN机制
sasl.enabled.mechanisms=SCRAM-SHA-256,PLAIN
# 为broker间通讯开启SCRAM机制,采用SCRAM-SHA-512算法
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
# broker间通讯使用PLAINTEXT
security.inter.broker.protocol=SASL_PLAINTEXT
# 配置listeners使用SASL_PLAINTEXT
listeners=SASL_PLAINTEXT://:9092
# 配置advertised.listeners
advertised.listeners=SASL_PLAINTEXT://127.0.0.1:9092#ACL配置
allow.everyone.if.no.acl.found=false
# 系统用户,多个分号隔开
super.users=User:admin;
# 如果kafka小于3.x版本的 这边配置需要改成kafka.security.auth.SimpleAclAuthorizer
authorizer.class.name=kafka.security.authorizer.AclAuthorizer

拷贝一份启动脚本重命名为:kafka-server-start-scram.sh 修改kafka启动脚本 注释最后一行并添加如下配置

#exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/usr/local/kafka/kafka_2.12-3.0.1/config/kafka_server_jaas_scram.conf kafka.Kafka "$@"

启动kafka

./kafka-server-start-scram.sh -daemon ../config/server-scram.properties 

注意:kafka连接的zk地址需要跟创建用户的zk节点一样,否则启动kafka会报认证失败<!--failed authentication due to: Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-256-->

比如:server.properties 配置连接的zk是zookeeper.connect=127.0.0.1:2181/kafka ,这时创建用户的zk地址要bin/kafka-configs.sh --zookeeper 127.0.0.1:2181/kafka才可以 如果zk没有kafka节点 需要自己到zk上新建一个。

 启动生产者和消费者

由于broker使用安全认证的方式启动,所以开启生产者和消费者也需要经过客户端认证。

  • 在config目录下新建 producer.conf并添加以下配置

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin";

  使用配置启动生产者

[root@hub kafka_2.12-3.0.1]# bin/kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic test --producer.config config/producer.conf
>1
>2
>3
>

使用chan用户开启生产者

在config目录新建producer-chan.conf并添加以下配置

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="chan" password="chan_test";

使用chan开启生产者,可以看到test这个topic是没有对chan这个用户做认证的,所以chan对这个用户没办法进行生产消息,这时候就需要进行授权。

[root@hub kafka_2.12-3.0.1]# bin/kafka-console-producer.sh --bootstrap-server 192.168.67.142:9092 --topic test --producer.config config/producer-chan.conf
>1
[2022-04-13 20:32:43,651] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 4 : {test=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
[2022-04-13 20:32:43,654] ERROR [Producer clientId=console-producer] Topic authorization failed for topics [test] (org.apache.kafka.clients.Metadata)
[2022-04-13 20:32:43,657] ERROR Error when sending message to topic test with key: null, value: 1 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [test]

开启消费者

[root@hub config]# cat consumer.conf 
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin";
bin/kafka-console-consumer.sh --bootstrap-server 192.168.67.142:9092 --topic test --group 'test_group' --from-beginning --consumer.config config/consumer.conf

ACL授权

给chan这个用户授权,对test这个topic可以进行写操作

[root@hub kafka_2.12-3.0.1]# bin/kafka-acls.sh --authorizer-properties zookeeper.connect=127.0.0.1:2181/kafka --add --allow-principal User:chan --operation Write --topic 'test'
Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`: (principal=User:chan, host=*, operation=WRITE, permissionType=ALLOW) Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`: (principal=User:chan, host=*, operation=WRITE, permissionType=ALLOW) [root@hub kafka_2.12-3.0.1]# bin/kafka-console-producer.sh --bootstrap-server 192.168.67.142:9092 --topic test --producer.config config/producer-chan.conf
>1
>2
>3
>

可以看到chan这个用户就可以往这个topic写数据了。


http://chatgpt.dhexx.cn/article/wjBWBa0K.shtml

相关文章

集成OpenLDAP与Kerberos实现统一认证(三):基于SASL/GSSAPI深度集成

文章目录 1. 写作背景2. 既定目标3. 重要概念3.1 SASL3.2 GSSAPI3.3 SASL与GSSAPI的关系3.4 saslauthd3.5 Kerberos化 4. 核心原理4.1 基于SASL/GSSAPI实现Kerberos账号登录OpenLDAP4.2 基于olcAuthzRegexp规则映射Kerberos与OpenLDAP账号4.3 基于saslauthd进行委托认证 5. 安装…

kafka sasl java_Kafka安装及开启SASL_PLAINTEXT认证(用户名和密码认证)

前些日子要封装一个kafka的客户端驱动&#xff0c;配置了下kafka环境&#xff0c;发现配置复杂度完爆rabbitmq很多倍啊&#xff0c;而且发布订阅模式使用起来也很麻烦&#xff0c;可能就胜在分布式了吧。 kafka需要java环境&#xff0c;自行安装java sdk 1.8. 官方加载安装包&a…

go kafka 配置SASL认证及实现SASL PLAIN认证功能

用户认证功能&#xff0c;是一个成熟组件不可或缺的功能。在0.9版本以前kafka是没有用户认证模块的&#xff08;或者说只有SSL&#xff09;&#xff0c;好在kafka0.9版本以后逐渐发布了多种用户认证功能&#xff0c;弥补了这一缺陷&#xff08;这里仅介绍SASL&#xff09;。 本…

kafka集群开启sasl认证

kafka集群开启sasl认证 sasl认证 sasl 是扩展C/S模式验证能力的一种认证机制。它可以规范客户端和服务端传输应答和传输内容编码&#xff0c;简而言之sasl决定了认证的规则&#xff0c;即客户端如何存储身份证书、客户端与服务端如何校验密码都由sasl决定。当我们的客户端通过…

mysql sasl_SASL认证失败的原因(authentication failed)

SASL认证失败的原因(authentication failed) (2012-06-15 00:45:43) 标签&#xff1a; 杂谈 authentication failed) SASL认证失败的原因可分为如下几个可能的方面&#xff1a; Permission问题&#xff1a;对系统用户的SASL Auth尤其重要&#xff0c;要保证postfix用户(smtpd)对…

Kafka安全(以SASL+ACL为例)

目录 1 Security2 SASLACL实现用户及权限认证2.1 下载2.2 Kafka服务配置2.3 修改Kafka 服务启动脚本2.4 配置server.properties2.5 启动Zookeeper2.6 启动Kafka 集群2.7 ACL2.7.1 admin2.7.2 生产者2.7.3 消费者2.7.4 sharga用户2.7.5 shargb用户2.7.6 说明 2.8 生产者客户端代…

安装sasl出错

场景&#xff1a;python项目需要用到 pyhive0.6.4 pyhdfs0.2.2 thrift0.13.0 thrift_connector0.12 thrift_sasl0.3.0进项镜像构建时&#xff0c; 报错&#xff1a;error: command gcc failed with exit status 1 解决方法&#xff1a; 1.ubuntu系统&#xff1a; sudo apt-…

sasl认证原理

SASL - 简单认证和安全层 SASL是一种用来扩充C/S模式验证能力的机制认证机制, 全称Simple Authentication and Security Layer. 当你设定sasl时&#xff0c;你必须决定两件事&#xff1b;一是用于交换“标识信 息”&#xff08;或称身份证书&#xff09;的验证机制&#xff1…

kafka sasl java_Kafka 集群配置SASL+ACL

** Kafka 集群配置SASLACL 测试环境&#xff1a;** 系统: CentOS 6.5 x86_64 JDK : java version 1.8.0_121 kafka: kafka_2.11-1.0.0.tgz zookeeper: 3.4.5 ip: 192.168.49.161 (我们这里在一台机上部署整套环境) kafka 名词解析&#xff1a; Broker: Kafka 集群包含一个或多个…

kafka sasl java_Kafka SASL 安全认证

java client 中添加 SASL 设置信息: Java client consumer properties配置.png 注意 sasl.jaas.config 配置中的分号必不可少。 package kafka; import java.time.Duration; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer…

SASL讲解,以及在Spark中的应用

是什么? SASL全称Simple Authentication and Security Layer,是一种用来扩充C/S模式验证能力的机制。在Postfix可以利用SASL来判断用户是否有权使用转发服务,或是辨认谁在使用你的服务器。      SASL提供了一个通用的方法为基于连接的协议增加验证支持,而XMPP使用了一…

关于SASL的介绍文档

http://docs.sun.com/app/docs/doc/819-7056/6n91eac4q?lzh&aview 简单验证安全层 (Simple Authentication Security Layer, SASL) 介绍 SASL 为应用程序和共享库的开发者提供了用于验证、数据完整性检查和加密的机制。开发者可通过 SASL 对通用 API 进行编码。此方法避免…

文献管理与信息分析期末考试答案

可以在考试界面按ctrlf&#xff0c;对照着下图自己敲关键字搜索相应题目&#xff0c;感觉这样更方便点&#xff0c;也不会漏下题目。&#xff08;仅供参考&#xff09;

使用NoteExpress做文献管理

NoteExpress 是北京爱琴海软件公司开发的一款专业级别的文献检索与管理系统&#xff0c;具备文献信息检索与下载功能&#xff0c;可以用来管理参考文献的题录&#xff0c;以附件方式管理参考文献全文或者任何格式的文件、文档。 除此以为还有EndNote等其他文献管理软件&#x…

Zotero——一款文献管理工具

1.简介&#xff1a;Zotero是一款开源的文献管理工具&#xff0c;可以提供文献管理、浏览等众多服务&#xff0c;可以极大地为我们的科研和论文写作提供便利。 2.下载与安装 官方网站&#xff1a;Zotero | Your personal research assistant 我们可以直接在其官网上下载该软件…

使用 bibtex 进行参考文献管理

原  文&#xff1a;Bibliography management with bibtex 译  者&#xff1a;Xovee 翻译时间&#xff1a;2020年11月9日 使用 bibtex 进行参考文献管理 LaTeX 中直接支持参考文献的管理。本篇文章介绍如何使用thebibliography环境和BibTeX系统来管理参考文献。 注意&…

参考文献管理

一年以后latex和word都积累了一些经验再看这篇博客&#xff0c;发现有些小错误和语焉不详处&#xff0c;小修小改了一波。 处理参考文献真的是一件说复杂也很复杂&#xff0c;说简单也很简单的事&#xff0c;关键看能不能掌握门&#xff08;tao&#xff09;路&#xff0c;这里只…

Zotero文献管理软件入门使用方法:软件下载、文献导入、引文插入

本文介绍文献管理软件Zotero的基础使用方法&#xff0c;包括软件下载与安装、文献与PDF导入、在Word中插入参考文献等的方法。 在EndNote文献输出引用格式自定义修改与编辑界面解读&#xff08;https://blog.csdn.net/zhebushibiaoshifu/article/details/115221112&#xff09;…

学习笔记:MOOC 文献管理与信息分析

学习笔记&#xff1a;MOOC 文献管理与信息分析 文章目录 学习笔记&#xff1a;MOOC 文献管理与信息分析前言本科硕士博士的差异科研的特性读研的意义学习策略 学习与搜索两种类型的知识什么是需求&#xff1f;搜商基本检索及逻辑关系&#xff08;AND OR NOT高级搜索命令检索图片…

今天【分享】一个好用的文献管理软件——Zetero

文章目录 一、下载与安装二、基础设置 一、下载与安装 首先去Zetero的官网下载即可&#xff0c;该软件不大&#xff0c;直接在百度搜索下载即可&#xff0c;这里不再放其网址。 下载之后&#xff0c;直接解压 点击安装即可。 下载完进去&#xff0c;是这种页面显示&#xff…