redis高级 使用canal进行mysql和redis的双写一致应用篇

2024-06-11 17:36

本文主要是介绍redis高级 使用canal进行mysql和redis的双写一致应用篇,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

前言

我们昨天谈论了对应的redis和mysql进行双写一致的理论篇

我们说了五种更新策略和查看的策略

更新策略可以使用

1.先更新数据库再更新redis  (高并发可能导致数据不一致)

2.先更新redis再更新数据库  (高并发可能导致数据不一致)

上述建议加上双检加锁策略来保证mysql的负载没那么高

3.停机更新  (业务允许可以使用)

4.先删除redis再更新数据库  ----延迟双删策略(但是业务时间大概率不好估量)

5.先更新数据库再删除redis      使用canal保证数据一致

--------------------

前面我们说了canal主要是通过二进制binlog的监听来对应进行一个增量的监控

下面我们来玩一玩对应的canal

这里的canal我们可以就当做一个从机,用来做对应的同步操作

下载以及前置工作

 下载地址 : Release v1.1.6 · alibaba/canal · GitHub

对mysql进行配置,新加一个canal用户

使用以下sql即可(注:mysql5和8不同)

//mysql5.7   开通用户权限DROP USER IF EXISTS 'canal'@'%';
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';  
GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';  
FLUSH PRIVILEGES;SELECT * FROM mysql.user;

只要结果是这样就代表是ok的

下载之后我们放在linux 下的 /mycanal文件夹下

使用 tar -zxvf 进行对应的解压

之后我们需要修改redis下对应的配置文件  

我们只需要修改 /mycanal/conf/example/instance.properties

修改为对应的mysql的ip和端口即可     

然后我们需要配置对应的my.ini开启对应的binlog二进制文件

log-bin=mysql-bin #开启 binlog
binlog-format=ROW #选择 ROW 模式
server_id=1    #配置MySQL replaction需要定义,不要和canal的 slaveId重复

然后我们重启mysql即可,查看binlog是否开启

开启即可

然后我们去对应的bin目录下开启对应的脚本

对应的stop.sh就是停止对应的应用

然后我们可以去对应日志文件查看是否启动成功

分别查看logs下面文件夹的canal.log和example.log即可

然后我们创建对应的数据库和测试数据表

先创建数据库然后执行以下语句
CREATE TABLE `t_user` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`userName` varchar(100) NOT NULL,PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8mb4

springboot整合

我们先创建一个项目

