canal +RocketMQ实现MySQL与ElasticSearch数据同步

2024-05-04 15:18

本文主要是介绍canal +RocketMQ实现MySQL与ElasticSearch数据同步,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1.引言

在很多业务情况下,我们都会在系统中引入ElasticSearch搜索引擎作为做全文检索的优化方案。

如果数据库数据发生更新,这时候就需要在业务代码中写一段同步更新ElasticSearch的代码。

这种数据同步的代码跟业务代码耦合性非常高,并且使得代码的可读性降低,于是乎,我们能不能把这些数据同步的代码抽出来形成一个独立的模块呢?肯定是可以的。

下面我会以一个CMS文章管理为例来演示canal+RocketMQ实现MySQLElasticSearch数据同步。

2.技术栈

如果你还对SpringBootcanalRocketMQMySQLElasticSearch 不是很了解的话,这里我为大家整理个它们的官网网站,如下

  • SpringBoot:https://spring.io/projects/spring-boot

  • canal :https://github.com/alibaba/canal

  • RocketMQ:http://rocketmq.apache.org/

  • MySQL:https://www.mysql.com/

  • ElasticSearch:https://www.elastic.co/cn/elasticsearch/

这里主要介绍一下canal,其他的自行学习。

2.1 canal定义

canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费.。

2.2 canal工作原理

在这里插入图片描述

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)
2.3 架构

在这里插入图片描述
说明:

  • server代表一个canal运行实例,对应于一个jvm
  • instance对应于一个数据队列 (1个server对应1…n个instance)

instance模块:

  • eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
  • eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
  • eventStore (数据存储)
  • metaManager (增量订阅&消费信息管理器)

到这里我们对canal有了一个初步的认识,接下我们就进入实战环节。

3.环境准备

3.1 MySQL 配置

对于自建 MySQL , 需要先开启 Binlog写入功能,配置binlog-formatROW 模式,my.cnf 中配置如下

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

**注意:**针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步

授权canal 连接 MySQL 账号具有作为 MySQL slave的权限, 如果已有账户可直接 使用grant 命令授权。

#创建用户名和密码都为canal
CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

3.2 canal的安装和配置

3.2.1 canal.admin安装和配置

canal提供web ui 进行Server管理、Instance管理。

3.2.1.1 下载 canal.admin, 访问 release 页面 , 选择需要的包下载, 如以 1.1.4版本为例
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz

在这里插入图片描述

3.2.1.2 解压完成可以看到如下结构:

在这里插入图片描述
我们先配置canal.admin之后。通过web ui来配置 cancal server,这样使用界面操作非常的方便。

3.2.1.3 配置修改
vi conf/application.yml
server:port: 8089
spring:jackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8spring.datasource:address: 127.0.0.1:3306database: canal_managerusername: canalpassword: canaldriver-class-name: com.mysql.jdbc.Driverurl: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=falsehikari:maximum-pool-size: 30minimum-idle: 1canal:adminUser: adminadminPasswd: admin
3.2.1.4 初始化元数据库

初始化元数据库

mysql -h127.1 -uroot -p# 导入初始化SQL
> source conf/canal_manager.sql
  • 初始化SQL脚本里会默认创建canal_manager的数据库,建议使用root等有超级权限的账号进行初始化

  • canal_manager.sql默认会在conf目录下,也可以通过链接下载 canal_manager.sql

3.2.1.5 启动
sh bin/startup.sh
3.2.1.6 启动成功,使用浏览器输入http://ip:8089/ 会跳转到登录界面

在这里插入图片描述
使用用户名:admin 密码为:123456 登录
登录成功,会自动跳转到如下界面。这时候我们的canal.admin就搭建成功了。
在这里插入图片描述

3.2.2 下载 canal.deployer, 访问 release 页面 , 选择需要的包下载, 如以 1.1.4版本为例
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz

在这里插入图片描述
解压完成可以看到如下结构:
在这里插入图片描述
进入conf 目录。可以看到如下的配置文件。
在这里插入图片描述

