当前位置:首页 > 服务端 > Kafka SASL/PLANTEXT ACL配置

Kafka SASL/PLANTEXT ACL配置

2022年09月16日 09:09:12服务端7

配置步骤

1. 在server.properties中添加

listeners=SASL_PLAINTEXT://localhost:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:admin

说明:

配置上 authorizer.class.name 即开启ACL权限认证,可以通过配置来限定某个账号的操作权限。

User:admin 为超级账号,拥有所有权限。

2. 在config目录下新建文件
kafka-zk-jaas.conf

zookeeper {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="admin"
        password="admin-pwd";
};

说明:username 为kafka broker在zk中注册使用的账号密码,通常使用超级账号。

kafka-server-jaas.conf

KafkaServer {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="admin"
        password="admin-pwd"
        user_admin="admin-pwd"
        user_broker0="broker0-pwd";
};

说明:

username 为注册到zk时使用的账号密码。

user_admin="admin-pwd" 设定存在账号 admin,密码为 admin-pwd。此处admin密码必须与zk中配置的admin账号密码相同

user_broker0="broker0-pwd" 设定存在账号 broker0,密码为 broker0-pwd。客户端可以使用此账号进行连接,并通过ACL配置来限制该账号的操作权限。

kafka-client-jaas.conf

KafkaClient {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="broker0"
        password="broker0-pwd";
};

3. 在zookeeper, kafka和consumer、producer 的启动脚本中添加如下脚本(在最后一行之前添加)

zookeeper-server-start.sh

export KAFKA_OPTS=" -Djava.security.auth.login.config=/home/******/kafka_2.13-2.5.0/config/kafka-zk-jaas.conf"

kafka-server-start.sh

export KAFKA_OPTS=" -Djava.security.auth.login.config=/home/*******/kafka_2.13-2.5.0/config/kafka-server-jaas.conf"

 kafka-console-consumer.sh

 kafka-console-producer.sh

export KAFKA_OPTS="-Djava.security.auth.login.config=/home/*******/kafka_2.13-2.5.0/config/kafka-client-jaas.conf"

4. 启动zk和kafka

./zookeeper-server-start.sh ../config/zookeeper.properties
./kafka-server-start.sh ../config/server.properties

5. 授权

给broker0账户赋予topic=[topic_name]的写权限

./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=127.0.0.1:2181 --add --allow-principal User:broker0 --operation Write --topic topic_name

给broker0账户赋予topic=[topic_name]的读权限

./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=127.0.0.1:2181 --add --allow-principal User:broker0 --operation Read --topic topic_name

修改consumer.properties中的group.id (默认为 test-consumer-group) 

# consumer group id
group.id=group_id

 给broker0账户赋予group=[group_id]读授权 (外部连接kafka需要指定group.id,此时必须对group进行授权,否则无法通过授权)

./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=127.0.0.1:2181 --add --allow-principal User:broker0 --operation Read --group group_id

查看权限

./kafka-acls.sh --list --authorizer-properties zookeeper.connect=127.0.0.1:2181
Current ACLs for resource `ResourcePattern(resourceType=GROUP, name=group_name, patternType=LITERAL)`:
        (principal=User:broker0, host=*, operation=READ, permissionType=ALLOW)

