go kafka 配置SASL认证及实现SASL PLAIN认证功能

article/2025/10/20 14:21:51

用户认证功能,是一个成熟组件不可或缺的功能。在0.9版本以前kafka是没有用户认证模块的(或者说只有SSL),好在kafka0.9版本以后逐渐发布了多种用户认证功能,弥补了这一缺陷(这里仅介绍SASL)。

本篇会介绍部署SASL/PLAIN认证功能的流程。最后再介绍对SASL/PLAIN功能进行二次开发。

kafka 2.x用户认证方式小结

需要先明确的一点是,用户认证和权限控制是两码事。用户认证是确认这个用户能否访问当前的系统,而权限控制是控制用户对当前系统中各种资源的访问权限。用户认证就是今天要讲的内容,而kafka的权限控制,则是对应bin/kafka-acls.sh工具所提供的一系列功能,这里不详细展开。

标题特地说明kafka2.x是因为kafka2.0的时候推出一种新的用户认证方式,SASL/OAUTHBEARER,在此前的版本是不存在这个东西的。那么加上这个之后,kafka目前共有4种常见的认证方式。

  • SASL/GSSAPI(kerberos):kafka0.9版本推出,即借助kerberos实现用户认证,如果公司恰好有kerberos环境,那么用这个是比较合适的。
  • SASL/PLAIN:kafka0.10推出,非常简单,简单得有些鸡肋,不建议生产环境使用,除非对这个功能二次开发,这也是我后面要讲的。
  • SASL/SCRAM:kafka0.10推出,全名Salted Challenge Response Authentication Mechanism,为解决SASL/PLAIN的不足而生,缺点可能是某些客户端并不支持这种方式认证登陆(使用比较复杂)。
  • SASL/OAUTHBEARER:kafka2.0推出,实现较为复杂,目前业内应该较少实践。

其实除了上述四种用户认证功能之外,还有一个叫Delegation Token的东西。这个东西说一个轻量级的工具,是对现有SASL的一个补充,能够提高用户认证的性能(主要针对Kerberos的认证方式)。算是比较高级的用法,一般也用不到,所以也不会多介绍,有兴趣可以看这里Authentication using Delegation Tokens。

SASL/GSSAPI

如果已经有kerberos的环境,那么会比较适合使用这种方式,只需要让管理员分配好principal和对应的keytab,然后在配置中添加对应的选项就可以了。需要注意的是,一般采用这种方案的话,zookeeper也需要配置kerberos认证。

SASL/PLAIN

这种方式其实就是一个用户名/密码的认证方式,不过它有很多缺陷,比如用户名密码是存储在文件中,不能动态添加,明文等等!这些特性决定了它比较鸡肋,但好处是足够简单,这使得我们可以方便地对它进行二次开发。本篇文章后续会介绍SASL/PLAIN的部署方式和二次开发的例子(基于kafka2.x)。

SASL/SCRAM

针对PLAIN方式的不足而提供的另一种认证方式。这种方式的用户名/密码是存储中zookeeper的,因此能够支持动态添加用户。该种认证方式还会使用sha256或sha512对密码加密,安全性相对会高一些。

而且配置起来和SASL/PLAIN差不多同样简单,添加用户/密码的命令官网也有提供,个人比较推荐使用这种方式。不过有些客户端是不支持这个方式认证登陆的,比如python的kafka客户端,这点需要提前调研好。

具体的部署方法官网或网上有很多,这里不多介绍,贴下官网的Authentication using SASL/SCRAM。

SASL/OAUTHBEARER

SASL/OAUTHBEARER是基于OAUTH2.0的一个新的认证框架,这里先说下什么是OAUTH吧,引用维基百科。

OAuth是一个开放标准,允许用户让第三方应用访问该用户在某一网站上存储的私密的资源(如照片,视频,联系人列表),而无需将用户名和密码提供给第三方应用。而 OAUTH2.0算是OAUTH的一个加强版。

说白了,SASL/OAUTHBEARER就是一套让用户使用第三方认证工具认证的标准,通常是需要自己实现一些token认证和创建的接口,所以会比较繁琐。