我们先对canal.properties 不做任何修改。

使用canal_local.properties的配置覆盖canal.properties

# register ip
canal.register.ip =# canal admin config
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster =

使用如下命令启动canal server

sh bin/startup.sh local

启动成功。同时我们在canal.admin web ui中刷新 server 管理,可以到canal server 已经启动成功。

在这里插入图片描述
这时候我们的canal.server 搭建已经成功。

3.2.3 在canal admin ui 中配置Instance管理
3.2.3.1 新建 Instance

选择Instance 管理-> 新建Instance
填写 Instance名称:cms_article
选择 选择所属主机集群
选择 载入模板
修改默认信息

#mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306 
canal.instance.master.journal.name = 
canal.instance.master.position = 
canal.instance.master.timestamp = 
#canal.instance.standby.address = 
#canal.instance.standby.journal.name =
#canal.instance.standby.position = 
#canal.instance.standby.timestamp = 
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal  
canal.instance.dbPassword = canal
#改成自己的数据库信息(需要监听的数据库)
canal.instance.defaultDatabaseName = cms-manage
canal.instance.connectionCharset = UTF-8
#table regex 需要过滤的表 这里数据库的中所有表
canal.instance.filter.regex = .\*\\..\*# MQ 配置 日志数据会发送到cms_article这个topic上
canal.mq.topic=cms_article
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
#单分区处理消息
canal.mq.partition=0

我们这里为了演示之创建一张表。
在这里插入图片描述

配置好之后,我需要点击保存。此时在Instances 管理中就可以看到此时的实例信息。
在这里插入图片描述

3.2.4 修改canal server 的配置文件,选择消息队列处理binlog

canal 1.1.1版本之后, 默认支持将canal server接收到的binlog数据直接投递到MQ, 目前默认支持的MQ系统有:

  • kafka: https://github.com/apache/kafka
  • RocketMQ : https://github.com/apache/rocketmq

本案例以RocketMQ为例

我们仍然使用web ui 界面操作。点击 server 管理 - > 点击配置
在这里插入图片描述
修改配置文件

# ...
# 可选项: tcp(默认), kafka, RocketMQ
canal.serverMode = RocketMQ
# ...
# kafka/rocketmq 集群配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092 
canal.mq.servers = 192.168.0.200:9078
canal.mq.retries = 0
# flagMessage模式下可以调大该值, 但不要超过MQ消息体大小上限
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
# flatMessage模式下请将该值改大, 建议50-200
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
# Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
canal.mq.canalBatchSize = 50
# Canal get数据的超时时间, 单位: 毫秒, 空为不限超时
canal.mq.canalGetTimeout = 100
# 是否为flat json格式对象
canal.mq.flatMessage = false
canal.mq.compressionType = none
canal.mq.acks = all
# kafka消息投递是否使用事务
canal.mq.transaction = false

修改好之后保存。会自动重启。

此时我们就可以在rocketmq的控制台看到一个cms_article topic已经自动创建了。

在这里插入图片描述

3.2.5 配置ElasticSearch启动

这里我使用的ElasticSearch 6.6.1
在这里插入图片描述
es 启动成功了。

我们使用 elasticsearch-head 连接是可以看到节点信息。一会我们就使用 elasticsearch-head 查询es中数据。

在这里插入图片描述

4.代码实战

4.1 创建一个springboot 项目

项目结构如下:
在这里插入图片描述

