本文主要是介绍Redis中管道操作pipeline的实现,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
《Redis中管道操作pipeline的实现》RedisPipeline是一种优化客户端与服务器通信的技术,通过批量发送和接收命令减少网络往返次数,提高命令执行效率,本文就来介绍一下Redis中管道操...
什么是pipeline
在 Redis 中,Pipeline(管道)是一种客户端与服务器间通信的优化机制,旨在减少网络往返时间和提高命令执行效率。以下是 Redis Pipeline 的具体定义和特点:
1.批量发送与接收:
- 使用 Pipeline 时,客户端不再逐条发送命令,而是将多个命令一次性打包成一个请求包发送给 Redis 服务器。相应地,服务器在接收到这个请求包后,不是立即返回每条命令的执行结果,而是先将所有命令依次执行完毕,然后将所有结果打包成一个响应包返回给客户端。
- 这种做法显著减少了客户端与服务器之间网络通信的次数,尤其是对于需要执行大量命令的场景,能够极大地降低网络延迟带来的影响。
2.异步执行
- 尽管 Redis Pipeline 中的所有命令是在服务器端按顺序执行的,但由于客户端与服务器之间的通信是批量进行的,客户端可以在发送完一批命令后立刻开始处理其他任务,而无需等待每个命令的单独响应。这种异步处理方式可以更好地利用客户端的计算资源,提高整体应用程序的并发性能。
3.命令隔离
- 在 Pipeline 中,每个命令的执行互不影响,即一个命令的执行结果不会影响后续命令的执行。这意味着即使某条命令执行失败,也不会阻止后续命令的执行。客户端在解析响应包时,可以根据响应内容判断每条命令的执行结果。
4.使用场景
- Pipeline 主要适用于需要对 Redis 执行大量命令的操作,如数据批量导入、大规模数据更新、复杂查询等。这些操作若不使用 Pipeline,可能会因为网络延迟导致整体执行时间显著增加。
- 对于涉及事务(transaction)的操作,虽然也可以使用 Pipeline 来打包命令,但需要注意的是,Pipeline 不提供事务的原子性和一致性保证。如果需要确保一组命令作为一个原子单位执行,应使用 Redis 的 MULTI/EXEC 命令来开启事务。
5.注意事项
- 虽然 Pipeline 能够显著提高命令执行效率,但一次性发送的命令数量不宜过大,否则可能导致数据包过大,增加网络传输压力,甚至超过 Redis 服务器或客户端的缓冲区限制,引发错误。合理的命令打包大小需要根据实际环境和网络状况进行调整
- 在使用 Pipeline 时,由于命令的响应是延迟返回的,客户端需要做好错误处理和重试策略,尤其是在网络不稳定或服务器负载较高的情况下。
总结来说,Redis Pipeline 是一种客户端与服务器间高效通信的技术,通过批量发送和接收命令,减少网络往返次数,提高命令执行效率,尤其适用于大量命令操作的场景。在使用时需注意命令打包大小的控制以及错误处理。
场景一:我要向redis新增大批量的数据
Redis Pipeline允许一次性发送多个命令到Redis服务器,而无需等待每个命令的响应,显著减少了网络往返时间和潜在的延迟。在Spring Boot应用中,可以使用RedisTemplate的executePipelined()方法实现:
@Autowired private StringRedisTemplate redisTemplate public void batchInsertUsersWithPipeline(List<User> users, String keyPrefix, long ttlSeconds) { redisTemplate.executePipelined((RedisCallback<Object>) connection -> { for (User user : users) { String key = generateKey(keyPrefix, user.getId()); String value = objectwww.chinasem.cnMapper.writeValueAsString(user); connection.setEx(key.getBytes(), (int) ttlSeconds, value.getBytes()); } return null; }); }
分批处理
尽管Pipeline提高了效率,但对于千万级数据,一次性发送所有命令可能导致内存溢出或网络阻塞。因此,建议将数据分批处理,每批包含适量的记录(如1000条),逐批发送至Redis:
public void insertUsersInBatches(List<User> users, String keyPrefix, long ttlSeconds, int batchSize) { int start = 0; while (start < users.size()) { int end = Math.min(start + batchSize, users.size()); List<User> batch = users.subList(start, end); batchInsertUsersWithPipeline(batch, keyPrefix, ttlSeconds); start = end; } }
batchInsertUsersWithPipeline方法利用Redis Pipeline机制发送批量命令,可以在一定程度上提高插入操作的并发性,减少网络往返时间和整体耗时。然而,Pipeline本身并不能严格保证所有命令同时成功或失败,其主要特性如下:
1.原子性:
- Redis命令在Pipeline内部是原子性的,即单个命令的执行不会被其他命令中断。
- 注意:这并不意味着整个Pipeline的所有命令作为一个整体具有原子性。Pipeline中的命令仍然是依次执行的,只是客户端与服务器之间的通信过程被优化了。
2.响应顺序
- Redis服务器会按接收到命令的顺序返回结果。即使在Pipeline中并发发送多个命令,客户端接收到的响应也将按照命令发送的顺序排列。
3.故障处理
- 如果Pipeline中的某个命令执行失败(如语法错误、key不存在等),后续命令通常仍会继续执行。
- 错误信息会包含在相应命令的响应中,客户端可以根据这些信息判断哪些命令执行成功,哪些失败。
综上所述,batchInsertUsersWithPipeline方法不能严格保证所有命令同时成功或失败。在实际使用中,如果需要确保一批数据要么全部成功插入,要么全部失败回滚,可以采取以下策略:
事务( MULTI/EXEC/DISCARD ):
- redis提供了事务(Transaction)功能,通过MULTI、EXEC和DISCARD等命令,可以将一组命令打包在一起执行,只有当所有命令都能成功执行时,整个事务才会提交;否则,任何命令失败都将导致整个事务回滚。
- 尽管Redis事务不支持回滚到某一特定状态(即不保证隔离性),但在批量插入场景下,它可以满足“全有或全无”的要求。
- 使用Lua脚本编写批量插入逻辑,脚本在Redis服务器端执行,具备原子性。即使在网络中断或服务器重启等异常情况下,脚本要么完全执行,要么完全不执行,不会出现部分成功部分失败的情况。
batchInsertUsersWithPipelinChina编程e方法中的connection中各个方法的区别是什么?
1.connection.setEx(key.getBytes(), (int) ttlSeconds, value.getBytes());
这一行调用了RedisConnection的setEx方法,用于设置一个带有过期时间(Time To Live,TTL)的键值对。参数说明如下:
- key.getBytes(): 将给定的键(字符串)转换为字节数组,这是Redis底层通信协议所要求的格式。
- (int) ttlSeconds: 将过期时间(以秒为单位)转换为整数类型,表示键值对在指定秒数后自动过期并被删除。
- value.getBytes(): 同样将给定的值(用户对象序列化后的jsON字符串)转换为字节数组
setEx方法确保在设置键值对的同时为其设定一个过期时间。如果键已经存在,该方法会更新键的值和过期时间。这个操作在Pipeline模式下是原子的,即在同一时刻只有一个setEx命令被执行。
2.connection.multi(); 和 connection.exec();
这两个方法涉及Redis的事务(Transaction)功能。在Pipeline模式下,由于我们希望保持较高的性能,一般不会使用这两个方法。但如果确实需要保证一批命令的原子性,可以使用如下方式:
- connection.multi(): 开启一个事务块,后续的所有命令都会被放入这个事务中,直到调用exec方法。在Pipeline模式下,调用multi方法可能会破坏原有的性能优化效果。
- connection.exec(): 提交并执行事务中的所有命令。如果事务中有任何一个命令执行失败,其他命令也会被取消执行,整个事务被视为失败。
在您的batchInsertUsersWithPipeline方法中并没有使用multi和exec,因为Pipeline已经提供了高效的批量执行机制,而且这里的目的是提高插入性能,而不是实现严格的事务行为。
综上所述,batchInsertUsersWithPipeline方法中直接使用了setEx方法,利用Pipeline来高效地批量插入带有过期时间的键值对。如果需要实现更严格的事务控制,应考虑使用Redis的事务(MULTI/EXEC)或Lua脚本,但这通常会牺牲一定的性能,并且与Pipeline机制不完全兼容。在实际应用场景中,应根据业务需求权衡选择合适的操作方式。
3.connection.set()和connection.setNx有什么区别
connection.set() 和 connection.setNx() 都是Redis的键值对设置方法,它们的主要区别在于是否存在条件以及对已有键的处理方式:
1.connection.set(key, value)
这是最基础的设置键值对的方法,无论键是否存在,都会直接覆盖(或创建)对应的键值对。参数说明如下:
- key: 要设置的键。
- value: 要关联的值。
行为特点:
- 无条件设置:不论键是否存在,都会执行设置操作。
- 覆盖已有键:如果键已存在,其原有值会被新的值覆盖。
- 创建新键:如果键不存在,会创建一个新的键值对。
2.connection.setNx(key, value)
这是带有条件的设置键值对方法,仅当键不存在时才会设置键值对。参数与set()相同:
- key: 要设置的键
- value: 要关联的值。
行为特点
- 有条件设置:仅在键不存在的情况下执行设置操作。
- 不覆盖已有键:如果键已存在,该方法不会有任何动作,既不会改变键的值,也不会抛出错误。
- 创建新键:如果键不存在,会创建一个新的键值对。
总结来说,connection.set()无条件地设置或更新键值对,而connection.setNx()则是在键不存在时才设置键值对,如果键已存在,则不会执行任何操作。前者适用于常规的键值更新或插入,后者常用于实现锁机制、唯一性检查等场景,确保某个键的值只在首次设置时有效。在您的batchInsertUsersWithPipeline方法中,由于目标是批量插入新数据,所以使用了setEx方法(带有过期时间的set),确保每个用户数据作为一个新的键值对被添加到Redis中。如果您需要在插入前检查键的唯一性,可以考虑使用setNx方法。不过,对于批量插入场景,通常假设数据是新的且键不存在,因此直接使用setEx更为常见。
场景二:大批量删除redis中的数据
public void batchDeleteKeysWithPipeline(List<String> keys) { redisTemplate.executePipelined((RedisCallback<Object>) connection -> { for (String key : keys) { connection.del(key.getBytes()); } return null; }); }
- redisTemplate.executePipelined() 方法创建了一个Pipeline上下文,允许您在回调函数内发送多个命令而不等待响应。
- 回调函数遍历要删除的键列表,对每个键调用 connection.del(key.getBytes())。del 方法用于删除指定键,将键名转换为字节数组后传递给Redis。
- 所有del命令在Pipeline中被连续发送至Redis服务器,期间客户端不会等待任何响应。
- 当回调函数执行完毕并返回时,Pipeline中的命令会被一次性发送至Redis,并接收所有命令的响应。由于命令是在一次网络往返中批量发送的,因此比单独执行每个删除命令效率更高。
场景三:删除redis中千万级别的数据
1.批量删除策略
- 使用 SCAN 命令结合 DEL 命令实现批量删除。
SCAN 命令用于增量式地迭代数据集,避免一次性获取所有键导致内存溢出。
DEL 命令用于删除单个或多个键。
2.并行处理
- 利用多线程或异步任务将批量删除操作分散到多个工作线程中,提高删除效率。
3.Redis 客户端优化:
- 选择高性能、支持批量操作和管道(Pipeline)功能的 Redis 客户端库,如 Jedis 或 Lettuce。
4.监控与故障恢复:
- 在执行大规模删除操作时,密切关注 Redis 的性能指标(如 CPU、内存、网络带宽等)以及客户端程序的状态。
- 准备应对可能的异常情况,如断连重试、数据一致性检查等。
基于Jedis客户端实现
import redis.clients.jedis.Jedis; import redis.clients.jedis.ScanParams; import redis.clients.jedis.ScanResult; public class RedisDataDeleter { private static final int SCAN_BATCH_SIZE = 1000; // 可根据实际情况调整 private static final String MATCH_PATTERN = "*"; // 匹配所有键 public void deleteAllKeys(Jedis jedis) { ScanParams scanParams = new ScanParams().count(SCAN_BATCH_SIZE).match(MATCH_PATTERN); String cursor = "0"; while (true) { ScanResult<String> scanResult = jedis.scan(cursor, scanParams); cursor = scanResult.getCursor(); List<String> keysToDelete = scanResult.getResult(); if (!keysToDelete.isEmpty()) { // 使用 Pipeline 批量删除键 Pipeline pipeline = jedis.pipelined(); for (String key : keysToDelete) { pipeline.del(key); } pipeline.sync(); // 执行批量命令 } if ("0".equals(cursor)) { break; // 扫描完成 } } } }
注意
- 请确保在生产环境中适当调整 SCAN_BATCH_SIZE 参数,使其既能充分利用系统资源,又不会对 Redis 服务器造成过大压力。
- 在执行大规模删除操作前,最好先备份重要数据,并在非高峰期进行操作,以减少对业务的影响。
如果条件允许,建议升级到 Redis 6.x 版本,并启用 activedefrag 配置项,有助于在删除大量数据后及时进行碎片整理,保持 Redis 内存的高效利用。同时,监控 Redis 的内存使用情况和碎片率,必要时手动触发 BGREWRITEAOF 或 BGSAVE 操作。
maven
<dependencies>javascript <!-- ... 其他依赖 ... --> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.7.0</version> <!-- 根据实际版本号调整 --> </dependency> </dependencies>
jedis连接池配置
spring.redis.host=192.168.1.100 spring.redis.port=6379 spring.redis.password=mysecretpassword # 如果有密码,请填写 # Jedis 连接池配置 spring.redis.jedis.pool.max-active=10 spring.redis.jedis.pool.max-idle=6 spring.redis.jedis.pool.min-idle=2 spring.redis.jedis.pool.max-wait=2000ms
jedisConfig
@Configuration public class JedisConfig { @Value("${spring.redis.host}") private String host; @Value("${spring.redis.port}") private int port; @Value("${spring.redis.password}") private String password; @Bean public JedisPool jedisPool() { JedisPoolConfig poolConfig = new JedisPoolConfig(); poolConfig.setMaxTotal(Integer.parseInt(env.getProperty("spring.redis.jedis.pool.max-active"))); poolConfig.setMaxIdle(Integer.parseInt(env.getProperty("spring.redis.jedis.pool.max-idle"))); poolConfig.setMinIdle(Integer.parseInt(env.getProperty("spring.redis.jedis.pool.min-idle"))); poolConfig.setMaxWaitMillis(Long.parseLong(env.getProperty("spring.redis.jedis.pool.max-wait"))); return new JedisPool(poolConfig, host, port, Protocol.DEFAULT_TIMEOUT, password); } }
实现 Redis 数据删除服务
@Service public class RedisDataDeleterService { @Autowired private JedisPool jedisPool; public void deleteAllKeys() { try (Jedis jedis = jedisPool.getResource()) { ScanParams scanParams = new ScanParams().match("*").count(1000); String cursor = "0"; while (true) { ScanResult<String> scanResult = jedis.scan(cursor, scanParams); cursor = scanResult.getCursor(); List<String> keysToDelete = scanResult.getResult(); if (!keysToDelete.isEmpty()) { Pipandroideline pipeline = jedis.pipelined(); for (String key : keysToDelete) { pipeline.del(key); } pipeline.sync(); } if ("0".equals(cursor)) { break; } } } } }
调用删除服务
@RestController @RequestMapping("/redis") public class RedisController { @Autowired private RedisDataDeleterService redisDataDeleterService; @GetMapping("/delete-all-keys") public ResponseEntity<?> deleteAllKeys() { redisDataDeleterService.deleteAllKeys(); return ResponseEntity.ok().build(); } }
基于Lettuce
maven
<dependencies> <!-- ... 其他依赖 ... --> <dependency> <groupId>io.lettuce</groupId> <artifactId>lettuce-core</artifactId> <version>6.2.</version> <!-- 根据实际版本号调整 --> </dependency> </dependencies>
配置Lettuce
Spring Boot 自动配置会为 Lettuce 提供连接池支持。在 application.properties 或 application.yml 中配置 Redis 连接信息:
http://www.chinasem.cnspring.redis.host=192.168.1.100 spring.redis.port=6379 spring.redis.password=mysecretpassword # 如果有密码,请填写
使用 Lettuce 客户端执行批量删除操作:
@Service public class RedisDataDeleterService { @Autowired private RedisConnectionFactory connectionFactory; public void deleteAllKeys() { RedisAsyncCommands<String, String> asyncCommands = connectionFactory.getConnection().async(); ScanArgs scanArgs = ScanArgs.Builder.matches("*").count(1000); RedisFuture<ScanResult<String>> scanFuture = asyncCommands.scan(ScanCursor.INITIAL, scanArgs); AtomicBoolean isRunning = new AtomicBoolean(true); AtomicReference<ScanCursor> lastCursor = new AtomicReference<>(ScanCursor.INITIAL); // 异步处理扫描结果 scanFuture.thenAccept(scanResult -> { lastCursor.set(scanResult.getCursor()); List<String> keysToDelete = scanResult.getKeys(); if (!keysToDelete.isEmpty()) { RedisFuture<Long> delFuture = asyncCommands.del(keysToDelete.toArray(new String[0])); delFuture.thenAccept(count -> { if (isRunning.get()) { // 如果仍在运行,继续扫描 deleteAllKeysRecursive(asyncCommands, scanArgs, lastCursor, isRunning); } }); } else { isRunning.set(false); } }); // 设置超时时间(可根据实际情况调整) CompletableFuture.runAsync(() -> { try { Thread.sleep(120000); // 2分钟超时 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } isRunning.set(false); }); } private void deleteAllKeysRecursive(RedisAsyncCommands<String, String> asyncCommands, ScanArgs scanArgs, AtomicReference<ScanCursor> lastCursor, AtomicBoolean isRunning) { if (isRunning.get()) { asyncCommands.scan(lastCursor.get(), scanArgs).thenAccept(scanResult -> { lastCursor.set(scanResult.getCursor()); List<String> keysToDelete = scanResult.getKeys(); if (!keysToDelete.isEmpty()) { asyncCommands.del(keysToDelete.toArray(new String[0])).thenAccept(count -> { if (isRunning.get()) { deleteAllKeysRecursive(asyncCommands, scanArgs, lastCursor, isRunning); } }); } else { isRunning.set(false); } }); } } }
调用
@RestController @RequestMapping("/redis") public class RedisController { @Autowired private RedisDataDeleterService redisDataDeleterService; @GetMapping("/delete-all-keys") public ResponseEntity<?> deleteAllKeys() { redisDataDeleterService.deleteAllKeys(); return ResponseEntity.ok().build(); } }
到此这篇关于Redis中管道操作pipeline的实现的文章就介绍到这了,更多相关Redis管道操作pipeline内容请搜索China编程(www.chinasem.cn)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程China编程(www.chinasem.cn)!
这篇关于Redis中管道操作pipeline的实现的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!