详情可以通过这个kip了解KIP-255

说了这么多,接下来就说实战了,先介绍下如何配置SASL/PLAIN。

SASL/PLAIN实例(配置及客户端)

Kafka添加SASL_PLAIN安全认证

1,配置修改

 2,添加kafka_server_jaas.conf文件 server端的认证文件,放置在/mnt/hdb/ops-ng/kafka/config/中。

 KafkaServer {

            org.apache.kafka.common.security.plain.PlainLoginModule required

            username="admin"

            password="admin-secret"

            user_admin="admin-secret"

            user_alice="alice-secret";

        };

内容解释:配置文件命名为:kafka_server_jaas.conf,放置在/mnt/hdb/ops-ng/kafka/config/。

使用user_来定义多个用户,供客户端程序(生产者、消费者程序)认证使用,可以定义多个。

上例我定义了两个用户,一个是admin,一个是alice,等号后面是对应用户的密码(如user_admin定义了用户名为admin,密码为admin-secret的用户)。

官方说明:

 大概意思是:username="admin"和password="admin-secret"是代理之间使用的用户名和密码,即多个kafka集群使用的用户名和密码,而user_userName则是连接端使用的用户名密码。

3,创建client认证文件kafka_client_jaas.conf,此文件是后面console的生产者和消费者使用,放置在/mnt/hdb/ops-ng/kafka/config/中。(可选,如果是程序是生产者或者消费者,可以不用配置)

 KafkaClient {

org.apache.kafka.common.security.plain.PlainLoginModule required

    username="alice"

    password="alice-secret";

};

4,修改启动脚本

vi bin/kafka-server-start.sh

添加一行:export KAFKA_OPTS="-Djava.security.auth.login.config=/mnt/hdb/ops-ng/kafka/config/kafka_server_jaas.conf"

 5,console 控制台生产、消费相关配置 (可选)

  • 修改/mnt/hdb/ops-ng/kafka/config/producer.properties,在配置最后加入以下两行内容:

security.protocol=SASL_PLAINTEXT

sasl.mechanism=PLAIN

  • 修改/mnt/hdb/ops-ng/kafka/config/consumer.properties,要添加的内容和producer的内容一样:

security.protocol=SASL_PLAINTEXT

sasl.mechanism=PLAIN

  • 添加kafka-console-producer.sh认证文件路径,后面启动生产者测试时使用:

[root@kafka1 ~]# cat /mnt/hdb/ops-ng/kafka/bin/kafka-console-producer.sh

export KAFKA_OPTS="-Djava.security.auth.login.config=/mnt/hdb/ops-ng/kafka/config/kafka_client_jaas.conf"

 

 

控制台生产者命令:

bin/kafka-console-producer.sh --broker-list 192.168.1.20:9092 --topic read

--producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN

  • 添加kafka-console-consumer.sh认证文件路径,后面启动消费者测试时使用:

[root@kafka1 ~]# cat /mnt/hdb/ops-ng/kafka/bin/kafka-console-consumer.sh

export KAFKA_OPTS="-Djava.security.auth.login.config=/mnt/hdb/ops-ng/kafka/config/kafka_client_jaas.conf"

 

控制台消费者命令:

bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.203:9092 --topic test-topicaaa --from-beginning --consumer-property security.protocol=SASL_PLAINTEXT --consumer-property sasl.mechanism=PLAIN

一般能够发送数据就说明部署完成了~

代码实现SASL/PLAIN认证

