kafka 集群 KRaft 模式搭建

2023-11-27 12:01
文章标签 集群 模式 搭建 kafka kraft

本文主要是介绍kafka 集群 KRaft 模式搭建,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Apache Kafka是一个开源分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用程序

Kafka 官网:https://kafka.apache.org/

Kafka 在2.8版本之后,移除了对Zookeeper的依赖,将依赖于ZooKeeper的控制器改造成了基于Kafka Raft的Quorm控制器,因此可以在不使用ZooKeeper的情况下实现集群

本文讲解 Kafka KRaft 模式集群搭建

笔者使用3台服务器,它们的 ip 分别是 192.168.3.232、192.168.2.90、192.168.2.11

目录

1、官网下载 Kafka

2、配置 Kafka

3、创建 KRaft 集群

4、启动 Kafka KRaft 集群

5、关闭 Kafka KRaft 集群

6、测试 KRaft 集群


1、官网下载 Kafka

这里笔者下载最新版3.6.0

下载完成

将kafka分别上传到3台linux

在3台服务器上分别创建 kafka 安装目录

mkdir /usr/local/kafka

在3台服务器上分别将 kafka 安装包解压到新创建的 kafka 目录

tar -xzf kafka_2.13-3.6.0.tgz -C /usr/local/kafka

2、配置 Kafka

进入配置目录

cd /usr/local/kafka/kafka_2.13-3.6.0/config/kraft

编辑配置文件

vi server.properties

server.properties 配置说明

node.id 是kafka的broker节点id

controller.quorum.voters 配置的是 kafka 集群中的其他节点,kafka Controller的投票者配置,定义了一组Controller节点,其中包括它们各自的 id 和网络地址

advertised.listeners 是节点自己的监听地址

192.168.3.232 节点配置

node.id = 1

192.168.2.90 节点配置

node.id = 2

192.168.2.11节点配置

node.id = 3

3、创建 KRaft 集群

生成集群id

在任意一个节点上执行就行,笔者使用 192.168.3.232 节点

进入bin 目录

cd /usr/local/kafka/kafka_2.13-3.6.0/bin

执行生成集群 id 命令

./kafka-storage.sh random-uuid

生成后保存生成的字符串    82vqfbdSTO2QzS_M0Su1Bw

然后分别在3台机器上执行下面命令

为方便执行命令,先回到 kafka安装目录

cd /usr/local/kafka/kafka_2.13-3.6.0

再执行命令,完成集群元数据配置

bin/kafka-storage.sh format -t 82vqfbdSTO2QzS_M0Su1Bw -c config/kraft/server.properties

192.168.3.232 节点

192.168.2.90 节点

192.168.2.11节点

上面命令执行完成后,开放防火墙端口

kafka 需要开放 9092 端口和 9093 端口

3台机器上分别开放 9092 和 9093 端口

查看开放端口

firewall-cmd --zone=public --list-ports

 开放9092 端口

firewall-cmd --zone=public --add-port=9092/tcp --permanent

  开放9093 端口

firewall-cmd --zone=public --add-port=9093/tcp --permanent

更新防火墙规则(无需断开连接,动态添加规则)

firewall-cmd --reload

4、启动 Kafka KRaft 集群

在3台机器上分别启动

下面2个命令均可启动

bin/kafka-server-start.sh -daemon config/kraft/server.properties

bin/kafka-server-start.sh config/kraft/server.properties

笔者使用第二个启动命令 启动,效果看下图

当 3 个节点都出现 Kafka Server started,集群启动成功

5、关闭 Kafka KRaft 集群

关闭命令

bin/kafka-server-stop.sh

在 3 个节点上分别执行关闭命令

6、测试 KRaft 集群

新建 maven 项目,添加 Kafka 依赖

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.0</version>
</dependency>

笔者新建 maven项目 kafka-learn

kafka-learn 项目 pom 文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.wsjzzcbq</groupId><artifactId>kafka-learn</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.0</version></dependency></dependencies><build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><configuration><source>11</source><target>11</target></configuration></plugin></plugins></build>
</project>

新建生产者 ProducerDemo

package com.wsjzzcbq;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;/*** Demo** @author wsjz* @date 2023/11/24*/
public class ProducerDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {Properties properties = new Properties();//配置集群节点信息properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.3.232:9092,192.168.2.90:9092,192.168.2.11:9092");//配置序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());Producer<String, String> producer = new KafkaProducer<>(properties);//topic 名称是demo_topicProducerRecord<String, String> producerRecord = new ProducerRecord<>("demo_topic", "明月别枝惊鹊");RecordMetadata recordMetadata = producer.send(producerRecord).get();System.out.println(recordMetadata.topic());System.out.println(recordMetadata.partition());System.out.println(recordMetadata.offset());}
}

新建消费者 ConsumerDemo

package com.wsjzzcbq;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;/*** ConsumerDemo** @author wsjz* @date 2023/11/24*/
public class ConsumerDemo {public static void main(String[] args) {Properties properties = new Properties();// 配置集群节点信息properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.3.232:9092,192.168.2.90:9092,192.168.2.11:9092");// 消费分组名properties.put(ConsumerConfig.GROUP_ID_CONFIG, "demo_group");// 序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties);// 消费者订阅主题consumer.subscribe(Arrays.asList("demo_topic"));while (true) {ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String,String> record:records) {System.out.printf("收到消息:partition=%d, offset=%d, key=%s, value=%s%n",record.partition(),record.offset(),record.key(),record.value());}}}
}

运行测试

效果图

消息成功发送并成功消费

至此完

这篇关于kafka 集群 KRaft 模式搭建的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/427612

相关文章

利用Python快速搭建Markdown笔记发布系统

《利用Python快速搭建Markdown笔记发布系统》这篇文章主要为大家详细介绍了使用Python生态的成熟工具,在30分钟内搭建一个支持Markdown渲染、分类标签、全文搜索的私有化知识发布系统... 目录引言:为什么要自建知识博客一、技术选型:极简主义开发栈二、系统架构设计三、核心代码实现(分步解析

Redis分片集群的实现

《Redis分片集群的实现》Redis分片集群是一种将Redis数据库分散到多个节点上的方式,以提供更高的性能和可伸缩性,本文主要介绍了Redis分片集群的实现,具有一定的参考价值,感兴趣的可以了解一... 目录1. Redis Cluster的核心概念哈希槽(Hash Slots)主从复制与故障转移2.

Linux系统配置NAT网络模式的详细步骤(附图文)

《Linux系统配置NAT网络模式的详细步骤(附图文)》本文详细指导如何在VMware环境下配置NAT网络模式,包括设置主机和虚拟机的IP地址、网关,以及针对Linux和Windows系统的具体步骤,... 目录一、配置NAT网络模式二、设置虚拟机交换机网关2.1 打开虚拟机2.2 管理员授权2.3 设置子

使用Python实现快速搭建本地HTTP服务器

《使用Python实现快速搭建本地HTTP服务器》:本文主要介绍如何使用Python快速搭建本地HTTP服务器,轻松实现一键HTTP文件共享,同时结合二维码技术,让访问更简单,感兴趣的小伙伴可以了... 目录1. 概述2. 快速搭建 HTTP 文件共享服务2.1 核心思路2.2 代码实现2.3 代码解读3.

MySQL双主搭建+keepalived高可用的实现

《MySQL双主搭建+keepalived高可用的实现》本文主要介绍了MySQL双主搭建+keepalived高可用的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,... 目录一、测试环境准备二、主从搭建1.创建复制用户2.创建复制关系3.开启复制,确认复制是否成功4.同

SpringBoot如何通过Map实现策略模式

《SpringBoot如何通过Map实现策略模式》策略模式是一种行为设计模式,它允许在运行时选择算法的行为,在Spring框架中,我们可以利用@Resource注解和Map集合来优雅地实现策略模式,这... 目录前言底层机制解析Spring的集合类型自动装配@Resource注解的行为实现原理使用直接使用M

C#原型模式之如何通过克隆对象来优化创建过程

《C#原型模式之如何通过克隆对象来优化创建过程》原型模式是一种创建型设计模式,通过克隆现有对象来创建新对象,避免重复的创建成本和复杂的初始化过程,它适用于对象创建过程复杂、需要大量相似对象或避免重复初... 目录什么是原型模式?原型模式的工作原理C#中如何实现原型模式?1. 定义原型接口2. 实现原型接口3

大数据spark3.5安装部署之local模式详解

《大数据spark3.5安装部署之local模式详解》本文介绍了如何在本地模式下安装和配置Spark,并展示了如何使用SparkShell进行基本的数据处理操作,同时,还介绍了如何通过Spark-su... 目录下载上传解压配置jdk解压配置环境变量启动查看交互操作命令行提交应用spark,一个数据处理框架

使用DeepSeek搭建个人知识库(在笔记本电脑上)

《使用DeepSeek搭建个人知识库(在笔记本电脑上)》本文介绍了如何在笔记本电脑上使用DeepSeek和开源工具搭建个人知识库,通过安装DeepSeek和RAGFlow,并使用CherryStudi... 目录部署环境软件清单安装DeepSeek安装Cherry Studio安装RAGFlow设置知识库总

Linux搭建Mysql主从同步的教程

《Linux搭建Mysql主从同步的教程》:本文主要介绍Linux搭建Mysql主从同步的教程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录linux搭建mysql主从同步1.启动mysql服务2.修改Mysql主库配置文件/etc/my.cnf3.重启主库my