Current ACLs for resource `ResourcePattern(resourceType=GROUP, name=group_id, patternType=LITERAL)`:
        (principal=User:broker0, host=*, operation=READ, permissionType=ALLOW)

Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=topic_name, patternType=LITERAL)`:
        (principal=User:broker0, host=*, operation=READ, permissionType=ALLOW)
        (principal=User:broker0, host=*, operation=WRITE, permissionType=ALLOW)

 ACL命令操作(--operation)对应功能:

Operation Resource API
ALTER Topic CreatePartitions
ALTER_CONFIGS Topic AlterConfigs
CREATE Topic Metadata if auto.create.topics.enable
CREATE Topic CreateTopics
DELETE Topic DeleteRecords
DELETE Topic DeleteTopics
DESCRIBE Topic ListOffsets
DESCRIBE Topic Metadata
DESCRIBE Topic OffsetFetch
DESCRIBE Topic OffsetForLeaderEpoch
DESCRIBE_CONFIGS Topic DescribeConfigs
READ Topic Fetch
READ Topic OffsetCommit
READ Topic TxnOffsetCommit
WRITE Topic Produce
WRITE Topic AddPartitionsToTxn
Operation Resource API
DELETE Group DeleteGroups
DESCRIBE Group DescribeGroup
DESCRIBE Group FindCoordinator
DESCRIBE Group ListGroups
READ Group AddOffsetsToTxn
READ Group Heartbeat
READ Group JoinGroup
READ Group LeaveGroup
READ Group OffsetCommit
READ Group OffsetFetch
READ Group SyncGroup
READ Group TxnOffsetCommit

6. 启动生产者和消费者

./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic topic_name --from-beginning --consumer.config ../config/consumer.properties
./kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic topic_name --producer.config ../config/producer.properties

7. 连接成功Kafka SASL/PLANTEXT ACL配置 _ JavaClub全栈架构师技术笔记

Springboot-kafka SASL/PLANTEXT 配置

yaml配置

spring:
  kafka:    
    properties:
      sasl.mechanism: PLAIN
      security.protocol: SASL_PLAINTEXT
      sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="broker0" password="broker0-pwd";

该效果等同于在代码中设置系统配置

System.setProperty("java.security.auth.login.config", "/home/*******/kafka_2.13-2.5.0/config/kafka-client-jaas.conf");

 Spring-kafka加载配置逻辑:

首先由Springboot加载application.yaml(或application.properties)可识别的配置,不可识别的配置均无效,如下图所示,IDE提示不可识别的配置springboot都不会加载。

Kafka SASL/PLANTEXT ACL配置 _ JavaClub全栈架构师技术笔记

然后在检测到 security.protocol: SASL_PLAINTEXT 之后再读取 sasl.jaas.config 配置,若配置表中不存在则使用

System.getProperty("java.security.auth.login.config");

加载配置。

PS:consumer和producer可以统一配置也可以单独配置(Kafka stream SASL/PLANTEXT配置也一样,这块网上资料比较少,所以特别提一下,统一配置对stream同样生效)

单独配置

spring:
  kafka:
    consumer:
      properties:
        sasl.mechanism: PLAIN
        security.protocol: SASL_PLAINTEXT
        sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="broker0" password="broker0-pwd";


spring:
  kafka:
    producer:
      properties:
        sasl.mechanism: PLAIN
        security.protocol: SASL_PLAINTEXT
        sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="broker0" password="broker0-pwd";


spring:
  kafka:
    streams:
      properties:
        sasl.mechanism: PLAIN
        security.protocol: SASL_PLAINTEXT
        sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="broker0" password="broker0-pwd";

异常分析

1. Windows系统启动Kafka报错异常,ERROR Shutdown broker because all log dirs in /tmp/kafka-logs have failed 

分析:无

方案:删除 /tmp/kafka-logs(Win10内置ubuntu) 文件夹内所有文件

2. ERROR [KafkaServer id=0] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
java.lang.IllegalArgumentException: Could not find a 'KafkaServer' or 'sasl_plaintext.KafkaServer' entry in the JAAS configuration. System property 'java.security.auth.login.config' is /home/******/kafka_2.13-2.5.0/config/kafka-server-jaas.conf 

[2020-07-09 17:03:05,360] ERROR [KafkaServer id=0] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
java.lang.IllegalArgumentException: Could not find a 'KafkaServer' or 'sasl_plaintext.KafkaServer' entry in the JAAS configuration. System property 'java.security.auth.login.config' is /home/******/kafka_2.13-2.5.0/config/kafka-server-jaas.conf
        at org.apache.kafka.common.security.JaasContext.defaultContext(JaasContext.java:133)
        at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:98)
        at org.apache.kafka.common.security.JaasContext.loadServerContext(JaasContext.java:70)
        at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:131)
        at org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:97)
        at kafka.network.Processor.<init>(SocketServer.scala:724)
        at kafka.network.SocketServer.newProcessor(SocketServer.scala:367)
        at kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:252)
        at kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:251)
        at kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:214)
        at kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:211)
        at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:553)
        at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:551)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:921)
        at kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:211)
        at kafka.network.SocketServer.startup(SocketServer.scala:122)
        at kafka.server.KafkaServer.startup(KafkaServer.scala:266)
        at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
        at kafka.Kafka$.main(Kafka.scala:82)
        at kafka.Kafka.main(Kafka.scala)

分析:kafka-server-jaas.conf 配置文件找不到,或者配置的 KafkaServer元素找不到

方案:确认kafka-server.start.sh配置的路径地址正确,确认 kafka-server-jaas.conf 配置文件中存在 KafkaServer 元素,区分大小写

3. INFO [SocketServer brokerId=0] Failed authentication with /127.0.0.1 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)

[2020-07-09 17:29:49,356] INFO [SocketServer brokerId=0] Failed authentication with /127.0.0.1 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)
[2020-07-09 17:29:49,765] INFO [SocketServer brokerId=0] Failed authentication with /127.0.0.1 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)
[2020-07-09 17:29:50,172] INFO [SocketServer brokerId=0] Failed authentication with /127.0.0.1 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)

分析:存在未认证的方式连接Kafka的请求。

方案:检查一下是否有kafka-console-consumer ,kafka-console-producer或者外部进程在尝试连接

4. java.io.EOFException: null,同时kafka控制台刷问题3出现的日志

[18:10:54:133] [TRACE] - org.apache.kafka.common.network.Selector.maybeReadFromClosingChannel(Selector.java:707) - [Producer clientId=producer-1] Read from closing channel failed, ignoring exception
java.io.EOFException: null
	at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:97) ~[kafka-clients-2.5.0.jar:?]
	at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:448) ~[kafka-clients-2.5.0.jar:?]
	at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:398) ~[kafka-clients-2.5.0.jar:?]
	at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:678) ~[kafka-clients-2.5.0.jar:?]
	at org.apache.kafka.common.network.Selector.maybeReadFromClosingChannel(Selector.java:704) [kafka-clients-2.5.0.jar:?]
	at org.apache.kafka.common.network.Selector.close(Selector.java:909) [kafka-clients-2.5.0.jar:?]
	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:629) [kafka-clients-2.5.0.jar:?]
	at org.apache.kafka.common.network.Selector.poll(Selector.java:485) [kafka-clients-2.5.0.jar:?]
	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) [kafka-clients-2.5.0.jar:?]
	at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324) [kafka-clients-2.5.0.jar:?]
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) [kafka-clients-2.5.0.jar:?]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_231]

分析 :Spring-kafka尝试连接kafka未通过授权,nio读取结果为空抛出的异常。

方案:检查授权文件及配置是否正确,检查用户名密码是否正确。

5. Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set

Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set
	at org.apache.kafka.common.security.JaasContext.defaultContext(JaasContext.java:133) ~[kafka-clients-2.5.0.jar:?]
	at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:98) ~[kafka-clients-2.5.0.jar:?]
	at org.apache.kafka.common.security.JaasContext.loadClientContext(JaasContext.java:84) ~[kafka-clients-2.5.0.jar:?]
	at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:134) ~[kafka-clients-2.5.0.jar:?]
	at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:73) ~[kafka-clients-2.5.0.jar:?]
	at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105) ~[kafka-clients-2.5.0.jar:?]
	at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:454) ~[kafka-clients-2.5.0.jar:?]
	at org.apache.kafka.clients.admin.Admin.create(Admin.java:71) ~[kafka-clients-2.5.0.jar:?]
	at org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier.getAdmin(DefaultKafkaClientSupplier.java:41) ~[kafka-streams-2.5.0.jar:?]
	at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:747) ~[kafka-streams-2.5.0.jar:?]
	at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:584) ~[kafka-streams-2.5.0.jar:?]
	at org.springframework.kafka.config.StreamsBuilderFactoryBean.start(StreamsBuilderFactoryBean.java:266) ~[spring-kafka-2.5.0.RELEASE.jar:2.5.0.RELEASE]
	at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182) ~[spring-context-5.2.6.RELEASE.jar:5.2.6.RELEASE]
	... 14 more

分析:sasl.jaas.config 未设置或 System.setProperty("java.security.auth.login.config", "kafka-client-jaas.conf"); 未设置。

方案:参照上文提到的 Springboot-kafka SASL/PLANTEXT 配置 进行配置。

 

6. failed authentication due to: Authentication failed: Invalid username or password

[2020-07-14 09:53:52,921] ERROR [Producer clientId=console-producer] Connection to node -1 (localhost/127.0.0.1:9092) failed authentication due to: Authentication failed: Invalid username or password (org.apache.kafka.clients.NetworkClient)
[2020-07-14 09:53:52,922] WARN [Producer clientId=console-producer] Bootstrap broker 127.0.0.1:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)

分析:用户名或密码错误

方案:请确认 kafka-client-jaas.conf 配置的用户名和密码,和 kafka-server-jaas.conf 中配置的用户名密码一致

7. org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [topic_name]

[2020-07-14 09:55:29,838] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {topic_name=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
[2020-07-14 09:55:29,841] ERROR [Producer clientId=console-producer] Topic authorization failed for topics [topic_name] (org.apache.kafka.clients.Metadata)
[2020-07-14 09:55:29,843] ERROR Error when sending message to topic topic_name with key: null, value: 5 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [topic_name]

分析:对 topics:[topic_name] 操作未授权,

方案:使用命令./kafka-acls.sh --list --authorizer-properties zookeeper.connect=127.0.0.1:2181 查看授权配置,是否有登录账号对topic的操作权限

作者:疯猴纸
来源链接:https://blog.csdn.net/qq_36389344/article/details/107233992

版权声明:
1、JavaClub(https://www.javaclub.cn)以学习交流为目的,由作者投稿、网友推荐和小编整理收藏优秀的IT技术及相关内容,包括但不限于文字、图片、音频、视频、软件、程序等,其均来自互联网,本站不享有版权,版权归原作者所有。

2、本站提供的内容仅用于个人学习、研究或欣赏,以及其他非商业性或非盈利性用途,但同时应遵守著作权法及其他相关法律的规定,不得侵犯相关权利人及本网站的合法权利。
3、本网站内容原作者如不愿意在本网站刊登内容,请及时通知本站(javaclubcn@163.com),我们将第一时间核实后及时予以删除。


本文链接:https://www.javaclub.cn/server/41670.html

标签: Kafka
分享给朋友:

“Kafka SASL/PLANTEXT ACL配置” 的相关文章

kafka消息中间件-快速学习

为什么需要消息队列   周末无聊刷着手机,某宝网APP突然蹦出来一条消息“为了回馈老客户,女朋友买一送一,活动仅限今天!”。买一送一还有这种好事,那我可不能错过!忍不住立马点了去。于是选了两个最新款,下单、支付一气呵成!满足的躺在床上,想着马上有女朋友了,竟然幸福的失眠了…...

kafka集群搭建

kafka集群搭建

本文将记录使用kafka镜像,分别在两种场景下搭建3节点集群:1.在一台机器上使用容器方式安装kafka集群;2.在三台机器上使用容器方式安装kafka集群。 此次使用的是wurstmeister的,下载量是比较大的。使用下面命令下载: docker pull wur...

kafka消息长度限制

更改为10M 客户端代码增加:max_request_size=10485760, 服务端配置:replica.fetch.max.bytes=10485760,message.max.bytes=10485760...

【kafka】安装部署kafka集群(kafka版本:kafka_2.12-2.3.0)

3.2.1 下载kafka并安装kafka_2.12-2.3.0.tgz tar -zxvf kafka_2.12-2.3.0.tgz 3.2.2 配置kafka集群 在config/server.properties中修改参数: [had...

Kafka 安装和简单使用

Kafka 安装和简单使用

文章目录 Kafka 安装和简单使用 kafka下载地址 windows 系统...

kafka的基本概念和工作流程分析

kafka的基本概念和工作流程分析

为什么需要消息队列   周末无聊刷着手机,某宝网APP突然蹦出来一条消息“为了回馈老客户,女朋友买一送一,活动仅限今天!”。买一送一还有这种好事,那我可不能错过!忍不住立马点了去。于是选了两个最新款,下单、支付一气呵成!满足的躺在床上,想着马上有女朋友了,竟然幸福的失眠了…...

Linux下Kafka下载与安装教程

Linux下Kafka下载与安装教程

原文链接:http://www.studyshare.cn/software/details/1176/0 一、预备环境 Kafka是java生态圈中的一员,运行在java虚拟机上,按Kafka官方说明,java环境推荐Java8;Kafka需要Zookeeper保存集群的...

Kafka 快速入门(安装)

Kafka 快速入门(安装)

kafka学习目录:kafka目录 二、Kafka 快速入门 2.1、windows版安装 2.1.1、Quick Start 本次安装学习在Windows操作系统进行。(Linux版本的差别不大,运行脚本文件后缀从bat...

Docker 安装 kafka

Docker 安装 kafka

简单安装为了集成 SpringBoot,真实使用,增加增加更多配置,比如将log映射出来 1.安装 zookeeper [root@centos-linux ~]# docker pull wurstmeister/zookeeper [root@centos-l...

Kafka如何保证消息不丢失不重复

首先需要思考下边几个问题: 消息丢失是什么造成的,从生产端和消费端两个角度来考虑 消息重复是什么造成的,从生产端和消费端两个角度来考虑 如何保证消息有序 如果保证消息不重不漏,损失的是什么 大概总结下 消费端重复消费:建立去重表 消费端丢失数据...

发表评论

访客

◎欢迎参与讨论,请在这里发表您的看法和观点。