使用 Redisson 实现分布式 CountDownLatch,如何使用RCountDownLatch实现内外网数据互通的超时控制?

本文主要是介绍使用 Redisson 实现分布式 CountDownLatch,如何使用RCountDownLatch实现内外网数据互通的超时控制?,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

闭锁(CountDownLatch)是一种用于同步多个线程的机制,它可以让一个或多个线程等待其他线程完成某个任务后再继续执行。

在Java中,RCountDownLatch 是 Redisson 提供的分布式闭锁实现,它基于 Redis 的分布式系统,可以在分布式环境中实现多个线程的同步。

闭锁的核心概念是一个计数器,该计数器可以被初始化为一个正整数,并通过 trySetCount() 方法来设置初始计数值。每个线程在完成任务后,可以通过 countDown() 方法将计数器减一。当计数器的值达到零时,所有等待的线程将被释放,继续执行后续操作。

RCountDownLatch latch = redissonClient.getCountDownLatch(LATCH_KEY);
latch.trySetCount(1);
latch.await();// 在其他线程或JVM里
RCountDownLatch latch = redissonClient.getCountDownLatch(LATCH_KEY);
latch.countDown();

1、实现多个线程的同步

下面是一个示例代码,演示了如何使用 RCountDownLatch 实现多个线程的同步:

import org.redisson.Redisson;
import org.redisson.api.RCountDownLatch;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;public class CountDownLatchExample {private static final int THREAD_COUNT = 5;public static void main(String[] args) throws InterruptedException {// 初始化 Redisson 客户端RedissonClient redissonClient = getRedissonClient();// 创建闭锁对象,设置初始计数值RCountDownLatch latch = redissonClient.getCountDownLatch("my-latch");latch.trySetCount(THREAD_COUNT);// 创建多个线程并启动for (int i = 0; i < THREAD_COUNT; i++) {new Thread(() -> {// 模拟线程执行任务// ...// 任务完成后,计数器减一latch.countDown();}).start();}// 等待所有线程任务完成latch.await();// 所有线程任务完成后,执行后续操作System.out.println("All threads have completed!");// 关闭 Redisson 客户端redissonClient.shutdown();}private static RedissonClient getRedissonClient() {Config config = new Config();config.useSingleServer().setAddress("redis://localhost:6379");return Redisson.create(config);}
}

解析:

  1. 首先,我们通过调用 getRedissonClient() 方法初始化了 Redisson 客户端。
  2. 接下来,我们通过 redissonClient.getCountDownLatch(LATCH_KEY) 创建了一个 RCountDownLatch 对象,并将其与指定的键 LATCH_KEY 关联起来。
  3. 使用 latch.trySetCount(1) 方法设置了初始计数值为1。
  4. 然后,我们调用 latch.await() 方法来等待其他线程完成任务。如果计数器的值不为0,当前线程将被阻塞。
  5. 当其他线程完成任务后,调用 latch.countDown() 方法将计数器减一。
  6. 最后,我们关闭了 Redisson 客户端。

总结:
本文介绍了如何使用 Redisson 实现分布式的 CountDownLatch。通过初始化 Redisson 客户端、创建 RCountDownLatch 对象、设置初始计数值、等待其他线程完成任务和计数减一等步骤,我们可以在分布式环境下进行线程同步操作。

2、经典场景

【需求】:我有一个外网接口,但是它无法直接访问内网的数据。现在有一个第三方需要通过外网接口获取内网的数据。我希望在第三方请求外网接口时,外网接口能够等待内网获取数据并返回给第三方,如果等待超过30秒则直接返回超时。

为了满足这个需求,我们可以使用 RCountDownLatch 来设计实现。具体的设计方案如下:

  1. 第三方向外网接口发送请求。
  2. 外网接口接收到请求后,将请求信息发送到 Kafka。
  3. 内网的服务启动一个消费者,从 Kafka 中获取请求信息。
  4. 内网服务根据请求内容获取数据,并将数据存放在 Redis 中。
  5. 内网服务使用 RCountDownLatch 创建一个闭锁对象,并设置初始计数值为 1。
  6. 内网服务在获取并存放数据到 Redis 后,调用 countDown() 方法将闭锁计数器减一。
  7. 外网接口使用 await() 方法等待闭锁计数器达到零,即等待内网服务完成数据的获取和存放到 Redis。
  8. 当闭锁计数器达到零时,外网接口从 Redis 中获取数据,并将数据作为响应返回给第三方。
  9. 如果等待超过30秒,外网接口直接返回超时信息给第三方。

通过这个设计方案,外网接口可以等待内网服务完成数据获取并返回给第三方。 RCountDownLatch 在这里起到了同步的作用,确保外网接口在获取到数据之前能够等待。

这个设计方案中,RCountDownLatch 用于外网服务等待内网服务完成数据获取和存放到 Redis 的过程。以下是一个简单的示例代码:

import org.redisson.api.RCountDownLatch;
import org.redisson.api.RedissonClient;@Service
@RequiredArgsConstructor
public class OuterServiceImpl implements OuterService {/*** RedissonClient */private final RedissonClient redissonClient;/*** KafkaConsumer */private final KafkaConsumer kafkaConsumer;public Response processRequest(Request request) {// 发送请求到 KafkakafkaProducer.send(request);// 创建闭锁对象并设置初始计数值为 1【】每个请求或会话都需要有自己唯一的 LATCH_KEYRCountDownLatch latch = redissonClient.getCountDownLatch(LATCH_KEY);latch.trySetCount(1);try {// 等待内网服务完成数据获取和存放到 Redisif (!latch.await(30, TimeUnit.SECONDS)) {// 超时处理return new Response("Timeout");}// 从 Redis 中获取数据Object data = redissonClient.getBucket(request.getId()).get();// 返回数据给第三方return new Response("Success", data);} catch (InterruptedException e) {Thread.currentThread().interrupt();// 异常处理return new Response("Error");}}// Kafka 消费者监听处理请求private void handleRequest(Message message) {// 处理请求并将数据存放到 Redis// 计数器减一RCountDownLatch latch = redissonClient.getCountDownLatch(LATCH_KEY);latch.countDown();}
}

在上述示例代码中,OuterService 类是一个外网服务,它使用 RCountDownLatch 实现了等待内网服务完成数据获取和存放到 Redis 的功能。在 processRequest() 方法中,它发送请求到 Kafka,并创建了一个闭锁对象。然后等待闭锁计数器达到零,即等待内网服务完成数据获取和存放到 Redis。如果等待时间超过 30 秒,将返回超时信息给第三方。

handleRequest() 方法中,外网服务的 Kafka 消费者监听处理请求,并在处理完成后调用 countDown() 方法,将闭锁计数器减一。

需要注意的是,示例代码中的 Kafka 消费者和生产者等细节并未展示,你需要根据实际情况进行实现。

总结:通过使用 RCountDownLatch,我们可以实现外网接口等待内网获取数据的场景。外网服务在调用内网服务之后等待闭锁计数器达到零,然后从 Redis 中获取数据返回给第三方。如果等待时间超过指定时间,则直接返回超时信息。

这篇关于使用 Redisson 实现分布式 CountDownLatch,如何使用RCountDownLatch实现内外网数据互通的超时控制?的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/825705

相关文章

Spring Security 基于表达式的权限控制

前言 spring security 3.0已经可以使用spring el表达式来控制授权,允许在表达式中使用复杂的布尔逻辑来控制访问的权限。 常见的表达式 Spring Security可用表达式对象的基类是SecurityExpressionRoot。 表达式描述hasRole([role])用户拥有制定的角色时返回true (Spring security默认会带有ROLE_前缀),去

大模型研发全揭秘:客服工单数据标注的完整攻略

在人工智能(AI)领域,数据标注是模型训练过程中至关重要的一步。无论你是新手还是有经验的从业者,掌握数据标注的技术细节和常见问题的解决方案都能为你的AI项目增添不少价值。在电信运营商的客服系统中,工单数据是客户问题和解决方案的重要记录。通过对这些工单数据进行有效标注,不仅能够帮助提升客服自动化系统的智能化水平,还能优化客户服务流程,提高客户满意度。本文将详细介绍如何在电信运营商客服工单的背景下进行

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

关于数据埋点,你需要了解这些基本知识

产品汪每天都在和数据打交道,你知道数据来自哪里吗? 移动app端内的用户行为数据大多来自埋点,了解一些埋点知识,能和数据分析师、技术侃大山,参与到前期的数据采集,更重要是让最终的埋点数据能为我所用,否则可怜巴巴等上几个月是常有的事。   埋点类型 根据埋点方式,可以区分为: 手动埋点半自动埋点全自动埋点 秉承“任何事物都有两面性”的道理:自动程度高的,能解决通用统计,便于统一化管理,但个性化定

中文分词jieba库的使用与实景应用(一)

知识星球:https://articles.zsxq.com/id_fxvgc803qmr2.html 目录 一.定义: 精确模式(默认模式): 全模式: 搜索引擎模式: paddle 模式(基于深度学习的分词模式): 二 自定义词典 三.文本解析   调整词出现的频率 四. 关键词提取 A. 基于TF-IDF算法的关键词提取 B. 基于TextRank算法的关键词提取

使用SecondaryNameNode恢复NameNode的数据

1)需求: NameNode进程挂了并且存储的数据也丢失了,如何恢复NameNode 此种方式恢复的数据可能存在小部分数据的丢失。 2)故障模拟 (1)kill -9 NameNode进程 [lytfly@hadoop102 current]$ kill -9 19886 (2)删除NameNode存储的数据(/opt/module/hadoop-3.1.4/data/tmp/dfs/na

异构存储(冷热数据分离)

异构存储主要解决不同的数据,存储在不同类型的硬盘中,达到最佳性能的问题。 异构存储Shell操作 (1)查看当前有哪些存储策略可以用 [lytfly@hadoop102 hadoop-3.1.4]$ hdfs storagepolicies -listPolicies (2)为指定路径(数据存储目录)设置指定的存储策略 hdfs storagepolicies -setStoragePo

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd

Hadoop数据压缩使用介绍

一、压缩原则 (1)运算密集型的Job,少用压缩 (2)IO密集型的Job,多用压缩 二、压缩算法比较 三、压缩位置选择 四、压缩参数配置 1)为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器 2)要在Hadoop中启用压缩,可以配置如下参数

Makefile简明使用教程

文章目录 规则makefile文件的基本语法:加在命令前的特殊符号:.PHONY伪目标: Makefilev1 直观写法v2 加上中间过程v3 伪目标v4 变量 make 选项-f-n-C Make 是一种流行的构建工具,常用于将源代码转换成可执行文件或者其他形式的输出文件(如库文件、文档等)。Make 可以自动化地执行编译、链接等一系列操作。 规则 makefile文件