延时任务通知服务的设计及实现(三)-- JDK的延迟队列DelayQueue

2024-05-07 09:36

本文主要是介绍延时任务通知服务的设计及实现(三)-- JDK的延迟队列DelayQueue,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、接着上文

上文我们讲述了使用redisson的RDelayedQueue实现分布式延迟队列,本文我们将自己JDK的延迟队列DelayQueue实现。

相比前者的实现,作为进程内的延迟队列,它会遇到许多技术难点:

  • 如何支持分布式的多个节点部署场景
  • 应用重启会恢复延时队列
  • 冷数据如何转换为热数据
  • 如何删除延迟队列中的任务

随后,我们也将提及:

  • 保存任务至延迟队列(生产者)
  • 读取延迟队列中的任务(消费者)

二、设计概要

在这里插入图片描述

  • 冷数据:mysql表中的任务数据

  • 热数据:jdk 延迟队列中的任务

  • 广播事件:删除延迟队列中的任务,发布的是广播事件,可以使用redis topic实现。

  • 本地事件:分布式多节点部署的时候,每个任务只保存在其中一个节点的延迟队列中,可以使用spring事件驱动实现。

  • 延迟队列 DelayQueueJob, 它实现了接口Delayed

包括任务的交易流水号和过期时间(即任务的回调时间)

import lombok.Builder;
import lombok.Data;import java.util.Date;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;/*** @author xxx*/
@Builder
@Data
public class DelayQueueJob implements Delayed {/*** 交易流水号*/private String transNo;/*** 到期时间*/private Date expireDate;public DelayQueueJob(String transNo, Date expireDate) {super();this.transNo = transNo;this.expireDate = expireDate;}/*** 用于队列中排序过期时间** @param o* @return*/@Overridepublic int compareTo(Delayed o) {return Long.valueOf(this.expireDate.getTime()).compareTo(Long.valueOf(((DelayQueueJob) o).expireDate.getTime()));}/*** 用于获取过期时间* 延迟关闭时间 = 过期时间 - 当前时间** @param unit* @return*/@Overridepublic long getDelay(TimeUnit unit) {return this.expireDate.getTime() - System.currentTimeMillis();}
}

三、应用启动流程

解决恢复延迟队列的问题。因为DelayQueue是进程内的,一旦重启,将被销毁。

在这里插入图片描述

