ES MQ canal同步mysql

2024-05-09 04:38
文章标签 mysql es canal 同步 database mq

本文主要是介绍ES MQ canal同步mysql,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

转载来源:https://juejin.cn/post/6844904073213247496

大约两年以前,笔者在一个项目中遇到了数据同步的难题。
当时,系统部署了几十个实例,分为1个中心平台和N个分中心平台,而每一个系统都对应一个单独的数据库实例。
在数据库层面,有这样一个需求:

中心平台数据库要包含所有系统平台的数据。
分中心数据库只包含本系统平台的数据。
在中心平台可以新增或修改 分 中心平台的数据,但要讲数据实时同步到对应的分中心平台数据库。

这几十个数据库实例之间,没有明确的主从关系,是否同步还要看数据的来源,所以并不能用MySQL的主从同步来做。
当时,笔者实验了几种方式,最后采用的方式是基于Mybatis拦截器机制 + 消息队列的方式来做的。
大概原理是通过Mybatis拦截器,拦截到事务操作,比如新增、修改和删除,根据自定义的数据主键(标识数据来源和去向),封装成对象,投递到消息队列对应的topic中去。然后,每个系统监听不同的topic,消费数据并同步到数据库。
在此后的一段时间里,知道了canal这个开源组件。发现它更直接,它可以从MySQL的binlog中解析数据,投递到消息队列或其它地方。
一、canal简介
说起canal,也是阿里巴巴存在数据同步的业务需求。所以从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务。
基于日志增量订阅&消费支持的业务:

数据库镜像
数据库实时备份
多级索引 (卖家和买家各自分库索引)
search build
业务cache刷新
价格变化等重要业务消息

我们正可以基于canal的机制,来完成一系列如数据同步、缓存刷新等业务。
二、启动canal
1、修改MySQL配置
对于自建的MySQL服务, 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下:
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
复制代码然后创建一个账户,用来链接MySQL,作为 MySQL slave 的权限。
CREATE USER canal IDENTIFIED BY ‘canal’;
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO ‘canal’@’%’;
– GRANT ALL PRIVILEGES ON . TO ‘canal’@’%’ ;
FLUSH PRIVILEGES;
复制代码2、下载
下载canal非常简单,访问 releases页面选择需要的包下载,然后将下载的包解压到指定的目录即可。
tar -zxvf canal.deployer-1.1.4.tar.gz -C /canal
解压完成后,我们可以看到这样一个目录:

3、修改配置
在启动之前,还需要修改一些配置信息。
首先,定位到canal/conf/example ,编辑instance.properties配置文件,重点有几项:
canal.instance.mysql.slaveId=1234 # canal模拟slaveid
canal.instance.master.address=127.0.0.1:3306 # MySQL数据库地址
canal.instance.dbUsername=canal # 作为slave角色的账户
canal.instance.dbPassword=canal # 作为slave角色的账户密码
canal.instance.connectionCharset = UTF-8 # 数据库编码方式对应Java中的编码类型
canal.instance.filter.regex=.\… # 表过滤的表达式
canal.mq.topic=example # MQ 主题名称
复制代码我们希望canal监听到的数据,要发送到消息队列中,还需要修改canal.properties文件,在这里主要是MQ的配置。在这里笔者使用的是阿里云版RocketMQ,参数如下:

配置ak/sk

canal.aliyun.accessKey = XXX
canal.aliyun.secretKey = XXX

配置topic