github.com/segmentio/kafka-go 消费数据 未指定组
package kafkaimport ("context""github.com/segmentio/kafka-go""github.com/segmentio/kafka-go/sasl""github.com/segmentio/kafka-go/sasl/plain""github.com/segmentio/kafka-go/sasl/scram"ktesting "github.com/segmentio/kafka-go/testing""github.com/wonderivan/logger""testing""time"
)const (saslTestConnect = "172.19.1.103:9092" // connect to sasl listenersaslTestTopic   = "test-topic"         // this topic is guaranteed to exist.username        = "alice"password        = "alice-secret"version         = "0.10.2.0"
)func Test_SASL_Read(t *testing.T) {//建立连接d := &kafka.Dialer{SASLMechanism: plain.Mechanism{Username: username,Password: password,},}//消费数据Readers := make([]*kafka.Reader, 0)for _, topic := range []string{saslTestTopic} {rc := kafka.ReaderConfig{Brokers:   []string{saslTestConnect},Topic:     topic,Partition: 0,MinBytes:  10e3, // 10KBMaxBytes:  10e6, // 10MB}rc.Dialer = dreader := kafka.NewReader(rc)//  beginning设置 通过获取当前log值指定消费位置//if !consumer.beginning {// lag, _ := reader.ReadLag(context.Background())// reader.SetOffset(lag) //从当前部开始读取//}Readers = append(Readers, reader)}for {for _, reader := range Readers {m, err := reader.ReadMessage(context.Background())if err == nil {logger.Info(m.Value)} else {logger.Error("read kafka error:%v", err)//lag, _ := reader.ReadLag(context.Background())//logger.Error("read %d  close:lag:%d", idx, lag)reader.Close()}}}}
github.com/Shopify/sarama 实现组消费
package kafkaimport ("fmt""github.com/Shopify/sarama"cluster "github.com/bsm/sarama-cluster""github.com/wonderivan/logger""testing""time"
)func Test_SASL_Group(t *testing.T) {config := cluster.NewConfig()config.Group.Return.Notifications = trueconfig.Net.ReadTimeout = 10 * time.Second //time.Millisecondconfig.Net.SASL.Enable = trueconfig.Net.SASL.User = usernameconfig.Net.SASL.Password = passwordconfig.Net.SASL.Version = sarama.SASLHandshakeV1 //version//SASLHandshakeV1config.Consumer.Offsets.CommitInterval = 1 * time.Secondconfig.Consumer.Offsets.Initial = sarama.OffsetNewest //初始从最新的offset开始/*//  beginning设置 通过获取当前log值指定消费位置if consumer.beginning {config.Consumer.Offsets.Initial = sarama.OffsetOldest}*/c, err := cluster.NewConsumer([]string{saslTestConnect}, "filebeat01", []string{"testtopic"}, config)if err != nil {logger.Info("连接失败:\n %v", err)return} else {logger.Info("连接成功")}defer c.Close()//接受错误消息go func(c *cluster.Consumer) {errors := c.Errors()noti := c.Notifications()for {select {case <-errors:case <-noti:}}}(c)for m := range c.Messages() {fmt.Println("消费:", m.Value)c.MarkOffset(m, "") //MarkOffset 并不是实时写入kafka,有可能在程序crash时丢掉未提交的offset}}

 

github.com/segmentio/kafka-go 生产数据
package channelimport ("context""fs.com/ezlogic/utils""github.com/segmentio/kafka-go/sasl/plain""sync""time""github.com/segmentio/kafka-go""github.com/wonderivan/logger"
)type KafkaProducer struct {writer *kafka.Writer
}type Producers struct {Write sync.Map //make(map[string]*channel.KafkaProducer)
}func NewProducer(topic, username, password string, brokers []string) *KafkaProducer {defer utils.DeferFunc("NewProducer", nil)var (conn   *kafka.Conndialer *kafka.Dialererr    error)if username == "" {conn, err = kafka.DialLeader(context.Background(), "tcp", brokers[0], topic, 0)} else {//sasldialer = &kafka.Dialer{SASLMechanism: plain.Mechanism{Username: username,Password: password},}// 读topic,如果topic不存在,则创建topic,因此后续可以正常写conn, err = dialer.DialLeader(context.Background(), "tcp", brokers[0], topic, 0)}if err != nil {logger.Painc("NewProducer err:", topic, brokers, "====", err)}conn.ReadPartitions(topic)conn.SetWriteDeadline(time.Now().Add(10 * time.Second))conn.Close()kp := &KafkaProducer{writer: kafka.NewWriter(kafka.WriterConfig{Brokers:  brokers,Dialer:   dialer,Topic:    topic,Balancer: &kafka.LeastBytes{},Async:    true,//0613 提高写性能BatchTimeout: 100 * time.Millisecond,BatchSize:    10000,}),}return kp
}func (producer *KafkaProducer) Write(msg []byte) {if err := producer.writer.WriteMessages(context.Background(), kafka.Message{Value: msg,}); err != nil {logger.Error("producer write error:%v", err)}
}

http://chatgpt.dhexx.cn/article/MBJwhCIu.shtml

相关文章

kafka集群开启sasl认证

kafka集群开启sasl认证 sasl认证 sasl 是扩展C/S模式验证能力的一种认证机制。它可以规范客户端和服务端传输应答和传输内容编码&#xff0c;简而言之sasl决定了认证的规则&#xff0c;即客户端如何存储身份证书、客户端与服务端如何校验密码都由sasl决定。当我们的客户端通过…

mysql sasl_SASL认证失败的原因(authentication failed)

SASL认证失败的原因(authentication failed) (2012-06-15 00:45:43) 标签&#xff1a; 杂谈 authentication failed) SASL认证失败的原因可分为如下几个可能的方面&#xff1a; Permission问题&#xff1a;对系统用户的SASL Auth尤其重要&#xff0c;要保证postfix用户(smtpd)对…

Kafka安全(以SASL+ACL为例)

目录 1 Security2 SASLACL实现用户及权限认证2.1 下载2.2 Kafka服务配置2.3 修改Kafka 服务启动脚本2.4 配置server.properties2.5 启动Zookeeper2.6 启动Kafka 集群2.7 ACL2.7.1 admin2.7.2 生产者2.7.3 消费者2.7.4 sharga用户2.7.5 shargb用户2.7.6 说明 2.8 生产者客户端代…

安装sasl出错

场景&#xff1a;python项目需要用到 pyhive0.6.4 pyhdfs0.2.2 thrift0.13.0 thrift_connector0.12 thrift_sasl0.3.0进项镜像构建时&#xff0c; 报错&#xff1a;error: command gcc failed with exit status 1 解决方法&#xff1a; 1.ubuntu系统&#xff1a; sudo apt-…

sasl认证原理

SASL - 简单认证和安全层 SASL是一种用来扩充C/S模式验证能力的机制认证机制, 全称Simple Authentication and Security Layer. 当你设定sasl时&#xff0c;你必须决定两件事&#xff1b;一是用于交换“标识信 息”&#xff08;或称身份证书&#xff09;的验证机制&#xff1…

kafka sasl java_Kafka 集群配置SASL+ACL

** Kafka 集群配置SASLACL 测试环境&#xff1a;** 系统: CentOS 6.5 x86_64 JDK : java version 1.8.0_121 kafka: kafka_2.11-1.0.0.tgz zookeeper: 3.4.5 ip: 192.168.49.161 (我们这里在一台机上部署整套环境) kafka 名词解析&#xff1a; Broker: Kafka 集群包含一个或多个…

kafka sasl java_Kafka SASL 安全认证

java client 中添加 SASL 设置信息: Java client consumer properties配置.png 注意 sasl.jaas.config 配置中的分号必不可少。 package kafka; import java.time.Duration; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer…

SASL讲解,以及在Spark中的应用

是什么? SASL全称Simple Authentication and Security Layer,是一种用来扩充C/S模式验证能力的机制。在Postfix可以利用SASL来判断用户是否有权使用转发服务,或是辨认谁在使用你的服务器。      SASL提供了一个通用的方法为基于连接的协议增加验证支持,而XMPP使用了一…

关于SASL的介绍文档

http://docs.sun.com/app/docs/doc/819-7056/6n91eac4q?lzh&aview 简单验证安全层 (Simple Authentication Security Layer, SASL) 介绍 SASL 为应用程序和共享库的开发者提供了用于验证、数据完整性检查和加密的机制。开发者可通过 SASL 对通用 API 进行编码。此方法避免…

文献管理与信息分析期末考试答案

可以在考试界面按ctrlf&#xff0c;对照着下图自己敲关键字搜索相应题目&#xff0c;感觉这样更方便点&#xff0c;也不会漏下题目。&#xff08;仅供参考&#xff09;

使用NoteExpress做文献管理

NoteExpress 是北京爱琴海软件公司开发的一款专业级别的文献检索与管理系统&#xff0c;具备文献信息检索与下载功能&#xff0c;可以用来管理参考文献的题录&#xff0c;以附件方式管理参考文献全文或者任何格式的文件、文档。 除此以为还有EndNote等其他文献管理软件&#x…

Zotero——一款文献管理工具

1.简介&#xff1a;Zotero是一款开源的文献管理工具&#xff0c;可以提供文献管理、浏览等众多服务&#xff0c;可以极大地为我们的科研和论文写作提供便利。 2.下载与安装 官方网站&#xff1a;Zotero | Your personal research assistant 我们可以直接在其官网上下载该软件…

使用 bibtex 进行参考文献管理

原  文&#xff1a;Bibliography management with bibtex 译  者&#xff1a;Xovee 翻译时间&#xff1a;2020年11月9日 使用 bibtex 进行参考文献管理 LaTeX 中直接支持参考文献的管理。本篇文章介绍如何使用thebibliography环境和BibTeX系统来管理参考文献。 注意&…

参考文献管理

一年以后latex和word都积累了一些经验再看这篇博客&#xff0c;发现有些小错误和语焉不详处&#xff0c;小修小改了一波。 处理参考文献真的是一件说复杂也很复杂&#xff0c;说简单也很简单的事&#xff0c;关键看能不能掌握门&#xff08;tao&#xff09;路&#xff0c;这里只…

Zotero文献管理软件入门使用方法:软件下载、文献导入、引文插入

本文介绍文献管理软件Zotero的基础使用方法&#xff0c;包括软件下载与安装、文献与PDF导入、在Word中插入参考文献等的方法。 在EndNote文献输出引用格式自定义修改与编辑界面解读&#xff08;https://blog.csdn.net/zhebushibiaoshifu/article/details/115221112&#xff09;…

学习笔记:MOOC 文献管理与信息分析

学习笔记&#xff1a;MOOC 文献管理与信息分析 文章目录 学习笔记&#xff1a;MOOC 文献管理与信息分析前言本科硕士博士的差异科研的特性读研的意义学习策略 学习与搜索两种类型的知识什么是需求&#xff1f;搜商基本检索及逻辑关系&#xff08;AND OR NOT高级搜索命令检索图片…

今天【分享】一个好用的文献管理软件——Zetero

文章目录 一、下载与安装二、基础设置 一、下载与安装 首先去Zetero的官网下载即可&#xff0c;该软件不大&#xff0c;直接在百度搜索下载即可&#xff0c;这里不再放其网址。 下载之后&#xff0c;直接解压 点击安装即可。 下载完进去&#xff0c;是这种页面显示&#xff…

文献管理软件Zotero的安装和使用

文章目录 前言一、Zotero简介二、安装与使用1、账号注册2、软件安装3、插件安装4、关联账户设置5、坚果云扩充&#xff08;WebDAV&#xff09;6、保存路径设置7、与Connected Papers联动8、参考文献的引用 前言 随着阅读文献数量的增加&#xff0c;感觉一个好用的文献管理工具…

文献管理与信息分析

文献管理与信息分析_罗昭锋 一、搜索引擎使用1.谷歌百度的高级搜索功能2.命令搜索3.图片搜索4.语音搜索5.本地搜索工具Everything 二、RSS-同步世界最新资讯1.RSS的意义及使用2.订阅科技文献 三、为知笔记和思维导图1.为知笔记2.思维导图3.快速学习某个主题知识的步骤 四、十大…

Zotero参考文献管理

Zotero 参考文献管理 简介 Zotero 作为一个开源免费的优秀文献管理工具&#xff0c;在写论文中时进行参考文献插入也是非常方便的&#xff0c;本文介绍如何利用 Zotero 的 Word 插件高效进行参考文献管理。 准备工作 确保自己的电脑上安装了 Word、Zotero 以及 Zotero Word…