import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;import java.util.List;
import java.util.concurrent.TimeUnit;@Slf4j
@Service
@RequiredArgsConstructor
public class ApplicationStartupListener implements ApplicationListener<ApplicationReadyEvent> {@Overridepublic void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {// 实现代码参考上面的流程图}
}

四、定时任务流程

解决冷数据如何转换为热数据的问题,防止延时任务过多导致消耗过多的jvm内存,所以只有回调时间将近的任务才放入延迟队列。

在这里插入图片描述

五、如何删除延迟队列中的任务

删除延迟队列的任务:发送广播消息通知所有的节点,当不是当前节点的时候,执行删除。

if (!NetUtil.getLocalhostStr().equals(ipAddress)) {DelayQueueSingleton.getDelayQueue().remove(transNo);
}

DelayQueueSingletons是一个单例类,详见下:

public class DelayQueueSingleton {private static volatile CustomDelayQueue<DelayQueueJob> delayQueue;private DelayQueueSingleton() {}public static CustomDelayQueue<DelayQueueJob> getDelayQueue() {if (delayQueue == null) {synchronized (DelayQueueSingleton.class) {if (delayQueue == null) {delayQueue = new CustomDelayQueue<>();}}}return delayQueue;}}

这里为了删除延迟队列的任务,我们对DelayQueue进行了重写。


import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;public class CustomDelayQueue<T extends Delayed> {private final DelayQueue<T> queue = new DelayQueue<>();private final Map<String, T> map = new ConcurrentHashMap<>();public boolean put(T task, String taskId) {// 如果任务已存在,则删除旧任务,防止重复添加this.remove(taskId);map.put(taskId, task);return queue.add(task);}public boolean remove(String taskId) {// 先删除map,再删除queueT task = map.remove(taskId);if (task != null) {return queue.remove(task);}return false;}public T take() throws InterruptedException {return queue.take();}
}

六、保存任务至延迟队列(生产者)


// 如果通知时间在一定时间范围内
if (DateUtil.offsetMinute(new DateTime(), commonConfig.getHotDataTimeLine()).after(event.getNotifyDate())) {DelayQueueSingleton.getDelayQueue().put(DelayQueueJob.builder().transNo(event.getTransNo()).expireDate(event.getNotifyDate()).build(), event.getTransNo());}

七、读取延迟队列中的任务(消费者)

作为延迟队列的消费者,它的实现和上一篇文章实现类似。不同的是take()获取任务不一样。

String transNo = null;
Date notifyDate = null;DelayQueueJob job = DelayQueueSingleton.getDelayQueue().take();
if (null != job) {transNo = job.getTransNo();notifyDate = job.getExpireDate();
}if (null == transNo) {return;
}if (log.isInfoEnabled()) {log.info("开始执行延迟队列中的任务,transNo={},notifyDate={}", transNo, notifyDate);
}// 异步执行你的操作
notifyTaskService.handleTask(transNo, notifyDate);

八、总结

作为进程内的延迟队列,在多点部署的分布式集群环境下, 代码明显比上一篇要复杂得多。

它们都需要的步骤是:

  • 任务的生产
  • 任务的消费
  • 移除任务

DelayQueue额外多出来的步骤是:

  • 应用启动的时候拉取回调时间将近的未完成任务(更新marked标记为true,防止重复拉取冷数据)
  • 定时拉取未标记且回调时间将近的未完成任务(和上面必须是互斥,等待上一步执行完成,否则会导致重复拉取)
  • 删除延迟队列DelayQueue的任务,必须发布广播消息给全部节点。(引入广播消息机制)

由此可见,任务表的字段marked仅供DelayQueue使用,防止重复拉取数据库的任务到热数据区。

    @Column(name = "marked", nullable = false, columnDefinition = "TINYINT(1) default 0 COMMENT '是否已标记为热数据'")private Boolean marked;

附:相关系列文章链接

延时任务通知服务的设计及实现(一)-- 设计方案

延时任务通知服务的设计及实现(二)-- redisson的延迟队列RDelayedQueue

延时任务通知服务的设计及实现(三)-- JDK的延迟队列DelayQueue

延时任务通知服务的设计及实现(四)-- webhook执行任务

延时任务通知服务的设计及实现(五)-- Netty时间轮HashedWheelTimer

这篇关于延时任务通知服务的设计及实现(三)-- JDK的延迟队列DelayQueue的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/966976

相关文章

使用zip4j实现Java中的ZIP文件加密压缩的操作方法

《使用zip4j实现Java中的ZIP文件加密压缩的操作方法》本文介绍如何通过Maven集成zip4j1.3.2库创建带密码保护的ZIP文件,涵盖依赖配置、代码示例及加密原理,确保数据安全性,感兴趣的... 目录1. zip4j库介绍和版本1.1 zip4j库概述1.2 zip4j的版本演变1.3 zip4

RabbitMQ消息总线方式刷新配置服务全过程

《RabbitMQ消息总线方式刷新配置服务全过程》SpringCloudBus通过消息总线与MQ实现微服务配置统一刷新,结合GitWebhooks自动触发更新,避免手动重启,提升效率与可靠性,适用于配... 目录前言介绍环境准备代码示例测试验证总结前言介绍在微服务架构中,为了更方便的向微服务实例广播消息,

python生成随机唯一id的几种实现方法

《python生成随机唯一id的几种实现方法》在Python中生成随机唯一ID有多种方法,根据不同的需求场景可以选择最适合的方案,文中通过示例代码介绍的非常详细,需要的朋友们下面随着小编来一起学习学习... 目录方法 1:使用 UUID 模块(推荐)方法 2:使用 Secrets 模块(安全敏感场景)方法

JAVA中安装多个JDK的方法

《JAVA中安装多个JDK的方法》文章介绍了在Windows系统上安装多个JDK版本的方法,包括下载、安装路径修改、环境变量配置(JAVA_HOME和Path),并说明如何通过调整JAVA_HOME在... 首先去oracle官网下载好两个版本不同的jdk(需要登录Oracle账号,没有可以免费注册)下载完

Spring StateMachine实现状态机使用示例详解

《SpringStateMachine实现状态机使用示例详解》本文介绍SpringStateMachine实现状态机的步骤,包括依赖导入、枚举定义、状态转移规则配置、上下文管理及服务调用示例,重点解... 目录什么是状态机使用示例什么是状态机状态机是计算机科学中的​​核心建模工具​​,用于描述对象在其生命

Spring Boot 结合 WxJava 实现文章上传微信公众号草稿箱与群发

《SpringBoot结合WxJava实现文章上传微信公众号草稿箱与群发》本文将详细介绍如何使用SpringBoot框架结合WxJava开发工具包,实现文章上传到微信公众号草稿箱以及群发功能,... 目录一、项目环境准备1.1 开发环境1.2 微信公众号准备二、Spring Boot 项目搭建2.1 创建

IntelliJ IDEA2025创建SpringBoot项目的实现步骤

《IntelliJIDEA2025创建SpringBoot项目的实现步骤》本文主要介绍了IntelliJIDEA2025创建SpringBoot项目的实现步骤,文中通过示例代码介绍的非常详细,对大家... 目录一、创建 Spring Boot 项目1. 新建项目2. 基础配置3. 选择依赖4. 生成项目5.

Linux下删除乱码文件和目录的实现方式

《Linux下删除乱码文件和目录的实现方式》:本文主要介绍Linux下删除乱码文件和目录的实现方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录linux下删除乱码文件和目录方法1方法2总结Linux下删除乱码文件和目录方法1使用ls -i命令找到文件或目录

SpringBoot+EasyExcel实现自定义复杂样式导入导出

《SpringBoot+EasyExcel实现自定义复杂样式导入导出》这篇文章主要为大家详细介绍了SpringBoot如何结果EasyExcel实现自定义复杂样式导入导出功能,文中的示例代码讲解详细,... 目录安装处理自定义导出复杂场景1、列不固定,动态列2、动态下拉3、自定义锁定行/列,添加密码4、合并

mybatis执行insert返回id实现详解

《mybatis执行insert返回id实现详解》MyBatis插入操作默认返回受影响行数,需通过useGeneratedKeys+keyProperty或selectKey获取主键ID,确保主键为自... 目录 两种方式获取自增 ID:1. ​​useGeneratedKeys+keyProperty(推