使用canal增量同步ES索引库数据

2024-08-24 02:44

本文主要是介绍使用canal增量同步ES索引库数据,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Canal增量数据同步利器

Canal介绍

canal主要用途是基于 MySQL 数据库增量日志解析,并能提供增量数据订阅和消费,应用场景十分丰富。

github地址:https://github.com/alibaba/canal

版本下载地址:https://github.com/alibaba/canal/releases

文档地址:https://github.com/alibaba/canal/wiki/Docker-QuickStart

Canal应用场景

1.电商场景下商品、用户实时更新同步到至Elasticsearch、solr等搜索引擎;
2.价格、库存发生变更实时同步到redis;
3.数据库异地备份、数据同步;
4.代替使用轮询数据库方式来监控数据库变更,有效改善轮询耗费数据库资源。
在这里插入图片描述

MySQL主从复制原理

1.MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
2.MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
3.MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

Canal工作原理

1.canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
2.MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
3.canal 解析 binary log 对象(原始为 byte 流)
在这里插入图片描述

Canal安装

参考文档:https://github.com/alibaba/canal/wiki/QuickStart

MySQL Bin-log开启

1)MySQL开启bin-log

a.进入mysql容器

docker exec -it -u root mysql /bin/bash

b.开启mysql的binlog

cd /etc/mysql/mysql.conf.d在mysqld.cnf最下面添加如下配置
# 开启 binlog
log-bin=/var/lib/mysql/mysql-bin
# 选择 ROW 模式
binlog-format=ROW
# 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
server-id=12345

c.创建账号并授权

授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant:

create user canal@'%' IDENTIFIED by 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

d.重启mysql

docker restart mysql

开启bin-log后,我们可以用sql语句查看下:

show variables like '%log_bin%'

效果如下:
在这里插入图片描述

Canal安装

1)拉取镜像

docker pull canal/canal-server:v1.1.1

2)安装容器

a.安装canal-server容器

docker run -p 11111:11111 --name canal -d docker.io/canal/canal-server

b.配置canal-server

修改/home/admin/canal-server/conf/canal.properties,将它的id属性修改成和mysql数据库中server-id不同的值,如下图:
在这里插入图片描述
c.修改/home/admin/canal-server/conf/example/instance.properties,配置要监听的数据库服务地址和监听数据变化的数据库以及表,修改如下:
在这里插入图片描述
在这里插入图片描述
指定监听数据库表的配置如下canal.instance.filter.regex:

mysql 数据解析关注的表,Perl正则表达式.
多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\) 
常见例子:
1.  所有表:.*   or  .*\\..*
2.  canal schema下所有表: canal\\..*
3.  canal下的以canal打头的表:canal\\.canal.*
4.  canal schema下的一张表:canal.test1
5.  多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)
注意:此过滤条件只针对row模式的数据有效(ps. mixed/statement因为不解析sql,所以无法准确提取tableName进行过滤)

重启canal:

docker restart canal
Canal微服务

​ 我们搭建一个微服务,用于读取canal监听到的变更日志,微服务名字叫seckill-canal。该项目我们需要引入canal-spring-boot-autoconfigure包,并且需要实现EntryHandler接口,该接口中有3个方法,分别为insert、update、delete,这三个方法用于监听数据增删改变化。

参考地址:https://github.com/NormanGyllenhaal/canal-client

1)pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>seckill-service</artifactId><groupId>com.seckill</groupId><version>0.0.1-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>seckill-canal</artifactId><dependencies><!--web--><dependency><groupId>com.seckill</groupId><artifactId>seckill-web</artifactId><version>0.0.1-SNAPSHOT</version></dependency><!--esAPI--><dependency><groupId>com.seckill</groupId><artifactId>seckill-search-api</artifactId><version>0.0.1-SNAPSHOT</version></dependency><!--goodsAPI--><dependency><groupId>com.seckill</groupId><artifactId>seckill-goods-api</artifactId><version>0.0.1-SNAPSHOT</version></dependency><!--canal--><dependency><groupId>top.javatool</groupId><artifactId>canal-spring-boot-autoconfigure</artifactId><version>1.2.1-RELEASE</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><!-- 指定该Main Class为全局的唯一入口 --><mainClass>com.seckill.CanalApplication</mainClass><layout>ZIP</layout></configuration><executions><execution><goals><goal>repackage</goal><!--可以把依赖的包都打包到生成的Jar包中--></goals></execution></executions></plugin></plugins></build>
</project>

bootstrap.yml配置

server:port: 18088
spring:application:name: seckill-canalcloud:nacos:config:file-extension: yamlserver-addr: nacos-server:8848discovery:#Nacos的注册地址server-addr: nacos-server:8848
#超时配置
ribbon:ReadTimeout: 3000000
#Canal配置
canal:server: canal-server:11111destination: example
#日志
logging:level:root: error

2)创建com.seckill.handler.SkuHandler实现EntryHandler接口,代码如下:

@Component
@CanalTable(value = "tb_sku")
public class SkuHandler implements EntryHandler<Sku> {/**** 增加数据* @param sku*/@Overridepublic void insert(Sku sku) {System.out.println("===========insert:"+sku);}/**** 修改数据* @param before* @param after*/@Overridepublic void update(Sku before, Sku after) {System.out.println("===========update-before:"+before);System.out.println("===========update-after:"+after);}/**** 删除数据* @param sku*/@Overridepublic void delete(Sku sku) {System.out.println("===========delete:"+sku);}
}

3)创建启动类

@SpringBootApplication
public class CanalApplication {public static void main(String[] args) {SpringApplication.run(CanalApplication.class,args);}
}

程序启动后,修改tb_sku数据,可以看到控制会打印修改前后的数据:
在这里插入图片描述

索引库同步

当tb_sku秒杀商品发生变化时,我们应该同时变更索引库中的索引数据,比如秒杀商品增加,则需要同步增加秒杀商品的索引,如果有秒杀商品删除,则需要同步移除秒杀商品。

修改seckill-canal中的com.seckill.handler.SkuHandler的增删改方法,代码如下:

@Component
@CanalTable(value = "tb_sku")
public class SkuHandler implements EntryHandler<Sku> {@Autowiredprivate SkuInfoFeign skuInfoFeign;/**** 增加数据* @param sku*/@Overridepublic void insert(Sku sku) {//将Sku转换成SkuInfoSkuInfo skuInfo = JSON.parseObject( JSON.toJSONString(sku) ,SkuInfo.class);//同步索引skuInfoFeign.modify(1,skuInfo);}/**** 修改数据* @param before* @param after*/@Overridepublic void update(Sku before, Sku after) {int type=2;//将Sku转换成SkuInfoSkuInfo skuInfo = JSON.parseObject( JSON.toJSONString(after) ,SkuInfo.class);if(skuInfo.getStatus()==1 || after.getSeckillNum()<=0){//商品变成了普通商品,或者商品库存为0,则需要删除索引数据type=3;}//同步索引skuInfoFeign.modify(type,skuInfo);}/**** 删除数据* @param sku*/@Overridepublic void delete(Sku sku) {//将Sku转换成SkuInfoSkuInfo skuInfo = JSON.parseObject( JSON.toJSONString(sku) ,SkuInfo.class);//同步索引skuInfoFeign.modify(3,skuInfo);}
}

开启Feign功能:@EnableFeignClients(basePackages = {“com.seckill.search.feign”})
在这里插入图片描述
此时对数据库中tb_sku表进行增删改的时候,会同步到索引库中。

这篇关于使用canal增量同步ES索引库数据的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1101225

相关文章

C#使用yield关键字实现提升迭代性能与效率

《C#使用yield关键字实现提升迭代性能与效率》yield关键字在C#中简化了数据迭代的方式,实现了按需生成数据,自动维护迭代状态,本文主要来聊聊如何使用yield关键字实现提升迭代性能与效率,感兴... 目录前言传统迭代和yield迭代方式对比yield延迟加载按需获取数据yield break显式示迭

使用SQL语言查询多个Excel表格的操作方法

《使用SQL语言查询多个Excel表格的操作方法》本文介绍了如何使用SQL语言查询多个Excel表格,通过将所有Excel表格放入一个.xlsx文件中,并使用pandas和pandasql库进行读取和... 目录如何用SQL语言查询多个Excel表格如何使用sql查询excel内容1. 简介2. 实现思路3

java脚本使用不同版本jdk的说明介绍

《java脚本使用不同版本jdk的说明介绍》本文介绍了在Java中执行JavaScript脚本的几种方式,包括使用ScriptEngine、Nashorn和GraalVM,ScriptEngine适用... 目录Java脚本使用不同版本jdk的说明1.使用ScriptEngine执行javascript2.

c# checked和unchecked关键字的使用

《c#checked和unchecked关键字的使用》C#中的checked关键字用于启用整数运算的溢出检查,可以捕获并抛出System.OverflowException异常,而unchecked... 目录在 C# 中,checked 关键字用于启用整数运算的溢出检查。默认情况下,C# 的整数运算不会自

在MyBatis的XML映射文件中<trim>元素所有场景下的完整使用示例代码

《在MyBatis的XML映射文件中<trim>元素所有场景下的完整使用示例代码》在MyBatis的XML映射文件中,trim元素用于动态添加SQL语句的一部分,处理前缀、后缀及多余的逗号或连接符,示... 在MyBATis的XML映射文件中,<trim>元素用于动态地添加SQL语句的一部分,例如SET或W

Mybatis官方生成器的使用方式

《Mybatis官方生成器的使用方式》本文详细介绍了MyBatisGenerator(MBG)的使用方法,通过实际代码示例展示了如何配置Maven插件来自动化生成MyBatis项目所需的实体类、Map... 目录1. MyBATis Generator 简介2. MyBatis Generator 的功能3

Python中使用defaultdict和Counter的方法

《Python中使用defaultdict和Counter的方法》本文深入探讨了Python中的两个强大工具——defaultdict和Counter,并详细介绍了它们的工作原理、应用场景以及在实际编... 目录引言defaultdict的深入应用什么是defaultdictdefaultdict的工作原理

使用Python进行文件读写操作的基本方法

《使用Python进行文件读写操作的基本方法》今天的内容来介绍Python中进行文件读写操作的方法,这在学习Python时是必不可少的技术点,希望可以帮助到正在学习python的小伙伴,以下是Pyth... 目录一、文件读取:二、文件写入:三、文件追加:四、文件读写的二进制模式:五、使用 json 模块读写

Python使用qrcode库实现生成二维码的操作指南

《Python使用qrcode库实现生成二维码的操作指南》二维码是一种广泛使用的二维条码,因其高效的数据存储能力和易于扫描的特点,广泛应用于支付、身份验证、营销推广等领域,Pythonqrcode库是... 目录一、安装 python qrcode 库二、基本使用方法1. 生成简单二维码2. 生成带 Log

Python如何使用seleniumwire接管Chrome查看控制台中参数

《Python如何使用seleniumwire接管Chrome查看控制台中参数》文章介绍了如何使用Python的seleniumwire库来接管Chrome浏览器,并通过控制台查看接口参数,本文给大家... 1、cmd打开控制台,启动谷歌并制定端口号,找不到文件的加环境变量chrome.exe --rem