基于canal的Redis缓存双写

2024-09-08 07:12
文章标签 redis canal 缓存 双写

本文主要是介绍基于canal的Redis缓存双写,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

canal地址:alibaba/canal: 阿里巴巴 MySQL binlog 增量订阅&消费组件 (github.com)icon-default.png?t=O83Ahttps://github.com/alibaba/canal

1. 准备

1.1 MySQL

查看主机二进制日志

show master status

查看binlog是否开启

show variables like 'log_bin'

授权canal连接MySQL账号

CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;SELECT * FROM mysql.user;

1.2 canal 

下载canal,上传redis所在服务器

 解压

修改conf/example下instance.properties

修改mysql主机ip

修改在mysql新建的canal账户 

启动

 2. canal客户端编写

新建表

CREATE TABLE `t_user` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`userName` varchar(100) NOT NULL,PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8mb4

依赖

<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.4</version>
</dependency>
package com.example.redis.utils;import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;public class RedisUtils
{public static final String  REDIS_IP_ADDR = "192.168.229.102";// public static final String  REDIS_pwd = "111111";public static JedisPool jedisPool;static {JedisPoolConfig jedisPoolConfig=new JedisPoolConfig();jedisPoolConfig.setMaxTotal(20);jedisPoolConfig.setMaxIdle(10);jedisPool=new JedisPool(jedisPoolConfig,REDIS_IP_ADDR,6379,10000);}public static Jedis getJedis() throws Exception {if(null!=jedisPool){return jedisPool.getResource();}throw new Exception("Jedispool is not ok");}}
package com.example.redis;import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import com.example.redis.utils.RedisUtils;
import org.springframework.boot.test.context.SpringBootTest;
import redis.clients.jedis.Jedis;import java.net.InetSocketAddress;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;@SpringBootTest
public class CanalTest {public static final Integer _60SECONDS = 60;public static final String  REDIS_IP_ADDR = "192.168.229.102";private static void redisInsert(List<Column> columns){JSONObject jsonObject = new JSONObject();for (Column column : columns){System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());jsonObject.put(column.getName(),column.getValue());}if(columns.size() > 0){try(Jedis jedis = RedisUtils.getJedis()){jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());}catch (Exception e){e.printStackTrace();}}}private static void redisDelete(List<Column> columns){JSONObject jsonObject = new JSONObject();for (Column column : columns){jsonObject.put(column.getName(),column.getValue());}if(columns.size() > 0){try(Jedis jedis = RedisUtils.getJedis()){jedis.del(columns.get(0).getValue());}catch (Exception e){e.printStackTrace();}}}private static void redisUpdate(List<Column> columns){JSONObject jsonObject = new JSONObject();for (Column column : columns){System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());jsonObject.put(column.getName(),column.getValue());}if(columns.size() > 0){try(Jedis jedis = RedisUtils.getJedis()){jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());System.out.println("---------update after: "+jedis.get(columns.get(0).getValue()));}catch (Exception e){e.printStackTrace();}}}public static void printEntry(List<Entry> entrys) {for (Entry entry : entrys) {if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {continue;}RowChange rowChage = null;try {//获取变更的row数据rowChage = RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error,data:" + entry.toString(),e);}//获取变动类型EventType eventType = rowChage.getEventType();System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));for (RowData rowData : rowChage.getRowDatasList()) {if (eventType == EventType.INSERT) {redisInsert(rowData.getAfterColumnsList());} else if (eventType == EventType.DELETE) {redisDelete(rowData.getBeforeColumnsList());} else {//EventType.UPDATEredisUpdate(rowData.getAfterColumnsList());}}}}public static void main(String[] args){System.out.println("---------O(∩_∩)O哈哈~ initCanal() main方法-----------");//=================================// 创建链接canal服务端CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(REDIS_IP_ADDR,11111), "example", "", "");int batchSize = 1000;//空闲空转计数器int emptyCount = 0;System.out.println("---------------------canal init OK,开始监听mysql变化------");try {connector.connect();//connector.subscribe(".*\\..*");connector.subscribe("redis_canal.t_user");connector.rollback();int totalEmptyCount = 10 * _60SECONDS;while (emptyCount < totalEmptyCount) {System.out.println("我是canal,每秒一次正在监听:"+ UUID.randomUUID().toString());Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {emptyCount++;try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }} else {//计数器重新置零emptyCount = 0;printEntry(message.getEntries());}connector.ack(batchId); // 提交确认// connector.rollback(batchId); // 处理失败, 回滚数据}System.out.println("已经监听了"+totalEmptyCount+"秒,无任何消息,请重启重试......");} finally {connector.disconnect();}}}

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.3.3</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.example</groupId><artifactId>Redis</artifactId><version>0.0.1-SNAPSHOT</version><name>Redis</name><description>Redis</description><url/><licenses><license/></licenses><developers><developer/></developers><scm><connection/><developerConnection/><tag/><url/></scm><properties><java.version>17</java.version></properties><dependencies><!--SpringBoot通用依赖模块--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.28</version><scope>compile</scope></dependency><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.4</version></dependency><!-- 数据库相关配置启动器 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jdbc</artifactId></dependency><!-- druid启动器的依赖  --><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-3-starter</artifactId><version>1.2.18</version></dependency><!-- 驱动类--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.28</version></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>3.8.0</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.0</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
spring.application.name=Redis
server.port=8080# ========================redis??=====================
spring.data.redis.database=0
spring.data.redis.host=192.168.229.102
spring.data.redis.port=6379
spring.data.redis.lettuce.pool.max-active=8
spring.data.redis.lettuce.pool.max-wait=-1ms
spring.data.redis.lettuce.pool.max-idle=8
spring.data.redis.lettuce.pool.min-idle=0# ========================alibaba.druid=====================
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/redis_canal?useUnicode=true&characterEncoding=utf-8&useSSL=false
spring.datasource.username=root
spring.datasource.password=abc123
spring.datasource.druid.test-while-idle=false

canal启动失败可能是没有durid依赖,将durid放入lib下即可

Central Repository: com/alibaba/druid/1.2.22 (maven.org)icon-default.png?t=O83Ahttps://repo1.maven.org/maven2/com/alibaba/druid/1.2.22/

这篇关于基于canal的Redis缓存双写的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1147463

相关文章

Redis 中的热点键和数据倾斜示例详解

《Redis中的热点键和数据倾斜示例详解》热点键是指在Redis中被频繁访问的特定键,这些键由于其高访问频率,可能导致Redis服务器的性能问题,尤其是在高并发场景下,本文给大家介绍Redis中的热... 目录Redis 中的热点键和数据倾斜热点键(Hot Key)定义特点应对策略示例数据倾斜(Data S

redis+lua实现分布式限流的示例

《redis+lua实现分布式限流的示例》本文主要介绍了redis+lua实现分布式限流的示例,可以实现复杂的限流逻辑,如滑动窗口限流,并且避免了多步操作导致的并发问题,具有一定的参考价值,感兴趣的可... 目录为什么使用Redis+Lua实现分布式限流使用ZSET也可以实现限流,为什么选择lua的方式实现

Redis中管道操作pipeline的实现

《Redis中管道操作pipeline的实现》RedisPipeline是一种优化客户端与服务器通信的技术,通过批量发送和接收命令减少网络往返次数,提高命令执行效率,本文就来介绍一下Redis中管道操... 目录什么是pipeline场景一:我要向Redis新增大批量的数据分批处理事务( MULTI/EXE

Linux修改pip和conda缓存路径的几种方法

《Linux修改pip和conda缓存路径的几种方法》在Python生态中,pip和conda是两种常见的软件包管理工具,它们在安装、更新和卸载软件包时都会使用缓存来提高效率,适当地修改它们的缓存路径... 目录一、pip 和 conda 的缓存机制1. pip 的缓存机制默认缓存路径2. conda 的缓

Redis中高并发读写性能的深度解析与优化

《Redis中高并发读写性能的深度解析与优化》Redis作为一款高性能的内存数据库,广泛应用于缓存、消息队列、实时统计等场景,本文将深入探讨Redis的读写并发能力,感兴趣的小伙伴可以了解下... 目录引言一、Redis 并发能力概述1.1 Redis 的读写性能1.2 影响 Redis 并发能力的因素二、

Redis中的常用的五种数据类型详解

《Redis中的常用的五种数据类型详解》:本文主要介绍Redis中的常用的五种数据类型详解,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Redis常用的五种数据类型一、字符串(String)简介常用命令应用场景二、哈希(Hash)简介常用命令应用场景三、列表(L

Redis解决缓存击穿问题的两种方法

《Redis解决缓存击穿问题的两种方法》缓存击穿问题也叫热点Key问题,就是⼀个被高并发访问并且缓存重建业务较复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击,本文给大家介绍了Re... 目录引言解决办法互斥锁(强一致,性能差)逻辑过期(高可用,性能优)设计逻辑过期时间引言缓存击穿:给

Redis中如何实现商品秒杀

《Redis中如何实现商品秒杀》:本文主要介绍Redis中如何实现商品秒杀问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录技术栈功能实现步骤步骤一:准备商品库存数据步骤二:实现商品秒杀步骤三:优化Redis性能技术讲解Redis的List类型Redis的Set

Redis如何实现刷票过滤

《Redis如何实现刷票过滤》:本文主要介绍Redis如何实现刷票过滤问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录引言一、概述二、技术选型三、搭建开发环境四、使用Redis存储数据四、使用SpringBoot开发应用五、 实现同一IP每天刷票不得超过次数六

如何通过Golang的container/list实现LRU缓存算法

《如何通过Golang的container/list实现LRU缓存算法》文章介绍了Go语言中container/list包实现的双向链表,并探讨了如何使用链表实现LRU缓存,LRU缓存通过维护一个双向... 目录力扣:146. LRU 缓存主要结构 List 和 Element常用方法1. 初始化链表2.