kafka 集群 KRaft 模式搭建

2023-11-27 12:01
文章标签 集群 模式 搭建 kafka kraft

本文主要是介绍kafka 集群 KRaft 模式搭建,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Apache Kafka是一个开源分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用程序

Kafka 官网:https://kafka.apache.org/

Kafka 在2.8版本之后,移除了对Zookeeper的依赖,将依赖于ZooKeeper的控制器改造成了基于Kafka Raft的Quorm控制器,因此可以在不使用ZooKeeper的情况下实现集群

本文讲解 Kafka KRaft 模式集群搭建

笔者使用3台服务器,它们的 ip 分别是 192.168.3.232、192.168.2.90、192.168.2.11

目录

1、官网下载 Kafka

2、配置 Kafka

3、创建 KRaft 集群

4、启动 Kafka KRaft 集群

5、关闭 Kafka KRaft 集群

6、测试 KRaft 集群


1、官网下载 Kafka

这里笔者下载最新版3.6.0

下载完成

将kafka分别上传到3台linux

在3台服务器上分别创建 kafka 安装目录

mkdir /usr/local/kafka

在3台服务器上分别将 kafka 安装包解压到新创建的 kafka 目录

tar -xzf kafka_2.13-3.6.0.tgz -C /usr/local/kafka

2、配置 Kafka

进入配置目录

cd /usr/local/kafka/kafka_2.13-3.6.0/config/kraft

编辑配置文件

vi server.properties

server.properties 配置说明

node.id 是kafka的broker节点id

controller.quorum.voters 配置的是 kafka 集群中的其他节点,kafka Controller的投票者配置,定义了一组Controller节点,其中包括它们各自的 id 和网络地址

advertised.listeners 是节点自己的监听地址

192.168.3.232 节点配置

node.id = 1

192.168.2.90 节点配置

node.id = 2

192.168.2.11节点配置

node.id = 3

3、创建 KRaft 集群

生成集群id

在任意一个节点上执行就行,笔者使用 192.168.3.232 节点

进入bin 目录

cd /usr/local/kafka/kafka_2.13-3.6.0/bin

执行生成集群 id 命令

./kafka-storage.sh random-uuid

生成后保存生成的字符串    82vqfbdSTO2QzS_M0Su1Bw

然后分别在3台机器上执行下面命令

为方便执行命令,先回到 kafka安装目录

cd /usr/local/kafka/kafka_2.13-3.6.0

再执行命令,完成集群元数据配置

bin/kafka-storage.sh format -t 82vqfbdSTO2QzS_M0Su1Bw -c config/kraft/server.properties

192.168.3.232 节点

192.168.2.90 节点

192.168.2.11节点

上面命令执行完成后,开放防火墙端口

kafka 需要开放 9092 端口和 9093 端口

3台机器上分别开放 9092 和 9093 端口

查看开放端口

firewall-cmd --zone=public --list-ports

 开放9092 端口

firewall-cmd --zone=public --add-port=9092/tcp --permanent

  开放9093 端口

firewall-cmd --zone=public --add-port=9093/tcp --permanent

更新防火墙规则(无需断开连接,动态添加规则)

firewall-cmd --reload

4、启动 Kafka KRaft 集群

在3台机器上分别启动

下面2个命令均可启动

bin/kafka-server-start.sh -daemon config/kraft/server.properties

bin/kafka-server-start.sh config/kraft/server.properties

笔者使用第二个启动命令 启动,效果看下图

当 3 个节点都出现 Kafka Server started,集群启动成功

5、关闭 Kafka KRaft 集群

关闭命令

bin/kafka-server-stop.sh

在 3 个节点上分别执行关闭命令

6、测试 KRaft 集群

新建 maven 项目,添加 Kafka 依赖

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.0</version>
</dependency>

笔者新建 maven项目 kafka-learn

kafka-learn 项目 pom 文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.wsjzzcbq</groupId><artifactId>kafka-learn</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.0</version></dependency></dependencies><build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><configuration><source>11</source><target>11</target></configuration></plugin></plugins></build>
</project>

新建生产者 ProducerDemo

package com.wsjzzcbq;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;/*** Demo** @author wsjz* @date 2023/11/24*/
public class ProducerDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {Properties properties = new Properties();//配置集群节点信息properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.3.232:9092,192.168.2.90:9092,192.168.2.11:9092");//配置序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());Producer<String, String> producer = new KafkaProducer<>(properties);//topic 名称是demo_topicProducerRecord<String, String> producerRecord = new ProducerRecord<>("demo_topic", "明月别枝惊鹊");RecordMetadata recordMetadata = producer.send(producerRecord).get();System.out.println(recordMetadata.topic());System.out.println(recordMetadata.partition());System.out.println(recordMetadata.offset());}
}