然后先配置对应的pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.atguigu.canal</groupId><artifactId>canal_demo02</artifactId><version>1.0-SNAPSHOT</version><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.5.14</version><relativePath/></parent><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><junit.version>4.12</junit.version><log4j.version>1.2.17</log4j.version><lombok.version>1.16.18</lombok.version><mysql.version>5.1.47</mysql.version><druid.version>1.1.16</druid.version><mapper.version>4.1.5</mapper.version><mybatis.spring.boot.version>1.3.0</mybatis.spring.boot.version></properties><dependencies><!--canal--><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.0</version></dependency><!--SpringBoot通用依赖模块--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><!--swagger2--><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId><version>2.9.2</version></dependency><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger-ui</artifactId><version>2.9.2</version></dependency><!--SpringBoot与Redis整合依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId></dependency><!--SpringBoot与AOP--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-aop</artifactId></dependency><dependency><groupId>org.aspectj</groupId><artifactId>aspectjweaver</artifactId></dependency><!--Mysql数据库驱动--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.47</version></dependency><!--SpringBoot集成druid连接池--><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>1.1.10</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>${druid.version}</version></dependency><!--mybatis和springboot整合--><dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>${mybatis.spring.boot.version}</version></dependency><!--通用基础配置junit/devtools/test/log4j/lombok/hutool--><!--hutool--><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.2.3</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>${junit.version}</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>${log4j.version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>${lombok.version}</version><optional>true</optional></dependency><!--persistence--><dependency><groupId>javax.persistence</groupId><artifactId>persistence-api</artifactId><version>1.0.2</version></dependency><!--通用Mapper--><dependency><groupId>tk.mybatis</groupId><artifactId>mapper</artifactId><version>${mapper.version}</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-autoconfigure</artifactId></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>3.8.0</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

然后是对应的properties文件,记得按照自己的设置修改

记得这里修改数据库的名字以及密码

server.port=5555# ========================alibaba.druid=====================
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:13306/canal_test?useUnicode=true&characterEncoding=utf-8&useSSL=false
spring.datasource.username=root
spring.datasource.password=abc123
spring.datasource.druid.test-while-idle=false

接着主启动类这里用不到,无需配置

下面写业务类

首先先搞连接池

注意修改对应的配置密码以及redis的IP地址

public class RedisUtils
{public static final String  REDIS_IP_ADDR = "192.168.188.136";public static final String  REDIS_pwd = "abc123";public static JedisPool jedisPool;static {JedisPoolConfig jedisPoolConfig=new JedisPoolConfig();jedisPoolConfig.setMaxTotal(20);jedisPoolConfig.setMaxIdle(10);jedisPool=new JedisPool(jedisPoolConfig,REDIS_IP_ADDR,6379,10000,REDIS_pwd);}public static Jedis getJedis() throws Exception {if(null!=jedisPool){return jedisPool.getResource();}throw new Exception("Jedispool is not ok");}}

最后就是业务类了

public class RedisCanalClientExample
{public static final Integer _60SECONDS = 60;public static final String  REDIS_IP_ADDR = "192.168.111.185";private static void redisInsert(List<Column> columns){JSONObject jsonObject = new JSONObject();for (Column column : columns){System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());jsonObject.put(column.getName(),column.getValue());}if(columns.size() > 0){try(Jedis jedis = RedisUtils.getJedis()){jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());}catch (Exception e){e.printStackTrace();}}}private static void redisDelete(List<Column> columns){JSONObject jsonObject = new JSONObject();for (Column column : columns){jsonObject.put(column.getName(),column.getValue());}if(columns.size() > 0){try(Jedis jedis = RedisUtils.getJedis()){jedis.del(columns.get(0).getValue());}catch (Exception e){e.printStackTrace();}}}private static void redisUpdate(List<Column> columns){JSONObject jsonObject = new JSONObject();for (Column column : columns){System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());jsonObject.put(column.getName(),column.getValue());}if(columns.size() > 0){try(Jedis jedis = RedisUtils.getJedis()){jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());System.out.println("---------update after: "+jedis.get(columns.get(0).getValue()));}catch (Exception e){e.printStackTrace();}}}public static void printEntry(List<Entry> entrys) {for (Entry entry : entrys) {if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {continue;}RowChange rowChage = null;try {//获取变更的row数据rowChage = RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error,data:" + entry.toString(),e);}//获取变动类型EventType eventType = rowChage.getEventType();System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));for (RowData rowData : rowChage.getRowDatasList()) {if (eventType == EventType.INSERT) {redisInsert(rowData.getAfterColumnsList());} else if (eventType == EventType.DELETE) {redisDelete(rowData.getBeforeColumnsList());} else {//EventType.UPDATEredisUpdate(rowData.getAfterColumnsList());}}}}public static void main(String[] args){System.out.println("---------O(∩_∩)O哈哈~ initCanal() main方法-----------");//=================================// 创建链接canal服务端CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(REDIS_IP_ADDR,11111), "example", "", "");int batchSize = 1000;//空闲空转计数器int emptyCount = 0;System.out.println("---------------------canal init OK,开始监听mysql变化------");try {connector.connect();//connector.subscribe(".*\\..*");connector.subscribe("bigdata.t_user");connector.rollback();int totalEmptyCount = 10 * _60SECONDS;while (emptyCount < totalEmptyCount) {System.out.println("我是canal,每秒一次正在监听:"+ UUID.randomUUID().toString());Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {emptyCount++;try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }} else {//计数器重新置零emptyCount = 0;printEntry(message.getEntries());}connector.ack(batchId); // 提交确认// connector.rollback(batchId); // 处理失败, 回滚数据}System.out.println("已经监听了"+totalEmptyCount+"秒,无任何消息,请重启重试......");} finally {connector.disconnect();}}
}

此时我们修改mysql,redis就会自动做同步,原因就是我们在这里打日志的时候也进行了redis的回写操作,大家可以在业务类中很容易的发现

这篇关于redis高级 使用canal进行mysql和redis的双写一致应用篇的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1051833

相关文章

JAVA系统中Spring Boot应用程序的配置文件application.yml使用详解

《JAVA系统中SpringBoot应用程序的配置文件application.yml使用详解》:本文主要介绍JAVA系统中SpringBoot应用程序的配置文件application.yml的... 目录文件路径文件内容解释1. Server 配置2. Spring 配置3. Logging 配置4. Ma

MySQL中时区参数time_zone解读

《MySQL中时区参数time_zone解读》MySQL时区参数time_zone用于控制系统函数和字段的DEFAULTCURRENT_TIMESTAMP属性,修改时区可能会影响timestamp类型... 目录前言1.时区参数影响2.如何设置3.字段类型选择总结前言mysql 时区参数 time_zon

Python MySQL如何通过Binlog获取变更记录恢复数据

《PythonMySQL如何通过Binlog获取变更记录恢复数据》本文介绍了如何使用Python和pymysqlreplication库通过MySQL的二进制日志(Binlog)获取数据库的变更记录... 目录python mysql通过Binlog获取变更记录恢复数据1.安装pymysqlreplicat

Linux使用dd命令来复制和转换数据的操作方法

《Linux使用dd命令来复制和转换数据的操作方法》Linux中的dd命令是一个功能强大的数据复制和转换实用程序,它以较低级别运行,通常用于创建可启动的USB驱动器、克隆磁盘和生成随机数据等任务,本文... 目录简介功能和能力语法常用选项示例用法基础用法创建可启动www.chinasem.cn的 USB 驱动

C#使用yield关键字实现提升迭代性能与效率

《C#使用yield关键字实现提升迭代性能与效率》yield关键字在C#中简化了数据迭代的方式,实现了按需生成数据,自动维护迭代状态,本文主要来聊聊如何使用yield关键字实现提升迭代性能与效率,感兴... 目录前言传统迭代和yield迭代方式对比yield延迟加载按需获取数据yield break显式示迭

使用SQL语言查询多个Excel表格的操作方法

《使用SQL语言查询多个Excel表格的操作方法》本文介绍了如何使用SQL语言查询多个Excel表格,通过将所有Excel表格放入一个.xlsx文件中,并使用pandas和pandasql库进行读取和... 目录如何用SQL语言查询多个Excel表格如何使用sql查询excel内容1. 简介2. 实现思路3

java脚本使用不同版本jdk的说明介绍

《java脚本使用不同版本jdk的说明介绍》本文介绍了在Java中执行JavaScript脚本的几种方式,包括使用ScriptEngine、Nashorn和GraalVM,ScriptEngine适用... 目录Java脚本使用不同版本jdk的说明1.使用ScriptEngine执行javascript2.

c# checked和unchecked关键字的使用

《c#checked和unchecked关键字的使用》C#中的checked关键字用于启用整数运算的溢出检查,可以捕获并抛出System.OverflowException异常,而unchecked... 目录在 C# 中,checked 关键字用于启用整数运算的溢出检查。默认情况下,C# 的整数运算不会自

在MyBatis的XML映射文件中<trim>元素所有场景下的完整使用示例代码

《在MyBatis的XML映射文件中<trim>元素所有场景下的完整使用示例代码》在MyBatis的XML映射文件中,trim元素用于动态添加SQL语句的一部分,处理前缀、后缀及多余的逗号或连接符,示... 在MyBATis的XML映射文件中,<trim>元素用于动态地添加SQL语句的一部分,例如SET或W

Mybatis官方生成器的使用方式

《Mybatis官方生成器的使用方式》本文详细介绍了MyBatisGenerator(MBG)的使用方法,通过实际代码示例展示了如何配置Maven插件来自动化生成MyBatis项目所需的实体类、Map... 目录1. MyBATis Generator 简介2. MyBatis Generator 的功能3