canal.mq.accessChannel = cloud
canal.mq.servers = 内网接入点
canal.mq.producerGroup = GID_**group(在后台创建)
canal.mq.namespace = rocketmq实例id
canal.mq.topic=(在后台创建)
复制代码4、启动
直接运行启动脚本即可运行:./canal/bin/startup.sh 。 然后打开logs/canal/canal.log文件,可以看到启动效果。
2020-02-26 21:12:36.715 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
2020-02-26 21:12:36.746 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.44.128(192.168.44.128):11111]
2020-02-26 21:12:37.406 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now …
复制代码三、启动MQ监听
我们把canal监听到的数据,投送到了消息队列中,那么接下来就是写个监听程序来消费其中的数据。
为了方便,笔者直接使用的是阿里云版RocketMQ,测试代码如下:
public static void main(String[] args) {
Properties properties = new Properties();
// 您在控制台创建的 Group ID
properties.put(PropertyKeyConst.GROUP_ID, “GID_CANAL”);
// AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
properties.put(PropertyKeyConst.AccessKey, “accessKey”);
// SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
properties.put(PropertyKeyConst.SecretKey, “secretKey”);
// 设置 TCP 接入域名,到控制台的实例基本信息中查看
properties.put(PropertyKeyConst.NAMESRV_ADDR,“http://MQ_INST_xxx.mq-internet.aliyuncs.com:80”);
// 集群订阅方式(默认)
// properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe(“example”,"*",new CanalListener());
consumer.start();
logger.info(“Consumer Started”);
}
复制代码四、测试
把环境都部署好之后,我们进入测试阶段来看一看实际效果。
我们以一张t_account表为例,这里面记录着账户id和账户余额。
首先,我们新增一条记录,insert into t_account (id,user_id,amount) values (4,4,200);
此时,MQ消费到数据如下:
{
“data”: [{
“id”: “4”,
“user_id”: “4”,
“amount”: “200.0”
}],
“database”: “seata”,
“es”: 1582723607000,
“id”: 2,
“isDdl”: false,
“mysqlType”: {
“id”: “int(11)”,
“user_id”: “varchar(255)”,
“amount”: “double(14,2)”
},
“old”: null,
“pkNames”: [“id”],
“sql”: “”,
“sqlType”: {
“id”: 4,
“user_id”: 12,
“amount”: 8
},
“table”: “t_account”,
“ts”: 1582723607656,
“type”: “INSERT”
}
复制代码通过数据可以看到,这里面详细记录了数据库的名称、表的名称、表的字段和新增数据的内容等。
然后,我们还可以把这条数据修改一下:update t_account set amount = 150 where id = 4;
此时,MQ消费到数据如下:
{
“data”: [{
“id”: “4”,
“user_id”: “4”,
“amount”: “150.0”
}],
“database”: “seata”,
“es”: 1582724016000,
“id”: 3,
“isDdl”: false,
“mysqlType”: {
“id”: “int(11)”,
“user_id”: “varchar(255)”,
“amount”: “double(14,2)”
},
“old”: [{
“amount”: “200.0”
}],
“pkNames”: [“id”],
“sql”: “”,
“sqlType”: {
“id”: 4,
“user_id”: 12,
“amount”: 8
},
“table”: “t_account”,
“ts”: 1582724016353,
“type”: “UPDATE”
}
复制代码可以看到,除了修改后的内容,canal还用old字段记录了修改前字段的值。
最后,我们删除这条数据:delete from t_account where id = 4;
相应的,MQ消费到数据如下:
{
“data”: [{
“id”: “4”,
“user_id”: “4”,
“amount”: “150.0”
}],
“database”: “seata”,
“es”: 1582724155000,
“id”: 4,
“isDdl”: false,
“mysqlType”: {
“id”: “int(11)”,
“user_id”: “varchar(255)”,
“amount”: “double(14,2)”
},
“old”: null,
“pkNames”: [“id”],
“sql”: “”,
“sqlType”: {
“id”: 4,
“user_id”: 12,
“amount”: 8
},
“table”: “t_account”,
“ts”: 1582724155370,
“type”: “DELETE”
}
复制代码监听到数据库表的变化之后,就可以根据自己的业务场景,对这些数据进行业务上的处理啦。
五、总结
可以看到,利用canal组件可以很方便的完成对数据变化的监听。如果利用消息队列来做数据同步的话,只有一点需要格外注意,即消息顺序性的问题。
binlog本身是有序的,但写入到mq之后如何保障顺序是值得关注的问题。
在mq顺序性问题这里,可以看到canal的消费顺序性相关解答。

作者:清幽之地
链接:https://juejin.cn/post/6844904073213247496
来源:掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

这篇关于ES MQ canal同步mysql的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/972411

相关文章

SQL中的外键约束

外键约束用于表示两张表中的指标连接关系。外键约束的作用主要有以下三点: 1.确保子表中的某个字段(外键)只能引用父表中的有效记录2.主表中的列被删除时,子表中的关联列也会被删除3.主表中的列更新时,子表中的关联元素也会被更新 子表中的元素指向主表 以下是一个外键约束的实例展示

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

服务器集群同步时间手记

1.时间服务器配置(必须root用户) (1)检查ntp是否安装 [root@node1 桌面]# rpm -qa|grep ntpntp-4.2.6p5-10.el6.centos.x86_64fontpackages-filesystem-1.41-1.1.el6.noarchntpdate-4.2.6p5-10.el6.centos.x86_64 (2)修改ntp配置文件 [r

如何去写一手好SQL

MySQL性能 最大数据量 抛开数据量和并发数,谈性能都是耍流氓。MySQL没有限制单表最大记录数,它取决于操作系统对文件大小的限制。 《阿里巴巴Java开发手册》提出单表行数超过500万行或者单表容量超过2GB,才推荐分库分表。性能由综合因素决定,抛开业务复杂度,影响程度依次是硬件配置、MySQL配置、数据表设计、索引优化。500万这个值仅供参考,并非铁律。 博主曾经操作过超过4亿行数据

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

MySQL数据库宕机,启动不起来,教你一招搞定!

作者介绍:老苏,10余年DBA工作运维经验,擅长Oracle、MySQL、PG、Mongodb数据库运维(如安装迁移,性能优化、故障应急处理等)公众号:老苏畅谈运维欢迎关注本人公众号,更多精彩与您分享。 MySQL数据库宕机,数据页损坏问题,启动不起来,该如何排查和解决,本文将为你说明具体的排查过程。 查看MySQL error日志 查看 MySQL error日志,排查哪个表(表空间

MySQL高性能优化规范

前言:      笔者最近上班途中突然想丰富下自己的数据库优化技能。于是在查阅了多篇文章后,总结出了这篇! 数据库命令规范 所有数据库对象名称必须使用小写字母并用下划线分割 所有数据库对象名称禁止使用mysql保留关键字(如果表名中包含关键字查询时,需要将其用单引号括起来) 数据库对象的命名要能做到见名识意,并且最后不要超过32个字符 临时库表必须以tmp_为前缀并以日期为后缀,备份

[MySQL表的增删改查-进阶]

🌈个人主页:努力学编程’ ⛅个人推荐: c语言从初阶到进阶 JavaEE详解 数据结构 ⚡学好数据结构,刷题刻不容缓:点击一起刷题 🌙心灵鸡汤:总有人要赢,为什么不能是我呢 💻💻💻数据库约束 🔭🔭🔭约束类型 not null: 指示某列不能存储 NULL 值unique: 保证某列的每行必须有唯一的值default: 规定没有给列赋值时的默认值.primary key:

MySQL-CRUD入门1

文章目录 认识配置文件client节点mysql节点mysqld节点 数据的添加(Create)添加一行数据添加多行数据两种添加数据的效率对比 数据的查询(Retrieve)全列查询指定列查询查询中带有表达式关于字面量关于as重命名 临时表引入distinct去重order by 排序关于NULL 认识配置文件 在我们的MySQL服务安装好了之后, 会有一个配置文件, 也就

Java 连接Sql sever 2008

Java 连接Sql sever 2008 /Sql sever 2008 R2 import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.Statement; public class TestJDBC