本文主要是介绍zookeeper、kakfa添加用户加密,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
背景
zookeeper无权限访问到根目录
步骤
-
在kafka/config 目录中创建
vi config/zookeeper_jaas.conf
-
在zookeeper_jaas.conf中添加
Server { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="12345" user_admin="12345"; }; #user_{username}="{password}"
-
在 zookeeper.properties最后添加配置
#auth authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider requireClientAuthScheme=sasl jaasLoginRenew=3600000
-
在zookeeper-server-start.sh中添加配置
根据不同的目录位置进行修改
-Djava.security.auth.login.config=/opt/kafka_2.13-3.5.1/config/zookeeper_jaas.conf
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; thenexport KAFKA_HEAP_OPTS="-Xmx512M -Xms512M -Djava.security.auth.login.config=/opt/kafka_2.13-3.5.1/config/zookeeper_jaas.conf" fi
-
启动zookeeper
./zookeeper-server-start.sh -daemon ../config/zookeeper.properties
-
下面开始设置ACL配置
-
登录zookeeper
192.168.6.42:2181
IP地址根据自己的进行调整./zookeeper-shell.sh 192.168.6.42:2181
-
添加用户
addauth digest admin:12345 addauth digest kafka:12345
-
设置ACL
ip:192.168.6.42:cdrwa
根据自己的ip地址进行修改setAcl / ip:192.168.4.235:cdrwa,ip:127.0.0.1:cdrwa,auth:kafka:cdrwa,auth:admin:cdrwa setAcl /consumers ip:192.168.4.235:cdrwa,ip:127.0.0.1:cdrwa,auth:kafka:cdrwa,auth:admin:cdrwa
cdrwa:create: 你可以创建子节点。read: 你可以获取节点数据以及当前节点的子节点列表。write: 你可以为节点设置数据。delete: 你可以删除子节点。admin: 可以为节点设置权限
-
查看是否配置正确
getAcl / getAcl /consumers
-
-
添加kafka的配置
vim config/kafka_server_jaas.conf
-
添加内容
KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="admin"password="12345"user_admin="12345"; };Client {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="admin"password="12345"; };
-
修改config/server.properties
# AUTHsecurity.inter.broker.protocol=SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol=PLAIN sasl.enabled.mechanisms=PLAINauthorizer.class.name=kafka.security.authorizer.AclAuthorizer allow.everyone.if.no.acl.found=true listeners=SASL_PLAINTEXT://0.0.0.0:9092 advertised.listeners=SASL_PLAINTEXT://:9092#将zookeeper.connect改成zookeeper的地址 zookeeper.connect=192.168.6.42:2181
-
调整kafka的启动脚本 kafka-server-start.sh
-Djava.security.auth.login.config=/opt/kafka_2.13-3.5.1/config/kafka_server_jaas.conf
根据自己的地址进行配置if [ "x$KAFKA_HEAP_OPTS" = "x" ]; thenexport KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/opt/kafka_2.13-3.5.1/config/kafka_server_jaas.conf" fi
-
启动kafka
./kafka-server-start.sh -daemon ../config/server.properti
-
测试
进入
kafka
目录,在config
目录下创建kafka_client_jaas.conf
文件,并写入如下内容。KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin"; };
-
配置提供者认证,修改提供者启动脚本,
vi bin/kafka-console-producer.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; thenexport KAFKA_HEAP_OPTS="-Xmx512M -Djava.security.auth.login.config=/opt/kafka_2.13-3.5.1/config/kafka_client_jaas.conf" fi exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"
-
启动消费者
./kafka-console-producer.sh --broker-list 192.168.4.235:9092 --topic testTopic --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN
-
配置消费者认证,修改提供者启动脚本,
vi bin/kafka-console-consumer.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; thenexport KAFKA_HEAP_OPTS="-Xmx512M -Djava.security.auth.login.config=/opt/kafka_2.13-3.5.1/config/kafka_client_jaas.conf" fiexec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"
-
启动消费者
./kafka-console-consumer.sh --bootstrap-server 192.168.4.235:9092 --topic testTopic --from-beginning --consumer-property security.protocol=SASL_PLAINTEXT --consumer-property sasl.mechanism=PLAIN
有消息输出即为成功
-
springboot的配置
spring:kafka:# docker http://192.168.2.202:8080bootstrap-servers: http://192.168.6.42:9092# 配置用户名密码producer:properties:sasl:mechanism: PLAINsecurity:protocol: SASL_PLAINTEXTconsumer:properties:sasl:mechanism: PLAINsecurity:protocol: SASL_PLAINTEXT
-
在相关的kafkaConfig中增加相关配置
@Beanpublic KafkaTemplate kafkaTemplate() {Map<String, Object> configs = new HashMap<>();configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers_config);configs.put(ProducerConfig.RETRIES_CONFIG, pro_retry_config);configs.put(ProducerConfig.BATCH_SIZE_CONFIG, batch_size_config);configs.put(ProducerConfig.ACKS_CONFIG, acks_config);configs.put(ProducerConfig.LINGER_MS_CONFIG, linger_ms_config);configs.put(ProducerConfig.BUFFER_MEMORY_CONFIG, buffer_memory_config);configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,key_serializer_config);configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,value_serializer_config);configs.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression_type_config);if (Boolean.valueOf(auth_enabled)) {configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name());configs.put(SaslConfigs.SASL_MECHANISM, sasl_mechanism);}DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory(configs);return new KafkaTemplate(producerFactory);}
-
最后在启动脚本中新增一个配置
-Djava.security.auth.login.config=客户端登录文件所在的位置
#eg:
-Djava.security.auth.login.config=/opt/kafka/config/kafka_client_jaas.conf
这篇关于zookeeper、kakfa添加用户加密的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!