Java中Springboot集成Kafka实现消息发送和接收功能

2025-01-25 04:50

本文主要是介绍Java中Springboot集成Kafka实现消息发送和接收功能,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

《Java中Springboot集成Kafka实现消息发送和接收功能》Kafka是一个高吞吐量的分布式发布-订阅消息系统,主要用于处理大规模数据流,它由生产者、消费者、主题、分区和代理等组件构成,Ka...

一、Kafka 简介

Kafka 是由 Apache 软件基金会开发的一个开源流处理平台,最初由 LinkedIn 公司开发,并于 2011 年开源。它是一种高吞吐量的分布式发布 - 订阅消息系统,以可持久化、高吞吐、低延迟、高容错等特性而著称。
Kafka 主要由生产者(Producer)、消费者(Consumer)、主题(Topic)、分区(Partition)和代理(Broker)等组件构成。生产者负责将数据发送到 Kafka 集群,消费者从集群中读取数据。主题是一种逻辑上的分类,数据被发送到特定的主题。每个主题又可以划分为多个分区,以实现数据的并行处理和提高系统的可扩展性。代理则是 Kafka 集群中的服务器节点,负责接收和存储生产者发送的数据,并为消费者提供数据读取服务。

二、Kafka 功能

消息队列功能:Kafka 可以作为消息队列使用,在应用程序之间传递消息。生产者将消息发送到主题,不同的消费者可以从主题中订阅并消费消息,实现应用程序解耦。例如,在电商系统中,订单生成模块可以将订单消息发送到 Kafka 主题,后续的库存管理、物流配送等模块可以从该主题消费订单消息,各自独立处理,降低模块间的耦合度。
数据存储功能:Kafka 具有持久化存储能力,它将消息数据存储在磁盘上,并且通过多副本机制保证数据的可靠性。即使某个节点出现故障,数据也不会丢失。这种特性使得 Kafka 不仅可以作为消息队列,还能用于数据的长期存储和备份,例如用于存储系统的操作日志,方便后续的数据分析和故障排查。
流处理功能:Kafka 可以与流处理框架(如 Apache Flink、Spark Streaming 等)集成,对实时数据流进行处理。通过将实时数据发送到 Kafka 主题,流处理框架可以从主题中读取数据并进行实时计算、分析和转换。例如,在实时监控系统中,通过 Kafka 收集服务器的性能指标数据,然后使用流处理框架对这些数据进行实时分析,及时发现性能异常并发出警报。

三、POM依赖

    <!-- kafka-->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>2.8.11</version>
    </dependency>

四、配置文件

spring:
  # Kafka 配置
  kafka:
    # Kafka 服务器地址和端口 代理地址,可以多个
    bootstrap-servers: IP:9092
    # 生产者配置
    producer:
      # 发送失败时的重试次数
      retries: 3
      # 每次批量发送消息的数量,调整为较小值
      BATch-size: 1
      # 生产者缓冲区大小
      buffer-memory: 33554432
      # 消息 key 的序列化器,将 key 序列化为字节数组
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
 www.chinasem.cn     # 消息 value 的序列化器,将消息体序列化为字节数组
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    # 消费者配置
    consumer:
      # 当没有初始偏移量或当前偏移量不存在时,从最早的消息开始消费
      auto-offset-reset: earliest
      # 是否自动提交偏移量
      enable-auto-commit: true
      # 自动提交偏移量的时间间隔(毫秒),延长自动提交时间间隔
      auto-commit-interval: 1000
      # 消息 key 的反序列化器,将字节数组反序列化为 key
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 消息 value 的反序列化器,将字节数组反序列化为消息体
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

五、生产者

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
/**
 * 生产者
 *
 * @author chenlei
 */
@Slf4j
@Component
public class KafkaProducer {
    /**
     * KafkaTemplate
     */
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    /**
     * 发送消息到指定的 Kafka 主题,并可指定分组信息
     *
     * @param topic   消息要发送到的 Kafka 主题
     * @param message 要发送的消息内容
     */
    public void sendMessage(String topic, String message) {
        // 使用 KafkaTemplate 发送消息,将消息发送到指定的主题
        ListenableFuture<SendResult<String, Strwww.chinasem.cning>> future = kafkaTemplate.send(topic, message);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
                // 消息发送成功后的处理逻辑,可根据需要添加
                log.info("已发送消息=[" + message + "],其偏移量=[" + result.getRecordMetadata().offset() + "]");
            }
            @Override
            public void onFailure(Throwable ex) {
                // 消息发送失败后的处理逻辑,使用日志记录异常
                log.error("发送消息=[" + message + "] 失败", ex);
            }
        });
    }
}

六、消费者

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stphpereotype.Component;
/**
 * @author 消费者
 * chenlei
 */
@Slf4j
@Component
public class KafkaConsumer {
    /**
     * 监听 Kafka 主题方法。
     *
     * @param record 从 Kafka 接收到的 ConsumerRecord,包含消息的键值对
     */
    @KafkaListener(topics = {"topic"}, groupId = "consumer.group-id", concurrency = "5")
    public void listen(ConsumerRecord<?, ?> record) {
        // 打印接收到的消息的详细信息
        log.info("接收到 Kafka 消息: 主题 = {}, 分区 = {}, 偏移量 = {}, 键 = {}, 值 = {}",
                record.topic(), China编程record.partition(), record.ohttp://www.chinasem.cnffset(), record.key(), record.value());
    }
}

到此这篇关于Java中Springboot集成Kafka实现消息发送和接收的文章就介绍到这了,更多相关Springboot Kafka 消息发送和接收内容请搜索China编程(www.chinasem.cn)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程China编程(www.chinasem.cn)!

这篇关于Java中Springboot集成Kafka实现消息发送和接收功能的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1153224

相关文章

Java数组初始化的五种方式

《Java数组初始化的五种方式》数组是Java中最基础且常用的数据结构之一,其初始化方式多样且各具特点,本文详细讲解Java数组初始化的五种方式,分析其适用场景、优劣势对比及注意事项,帮助避免常见陷阱... 目录1. 静态初始化:简洁但固定代码示例核心特点适用场景注意事项2. 动态初始化:灵活但需手动管理代

Java使用SLF4J记录不同级别日志的示例详解

《Java使用SLF4J记录不同级别日志的示例详解》SLF4J是一个简单的日志门面,它允许在运行时选择不同的日志实现,这篇文章主要为大家详细介绍了如何使用SLF4J记录不同级别日志,感兴趣的可以了解下... 目录一、SLF4J简介二、添加依赖三、配置Logback四、记录不同级别的日志五、总结一、SLF4J

将Java项目提交到云服务器的流程步骤

《将Java项目提交到云服务器的流程步骤》所谓将项目提交到云服务器即将你的项目打成一个jar包然后提交到云服务器即可,因此我们需要准备服务器环境为:Linux+JDK+MariDB(MySQL)+Gi... 目录1. 安装 jdk1.1 查看 jdk 版本1.2 下载 jdk2. 安装 mariadb(my

使用Python实现一个优雅的异步定时器

《使用Python实现一个优雅的异步定时器》在Python中实现定时器功能是一个常见需求,尤其是在需要周期性执行任务的场景下,本文给大家介绍了基于asyncio和threading模块,可扩展的异步定... 目录需求背景代码1. 单例事件循环的实现2. 事件循环的运行与关闭3. 定时器核心逻辑4. 启动与停

基于Python实现读取嵌套压缩包下文件的方法

《基于Python实现读取嵌套压缩包下文件的方法》工作中遇到的问题,需要用Python实现嵌套压缩包下文件读取,本文给大家介绍了详细的解决方法,并有相关的代码示例供大家参考,需要的朋友可以参考下... 目录思路完整代码代码优化思路打开外层zip压缩包并遍历文件:使用with zipfile.ZipFil

Python实现word文档内容智能提取以及合成

《Python实现word文档内容智能提取以及合成》这篇文章主要为大家详细介绍了如何使用Python实现从10个左右的docx文档中抽取内容,再调整语言风格后生成新的文档,感兴趣的小伙伴可以了解一下... 目录核心思路技术路径实现步骤阶段一:准备工作阶段二:内容提取 (python 脚本)阶段三:语言风格调

SpringBoot中配置Redis连接池的完整指南

《SpringBoot中配置Redis连接池的完整指南》这篇文章主要为大家详细介绍了SpringBoot中配置Redis连接池的完整指南,文中的示例代码讲解详细,具有一定的借鉴价值,感兴趣的小伙伴可以... 目录一、添加依赖二、配置 Redis 连接池三、测试 Redis 操作四、完整示例代码(一)pom.

Java 正则表达式URL 匹配与源码全解析

《Java正则表达式URL匹配与源码全解析》在Web应用开发中,我们经常需要对URL进行格式验证,今天我们结合Java的Pattern和Matcher类,深入理解正则表达式在实际应用中... 目录1.正则表达式分解:2. 添加域名匹配 (2)3. 添加路径和查询参数匹配 (3) 4. 最终优化版本5.设计思

C#实现将Excel表格转换为图片(JPG/ PNG)

《C#实现将Excel表格转换为图片(JPG/PNG)》Excel表格可能会因为不同设备或字体缺失等问题,导致格式错乱或数据显示异常,转换为图片后,能确保数据的排版等保持一致,下面我们看看如何使用C... 目录通过C# 转换Excel工作表到图片通过C# 转换指定单元格区域到图片知识扩展C# 将 Excel

Java使用ANTLR4对Lua脚本语法校验详解

《Java使用ANTLR4对Lua脚本语法校验详解》ANTLR是一个强大的解析器生成器,用于读取、处理、执行或翻译结构化文本或二进制文件,下面就跟随小编一起看看Java如何使用ANTLR4对Lua脚本... 目录什么是ANTLR?第一个例子ANTLR4 的工作流程Lua脚本语法校验准备一个Lua Gramm