新建消费者 ConsumerDemo

package com.wsjzzcbq;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;/*** ConsumerDemo** @author wsjz* @date 2023/11/24*/
public class ConsumerDemo {public static void main(String[] args) {Properties properties = new Properties();// 配置集群节点信息properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.3.232:9092,192.168.2.90:9092,192.168.2.11:9092");// 消费分组名properties.put(ConsumerConfig.GROUP_ID_CONFIG, "demo_group");// 序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties);// 消费者订阅主题consumer.subscribe(Arrays.asList("demo_topic"));while (true) {ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String,String> record:records) {System.out.printf("收到消息:partition=%d, offset=%d, key=%s, value=%s%n",record.partition(),record.offset(),record.key(),record.value());}}}
}

运行测试

效果图

消息成功发送并成功消费

至此完

这篇关于kafka 集群 KRaft 模式搭建的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/427612

相关文章

Mycat搭建分库分表方式

《Mycat搭建分库分表方式》文章介绍了如何使用分库分表架构来解决单表数据量过大带来的性能和存储容量限制的问题,通过在一对主从复制节点上配置数据源,并使用分片算法将数据分配到不同的数据库表中,可以有效... 目录分库分表解决的问题分库分表架构添加数据验证结果 总结分库分表解决的问题单表数据量过大带来的性能

Java汇编源码如何查看环境搭建

《Java汇编源码如何查看环境搭建》:本文主要介绍如何在IntelliJIDEA开发环境中搭建字节码和汇编环境,以便更好地进行代码调优和JVM学习,首先,介绍了如何配置IntelliJIDEA以方... 目录一、简介二、在IDEA开发环境中搭建汇编环境2.1 在IDEA中搭建字节码查看环境2.1.1 搭建步

如何在一台服务器上使用docker运行kafka集群

《如何在一台服务器上使用docker运行kafka集群》文章详细介绍了如何在一台服务器上使用Docker运行Kafka集群,包括拉取镜像、创建网络、启动Kafka容器、检查运行状态、编写启动和关闭脚本... 目录1.拉取镜像2.创建集群之间通信的网络3.将zookeeper加入到网络中4.启动kafka集群

Python基于火山引擎豆包大模型搭建QQ机器人详细教程(2024年最新)

《Python基于火山引擎豆包大模型搭建QQ机器人详细教程(2024年最新)》:本文主要介绍Python基于火山引擎豆包大模型搭建QQ机器人详细的相关资料,包括开通模型、配置APIKEY鉴权和SD... 目录豆包大模型概述开通模型付费安装 SDK 环境配置 API KEY 鉴权Ark 模型接口Prompt

IDEA中的Kafka管理神器详解

《IDEA中的Kafka管理神器详解》这款基于IDEA插件实现的Kafka管理工具,能够在本地IDE环境中直接运行,简化了设置流程,为开发者提供了更加紧密集成、高效且直观的Kafka操作体验... 目录免安装:IDEA中的Kafka管理神器!简介安装必要的插件创建 Kafka 连接第一步:创建连接第二步:选

鸿蒙开发搭建flutter适配的开发环境

《鸿蒙开发搭建flutter适配的开发环境》文章详细介绍了在Windows系统上如何创建和运行鸿蒙Flutter项目,包括使用flutterdoctor检测环境、创建项目、编译HAP包以及在真机上运... 目录环境搭建创建运行项目打包项目总结环境搭建1.安装 DevEco Studio NEXT IDE

Nacos集群数据同步方式

《Nacos集群数据同步方式》文章主要介绍了Nacos集群中服务注册信息的同步机制,涉及到负责节点和非负责节点之间的数据同步过程,以及DistroProtocol协议在同步中的应用... 目录引言负责节点(发起同步)DistroProtocolDistroSyncChangeTask获取同步数据getDis

服务器集群同步时间手记

1.时间服务器配置(必须root用户) (1)检查ntp是否安装 [root@node1 桌面]# rpm -qa|grep ntpntp-4.2.6p5-10.el6.centos.x86_64fontpackages-filesystem-1.41-1.1.el6.noarchntpdate-4.2.6p5-10.el6.centos.x86_64 (2)修改ntp配置文件 [r

HDFS—集群扩容及缩容

白名单:表示在白名单的主机IP地址可以,用来存储数据。 配置白名单步骤如下: 1)在NameNode节点的/opt/module/hadoop-3.1.4/etc/hadoop目录下分别创建whitelist 和blacklist文件 (1)创建白名单 [lytfly@hadoop102 hadoop]$ vim whitelist 在whitelist中添加如下主机名称,假如集群正常工作的节

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd