还在为数据同步而苦恼吗?手把手教你实现canal实现数据同步

2023-12-08 00:18

本文主要是介绍还在为数据同步而苦恼吗?手把手教你实现canal实现数据同步,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1. canal简介

阿里巴巴 MySQL binlog 增量订阅&消费组件

Canal是基于MySQL二进制日志的高性能数据同步系统。Canal在阿里巴巴集团(包括https://www.taobao.com)中被广泛使用,以提供可靠的低延迟增量数据管道。

Canal Server能够解析MySQL Binlog并订阅数据更改,而Canal Client可以实现将更改广播到任何地方,例如数据库和Apache Kafka。

具有以下特点:

  1. 支持所有平台。
  2. 支持由Prometheus支持的细粒度系统监视。
  3. 支持通过不同方式(例如通过GTID)解析和预订MySQL Binlog。
  4. 支持高性能,实时数据同步。(查看更多的性能)
  5. Canal Server和Canal Client均支持由Apache ZooKeeper支持的HA /可伸缩性
  6. Docker支持。

1.1. 认识canal

在这里插入图片描述

canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

1.2. 工作原理

MySQL主备复制原理

在这里插入图片描述

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

canal 工作原理

在这里插入图片描述

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

2. 安装

2.1. 准备

  • 修改/etc/my.cnf(linux)或者 mysql根目录下的my.ini(windows)

    需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式, 中配置如下

    [mysqld]
    log-bin=mysql-bin # 开启 binlog
    binlog-format=ROW # 选择 ROW 模式
    server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
    

    修改完成之后重启mysql服务

  • 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

    CREATE USER canal IDENTIFIED BY 'canal';  
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
    FLUSH PRIVILEGES;
    

2.2. 启动

  • 下载 canal, 访问 release 页面 , 选择需要的包下载, 如以 1.0.17 版本为例

    wget https://github.com/alibaba/canal/releases/download/canal-1.0.17/canal.deployer-1.0.17.tar.gz
    
  • 解压缩

    mkdir /opt/canal
    tar -zxvf canal.deployer-$version.tar.gz  -C /opt/canal
    

    解压完成后,进入 /opt/canal 目录,可以看到如下结构
    在这里插入图片描述

  • 配置修改

    vi conf/example/instance.properties
    
    ## mysql serverId
    canal.instance.mysql.slaveId = 1234
    #position info,需要改成自己的数据库信息
    canal.instance.master.address = 127.0.0.1:3306 
    canal.instance.master.journal.name = 
    canal.instance.master.position = 
    canal.instance.master.timestamp = 
    #canal.instance.standby.address = 
    #canal.instance.standby.journal.name =
    #canal.instance.standby.position = 
    #canal.instance.standby.timestamp = 
    #username/password,需要改成自己的数据库信息
    canal.instance.dbUsername = canal  
    canal.instance.dbPassword = canal
    canal.instance.defaultDatabaseName =
    canal.instance.connectionCharset = UTF-8
    #table regex
    canal.instance.filter.regex = .\*\\\\..\*
    
    • canal.instance.connectionCharset 代表数据库的编码方式对应到 java 中的编码类型,比如 UTF-8,GBK , ISO-8859-1
    • 如果系统是1个 cpu,需要将 canal.instance.parser.parallel 设置为 false
  • 启动

    sh bin/startup.sh
    
  • 查看 server 日志

    vi logs/canal/canal.log
    
    2013-02-05 22:45:27.967 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
    2013-02-05 22:45:28.113 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111]
    2013-02-05 22:45:28.210 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
    
  • 查看 instance 的日志

    vi logs/example/example.log
    
    2013-02-05 22:50:45.636 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
    2013-02-05 22:50:45.641 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
    2013-02-05 22:50:45.803 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
    2013-02-05 22:50:45.810 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....
    
  • 关闭

    sh bin/stop.sh
    

至此canal一切都已安装成功

3. java客户端

canal启动成功后,就可以通过java客户端读取binlog日志中的数据,并进行解析

从头创建工程,过程略。。。。

3.1. 入门代码

  1. 添加依赖

    <dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.4</version>
    </dependency>
    
  2. ClientSample代码

    package com.atguigu.canal.demo;
    import java.net.InetSocketAddress;
    import java.util.List;import com.alibaba.otter.canal.client.CanalConnectors;
    import com.alibaba.otter.canal.client.CanalConnector;
    import com.alibaba.otter.canal.protocol.Message;
    import com.alibaba.otter.canal.protocol.CanalEntry.Column;
    import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
    import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
    import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
    import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
    import com.alibaba.otter.canal.protocol.CanalEntry.RowData;public class SimpleCanalClientExample {public static void main(String args[]) {// 创建链接,connector也是canal数据操作客户端CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("172.16.116.100",11111), "example", "", "");int batchSize = 1000;int emptyCount = 0;try {// 链接对应的canal serverconnector.connect();// 客户端订阅,重复订阅时会更新对应的filter信息,这里订阅所有库的所有表connector.subscribe(".*\\..*");// 回滚到未进行 ack 的地方,下次fetch的时候,可以从最后一个没有 ack 的地方开始拿connector.rollback();int totalEmptyCount = 120;// 循环遍历120次while (emptyCount < totalEmptyCount) {// 尝试拿batchSize条记录,有多少取多少,不会阻塞等待Message message = connector.getWithoutAck(batchSize);// 消息idlong batchId = message.getId();// 实际获取记录数int size = message.getEntries().size();// 如果没有获取到消息if (batchId == -1 || size == 0) {emptyCount++;System.out.println("empty count : " + emptyCount);try {Thread.sleep(1000);} catch (InterruptedException e) {}} else {// 如果消息不为空,重置遍历。从0开始重新遍历emptyCount = 0;// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);printEntry(message.getEntries());}// 进行 batch id 的确认。connector.ack(batchId); // 提交确认// 回滚到未进行 ack 的地方,指定回滚具体的batchId;如果不指定batchId,回滚到未进行ack的地方// connector.rollback(batchId); // 处理失败, 回滚数据}System.out.println("empty too many times, exit");} finally {// 释放链接connector.disconnect();}}private static void printEntry(List<Entry> entrys) {for (Entry entry : entrys) {// 如果是事务操作,直接忽略。 EntryType常见取值:事务头BEGIN/事务尾END/数据ROWDATAif (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {continue;}RowChange rowChange = null;try {// 获取byte数据,并反序列化rowChange = RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);}EventType eventType = rowChange.getEventType();System.out.println("====================================begin========================================");System.out.println(String.format("基本信息 binlog[%s:%s] , 表[%s.%s] , 操作: %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));// 如果是ddl或者是查询操作,直接打印sqlSystem.out.println(rowChange.getSql() + ";");// 如果是删除、更新、新增操作解析出数据for (RowData rowData : rowChange.getRowDatasList()) {if (eventType == EventType.DELETE) {// 删除操作,只有删除前的数据printColumn(rowData.getBeforeColumnsList());} else if (eventType == EventType.INSERT) {// 新增数据,只有新增后的数据printColumn(rowData.getAfterColumnsList());} else {// 更新数据:获取更新前后内容System.out.println("-------> before");printColumn(rowData.getBeforeColumnsList());System.out.println("-------> after");printColumn(rowData.getAfterColumnsList());}}System.out.println("------------------------------------end------------------------------------------");}}private static void printColumn(List<Column> columns) {for (Column column : columns) {System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());}}
    }
    
  3. 运行Client

    启动Canal Client后,可以从控制台从看到类似消息:

    empty count : 1
    empty count : 2
    empty count : 3
    empty count : 4
    ......
    

    此时代表当前数据库无变更数据

  4. 触发数据库变更

    use test;CREATE TABLE `xdual` (`ID` int(11) NOT NULL AUTO_INCREMENT,`X` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,PRIMARY KEY (`ID`)
    ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;insert into xdual(id,x) values(null,now());
    

    可以从控制台中看到:

    empty count : 1
    empty count : 2
    empty count : 3
    empty count : 4
    ====================================begin========================================
    基本信息 binlog[mysql-bin.000001:15153] , 表[test.xdual] , 操作: CREATE
    CREATE TABLE `xdual` (`ID` int(11) NOT NULL AUTO_INCREMENT,`X` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,PRIMARY KEY (`ID`)
    ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
    ------------------------------------end------------------------------------------
    ====================================begin========================================
    基本信息 binlog[mysql-bin.000001:15614] , 表[test.xdual] , 操作: INSERT
    ID : 1    update=true
    X : 2020-04-21 22:52:40    update=true
    ------------------------------------end------------------------------------------
    

3.2. 模型设计

在了解具体API之前,需要提前了解下canal client的类设计,这样才可以正确的使用好canal.

3.2.1. CanalConnector

javadoc查看:http://alibaba.github.io/canal/apidocs/1.0.13/com/alibaba/otter/canal/client/CanalConnector.html

server/client交互协议:
在这里插入图片描述

get/ack/rollback协议介绍:

  • Message getWithoutAck(int batchSize),允许指定batchSize,一次可以获取多条,每次返回的对象为Message,包含的内容为:
    a. batch id 唯一标识
    b. entries 具体的数据对象,可参见下面的数据介绍
  • getWithoutAck(int batchSize, Long timeout, TimeUnit unit),相比于getWithoutAck(int batchSize),允许设定获取数据的timeout超时时间
    a. 拿够batchSize条记录或者超过timeout时间
    b. timeout=0,阻塞等到足够的batchSize
  • void rollback(long batchId),顾命思议,回滚上次的get请求,重新获取数据。基于get获取的batchId进行提交,避免误操作
  • void ack(long batchId),顾命思议,确认已经消费成功,通知server删除数据。基于get获取的batchId进行提交,避免误操作

canal的get/ack/rollback协议和常规的jms协议有所不同,允许get/ack异步处理,比如可以连续调用get多次,后续异步按顺序提交ack/rollback,项目中称之为流式api.

3.2.2. 流式模型

流式api带来的异步响应模型:
在这里插入图片描述

流式api设计:

  • 每次get操作都会在meta中产生一个mark,mark标记会递增,保证运行过程中mark的唯一性
  • 每次的get操作,都会在上一次的mark操作记录的cursor继续往后取,如果mark不存在,则在last ack cursor继续往后取
  • 进行ack时,需要按照mark的顺序进行数序ack,不能跳跃ack. ack会删除当前的mark标记,并将对应的mark位置更新为last ack cursor
  • 一旦出现异常情况,客户端可发起rollback情况,重新置位:删除所有的mark, 清理get请求位置,下次请求会从last ack cursor继续往后取

流式api设计的好处:

  • get/ack异步化,减少因ack带来的网络延迟和操作成本 (99%的状态都是处于正常状态,异常的rollback属于个别情况,没必要为个别的case牺牲整个性能)
  • get获取数据后,业务消费存在瓶颈或者需要多进程/多线程消费时,可以不停的轮询get数据,不停的往后发送任务,提高并行化. (作者在实际业务中的一个case:业务数据消费需要跨中美网络,所以一次操作基本在200ms以上,为了减少延迟,所以需要实施并行化)

3.2.3. 数据对象Entry

数据对象格式简单介绍:https://github.com/alibaba/canal/blob/master/protocol/src/main/java/com/alibaba/otter/canal/protocol/EntryProtocol.proto

Entry  	[每一条代表一条binlog数据]Header  logfileName [binlog文件名]  logfileOffset [binlog position]  executeTime [binlog里记录变更发生的时间戳,精确到秒]  schemaName   tableName  eventType [insert/update/delete类型]  entryType   [事务头BEGIN/事务尾END/数据ROWDATA]  storeValue  [byte数据,可展开,对应的类型为RowChange]  
RowChangeisDdl       [是否是ddl变更操作,比如create table/drop table]sql         [具体的ddl sql]rowDatas    [具体insert/update/delete的变更数据,可为多条,1个binlog event事件可对应多条变更,比如批处理]beforeColumns [Column类型的数组,变更前的数据字段]afterColumns [Column类型的数组,变更后的数据字段]
ColumnindexsqlType     [jdbc type]name        [column name]isKey       [是否为主键]updated     [是否发生过变更]isNull      [值是否为null]value       [具体的内容,注意为string文本]  

说明:

  • 可以提供数据库变更前和变更后的字段内容,针对binlog中没有的name,isKey等信息进行补全
  • 可以提供ddl的变更语句
  • insert只有after columns, delete只有before columns,而update则会有before / after columns数据.

3.2.4. 黑白名单配置

# table regex 设置白名单,如果在instance.properties配置文件中进行该项配置,则在代码中不应该再配置
# connector.subscribe(".*\\..*");,如果还在代码中配置,则配置文件将会失效!!!
canal.instance.filter.regex = .*\\..*# table black regex 设置黑名单
canal.instance.filter.black.regex =

所以当你只关心部分库表更新时,设置了canal.instance.filter.regex,一定不要在客户端调用CanalConnector.subscribe(".\…"),不然等于没设置canal.instance.filter.regex。

如果一定要调用CanalConnector.subscribe(".\…"),那么可以设置instance.properties的canal.instance.filter.black.regex参数添加黑名单,过滤非关注库表。

========================================================

mysql 数据解析关注的表,Perl正则表达式.多个正则之间以逗号(,)分隔,转义符需要双斜杠(\)
常见例子:

  1. 所有表:.* or .*\\..*
  2. canal schema下所有表:canal\\..*
  3. canal下的以canal打头的表:canal\\.canal.*
  4. canal schema下的一张表:canal.test1
  5. 多个规则组合使用:canal\…*,mysql.test1,mysql.test2 (逗号分隔)

注意:此过滤条件只针对row模式的数据有效(ps. mixed/statement因为不解析sql,所以无法准确提取tableName进行过滤)

3.3. 以redis为例数据同步

@SpringBootTest
class CanalDemoApplicationTests {@Autowiredprivate StringRedisTemplate redisTemplate;private static final String KEY_PREFIX = "canal:test:";@Testvoid contextLoads() {// 创建链接,connector也是canal数据操作客户端,默认端口号:11111CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("172.16.116.100",11111), "example", "", "");int batchSize = 1000;int emptyCount = 0;try {// 链接对应的canal serverconnector.connect();// 客户端订阅,重复订阅时会更新对应的filter信息,这里订阅所有库的所有表connector.subscribe(".*\\..*");// 回滚到未进行 ack 的地方,下次fetch的时候,可以从最后一个没有 ack 的地方开始拿connector.rollback();while (true) {// 尝试拿batchSize条记录,有多少取多少,不会阻塞等待Message message = connector.getWithoutAck(batchSize);// 消息idlong batchId = message.getId();// 实际获取记录数int size = message.getEntries().size();// 如果没有获取到消息if (batchId == -1 || size == 0) {emptyCount++;System.out.println("empty count : " + emptyCount);try {Thread.sleep(1000);} catch (InterruptedException e) {}} else {// 如果消息不为空,重置遍历。从0开始重新遍历emptyCount = 0;// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);printEntry(message.getEntries());}// 进行 batch id 的确认。connector.ack(batchId); // 提交确认// 回滚到未进行 ack 的地方,指定回滚具体的batchId;如果不指定batchId,回滚到未进行ack的地方// connector.rollback(batchId); // 处理失败, 回滚数据}} finally {// 释放链接connector.disconnect();}}private void printEntry(List<CanalEntry.Entry> entrys) {for (CanalEntry.Entry entry : entrys) {// 如果是事务操作,直接忽略。 EntryType常见取值:事务头BEGIN/事务尾END/数据ROWDATAif (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {continue;}// 如果不是需要数据同步的表,直接忽略。if (!StringUtils.equals(entry.getHeader().getSchemaName(), "test") || !StringUtils.equals(entry.getHeader().getTableName(), "user")){continue;}CanalEntry.RowChange rowChange = null;try {// 获取byte数据,并反序列化rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);}CanalEntry.EventType eventType = rowChange.getEventType();// 如果是删除、更新、新增操作解析出数据for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {// 操作前数据List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();// 操作后数据List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();if (eventType == CanalEntry.EventType.DELETE) {// 删除操作,只有删除前的数据if(beforeColumnsList.size() <= 0){continue;}for (CanalEntry.Column column : beforeColumnsList) {// 取主键作为key删除对应的缓存if (column.getIsKey()){this.redisTemplate.delete(KEY_PREFIX + column.getValue());}}} else {// 新增/更新数据,取操作后的数据。组装成json数据if(afterColumnsList.size() <= 0){continue;}JSONObject json=new JSONObject();// 主键String key = null;for (CanalEntry.Column column : afterColumnsList) {// 遍历字段放入jsonjson.put(underscoreToCamel(column.getName()), column.getValue());// 如果是该字段是主键,取出该字段if (column.getIsKey()){key = column.getValue();}}this.redisTemplate.opsForValue().set(KEY_PREFIX + key, json.toJSONString());}}}}/*** 下划线 转 驼峰* @param param* @return*/private String underscoreToCamel(String param){if (param==null||"".equals(param.trim())){return "";}int len=param.length();StringBuilder sb=new StringBuilder(len);for (int i = 0; i < len; i++) {char c = Character.toLowerCase(param.charAt(i));if (c == '_'){if (++i<len){sb.append(Character.toUpperCase(param.charAt(i)));}}else{sb.append(c);}}return sb.toString();}
}

4. 客户端适配器(ClientAdapter)

canal 1.1.1版本之后, 增加客户端数据落地的适配及启动功能, 目前支持功能:

  • 客户端启动器
  • 同步管理REST接口
  • 日志适配器, 作为DEMO
  • 关系型数据库的数据同步(表对表同步), ETL功能
  • HBase的数据同步(表对表同步), ETL功能
  • ElasticSearch多表数据同步,ETL功能(新)
  • 后续支持redis、mongodb

4.1. 适配器整体结构

client-adapter分为适配器启动器两部分, 适配器为多个fat jar, 每个适配器会将自己所需的依赖打成一个包, 以SPI的方式让启动器动态加载, 目前所有支持的适配器都放置在plugin目录下

启动器为 SpringBoot 项目, 运行目录结构为:
在这里插入图片描述

详细结构如下:

- binrestart.shstartup.batstartup.shstop.sh
- confbootstrap.ymlapplication.yml- esbiz_order.ymlcustomer.ymlmytest_user.yml- hbasemytest_person2.yml- rdbmytest_user.yml
- lib...
- logs
- pluginclient-adapter.elasticsearch-1.1.4-jar-with-dependencies.jarclient-adapter.hbase-1.1.4-jar-with-dependencies.jarclient-adapter.logger-1.1.4-jar-with-dependencies.jarclient-adapter.rdb-1.1.4-jar-with-dependencies.jar

4.2. 适配器配置介绍

总配置文件 application.yml

canal.conf:canalServerHost: 127.0.0.1:11111     # 对应单机模式下的canal server的ip:portzookeeperHosts: slave1:2181          # 对应集群模式下的zk地址, 如果配置了canalServerHost, 则以canalServerHost为准mqServers: slave1:6667 #or rocketmq  # kafka或rocketMQ地址, 与canalServerHost不能并存flatMessage: true                    # 扁平message开关, 是否以json字符串形式投递数据, 仅在kafka/rocketMQ模式下有效batchSize: 50                        # 每次获取数据的批大小, 单位为KsyncBatchSize: 1000                  # 每次同步的批数量retries: 0                           # 重试次数, -1为无限重试timeout:                             # 同步超时时间, 单位毫秒mode: tcp # kafka rocketMQ           # canal client的模式: tcp直连 kafka rocketMQsrcDataSources:                      # 源数据库defaultDS:                         # 自定义名称url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true   # jdbc url username: root                                            # jdbc 账号password: 121212                                          # jdbc 密码canalAdapters:                       # 适配器列表- instance: example                  # canal 实例名或者 MQ topic 名groups:                            # 分组列表- groupId: g1                      # 分组id, 如果是MQ模式将用到该值outerAdapters:                   # 分组内适配器列表- name: logger                   # 日志打印适配器
......           

说明:

  1. 一份数据可以被多个group同时消费, 多个group之间会是一个并行执行, 一个group内部是一个串行执行多个outerAdapters, 比如例子中logger和hbase
  2. 目前client adapter数据订阅的方式支持两种,直连canal server 或者 订阅kafka/RocketMQ的消息

4.3. 适配器启动

前提:启动canal server (单机模式)

上传canal.adapter-1.1.4.tar.gz到/opt目录下

  1. 在/opt目录下创建canal-adapter目录:mkdir canal-adapter
  2. 解压压缩包:tar -zxvf canal.adapter-1.1.4.tar.gz -C canal-adapter
  3. 修改conf/application.yml,如下
  4. 启动:bin/startup.sh
server:port: 8081
logging:level:com.alibaba.otter.canal.client.adapter.hbase: DEBUG
spring:jackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8default-property-inclusion: non_nullcanal.conf:canalServerHost: 127.0.0.1:11111batchSize: 500                            syncBatchSize: 1000                       retries: 0                               timeout:                                 mode: tcp canalAdapters:                            - instance: example                       groups:                                 - groupId: g1                           outerAdapters:                        - name: logger                        

logger适配器:

最简单的处理, 将受到的变更事件通过日志打印的方式进行输出, 如配置所示, 只需要定义name: logger即可

4.4. adapter管理REST接口

查询所有订阅同步的canal instance或MQ topic

curl http://127.0.0.1:8081/destinations

在这里插入图片描述

数据同步开关

curl http://127.0.0.1:8081/syncSwitch/example/off -X PUT

针对 example 这个canal instance/MQ topic 进行开关操作. off代表关闭, instance/topic下的同步将阻塞或者断开连接不再接收数据, on代表开启

注: 如果在配置文件中配置了 zookeeperHosts 项, 则会使用分布式锁来控制HA中的数据同步开关, 如果是单机模式则使用本地锁来控制开关

在这里插入图片描述
数据同步开关状态

curl http://127.0.0.1:8081/syncSwitch/example

查看指定 canal instance/MQ topic 的数据同步开关状态
在这里插入图片描述

这篇关于还在为数据同步而苦恼吗?手把手教你实现canal实现数据同步的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/467827

相关文章

Python pyinstaller实现图形化打包工具

《Pythonpyinstaller实现图形化打包工具》:本文主要介绍一个使用PythonPYQT5制作的关于pyinstaller打包工具,代替传统的cmd黑窗口模式打包页面,实现更快捷方便的... 目录1.简介2.运行效果3.相关源码1.简介一个使用python PYQT5制作的关于pyinstall

使用Python实现大文件切片上传及断点续传的方法

《使用Python实现大文件切片上传及断点续传的方法》本文介绍了使用Python实现大文件切片上传及断点续传的方法,包括功能模块划分(获取上传文件接口状态、临时文件夹状态信息、切片上传、切片合并)、整... 目录概要整体架构流程技术细节获取上传文件状态接口获取临时文件夹状态信息接口切片上传功能文件合并功能小

python实现自动登录12306自动抢票功能

《python实现自动登录12306自动抢票功能》随着互联网技术的发展,越来越多的人选择通过网络平台购票,特别是在中国,12306作为官方火车票预订平台,承担了巨大的访问量,对于热门线路或者节假日出行... 目录一、遇到的问题?二、改进三、进阶–展望总结一、遇到的问题?1.url-正确的表头:就是首先ur

C#实现文件读写到SQLite数据库

《C#实现文件读写到SQLite数据库》这篇文章主要为大家详细介绍了使用C#将文件读写到SQLite数据库的几种方法,文中的示例代码讲解详细,感兴趣的小伙伴可以参考一下... 目录1. 使用 BLOB 存储文件2. 存储文件路径3. 分块存储文件《文件读写到SQLite数据库China编程的方法》博客中,介绍了文

Oracle Expdp按条件导出指定表数据的方法实例

《OracleExpdp按条件导出指定表数据的方法实例》:本文主要介绍Oracle的expdp数据泵方式导出特定机构和时间范围的数据,并通过parfile文件进行条件限制和配置,文中通过代码介绍... 目录1.场景描述 2.方案分析3.实验验证 3.1 parfile文件3.2 expdp命令导出4.总结

Redis主从复制实现原理分析

《Redis主从复制实现原理分析》Redis主从复制通过Sync和CommandPropagate阶段实现数据同步,2.8版本后引入Psync指令,根据复制偏移量进行全量或部分同步,优化了数据传输效率... 目录Redis主DodMIK从复制实现原理实现原理Psync: 2.8版本后总结Redis主从复制实

JAVA利用顺序表实现“杨辉三角”的思路及代码示例

《JAVA利用顺序表实现“杨辉三角”的思路及代码示例》杨辉三角形是中国古代数学的杰出研究成果之一,是我国北宋数学家贾宪于1050年首先发现并使用的,:本文主要介绍JAVA利用顺序表实现杨辉三角的思... 目录一:“杨辉三角”题目链接二:题解代码:三:题解思路:总结一:“杨辉三角”题目链接题目链接:点击这里

基于Python实现PDF动画翻页效果的阅读器

《基于Python实现PDF动画翻页效果的阅读器》在这篇博客中,我们将深入分析一个基于wxPython实现的PDF阅读器程序,该程序支持加载PDF文件并显示页面内容,同时支持页面切换动画效果,文中有详... 目录全部代码代码结构初始化 UI 界面加载 PDF 文件显示 PDF 页面页面切换动画运行效果总结主

更改docker默认数据目录的方法步骤

《更改docker默认数据目录的方法步骤》本文主要介绍了更改docker默认数据目录的方法步骤,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一... 目录1.查看docker是否存在并停止该服务2.挂载镜像并安装rsync便于备份3.取消挂载备份和迁

SpringBoot实现基于URL和IP的访问频率限制

《SpringBoot实现基于URL和IP的访问频率限制》在现代Web应用中,接口被恶意刷新或暴力请求是一种常见的攻击手段,为了保护系统资源,需要对接口的访问频率进行限制,下面我们就来看看如何使用... 目录1. 引言2. 项目依赖3. 配置 Redis4. 创建拦截器5. 注册拦截器6. 创建控制器8.