SpringBoot整合Canal+RabbitMQ监听数据变更详解

本文主要是介绍SpringBoot整合Canal+RabbitMQ监听数据变更详解,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

《SpringBoot整合Canal+RabbitMQ监听数据变更详解》在现代分布式系统中,实时获取数据库的变更信息是一个常见的需求,本文将介绍SpringBoot如何通过整合Canal和Rabbit...

需求

在现代分布式系统中,实时获取数据库的变更信息是一个常见的需求。例如,在电商系统中,当订单表发生更新时,可能需要同步这些变更到搜索服务、缓存服务或者通知其他微服务。传统的解决方案包括定时轮询数据库或通过触发器将变更写入消息队列等方法,但这些方案要么效率低下,要么实现复杂。而使用 Canal + RabbitMQ 可以提供一种高效且可靠的方式来捕获 mysql 数据库的变更,并将其发送到 RabbitMQ 中供其他服务消费。

Canal 是阿里巴巴开源的一个用于增量订阅和消费 MySQL 数据库 Binlog 的工具,它模拟 MySQL 主从复制机制,无需侵入业务逻辑即可捕获数据库变更。RabbitMQ 是一个流行的开源消息代理,支持多种协议并提供了丰富的特性来确保消息传递的可靠性。结合这两者,可以构建一个强大的实时数据变更监听和处理系统。

步骤

环境搭建

整合SpringBoot与Canal实现客户端

Canal整合RabbitMQ

SpringBoot整合RabbitMQ

环境搭建

1. 安装MySQL

确保你有一个正在运行的 MySQL 实例,并启用了 binlog 日志记录功能。这是 Canal 捕获数据库变更的基础。

# 修改 MySQL 配置文件 my.cnf 或 my.ini
[mysqld]
server-id=1
log-bin=mysql-bin
binlog-format=ROW

重启 MySQL 服务使配置生效。

2. 安装Canal Server

下载最新版本的 Canal Server 并解压到合适的位置。根据官方文档进行python必要的配置,特别是 instance.properties 文件中的数据库连接信息。

3. 安装RabbitMQ

可以通过 docker 快速安装 RabbitMQ:

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management

访问 http://localhost:15672 登录管理界面,默认用户名/密码为 guest/guest。

整合SpringBoot与Canal实现客户端

创建SpringBoot项目

使用 Spring Initializr 创建一个新的 Spring Boot 项目,添加 Web, JPA, 和 AMQP(用于后续整合 RabbitMQ)依赖。

引入Canal依赖

在 pom.XML 中添加 Canal Client 的依赖:

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.5</version>
</dependency>

编写Canal客户端代码

创建一个 Canal 客户端类,用来监听 MySQL 数据库的变化,并将变更事件转发给 RabbitMQ。

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.rabbitmq.client.Channel;

public class CanalClient {

    private final CanalConnector connector;
    private final Channel channel;

    public CanalClient(CanalConnector connector, Channel channel) {
        this.connector = connector;
        this.channel = channel;
    }

    public void start() {
        // Canal 连接配置
        connector.connect();
        connector.subscribe(".*\\..*"); // 订阅所有数据库和表
        connector.rollback();

        while (true) {
            int BATchSize = 1000;
            EntryBatch batch = connector.getWithoutAck(batchSize); // 获取一批次数据
            long batchId = batch.getId();
            int size = batch.getEntries().size();

            if (batchId == -1 || size == 0) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                printEntry(batch.getEntries());
                connector.ack(batchId); // 提交确认
            }

            if (Thread.currentThread().isInterrupted()) {
                break;
            }
        }

        connector.disconnect();
    }

    private void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }

            RowChanjavascriptge rowChage = null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
            }

            EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.DELETE) {
                    sendToRabbitMQ(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    sendToRabbitMQ(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------> before");
                    sendToRabbitMQ(rowData.getBeforeColumnsList());

                    System.out.println("-------> after");
                    sendToRabbitMQ(rowData.getAfterColumnsList());
           China编程     }
            }
        }
    }

    private void sendToRabbitMQ(List<Column> columns) {
        StringBuilder message = new StringBuilder();
        for (Column column : columns) {
            message.append(column.getName()).append("=").append(column.getValue()).append(",");
        }
        try {
            channel.basicPublish("", "canal_exchange", null, message.toString().getBytes());
        } catch (IOException e) {
            e.printStackTrawww.chinasem.cnce();
        }
    }
}

Canal整合RabbitMQ

配置Canal Server

确保 Canal Server 已正确配置并启动,能够监听 MySQL 的 binlog 日志。修改 Canal Server 的配置文件以指向你的 MySQL 实例,并设置适当的过滤规则。

配置RabbitMQ Exchange

在 RabbitMQ 中创建一个名为 canal_exchange 的 exchange,类型可以根据需要选择,如 fanout, direct, topic 或 headers。

rabbitmqadmin declare exchange name=canal_exchange type=fanout

SpringBoot整合RabbitMQ

添加依赖

确保在 pom.xml 中已经包含了 RabbitMQ 的 Spring AMQP 依赖。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置RabbitMQ连接信息

在 application.yml 或 application.properties 中配置 RabbitMQ 的连接参数。

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest

创建消费者

编写一个消费者类来接收来自 RabbitMQ 的消息。

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class CanalMessageConsumer {

    @RabbitListener(queues = "canal_queue")
    public void receive(String message) {
        System.out.println("Received message: " + message);
    }
}

配置队列和绑定

确保在应用程序启动时自动创建所需的队列,并将它们绑定到之前创建的 exchange 上。

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    @Bean
    public Queue canalQueue() {
        return new Queue("canal_queue", false);
    }

    @Bean
    public TopicExchange canalExchange() {
        return new TopicExchange("canal_exchange");
    }

    @Bean
    public Binding binding(Queue canalQueue, TopicExchange canalExchange) {
        return BindingBuilder.bind(canalQueue).to(canalExchange).with("#");
    }
}

总结

通过以上步骤,我们成功地将 Canal 与 RabbitMQ 整合到了 Spring Boot 应用程序中。这使得我们可以实时监听 MySQL 数据库的变更,并将这些变更作为消息发布到 RabbitMQ 中供其他微服务消费。这种方法不仅提高了系统的响应速度,也简化了数据同步的过程,降低了开发和维护成本。

到此这篇关于SpringBoot整合Canal+RabbitMQ监听数据变更详解的文章就介绍到这了,更多相关SpringBoot监听数据变更内容请搜索编程China编程(www.chinasem.cn)以前的文章或继www.chinasem.cn续浏览下面的相关文章希望大家以后多多支持China编程(www.chinasem.cn)!

这篇关于SpringBoot整合Canal+RabbitMQ监听数据变更详解的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1152860

相关文章

IDEA运行spring项目时,控制台未出现的解决方案

《IDEA运行spring项目时,控制台未出现的解决方案》文章总结了在使用IDEA运行代码时,控制台未出现的问题和解决方案,问题可能是由于点击图标或重启IDEA后控制台仍未显示,解决方案提供了解决方法... 目录问题分析解决方案总结问题js使用IDEA,点击运行按钮,运行结束,但控制台未出现http://

解决Spring运行时报错:Consider defining a bean of type ‘xxx.xxx.xxx.Xxx‘ in your configuration

《解决Spring运行时报错:Considerdefiningabeanoftype‘xxx.xxx.xxx.Xxx‘inyourconfiguration》该文章主要讲述了在使用S... 目录问题分析解决方案总结问题Description:Parameter 0 of constructor in x

解决IDEA使用springBoot创建项目,lombok标注实体类后编译无报错,但是运行时报错问题

《解决IDEA使用springBoot创建项目,lombok标注实体类后编译无报错,但是运行时报错问题》文章详细描述了在使用lombok的@Data注解标注实体类时遇到编译无误但运行时报错的问题,分析... 目录问题分析问题解决方案步骤一步骤二步骤三总结问题使用lombok注解@Data标注实体类,编译时

JSON字符串转成java的Map对象详细步骤

《JSON字符串转成java的Map对象详细步骤》:本文主要介绍如何将JSON字符串转换为Java对象的步骤,包括定义Element类、使用Jackson库解析JSON和添加依赖,文中通过代码介绍... 目录步骤 1: 定义 Element 类步骤 2: 使用 Jackson 库解析 jsON步骤 3: 添

Java中注解与元数据示例详解

《Java中注解与元数据示例详解》Java注解和元数据是编程中重要的概念,用于描述程序元素的属性和用途,:本文主要介绍Java中注解与元数据的相关资料,文中通过代码介绍的非常详细,需要的朋友可以参... 目录一、引言二、元数据的概念2.1 定义2.2 作用三、Java 注解的基础3.1 注解的定义3.2 内

将sqlserver数据迁移到mysql的详细步骤记录

《将sqlserver数据迁移到mysql的详细步骤记录》:本文主要介绍将SQLServer数据迁移到MySQL的步骤,包括导出数据、转换数据格式和导入数据,通过示例和工具说明,帮助大家顺利完成... 目录前言一、导出SQL Server 数据二、转换数据格式为mysql兼容格式三、导入数据到MySQL数据

Java中使用Java Mail实现邮件服务功能示例

《Java中使用JavaMail实现邮件服务功能示例》:本文主要介绍Java中使用JavaMail实现邮件服务功能的相关资料,文章还提供了一个发送邮件的示例代码,包括创建参数类、邮件类和执行结... 目录前言一、历史背景二编程、pom依赖三、API说明(一)Session (会话)(二)Message编程客

Java中List转Map的几种具体实现方式和特点

《Java中List转Map的几种具体实现方式和特点》:本文主要介绍几种常用的List转Map的方式,包括使用for循环遍历、Java8StreamAPI、ApacheCommonsCollect... 目录前言1、使用for循环遍历:2、Java8 Stream API:3、Apache Commons

C++中使用vector存储并遍历数据的基本步骤

《C++中使用vector存储并遍历数据的基本步骤》C++标准模板库(STL)提供了多种容器类型,包括顺序容器、关联容器、无序关联容器和容器适配器,每种容器都有其特定的用途和特性,:本文主要介绍C... 目录(1)容器及简要描述‌php顺序容器‌‌关联容器‌‌无序关联容器‌(基于哈希表):‌容器适配器‌:(

JavaScript中的isTrusted属性及其应用场景详解

《JavaScript中的isTrusted属性及其应用场景详解》在现代Web开发中,JavaScript是构建交互式应用的核心语言,随着前端技术的不断发展,开发者需要处理越来越多的复杂场景,例如事件... 目录引言一、问题背景二、isTrusted 属性的来源与作用1. isTrusted 的定义2. 为