Java延时订单处理(上)- - 抛砖引玉

2024-02-05 01:18

本文主要是介绍Java延时订单处理(上)- - 抛砖引玉,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

更多最新文章欢迎大家访问我的个人博客😄:豆腐别馆

一、前言

在订单系统中,我们常常有如下需求:下单N分钟内(本文以三十分钟为例)未付款订单要自动取消,同时要恢复库存。在这里,如何保证延时订单的实时性将是我们首先需要跨过的第一道坎。那么该如何处理?请往下看 ↓

二、使用Timer处理延时订单

Timer: Java util里自带的一个类,可异步延时执行。

1. 代码实现
......
// 此处略去下单逻辑
......// 下单成功后,延迟执行检查订单状态,恢复库存
Timer timer = new Timer();
final Long orderId= returnOrder.getId();
timer.schedule(new TimerTask() {@Overridepublic void run() {// 处理延时订单disposeTimeOut(orderId);}
}, (30 * 60 * 1000); // 延迟三十分钟执行disposeTimeOut()方法

◇ 关于disposeTimeOut(String orderId),里面的主要逻辑为判断当前订单是否已付款,如果:

  • **未付款:**取消订单、恢复库存
  • **已付款:**不做任何操作
2. 服务重启处理

正常来讲,当下完订单三十分钟后即去调用判断处理超时订单并不会有什么问题,但是如果在这下完单的三十分钟内,服务器宕机或者重启了怎么办?(假设只有一台服务器)
在Spring中,其为我们提供了一个InitializingBean接口,实现该接口即可实现在服务启动时执行重写的方法。那么要做的无非就是在服务器启动时,扫描全部待支付订单,如果:

  • **已超时:**取消订单、恢复库存
  • **未超时:**获取超时时间继续放入Timer中,待超过订单超时时间后继续调用上述的disposeTimeOut()方法。
/*** 处理延时订单*/
public class DisposeTimeOutOrder implements InitializingBean {protected Log log = LogFactory.getLog(this.getClass());@Resourceprivate OrderService orderService;@Overridepublic void afterPropertiesSet() throws Exception {DisposeThread thread = new DisposeThread();// 使用多线程,避免占用服务器启动时间new Thread(thread).start();}/*** 处理待支付且未超时/已超时订单*/class DisposeThread implements Runnable {@Overridepublic void run() {String now = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());int timeout = 0;try {log.debug("服务器启动完毕,开始扫描处理待支付已超时订单...记录时间:" + now);// 处理待支付且已超时订单timeout = orderService.disposeTimeOut();log.info("【成功】处理待支付已超时订单成功记录数:" + timeout + ",记录时间:" + now);// 处理待支付且未超时订单List<DzOrder> waitPayList = orderService.getWaitPayOrder();int waitCount = 0;if (CollectionsUtil.isNotNull(waitPayList)) {waitCount = waitPayList.size();for(DzOrder order : waitPayList) {Timer timer = new Timer();final Long orderId = order.getId();timer.schedule(new TimerTask() {@Overridepublic void run() {// 处理待支付且未超时订单orderService.disposeTimeOut(orderId);}}, (order.getPaymentTimeout().getTime()));}}log.debug("服务器启动完毕,开始扫描处理待支付未超时订单,共发现记录数为:" + waitCount + "!已推入检查队列准备到期检查...记录时间:" + now);} catch (Exception e) {log.debug("【异常】服务器启动完毕,处理待支付订单异常,记录时间:" + now);e.printStackTrace();}}}
}

至此关于Timer处理延时订单已完毕,但是等等,虽然业务逻辑没问题、甚至服务重启看似也能解决,但是我们知道Timer其本身也是存在着问题的。

3. 存在问题

1、Timer在执行所有定时任务时只会创建一个线程。如果某个任务的执行时间长度大于其周期时间长度,那么就会导致这一次的任务还在执行,而下一个周期的任务已经需要开始执行了,当然在一个线程内这两个任务只能顺序执行,有两种情况:对于之前需要执行但还没有执行的任务,一是当前任务执行完马上执行那些任务(按顺序来),二是干脆把那些任务丢掉,不去执行它们。至于具体采取哪种做法,需要看是调用schedule还是scheduleAtFixedRate。

2、Timer线程是不会捕获异常的,如果TimerTask抛出的了未检查异常则会导致Timer线程终止,同时Timer也不会重新恢复线程的执行,他会错误的认为整个Timer线程都会取消。同时,已经被安排单尚未执行的TimerTask也不会再执行了,新的任务也不能被调度。故如果TimerTask抛出未检查的异常,Timer将会产生无法预料的行为。

针对上述问题,我们可以使用ScheduledExecutorService替代。但是由于ScheduledExecutorService是多线程处理,即不同任务会被分放到其线程池中的不同线程,因此当订单数据量稍微增长,随着线程的消耗,就容易出现无可用线程池甚至内存溢出等异常。
因此此处,还可以使用DelayQueue进行处理。再往下看:

三、使用DelayQueue处理延时订单

**DelayQueue:**顾名思义,即延时队列,同样是Java util里自带的一个类。里面的put()、及take()方法可为我们实现队列延时抓取。

1. DelayQueue核心源码
/*** Inserts the specified element into this delay queue. As the queue is* unbounded this method will never block.** @param e the element to add* @throws NullPointerException {@inheritDoc}*/
public void put(E e) {offer(e);
}/*** Inserts the specified element into this delay queue.** @param e the element to add* @return <tt>true</tt>* @throws NullPointerException if the specified element is null*/
public boolean offer(E e) {final ReentrantLock lock = this.lock;lock.lock();try {q.offer(e);if (q.peek() == e) {leader = null;available.signal();}return true;} finally {lock.unlock();}
}/*** Retrieves and removes the head of this queue, waiting if necessary* until an element with an expired delay is available on this queue.** @return the head of this queue* @throws InterruptedException {@inheritDoc}*/public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {E first = q.peek();if (first == null)available.await();else {long delay = first.getDelay(TimeUnit.NANOSECONDS);if (delay <= 0)return q.poll();else if (leader != null)available.await();else {Thread thisThread = Thread.currentThread();leader = thisThread;try {available.awaitNanos(delay);} finally {if (leader == thisThread)leader = null;}}}}} finally {if (leader == null && q.peek() != null)available.signal();lock.unlock();}}

相信了解过或是英文好的朋友看过源码已经知道它们的各自用处,其中最关键的便是take()方法,源码中的介绍如下:

Retrieves and removes the head of this queue, waiting if necessary until an element with an expired delay is available on this queue.

用专20的英文水平翻译后便是:

在必要时阻塞等待,直到该队列上有一个具有过期延迟的元素可用

2. 实现思路
  1. 客户下单,订单数据(如订单状态、订单过期时间等)保存进数据库的同时存进DelayedQueue延时队列
  2. 延时队列按订单超时时刻进行排序,最快过期的队列最先出队。
  3. 订单到期出队,到数据库查询订单数据,同时根据订单状态处理到期订单。如过期未支付,则修改订单状态为已过期。

好了,话不多说,上代码:

3. 代码实现

(1)首先声明一个DelayedVo实现Delayed接口

package com.yby.duanzu.service.impl.core;import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;/*** 存放延迟队列*/
public class DelayedVo<T> implements Delayed {// 到期时间,单位为毫秒,实际计算为纳秒private long activeTime;// 业务数据private T data;// activeTime:过期时长,单位为毫秒public DelayedVo(long activeTime, T data) {super();this.activeTime = activeTime;this.data = data;}public long getActiveTime() {return activeTime;}public T getData() {return data;}/*** 返回激活日期的剩余时间,时间单位由单位参数指定*/@Overridepublic long getDelay(TimeUnit unit) {long excessTime = unit.convert(this.activeTime - System.currentTimeMillis(), unit);return excessTime;}/*** Delated接口继承了Comparable接口,剩余时间排序由小到大排序(纳秒)*/@Overridepublic int compareTo(Delayed delayed) {long excessTime = getDelay(TimeUnit.NANOSECONDS) - delayed.getDelay(TimeUnit.NANOSECONDS);return excessTime == 0 ? 0 : ((excessTime < 0 ? -1 : 1));}}

(2)使用延时队列处理延时订单

package com.yby.duanzu.service.impl.core;import java.util.Date;
import java.util.concurrent.DelayQueue;import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;import com.yby.api.common.RandomUtil;
import com.yby.duanzu.po.DzOrder;
import com.yby.duanzu.service.core.OrderService;/*** 使用阻塞队列实现延时订单*/
@Service
@Qualifier("DelayQueueServiceImpl")
public class DelayQueueServiceImpl implements DelayQueueService {private Log logger = LogFactory.getLog(this.getClass());@Resourceprivate OrderService orderService;private Thread takeOrder;// 负责保存限时订单的队列private static DelayQueue<DelayedVo<DzOrder>> delayOrder = new DelayQueue<DelayedVo<DzOrder>>();/*** 进行延时处理的方法* * @param order*            要进行延时处理的订单* @param expireTime*            延时时长,单位为毫秒*/@Overridepublic void orderDelay(DzOrder order, long expireTime) {DelayedVo<DzOrder> delayedOrder = new DelayedVo<DzOrder>(expireTime, order);// 将订单推入队列delayOrder.put(delayedOrder);logger.info("订单[超时时长:" + expireTime / 1000 + "秒]被推入检查队列,订单详情:" + order);}/*** 处理到期的订单线程*/private class TakeOrder implements Runnable {private OrderService orderService;public TakeOrder(OrderService orderService) {super();this.orderService = orderService;}@Overridepublic void run() {logger.info("处理到期订单线程已经启动");// 检查当前线程是否中断while (!Thread.currentThread().isInterrupted()) {try {// take():获取队列,在必要时阻塞等待,直到该队列上有一个具有过期延迟的元素可用。DelayedVo<DzOrder> delayedOrder = delayOrder.take();if (delayedOrder != null) {// 处理待支付且支付超时订单orderService.disposeTimeOut(delayedOrder.getData().getId());}} catch (Exception e) {logger.error("The thread is Interrupted!");}}logger.info("处理到期订单线程准备关闭...");}}// @PostConstruct:当整个bean被初始化完成后执行@PostConstructpublic void init() {takeOrder = new Thread(new TakeOrder(orderService));takeOrder.start();}// 销毁示例之前调用@PreDestroypublic void close() {takeOrder.interrupt();}
}

(3)方法调用

......
// 此处略去下单逻辑
......// 下单成功后,将订单放入延时队列
delayQueueService.orderDelay(returnOrder, returnOrder.getPaymentTimeout().getTime());......
......
4. 服务重启处理

同样的,DelayQueue尽管特殊,可以进行延时处理,但说到底其还是一个队列,是队列,没做持久化,那么就还是得存放在内存当中,那么就一样面临服务重启后数据丢失的风险。
依旧使用InitializingBean(或可使用Spring中的@PostConstruct注解),在服务启动时扫描全部待支付订单,如果:

  • **已超时:**取消订单、恢复库存
  • **未超时:**获取超时时间继续放入DelayQueue中,待超过订单中支付超时时间后继续调用上述的disposeTimeOut()方法。
package com.yby.duanzu.service.impl.core;import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;import javax.annotation.Resource;import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;import com.yby.duanzu.po.DzOrder;
import com.yby.duanzu.service.core.OrderService;
import com.yby.duanzu.util.CollectionsUtil;/*** 处理支付超时订单*/
public class DisposeTimeOutOrder implements InitializingBean {protected Log log = LogFactory.getLog(this.getClass());@Resourceprivate OrderService orderService;@Autowired@Qualifier("delayQueueService")private DelayQueueService delayQueueService;@Overridepublic void afterPropertiesSet() throws Exception {DisposeThread thread = new DisposeThread();// 使用多线程,避免占用服务器启动时间new Thread(thread).start();}/*** 处理待支付且未超时/已超时订单*/class DisposeThread implements Runnable {@Overridepublic void run() {String now = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());int timeout = 0;try {log.debug("服务器启动完毕,开始扫描处理待支付已超时订单...记录时间:" + now);// 处理待支付且已超时订单timeout = orderService.disposeTimeOut();log.info("【成功】处理待支付已超时订单成功记录数:" + timeout + ",记录时间:" + now);// 处理待支付且未超时订单List<DzOrder> waitPayList = orderService.getWaitPayOrder();int waitCount = 0;if (CollectionsUtil.isNotNull(waitPayList)) {waitCount = waitPayList.size();for (DzOrder order : waitPayList) {long expireTime = order.getPaymentTimeout().getTime() - (new Date().getTime());// 放入延时队列delayQueueService.orderDelay(order, expireTime);}}log.debug("服务器启动完毕,开始扫描处理待支付未超时订单,共发现记录数为:" + waitCount + "!已推入检查队列准备到期检查...记录时间:" + now);} catch (Exception e) {log.debug("【异常】服务器启动完毕,处理待支付订单异常,记录时间:" + now);e.printStackTrace();}}}
}

至此关于DelayQueue处理延时订单也已完毕,但是,这样是否就已经完美了么?

5. 存在问题

尽管DelayQueue避免了Timer中关于时间问题及发生异常使其它任务一起终止的问题,但是由于DelayQueue是一个无限容量的队列容器,即只要你有足够的内存,那么就可以存放无限的数据。如果在队列失效时间内存放过多的数据,那么对内存一样是种损耗 ,且程序里放着一个死循环,就算有阻塞队列的存在,也怎么着都感觉不是个滋味。那么是否还有其它方式可以解决?答案当然必须还是肯定的,所以事情还远未结束。

四、远未结束

实际上,除了上述提到的未解决的问题,还有一个缺陷就是:如果在集群环境下,无论是Timer还是DelayQueue,如果出现应用重启,在上述的解决思路中,全局扫描订单将会把原先应该由其它服务管理的部分一起给扫描,那么这样势必将引起服务间订单数据的抢夺。
再者,除了在前言中提到的实时性、正文中一再提到的内存消耗、并发吞吐量也应该是我们所需要考虑的问题,那么如何既优雅又高效地解决?

… …

请看下回分解♪(*)

这篇关于Java延时订单处理(上)- - 抛砖引玉的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/679363

相关文章

SpringBoot中六种批量更新Mysql的方式效率对比分析

《SpringBoot中六种批量更新Mysql的方式效率对比分析》文章比较了MySQL大数据量批量更新的多种方法,指出REPLACEINTO和ONDUPLICATEKEY效率最高但存在数据风险,MyB... 目录效率比较测试结构数据库初始化测试数据批量修改方案第一种 for第二种 case when第三种

Java docx4j高效处理Word文档的实战指南

《Javadocx4j高效处理Word文档的实战指南》对于需要在Java应用程序中生成、修改或处理Word文档的开发者来说,docx4j是一个强大而专业的选择,下面我们就来看看docx4j的具体使用... 目录引言一、环境准备与基础配置1.1 Maven依赖配置1.2 初始化测试类二、增强版文档操作示例2.

一文详解如何使用Java获取PDF页面信息

《一文详解如何使用Java获取PDF页面信息》了解PDF页面属性是我们在处理文档、内容提取、打印设置或页面重组等任务时不可或缺的一环,下面我们就来看看如何使用Java语言获取这些信息吧... 目录引言一、安装和引入PDF处理库引入依赖二、获取 PDF 页数三、获取页面尺寸(宽高)四、获取页面旋转角度五、判断

Spring Boot中的路径变量示例详解

《SpringBoot中的路径变量示例详解》SpringBoot中PathVariable通过@PathVariable注解实现URL参数与方法参数绑定,支持多参数接收、类型转换、可选参数、默认值及... 目录一. 基本用法与参数映射1.路径定义2.参数绑定&nhttp://www.chinasem.cnbs

MyBatis-Plus通用中等、大量数据分批查询和处理方法

《MyBatis-Plus通用中等、大量数据分批查询和处理方法》文章介绍MyBatis-Plus分页查询处理,通过函数式接口与Lambda表达式实现通用逻辑,方法抽象但功能强大,建议扩展分批处理及流式... 目录函数式接口获取分页数据接口数据处理接口通用逻辑工具类使用方法简单查询自定义查询方法总结函数式接口

JAVA中安装多个JDK的方法

《JAVA中安装多个JDK的方法》文章介绍了在Windows系统上安装多个JDK版本的方法,包括下载、安装路径修改、环境变量配置(JAVA_HOME和Path),并说明如何通过调整JAVA_HOME在... 首先去oracle官网下载好两个版本不同的jdk(需要登录Oracle账号,没有可以免费注册)下载完

Spring StateMachine实现状态机使用示例详解

《SpringStateMachine实现状态机使用示例详解》本文介绍SpringStateMachine实现状态机的步骤,包括依赖导入、枚举定义、状态转移规则配置、上下文管理及服务调用示例,重点解... 目录什么是状态机使用示例什么是状态机状态机是计算机科学中的​​核心建模工具​​,用于描述对象在其生命

Spring Boot 结合 WxJava 实现文章上传微信公众号草稿箱与群发

《SpringBoot结合WxJava实现文章上传微信公众号草稿箱与群发》本文将详细介绍如何使用SpringBoot框架结合WxJava开发工具包,实现文章上传到微信公众号草稿箱以及群发功能,... 目录一、项目环境准备1.1 开发环境1.2 微信公众号准备二、Spring Boot 项目搭建2.1 创建

Java中Integer128陷阱

《Java中Integer128陷阱》本文主要介绍了Java中Integer与int的区别及装箱拆箱机制,重点指出-128至127范围内的Integer值会复用缓存对象,导致==比较结果为true,下... 目录一、Integer和int的联系1.1 Integer和int的区别1.2 Integer和in

SpringSecurity整合redission序列化问题小结(最新整理)

《SpringSecurity整合redission序列化问题小结(最新整理)》文章详解SpringSecurity整合Redisson时的序列化问题,指出需排除官方Jackson依赖,通过自定义反序... 目录1. 前言2. Redission配置2.1 RedissonProperties2.2 Red