SpringBoot整合Canal+RabbitMQ监听数据变更详解

本文主要是介绍SpringBoot整合Canal+RabbitMQ监听数据变更详解,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

《SpringBoot整合Canal+RabbitMQ监听数据变更详解》在现代分布式系统中,实时获取数据库的变更信息是一个常见的需求,本文将介绍SpringBoot如何通过整合Canal和Rabbit...

需求

在现代分布式系统中,实时获取数据库的变更信息是一个常见的需求。例如,在电商系统中,当订单表发生更新时,可能需要同步这些变更到搜索服务、缓存服务或者通知其他微服务。传统的解决方案包括定时轮询数据库或通过触发器将变更写入消息队列等方法,但这些方案要么效率低下,要么实现复杂。而使用 Canal + RabbitMQ 可以提供一种高效且可靠的方式来捕获 mysql 数据库的变更,并将其发送到 RabbitMQ 中供其他服务消费。

Canal 是阿里巴巴开源的一个用于增量订阅和消费 MySQL 数据库 Binlog 的工具,它模拟 MySQL 主从复制机制,无需侵入业务逻辑即可捕获数据库变更。RabbitMQ 是一个流行的开源消息代理,支持多种协议并提供了丰富的特性来确保消息传递的可靠性。结合这两者,可以构建一个强大的实时数据变更监听和处理系统。

步骤

环境搭建

整合SpringBoot与Canal实现客户端

Canal整合RabbitMQ

SpringBoot整合RabbitMQ

环境搭建

1. 安装MySQL

确保你有一个正在运行的 MySQL 实例,并启用了 binlog 日志记录功能。这是 Canal 捕获数据库变更的基础。

# 修改 MySQL 配置文件 my.cnf 或 my.ini
[mysqld]
server-id=1
log-bin=mysql-bin
binlog-format=ROW

重启 MySQL 服务使配置生效。

2. 安装Canal Server

下载最新版本的 Canal Server 并解压到合适的位置。根据官方文档进行python必要的配置,特别是 instance.properties 文件中的数据库连接信息。

3. 安装RabbitMQ

可以通过 docker 快速安装 RabbitMQ:

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management

访问 http://localhost:15672 登录管理界面,默认用户名/密码为 guest/guest。

整合SpringBoot与Canal实现客户端

创建SpringBoot项目

使用 Spring Initializr 创建一个新的 Spring Boot 项目,添加 Web, JPA, 和 AMQP(用于后续整合 RabbitMQ)依赖。

引入Canal依赖

在 pom.XML 中添加 Canal Client 的依赖:

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.5</version>
</dependency>

编写Canal客户端代码

创建一个 Canal 客户端类,用来监听 MySQL 数据库的变化,并将变更事件转发给 RabbitMQ。

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.rabbitmq.client.Channel;

public class CanalClient {

    private final CanalConnector connector;
    private final Channel channel;

    public CanalClient(CanalConnector connector, Channel channel) {
        this.connector = connector;
        this.channel = channel;
    }

    public void start() {
        // Canal 连接配置
        connector.connect();
        connector.subscribe(".*\\..*"); // 订阅所有数据库和表
        connector.rollback();

        while (true) {
            int BATchSize = 1000;
            EntryBatch batch = connector.getWithoutAck(batchSize); // 获取一批次数据
            long batchId = batch.getId();
            int size = batch.getEntries().size();

            if (batchId == -1 || size == 0) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                printEntry(batch.getEntries());
                connector.ack(batchId); // 提交确认
            }

            if (Thread.currentThread().isInterrupted()) {
                break;
            }
        }

        connector.disconnect();
    }

    private void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }

            RowChanjavascriptge rowChage = null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
            }

            EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.DELETE) {
                    sendToRabbitMQ(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    sendToRabbitMQ(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------> before");
                    sendToRabbitMQ(rowData.getBeforeColumnsList());

                    System.out.println("-------> after");
                    sendToRabbitMQ(rowData.getAfterColumnsList());
           China编程     }
            }
        }
    }

    private void sendToRabbitMQ(List<Column> columns) {
        StringBuilder message = new StringBuilder();
        for (Column column : columns) {
            message.append(column.getName()).append("=").append(column.getValue()).append(",");
        }
        try {
            channel.basicPublish("", "canal_exchange", null, message.toString().getBytes());
        } catch (IOException e) {
            e.printStackTrawww.chinasem.cnce();
        }
    }
}

Canal整合RabbitMQ

配置Canal Server

确保 Canal Server 已正确配置并启动,能够监听 MySQL 的 binlog 日志。修改 Canal Server 的配置文件以指向你的 MySQL 实例,并设置适当的过滤规则。

配置RabbitMQ Exchange

在 RabbitMQ 中创建一个名为 canal_exchange 的 exchange,类型可以根据需要选择,如 fanout, direct, topic 或 headers。

rabbitmqadmin declare exchange name=canal_exchange type=fanout

SpringBoot整合RabbitMQ

添加依赖

确保在 pom.xml 中已经包含了 RabbitMQ 的 Spring AMQP 依赖。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置RabbitMQ连接信息

在 application.yml 或 application.properties 中配置 RabbitMQ 的连接参数。

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest

创建消费者

编写一个消费者类来接收来自 RabbitMQ 的消息。

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class CanalMessageConsumer {

    @RabbitListener(queues = "canal_queue")
    public void receive(String message) {
        System.out.println("Received message: " + message);
    }
}

配置队列和绑定

确保在应用程序启动时自动创建所需的队列,并将它们绑定到之前创建的 exchange 上。

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    @Bean
    public Queue canalQueue() {
        return new Queue("canal_queue", false);
    }

    @Bean
    public TopicExchange canalExchange() {
        return new TopicExchange("canal_exchange");
    }

    @Bean
    public Binding binding(Queue canalQueue, TopicExchange canalExchange) {
        return BindingBuilder.bind(canalQueue).to(canalExchange).with("#");
    }
}

总结

通过以上步骤,我们成功地将 Canal 与 RabbitMQ 整合到了 Spring Boot 应用程序中。这使得我们可以实时监听 MySQL 数据库的变更,并将这些变更作为消息发布到 RabbitMQ 中供其他微服务消费。这种方法不仅提高了系统的响应速度,也简化了数据同步的过程,降低了开发和维护成本。

到此这篇关于SpringBoot整合Canal+RabbitMQ监听数据变更详解的文章就介绍到这了,更多相关SpringBoot监听数据变更内容请搜索编程China编程(www.chinasem.cn)以前的文章或继www.chinasem.cn续浏览下面的相关文章希望大家以后多多支持China编程(www.chinasem.cn)!

这篇关于SpringBoot整合Canal+RabbitMQ监听数据变更详解的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1152860

相关文章

Flutter监听当前页面可见与隐藏状态的代码详解

《Flutter监听当前页面可见与隐藏状态的代码详解》文章介绍了如何在Flutter中使用路由观察者来监听应用进入前台或后台状态以及页面的显示和隐藏,并通过代码示例讲解的非常详细,需要的朋友可以参考下... flutter 可以监听 app 进入前台还是后台状态,也可以监听当http://www.cppcn

C++ Primer 标准库vector示例详解

《C++Primer标准库vector示例详解》该文章主要介绍了C++标准库中的vector类型,包括其定义、初始化、成员函数以及常见操作,文章详细解释了如何使用vector来存储和操作对象集合,... 目录3.3标准库Vector定义和初始化vector对象通列表初始化vector对象创建指定数量的元素值

使用Java发送邮件到QQ邮箱的完整指南

《使用Java发送邮件到QQ邮箱的完整指南》在现代软件开发中,邮件发送功能是一个常见的需求,无论是用户注册验证、密码重置,还是系统通知,邮件都是一种重要的通信方式,本文将详细介绍如何使用Java编写程... 目录引言1. 准备工作1.1 获取QQ邮箱的SMTP授权码1.2 添加JavaMail依赖2. 实现

MyBatis与其使用方法示例详解

《MyBatis与其使用方法示例详解》MyBatis是一个支持自定义SQL的持久层框架,通过XML文件实现SQL配置和数据映射,简化了JDBC代码的编写,本文给大家介绍MyBatis与其使用方法讲解,... 目录ORM缺优分析MyBATisMyBatis的工作流程MyBatis的基本使用环境准备MyBati

Java嵌套for循环优化方案分享

《Java嵌套for循环优化方案分享》介绍了Java中嵌套for循环的优化方法,包括减少循环次数、合并循环、使用更高效的数据结构、并行处理、预处理和缓存、算法优化、尽量减少对象创建以及本地变量优化,通... 目录Java 嵌套 for 循环优化方案1. 减少循环次数2. 合并循环3. 使用更高效的数据结构4

java两个List的交集,并集方式

《java两个List的交集,并集方式》文章主要介绍了Java中两个List的交集和并集的处理方法,推荐使用Apache的CollectionUtils工具类,因为它简单且不会改变原有集合,同时,文章... 目录Java两个List的交集,并集方法一方法二方法三总结java两个List的交集,并集方法一

Spring AI集成DeepSeek三步搞定Java智能应用的详细过程

《SpringAI集成DeepSeek三步搞定Java智能应用的详细过程》本文介绍了如何使用SpringAI集成DeepSeek,一个国内顶尖的多模态大模型,SpringAI提供了一套统一的接口,简... 目录DeepSeek 介绍Spring AI 是什么?Spring AI 的主要功能包括1、环境准备2

Spring AI集成DeepSeek实现流式输出的操作方法

《SpringAI集成DeepSeek实现流式输出的操作方法》本文介绍了如何在SpringBoot中使用Sse(Server-SentEvents)技术实现流式输出,后端使用SpringMVC中的S... 目录一、后端代码二、前端代码三、运行项目小天有话说题外话参考资料前面一篇文章我们实现了《Spring

Spring AI与DeepSeek实战一之快速打造智能对话应用

《SpringAI与DeepSeek实战一之快速打造智能对话应用》本文详细介绍了如何通过SpringAI框架集成DeepSeek大模型,实现普通对话和流式对话功能,步骤包括申请API-KEY、项目搭... 目录一、概述二、申请DeepSeek的API-KEY三、项目搭建3.1. 开发环境要求3.2. mav

Springboot的自动配置是什么及注意事项

《Springboot的自动配置是什么及注意事项》SpringBoot的自动配置(Auto-configuration)是指框架根据项目的依赖和应用程序的环境自动配置Spring应用上下文中的Bean... 目录核心概念:自动配置的关键特点:自动配置工作原理:示例:需要注意的点1.默认配置可能不适合所有场景