4.2 pom.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>rockmq-samples</artifactId><groupId>com.lidong.rocketmq</groupId><version>1.0.0</version></parent><modelVersion>4.0.0</modelVersion><artifactId>springboot-canal-rocketmq-es</artifactId><properties><java.version>1.8</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><spring-boot.version>2.2.5.RELEASE</spring-boot.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-elasticsearch</artifactId></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.4</version></dependency></dependencies><dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>${spring-boot.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>2.3.0.RELEASE</version><configuration><mainClass>com.lidong.RocketmqSyncSamplesApplication</mainClass></configuration><executions><execution><id>repackage</id><goals><goal>repackage</goal></goals></execution></executions></plugin></plugins></build></project>
  • spring-boot-starter-data-elasticsearch:操作es依赖库
  • rocketmq-spring-boot-starter:操作rocketmq依赖库

其他就不过多介绍了。大家一看就明白了。

4.3 application的配置
server:port: 8085
rocketmq:name-server: localhost:9876
spring:data:elasticsearch:cluster-nodes: localhost:9300cluster-name: my-applicationrepositories:enabled: true
  • rocketmq.name-server : rocketmq的namesver
  • spring.data.elasticsearch.cluster-nodes :es 节点地址
  • spring.data.elasticsearch.cluster-name: es节点集群名称
  • spring.data.elasticsearch.repositories.enabled :开启es仓库使用
4.4 创建es操作的实体类和仓库类
4.4.1 EsCmsArticle实体类
package com.lidong.canal.es.entity;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import java.io.Serializable;
import java.util.Date;/*** 文章详情** String indexName();//索引库的名称,个人建议以项目的名称命名* String type() default "";//类型,个人建议以实体的名称命名* short shards() default 5;//默认分区数* short replicas() default 1;//每个分区默认的备份数* String refreshInterval() default "1s";//刷新间隔* String indexStoreType() default "fs";//索引文件存储类型***/@Document(indexName = "canal-rocketmq-es", type = "cms-article")
public class EsCmsArticle implements  Serializable {@Idprivate Long courseId;/** 标题 */private String title;/** 摘要 */private String abstractX;/** 内容 */private String content;/** 年龄段 */private String ageRange;/** 图片 */private String image;/** 查看次数 */private Long viewNumber;/** 作者 */private String author;/** 来源 */private String source;/** 所属分类 */private Long classId;/** 关键字 */private String keyWords;/** 描述 */private String description;/** 文章url */private String url;/*** 文章状态*/private Integer status;/*** 创建时间*/private Date createTime;/*** 修改时间*/private Date updateTime;public void setCourseId(Long courseId){this.courseId = courseId;}public Long getCourseId(){return courseId;}public void setTitle(String title){this.title = title;}public String getTitle(){return title;}public void setAbstractX(String abstractX){this.abstractX = abstractX;}public String getAbstractX(){return abstractX;}public void setContent(String content){this.content = content;}public String getContent(){return content;}public void setAgeRange(String ageRange){this.ageRange = ageRange;}public String getAgeRange(){return ageRange;}public void setImage(String image){this.image = image;}public String getImage(){return image;}public void setViewNumber(Long viewNumber){this.viewNumber = viewNumber;}public Long getViewNumber(){return viewNumber;}public void setAuthor(String author){this.author = author;}public String getAuthor(){return author;}public void setSource(String source){this.source = source;}public String getSource(){return source;}public void setClassId(Long classId){this.classId = classId;}public Long getClassId(){return classId;}public void setKeyWords(String keyWords){this.keyWords = keyWords;}public String getKeyWords(){return keyWords;}public void setDescription(String description){this.description = description;}public String getDescription(){return description;}public void setUrl(String url){this.url = url;}public String getUrl(){return url;}public Integer getStatus() {return status;}public void setStatus(Integer status) {this.status = status;}public Date getCreateTime() {return createTime;}public void setCreateTime(Date createTime) {this.createTime = createTime;}public Date getUpdateTime() {return updateTime;}public void setUpdateTime(Date updateTime) {this.updateTime = updateTime;}@Overridepublic String toString() {return "CmsArticle{" +"courseId=" + courseId +", title='" + title + '\'' +", abstractX='" + abstractX + '\'' +", content='" + content + '\'' +", ageRange='" + ageRange + '\'' +", image='" + image + '\'' +", viewNumber=" + viewNumber +", author='" + author + '\'' +", source='" + source + '\'' +", classId=" + classId +", keyWords='" + keyWords + '\'' +", description='" + description + '\'' +", url='" + url + '\'' +", status=" + status +", createTime=" + createTime +", updateTime=" + updateTime +'}';}
}
4.4.2 CmsArticleRepository 仓库类
package com.lidong.canal.rocketmq;import com.alibaba.fastjson.JSON;
import com.lidong.canal.bean.CanalBean;
import com.lidong.canal.bean.CmsArticle;
import com.lidong.canal.es.entity.EsCmsArticle;
import com.lidong.canal.es.repository.CmsArticleRepository;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.List;
import java.util.Optional;@Component
@RocketMQMessageListener(topic = "cms_article",consumerGroup = "cms-article",selectorExpression = "*",consumeMode = ConsumeMode.ORDERLY,messageModel = MessageModel.CLUSTERING,consumeThreadMax = 1
)
public class SpringConsumer implements RocketMQListener<String> {private Logger logger = LoggerFactory.getLogger(SpringConsumer.class.getSimpleName());@AutowiredCmsArticleRepository cmsArticleRepository;/*** 实现方式很简单吧,但是你也看见了代码中就没有消息能够消费是否成功后的确认方式,因为实现的onMessage()方法是个void的,还好看过原始的rocketmq的消费者实现方式,也就是rocketmq-client.jar的实现,它是MessageListener.java类来实现消息监听接收的,而它有2个继承接口类MessageListenerConcurrently.java和MessageListenerOrderly.java,这样就好找了,直接收一下这2个接口的实现类,乖乖,果然找到了在rocket-spring-boot的jar里面,就是DefaultRocketMQListenerContainer.java这个类,看下其中一个实现*** @param msg*/@Overridepublic void onMessage(String msg) {System.out.println("接收到消息 -> " + msg);CanalBean canalBean = JSON.parseObject(msg, CanalBean.class);String table = canalBean.getTable();System.out.println(table.toString());String type = canalBean.getType();System.out.println(type);List<CmsArticle> data = canalBean.getData();data.stream().forEach(tbTest -> {EsCmsArticle esCmsArticle = new EsCmsArticle();System.out.println(tbTest.toString());if ("UPDATE".equals(type) && "cms_article".equals(table)) {Optional<EsCmsArticle> article = cmsArticleRepository.findById(tbTest.getCourseId());if (article.isPresent()) {EsCmsArticle cmsArticle = article.get();BeanUtils.copyProperties(tbTest, cmsArticle);cmsArticleRepository.save(cmsArticle);logger.info("id = {} 编辑es成功", cmsArticle.getCourseId());} else {BeanUtils.copyProperties(tbTest, esCmsArticle);cmsArticleRepository.save(esCmsArticle);logger.info("id = {} 添加es成功", esCmsArticle.getCourseId());}} else if ("INSERT".equals(type) && "cms_article".equals(table)) {BeanUtils.copyProperties(tbTest, esCmsArticle);cmsArticleRepository.save(esCmsArticle);logger.info("id = {} 添加es成功", esCmsArticle.getCourseId());}});}
}
4.6 SpringBootApplication启动类
package com.lidong.canal;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class RocketmqToEsSamplesApplication {public static void main(String[] args) {SpringApplication.run(RocketmqToEsSamplesApplication.class, args);}}
4.7 CanalBean类 接收mq的数据实体
public class CanalBean implements Serializable {//数据private List<CmsArticle> data;//数据库名称private String database;private long es;//递增,从1开始private int id;//是否是DDL语句private boolean isDdl;//表结构的字段类型private MysqlType mysqlType;//UPDATE语句,旧数据private List<CmsArticle> old;//主键名称private List<String> pkNames;//sql语句private String sql;private SqlType sqlType;//表名private String table;private long ts;//(新增)INSERT、(更新)UPDATE、(删除)DELETE、(删除表)ERASE等等private String type;//get set ...

public class MysqlType implements Serializable {private String id;private String commodity_name;private String commodity_price;private String number;private String description;
//get set..
public class SqlType implements Serializable {private int id;private int commodity_name;private int commodity_price;private int number;private int description;//get set..
}
package com.lidong.canal.bean;import java.io.Serializable;
import java.util.Date;public class CmsArticle implements Serializable {/** $column.columnComment */private Long courseId;/** 标题 */private String title;/** 摘要 */private String abstractX;/** 内容 */private String content;/** 年龄段 */private String ageRange;/** 图片 */private String image;/** 查看次数 */private Long viewNumber;/** 作者 */private String author;/** 来源 */private String source;/** 所属分类 */private Long classId;/** 关键字 */private String keyWords;/** 描述 */private String description;/** 文章url */private String url;/*** 文章状态*/private Integer status;/*** 创建时间*/private Date createTime;/*** 修改时间*/private Date updateTime;
}

5测试

5.1准备两条sql语句(一条添加 一条修改)
INSERT INTO `cms-manage`.`cms_article` (`title`, `abstract_x`, `content`, `age_range`, `image`, `create_time`, `update_time`, `view_number`, `author`, `source`, `class_id`, `description`, `key_words`, `url`, `status`) VALUES ( '​​2018腾讯网”', '2018腾讯网年度教育盛典|哈喽贝比荣获“2018年度社会影响力儿童教育品牌”2018腾讯网年度教育盛典|哈喽贝比荣获“2018年度社会影响力儿童教育品牌”', '<p><img src=\"http://www.myhellobaby.cn/profile/upload/2019/08/14/27c24d88a577c3fb20725324be926178.jpg\" data-filename=\"/profile/upload/2019/08/14/27c24d88a577c3fb20725324be926178.jpg\" style=\"width: 318px;\"><br></p><p><span style=\"font-size: 13px;\"></span><br></p>', NULL, 'http://www.myhellobaby.cn/profile/upload/2019/08/14/2fbfa6316289d8d8373ac38b7096465f.jpg', '2019-08-12 12:08:23', '2019-08-16 13:56:47', NULL, '腾讯网', '腾讯网', '3', NULL, NULL, '/news/news_detail_5.html', '0');
UPDATE  cms_article SET title='​​​​2018腾讯网年度教育盛典|哈喽贝比荣获' WHERE course_id=11;

这时候,我查看sql执行的结果
在这里插入图片描述

在查看idea 运行的控制台日志

2020-08-20 16:31:58.688  INFO 2028 --- [           main] o.elasticsearch.plugins.PluginsService   : no modules loaded
2020-08-20 16:31:58.689  INFO 2028 --- [           main] o.elasticsearch.plugins.PluginsService   : loaded plugin [org.elasticsearch.index.reindex.ReindexPlugin]
2020-08-20 16:31:58.689  INFO 2028 --- [           main] o.elasticsearch.plugins.PluginsService   : loaded plugin [org.elasticsearch.join.ParentJoinPlugin]
2020-08-20 16:31:58.689  INFO 2028 --- [           main] o.elasticsearch.plugins.PluginsService   : loaded plugin [org.elasticsearch.percolator.PercolatorPlugin]
2020-08-20 16:31:58.689  INFO 2028 --- [           main] o.elasticsearch.plugins.PluginsService   : loaded plugin [org.elasticsearch.script.mustache.MustachePlugin]
2020-08-20 16:31:58.690  INFO 2028 --- [           main] o.elasticsearch.plugins.PluginsService   : loaded plugin [org.elasticsearch.transport.Netty4Plugin]
2020-08-20 16:31:59.588  INFO 2028 --- [           main] o.s.d.e.c.TransportClientFactoryBean     : Adding transport node : 127.0.0.1:9300
2020-08-20 16:32:00.228  INFO 2028 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
2020-08-20 16:32:01.366  INFO 2028 --- [           main] a.r.s.s.DefaultRocketMQListenerContainer : running container: DefaultRocketMQListenerContainer{consumerGroup='cms-article', nameServer='192.168.0.200:9876', topic='cms_article', consumeMode=ORDERLY, selectorType=TAG, selectorExpression='*', messageModel=CLUSTERING}
2020-08-20 16:32:01.366  INFO 2028 --- [           main] o.a.r.s.a.ListenerContainerConfiguration : Register the listener to container, listenerBeanName:springConsumer, containerBeanName:org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_1
2020-08-20 16:32:01.397  INFO 2028 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8085 (http) with context path ''
2020-08-20 16:32:01.399  INFO 2028 --- [           main] c.l.c.RocketmqToEsSamplesApplication     : Started RocketmqToEsSamplesApplication in 4.594 seconds (JVM running for 5.346)
接收到消息 -> {"data":[{"course_id":"17","title":"​​2018腾讯网”","abstract_x":"2018腾讯网年度教育盛典|哈喽贝比荣获“2018年度社会影响力儿童教育品牌”2018腾讯网年度教育盛典|哈喽贝比荣获“2018年度社会影响力儿童教育品牌”","content":"<p><img src=\"http://www.myhellobaby.cn/profile/upload/2019/08/14/27c24d88a577c3fb20725324be926178.jpg\" data-filename=\"/profile/upload/2019/08/14/27c24d88a577c3fb20725324be926178.jpg\" style=\"width: 318px;\"><br></p><p><span style=\"font-size: 13px;\"></span><br></p>","age_range":null,"image":"http://www.myhellobaby.cn/profile/upload/2019/08/14/2fbfa6316289d8d8373ac38b7096465f.jpg","create_time":"2019-08-12 12:08:23","update_time":"2019-08-16 13:56:47","view_number":null,"author":"腾讯网","source":"腾讯网","class_id":"3","description":null,"key_words":null,"url":"/news/news_detail_5.html","status":"0"}],"database":"cms-manage","es":1597912365000,"id":24,"isDdl":false,"mysqlType":{"course_id":"int(11)","title":"varchar(255)","abstract_x":"varchar(255)","content":"text","age_range":"varchar(255)","image":"varchar(255)","create_time":"datetime","update_time":"datetime","view_number":"int(11)","author":"varchar(255)","source":"varchar(255)","class_id":"int(11)","description":"varchar(255)","key_words":"varchar(255)","url":"varchar(255)","status":"int(11)"},"old":null,"pkNames":["course_id"],"sql":"","sqlType":{"course_id":4,"title":12,"abstract_x":12,"content":2005,"age_range":12,"image":12,"create_time":93,"update_time":93,"view_number":4,"author":12,"source":12,"class_id":4,"description":12,"key_words":12,"url":12,"status":4},"table":"cms_article","ts":1597912365169,"type":"INSERT"}
cms_article
INSERT
CmsArticle{courseId=17, title='​​2018腾讯网”', abstractX='2018腾讯网年度教育盛典|哈喽贝比荣获“2018年度社会影响力儿童教育品牌”2018腾讯网年度教育盛典|哈喽贝比荣获“2018年度社会影响力儿童教育品牌”', content='<p><img src="http://www.myhellobaby.cn/profile/upload/2019/08/14/27c24d88a577c3fb20725324be926178.jpg" data-filename="/profile/upload/2019/08/14/27c24d88a577c3fb20725324be926178.jpg" style="width: 318px;"><br></p><p><span style="font-size: 13px;"></span><br></p>', ageRange='null', image='http://www.myhellobaby.cn/profile/upload/2019/08/14/2fbfa6316289d8d8373ac38b7096465f.jpg', viewNumber=null, author='腾讯网', source='腾讯网', classId=3, keyWords='null', description='null', url='/news/news_detail_5.html', status=0, createTime=Mon Aug 12 12:08:23 CST 2019, updateTime=Fri Aug 16 13:56:47 CST 2019}
2020-08-20 16:32:44.617  INFO 2028 --- [MessageThread_1] SpringConsumer                           : id = 17 添加es成功
2020-08-20 16:32:44.618  INFO 2028 --- [MessageThread_1] a.r.s.s.DefaultRocketMQListenerContainer : consume C0A800C86B106F496D9F6565B8713098 cost: 149 ms
接收到消息 -> {"data":[{"course_id":"11","title":"​​​​2018腾讯网年度教育盛典|哈喽贝比荣获","abstract_x":"2018腾讯网年度教育盛典|哈喽贝比荣获“2018年度社会影响力儿童教育品牌”2018腾讯网年度教育盛典|哈喽贝比荣获“2018年度社会影响力儿童教育品牌”","content":"<p><img src=\"http://www.myhellobaby.cn/profile/upload/2019/08/14/27c24d88a577c3fb20725324be926178.jpg\" data-filename=\"/profile/upload/2019/08/14/27c24d88a577c3fb20725324be926178.jpg\" style=\"width: 318px;\"><br></p><p><span style=\"font-size: 13px;\">腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网</span><br></p>","age_range":null,"image":"http://www.myhellobaby.cn/profile/upload/2019/08/14/2fbfa6316289d8d8373ac38b7096465f.jpg","create_time":"2019-08-12 12:08:23","update_time":"2019-08-16 13:56:47","view_number":null,"author":"腾讯网","source":"腾讯网6666","class_id":"3","description":null,"key_words":null,"url":"/news/news_detail_5.html","status":"0"}],"database":"cms-manage","es":1597912365000,"id":25,"isDdl":false,"mysqlType":{"course_id":"int(11)","title":"varchar(255)","abstract_x":"varchar(255)","content":"text","age_range":"varchar(255)","image":"varchar(255)","create_time":"datetime","update_time":"datetime","view_number":"int(11)","author":"varchar(255)","source":"varchar(255)","class_id":"int(11)","description":"varchar(255)","key_words":"varchar(255)","url":"varchar(255)","status":"int(11)"},"old":[{"title":"​​2018腾讯网年度教育盛典|哈喽贝比荣获“2018年度社会影响力儿童教育品牌”"}],"pkNames":["course_id"],"sql":"","sqlType":{"course_id":4,"title":12,"abstract_x":12,"content":2005,"age_range":12,"image":12,"create_time":93,"update_time":93,"view_number":4,"author":12,"source":12,"class_id":4,"description":12,"key_words":12,"url":12,"status":4},"table":"cms_article","ts":1597912365270,"type":"UPDATE"}
cms_article
UPDATE
CmsArticle{courseId=11, title='​​​​2018腾讯网年度教育盛典|哈喽贝比荣获', abstractX='2018腾讯网年度教育盛典|哈喽贝比荣获“2018年度社会影响力儿童教育品牌”2018腾讯网年度教育盛典|哈喽贝比荣获“2018年度社会影响力儿童教育品牌”', content='<p><img src="http://www.myhellobaby.cn/profile/upload/2019/08/14/27c24d88a577c3fb20725324be926178.jpg" data-filename="/profile/upload/2019/08/14/27c24d88a577c3fb20725324be926178.jpg" style="width: 318px;"><br></p><p><span style="font-size: 13px;">腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网腾讯网</span><br></p>', ageRange='null', image='http://www.myhellobaby.cn/profile/upload/2019/08/14/2fbfa6316289d8d8373ac38b7096465f.jpg', viewNumber=null, author='腾讯网', source='腾讯网6666', classId=3, keyWords='null', description='null', url='/news/news_detail_5.html', status=0, createTime=Mon Aug 12 12:08:23 CST 2019, updateTime=Fri Aug 16 13:56:47 CST 2019}
2020-08-20 16:32:44.740  INFO 2028 --- [MessageThread_1] SpringConsumer                           : id = 11 编辑es成功
2020-08-20 16:32:44.740  INFO 2028 --- [MessageThread_1] a.r.s.s.DefaultRocketMQListenerContainer : consume C0A800C86B106F496D9F6565B8D6309B cost: 121 ms

数据库表中的数据和es数据都变化了,并且一致。

mysql
在这里插入图片描述
es
在这里插入图片描述
到这里canal +Rocketmq实现MySQL与ElasticSearch数据同步整合就完成,感谢你的学习。有问题欢迎在留言区互动解决。

这篇关于canal +RocketMQ实现MySQL与ElasticSearch数据同步的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/959507

相关文章

大模型研发全揭秘:客服工单数据标注的完整攻略

在人工智能(AI)领域,数据标注是模型训练过程中至关重要的一步。无论你是新手还是有经验的从业者,掌握数据标注的技术细节和常见问题的解决方案都能为你的AI项目增添不少价值。在电信运营商的客服系统中,工单数据是客户问题和解决方案的重要记录。通过对这些工单数据进行有效标注,不仅能够帮助提升客服自动化系统的智能化水平,还能优化客户服务流程,提高客户满意度。本文将详细介绍如何在电信运营商客服工单的背景下进行

SQL中的外键约束

外键约束用于表示两张表中的指标连接关系。外键约束的作用主要有以下三点: 1.确保子表中的某个字段(外键)只能引用父表中的有效记录2.主表中的列被删除时,子表中的关联列也会被删除3.主表中的列更新时,子表中的关联元素也会被更新 子表中的元素指向主表 以下是一个外键约束的实例展示

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

服务器集群同步时间手记

1.时间服务器配置(必须root用户) (1)检查ntp是否安装 [root@node1 桌面]# rpm -qa|grep ntpntp-4.2.6p5-10.el6.centos.x86_64fontpackages-filesystem-1.41-1.1.el6.noarchntpdate-4.2.6p5-10.el6.centos.x86_64 (2)修改ntp配置文件 [r

关于数据埋点,你需要了解这些基本知识

产品汪每天都在和数据打交道,你知道数据来自哪里吗? 移动app端内的用户行为数据大多来自埋点,了解一些埋点知识,能和数据分析师、技术侃大山,参与到前期的数据采集,更重要是让最终的埋点数据能为我所用,否则可怜巴巴等上几个月是常有的事。   埋点类型 根据埋点方式,可以区分为: 手动埋点半自动埋点全自动埋点 秉承“任何事物都有两面性”的道理:自动程度高的,能解决通用统计,便于统一化管理,但个性化定

如何去写一手好SQL

MySQL性能 最大数据量 抛开数据量和并发数,谈性能都是耍流氓。MySQL没有限制单表最大记录数,它取决于操作系统对文件大小的限制。 《阿里巴巴Java开发手册》提出单表行数超过500万行或者单表容量超过2GB,才推荐分库分表。性能由综合因素决定,抛开业务复杂度,影响程度依次是硬件配置、MySQL配置、数据表设计、索引优化。500万这个值仅供参考,并非铁律。 博主曾经操作过超过4亿行数据

使用SecondaryNameNode恢复NameNode的数据

1)需求: NameNode进程挂了并且存储的数据也丢失了,如何恢复NameNode 此种方式恢复的数据可能存在小部分数据的丢失。 2)故障模拟 (1)kill -9 NameNode进程 [lytfly@hadoop102 current]$ kill -9 19886 (2)删除NameNode存储的数据(/opt/module/hadoop-3.1.4/data/tmp/dfs/na

异构存储(冷热数据分离)

异构存储主要解决不同的数据,存储在不同类型的硬盘中,达到最佳性能的问题。 异构存储Shell操作 (1)查看当前有哪些存储策略可以用 [lytfly@hadoop102 hadoop-3.1.4]$ hdfs storagepolicies -listPolicies (2)为指定路径(数据存储目录)设置指定的存储策略 hdfs storagepolicies -setStoragePo

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd

字节面试 | 如何测试RocketMQ、RocketMQ?

字节面试:RocketMQ是怎么测试的呢? 答: 首先保证消息的消费正确、设计逆向用例,在验证消息内容为空等情况时的消费正确性; 推送大批量MQ,通过Admin控制台查看MQ消费的情况,是否出现消费假死、TPS是否正常等等问题。(上述都是临场发挥,但是RocketMQ真正的测试点,还真的需要探讨) 01 先了解RocketMQ 作为测试也是要简单了解RocketMQ。简单来说,就是一个分