Kafka SASL_SSL双重认证

2024-02-06 09:20
文章标签 认证 ssl kafka 双重 sasl

本文主要是介绍Kafka SASL_SSL双重认证,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

    • 1. 背景
    • 2. 环境
    • 3. 操作步骤
      • 3.1 生成SSL证书
      • 3.2 配置zookeeper认证
      • 3.3 配置kafka安全认证
      • 3.4 使用kafka客户端进行验证
      • 3.5 使用Java端代码进行认证

1. 背景

kafka提供了多种安全认证机制,主要分为SASL和SSL两大类。

  • SASL: 是一种身份验证机制,用于在客户端和服务器之间进行身份验证的过程,其中SASL/PLAIN是基于账号密码的认证方式。
  • SSL: 是一种加密协议,用于在网络通信中提供数据的保密性和完整性。它使用公钥和私钥来建立安全的连接,并对传输的数据进行加密和解密,以防止未经授权的访问和篡改。

在 Kafka 中启用 SASL_SSL 安全协议时,SASL 用于客户端和服务器之间的身份验证,SSL 则用于加密和保护数据的传输。不仅提供身份验证,还提供加密和数据保护的功能。

因工作需要,需要在测试环境搭建一套基于SASL_SSL协议的kafka环境。坑比较多,经过两天的研究终于搞定了,特在此记录下。

2. 环境

  • 操作系统:linux
  • kafka版本:kafka_2.13-2.7.1
  • zookeeper版本:apache-zookeeper-3.7.0
  • 应用程序版本:spring-boot-2.6.7、JDK1.8

3. 操作步骤

  1. 生成SSL证书
  2. 配置zookeeper
  3. 配置kafka
  4. 前三步配置完成后kafka就开启了SASL_SSL双重认证,可以使用kafka自带的客户端进行测试(3.4),
  5. 在业务代码中使用请查看(3.5)

3.1 生成SSL证书

按照步骤一步一步操作,生成服务器/客户端的SSL证书。也就是公钥与私钥

参考:【SSL协议】生成SSL证书 - lihewei - 博客园 (cnblogs.com)

3.2 配置zookeeper认证

第一步: 在apache-zookeeper-3.7.0/conf 目录下创建 kafka_zk_jaas.conf 配置文件(名称任意),定义了两个用户,可提供给生产者和消费者使用,格式为:user_用户名=“用户密码”,内容如下:

Server {org.apache.zookeeper.server.auth.DigestLoginModule requireduser_admin="1qaz@WSX"user_kafka="1qaz@WSX";
};

第二步: zookeeper配置文件zoo.cfg中新增SASL认证配置,如下:

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000

第三步: 在apache-zookeeper-3.7.0/bin/zkServer.sh脚本中新增jvm参数,让其启动时加载jaas配置文件

export SERVER_JVMFLAGS="-Xmx${ZK_SERVER_HEAP}m $SERVER_JVMFLAGS -Djava.security.auth.login.config=/home/crbt/local/apache-zookeeper-3.7.0/conf/kafka_zk_jaas.conf"

3.3 配置kafka安全认证

第一步: /home/crbt/local/kafka_2.13-2.7.1/config目录下创建kafka-server-jaas.conf和kafka-client-jaas.conf配置文件,内容如下:

kafka-server-jaas.conf

KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="admin"password="1qaz@WSX"user_admin="1qaz@WSX"user_kafka="1qaz@WSX";
};Client {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="kafka"password="1qaz@WSX";
};

kafka-client-jaas.conf

KafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="kafka"password="1qaz@WSX";
};

第二步: 在kafka启动脚本(kafka_2.13-2.7.1/bin/kafka-server-start.sh)配置环境变量,指定jaas.conf文件,增加如下代码:

增加环境变量: -Djava.security.auth.login.config=/home/crbt/local/kafka_2.13-2.7.1/config/kafka-server-jaas.conf

...if [ "x$KAFKA_HEAP_OPTS" = "x" ]; thenexport KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/home/crbt/local/kafka_2.13-2.7.1/config/kafka-server-jaas.conf"
fi...

**第三步:**修改 kafka 的 server.properties配置文件

#listeners=SSL://10.1.61.121:9092
host.name=node1
#listeners=PLAINTEXT://node1:9092,SSL://node1:9093
listeners=SASL_SSL://node1:9093
#advertised.listeners=SSL://node1:9092
advertised.listeners=SASL_SSL://node1:9093
ssl.keystore.location=/home/crbt/lihw/ca/server/server.keystore.jks
ssl.keystore.password=Q06688
ssl.key.password=Q06688
ssl.truststore.location=/home/crbt/lihw/ca/trust/server.truststore.jks
ssl.truststore.password=Q06688
ssl.client.auth=required
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.keystore.type=JKS 
ssl.truststore.type=JKS 
# kafka2.0.x开始,将ssl.endpoint.identification.algorithm设置为了HTTPS,即:需要验证主机名
# 如果不需要验证主机名,那么可以这么设置 ssl.endpoint.identification.algorithm=即可
ssl.endpoint.identification.algorithm=
# 设置内部访问也用SSL,默认值为security.inter.broker.protocol=PLAINTEXT
security.inter.broker.protocol=SASL_SSL
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=true

注意:这里有个坑,生成SSL密钥私钥时指定了主机的hostname,这里也要配置kafka所在服务器的hostname

3.4 使用kafka客户端进行验证

第一步: 修改kafka/config/下的 consumer.properties、producer.properties,配置SASL_SSL验证的基本信息。

consumer.properties:

bootstrap.servers=node1:9093
security.protocol=SASL_SSL
ssl.truststore.location=/home/crbt/lihw/ca/trust/server.truststore.jks
ssl.truststore.password=Q06688sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="1qaz@WSX";

producer.properties:

bootstrap.servers=node1:9093
security.protocol=SASL_SSL
ssl.truststore.location=/home/crbt/lihw/ca/trust/server.truststore.jks
ssl.truststore.password=Q06688sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="1qaz@WSX";

第二步: 使用命令行操作时,让其找到上述设置的SASL_SSL配置文件( --producer.config …/config/producer.properties)

#生产
crbt@node1:/home/crbt/local/kafka_2.13-2.7.1/bin>./kafka-console-producer.sh --bootstrap-server node1:9093  --topic first --producer.config ../config/producer.properties
>aaa
>bbb
>ccc
>#消费
crbt@node1:/home/crbt/local/kafka_2.13-2.7.1/bin>./kafka-console-consumer.sh --bootstrap-server node1:9093 --topic first -consumer.config /home/crbt/local/kafka_2.13-2.7.1/config/consumer.properties
aaa
bbb
ccc

3.5 使用Java端代码进行认证

第一步: yaml 配置文件

spring:kafka:bootstrap-servers: localhost:9093properties:sasl:mechanism: PLAINjaas:#此处填写 SASL登录时分配的用户名密码(注意password结尾;)config: org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="1qaz@WSX";security:protocol: SASL_SSLssl:trust-store-location: /home/crbt/lihw/ca/trust/server.truststore.jkstrust-store-password: Q06688key-store-type: JKSproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerbatch-size: 106384acks: -1retries: 3properties:linger-ms: 1retry.backoff.ms: 1000buffer-memory: 33554432

第二步: 使用 kafkaTemplate 的方式,配置一个 config

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap;
import java.util.Map;@Slf4j
@Configuration
public class KafkaProducerConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.producer.acks}")private String acks;@Value("${spring.kafka.producer.retries}")private String retries;@Value("${spring.kafka.producer.batch-size}")private String batchSize;@Value("${spring.kafka.producer.properties.linger-ms}")private int lingerMs;@Value("${spring.kafka.producer.properties.buffer-memory}")private int bufferMemory;@Value("${spring.kafka.producer.key-serializer}")private String keySerializer;@Value("${spring.kafka.producer.value-serializer}")private String valueSerializer;@Value("${spring.kafka.properties.security.protocol}")private String protocol;@Value("${spring.kafka.properties.sasl.mechanism}")private String mechanism;@Value("${spring.kafka.ssl.trust-store-location}")private String trustStoreLocation;@Value("${spring.kafka.ssl.trust-store-password}")private String trustStorePassword;@Value("${spring.kafka.ssl.key-store-type}")private String keyStoreType;@Value("${spring.kafka.properties.sasl.jaas.config}")private String jaasConfig;@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}/*** the producer factory config*/@Beanpublic ProducerFactory<String, String> producerFactory() {Map<String, Object> props = new HashMap<String, Object>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.ACKS_CONFIG, acks);props.put(ProducerConfig.RETRIES_CONFIG, retries);props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);props.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);props.put("security.protocol", protocol);props.put(SaslConfigs.SASL_MECHANISM, mechanism);props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreLocation);props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, trustStorePassword);props.put(SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, keyStoreType);props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);return new DefaultKafkaProducerFactory<String, String>(props);}
}

这篇关于Kafka SASL_SSL双重认证的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/683828

相关文章

浅析Spring Security认证过程

类图 为了方便理解Spring Security认证流程,特意画了如下的类图,包含相关的核心认证类 概述 核心验证器 AuthenticationManager 该对象提供了认证方法的入口,接收一个Authentiaton对象作为参数; public interface AuthenticationManager {Authentication authenticate(Authenti

搭建Kafka+zookeeper集群调度

前言 硬件环境 172.18.0.5        kafkazk1        Kafka+zookeeper                Kafka Broker集群 172.18.0.6        kafkazk2        Kafka+zookeeper                Kafka Broker集群 172.18.0.7        kafkazk3

国产游戏崛起:技术革新与文化自信的双重推动

近年来,国产游戏行业发展迅猛,技术水平和作品质量均得到了显著提升。特别是以《黑神话:悟空》为代表的一系列优秀作品,成功打破了过去中国游戏市场以手游和网游为主的局限,向全球玩家展示了中国在单机游戏领域的实力与潜力。随着中国开发者在画面渲染、物理引擎、AI 技术和服务器架构等方面取得了显著进展,国产游戏正逐步赢得国际市场的认可。然而,面对全球游戏行业的激烈竞争,国产游戏技术依然面临诸多挑战,未来的

【Kubernetes】K8s 的安全框架和用户认证

K8s 的安全框架和用户认证 1.Kubernetes 的安全框架1.1 认证:Authentication1.2 鉴权:Authorization1.3 准入控制:Admission Control 2.Kubernetes 的用户认证2.1 Kubernetes 的用户认证方式2.2 配置 Kubernetes 集群使用密码认证 Kubernetes 作为一个分布式的虚拟

消除安卓SDK更新时的“https://dl-ssl.google.com refused”异常的方法

消除安卓SDK更新时的“https://dl-ssl.google.com refused”异常的方法   消除安卓SDK更新时的“https://dl-ssl.google.com refused”异常的方法 [转载]原地址:http://blog.csdn.net/x605940745/article/details/17911115 消除SDK更新时的“

Android逆向(反调,脱壳,过ssl证书脚本)

文章目录 总结 基础Android基础工具 定位关键代码页面activity定位数据包参数定位堆栈追踪 编写反调脱壳好用的脚本过ssl证书校验抓包反调的脚本打印堆栈bilibili反调的脚本 总结 暑假做了两个月的Android逆向,记录一下自己学到的东西。对于app渗透有了一些思路。 这两个月主要做的是代码分析,对于分析完后的持久化等没有学习。主要是如何反编译源码,如何找到

【Shiro】Shiro 的学习教程(二)之认证、授权源码分析

目录 1、背景2、相关类图3、解析3.1、加载、解析阶段3.2、认证阶段3.3、授权阶段 1、背景 继上节代码,通过 debug 进行 shiro 源码分析。 2、相关类图 debug 之前,先了解下一些类的结构图: ①:SecurityManager:安全管理器 DefaultSecurityManager: RememberMeManager:实现【记住我】功能

Java消息队列:RabbitMQ与Kafka的集成与应用

Java消息队列:RabbitMQ与Kafka的集成与应用 大家好,我是微赚淘客返利系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿! 在现代的分布式系统中,消息队列是实现系统间通信、解耦和提高可扩展性的重要组件。RabbitMQ和Kafka是两个广泛使用的消息队列系统,它们各有特点和优势。本文将介绍如何在Java应用中集成RabbitMQ和Kafka,并展示它们的应用场景。 消息队

Kafka (快速)安装部署

文章目录 1、软件下载&配置环境1_JDK安装2_Zookeeper安装3_Kafka安装 2、单机安装1_配置主机名和IP映射2_单机Kafka配置 3、集群安装1_配置主机名和IP的映射关系2_时钟同步3_Zookeeper配置信息4_集群Kafka配置 4、kafka的其他脚本命令 1、软件下载&配置环境 下面的操作无论是单机部署还是分布式集群环境下都是通用的。 准

OpenStack离线Train版安装系列—3控制节点-Keystone认证服务组件

本系列文章包含从OpenStack离线源制作到完成OpenStack安装的全部过程。 在本系列教程中使用的OpenStack的安装版本为第20个版本Train(简称T版本),2020年5月13日,OpenStack社区发布了第21个版本Ussuri(简称U版本)。 OpenStack部署系列文章 OpenStack Victoria版 安装部署系列教程 OpenStack Ussuri版