如何通过事务消息保障抢购业务的分布式一致性?

2024-09-02 08:58

本文主要是介绍如何通过事务消息保障抢购业务的分布式一致性?,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

点击上方“朱小厮的博客”,选择“设为星标”

后台回复"书",获取

后台回复“k8s”,可领取k8s资料

作者 | 山猎

出品 | 阿里巴巴中间件

前言


在电商领域,抢购和秒杀是非常普遍业务模式,抢购类业务在快速拉升用户流量并为消息者带来实惠的同时,也给电商系统带来了巨大考验。在高并发、大流量的冲击下,系统的性能和稳定性至关重要,任何一个环节出现故障,都会影响整体的购物体验,甚至造成电商系统的大面积崩溃。和电商领域抢购场景极为类似的业务模式还有很多,比如大型赛事和在线教育的报名系统,以及各类购票系统等。

针对抢购类业务在技术上带来的挑战,业界有一系列解决方案,通过不同维度来提升系统的性能与稳定性,包括动静分离、定时上架、异步处理、令牌队列、多级缓存、作弊行为侦测、流量防护、全链路压测等。

本文重点聚焦在如何确保抢购类业务的一致性上,分布式事务一直是IT界老大难的问题,而抢购业务所具备的高并发、大流量特征,更是成倍增加了分布式一致性的实现难度。以下将介绍如何通过事务消息构建满足抢购类业务要求的高性能高可用分布式一致性机制。

事务一致性原理回顾


事务是应用程序中一系列严密的操作,这一系列操作是一个不可分割的工作单位,它们要么全部执行成功,要么一个都不做。事务具有四个特征:原子性( Atomicity )、一致性( Consistency )、隔离性( Isolation )和持续性( Durability ),这四个特性简称为 ACID 特性。

在非并发状态下,保证事务的 ACID 特性是轻而易举的事情,如果某一个操作执行不成功,把前面的操作全部回滚就 OK 了。而在并发状态下,由于有多个事务同时操作同一个资源,对于事务 ACID 特性的保证就会困难一些,如果考虑得不周全,就会遇到如下几个问题:

  • 脏读:事务 A 读到了事务 B 还没有提交的数据。

  • 不可重复读:在一个事务里面对某个数据读取了两次,读出来的数据不一致。

  • 幻读:在一个事务对某个数据集用同样的方式读取了两次,数据集的条目数量不一致。

为了应对上述并发情况下出现的问题,就需要通过一定的事务隔离级别来解决。当事务的隔离级别越高的时候,上述问题发生的机会就越小,但是性能消耗也会越大。所以在实际生产过程中,要根据实际需求去确定隔离级别:

  • READ_UNCOMMITTED(读未提交):最低的隔离级别,可以读到未提交的数据,无法解决脏读、不可重复读、幻读中的任何一种。

  • READ_COMMITED (读已提交):能够防止脏读,但是无法解决不可重复读和幻读的问题。

  • REPEATABLE_READ (重复读取):对同一条数据的多次重复读取能保持一致,解决了脏读、不可重复读的问题,但是幻读的问题还是无法解决。

  • SERLALIZABLE ( 串行化):最高的事务隔离级别,避免了事务的并行执行,解决了脏读、不可重复读和幻读的问题,但性能最低。

关系型数据库提供了对于事务的支持,能够通过不同隔离级别的设置,确保并发状态下事务的ACID特性。但关系型数据库提供的能力是基于单机事务的,一旦遇到分布式事务场景,就需要通过更多其他技术手段来解决问题。


抢购业务中的分布式事务


有如下三种情况可能会产生分布式事务:

1、一个事务操作包含对两个数据库的操作:数据库所提供的事务保证仅能局限在对于自身的操作上,无法跨越到其他数据库。

2、一个事务包含对多个数据分片的操作:具体的分片规则由分库分表中间件或者分库分表 SDK 来实现,有可能跨越多个数据库或同一个数据库的多个表。对于业务逻辑而言,底层的数据分片情况是不透明的,因此也没有办法依赖于数据库提供的单机事务机制。

3、一个事务包括对多个服务的调用:在微服务领域,这是极为常见的场景,不同的服务使用不同的数据资源,甚至涉及到更为复杂的调用链路。在这种情况下,数据库提供的单机事务机制,仅仅能保证其中单一环节的 ACID 特性,没有办法延伸到全局。

微服务技术在电商领域的普及程度是非常高的,比较大型的电商应用还会通过中台思想将共性业务能力进行沉淀,因此抢购业务中的很多环境都属于跨服务的分布式调用,会涉及到上述第三种分布式事务形态。比如在订单支付成功后,交易中心会调用订单中心的服务把订单状态更新,并调用物流中心的服务通知商品发货,同时还要调用积分中心的服务为用户增加相应的积分。如何保障分布式事务一致性,成为了确保抢购业务稳定运行的核心诉求之一。

分布式事务的实现方式


传统分布式事务

传统的分布式事务通过 XA 模型实现,通过一个事务协调者,站在全局的角度将多个子事务合并成一个分布式事务。XA 模型之所以能在分布式事务领域得到广泛使用,是因为其具有如下两个方面的优势:

  • 提供了强一致性保证,在业务执行的任何时间点都能确保事务一致性。

  • 使用简单。常见的关系型数据库都提供了对XA协议的支持,通过引入事务协调器,业务代码跟使用单机事务相比基本上没有差别。

但是在互联网领域,XA 模型的分布式事务实现存在很多局限性,在抢购业务这样的高并发大流量场景中更是被完全弃用。我们拿 XA 分布式协议中最普遍的两阶式提交方案,来说明为什么 XA 模型并不适合互联网场景。

1、性能问题。在两段式提交的执行过程中,所有参与节点都是事务阻塞型的,需要长时间锁定资源。这会导致系统整体的并发吞吐量变低,在抢购业务中是不可接受的。

2、单点故障问题。事务协调者在链路中有着至关重要的作用,一旦协调者发生故障,参与者会一直阻塞下去,整个系统将无法工作,因此需要投入巨大的精力来保障事务协调者的高可用性。

3、数据不一致问题。在阶段二中,如果协调者向参与者发送 commit 请求之后,发生了网络异常,会导致只有一部分参与者接收到了 commit 请求,没有接收到 commit 请求的参与者最终会执行回滚操作,从而造成数据不一致现象。在抢购业务中,这样的数据不一致有可能会对企业或消费者造成巨大的经济损失。

因此 XA 模型是一个理想化的分布式事务模型,并没有考虑到高并发、网络故障等实际因素,即便是在两阶段提交的基础上,诞生了三阶段提交这样的实现方式,也没有办法从根本上解决性能和数据不一致的问题。


柔性事务

针对传统分布式事务方案在互联网领域的局限性,业界提出了 CAP 理论以及 BASE 理论,在此基础上诞生了在大型互联网应用中广泛使用的柔性事务。柔性事务的核心思想是放弃传统分布式事务中对于严格一致性的要求,允许在事务执行过程中存在数据不一致的中间状态,在业务上需要容忍中间状态的存在。柔性事务会提供完善的机制,保证在一段时间的中间状态后,系统能走向最终一致状态。

遵循 BASE 理论的柔性事务放弃了隔离性,减小了事务中锁的粒度,使得应用能够更好的利用数据库的并发性能,实现吞吐量的线性扩展。异步执行方式可以更好地适应分布式环境,在网络抖动、节点故障的情况下能够尽量保障服务的可用性。因此在高并发、大流量的抢购业务中,柔性事务是最佳的选择。


传统分布式事务

柔性事务

业务改造

一致性

强一致性

最终一致

回滚

支持    

实现回退接口

隔离性

支持    

放弃隔离性或实现资源锁定接口

并发性能    

低    

适合场景    

低并发、短事务

高并发、长事务

柔性事务有多种实现方式,包括TCC、Saga、事务消息、最大努力通知等,本文将重点介绍通过事务消息实现柔性事务。

事务消息原理分析


抢购业务场景拆解

我们结合抢购业务的真实场景,分析如何通过事务消息实现分布式一致性。在抢购业务中,有两个非常关键的阶段,需要引入分布式事务机制,分别是订单创建阶段和付款成功阶段。

从字面含义来看,抢购业务就隐含了一个重要的前提:库存是有限的。因此在订单创建的时候,需要预先检查库存情况,并相对应的库存进行锁定,以防止商品超卖。如果库存锁定操作失败,代表库存不足,必须确保订单不能被成功创建。在锁定库存后,如果因为某种异常情况导致订单创建失败,必须及时将之前锁定的库存进行释放操作,以便让其他用户可以重新争夺对应的商品。

如果抢购系统实现了购物车机制,在订单创建的同时,则需要从购买车中将相应的条目删除。

基于微服务架构的业务拆分,订单创建阶段的 3 个行为很有可能通过 3 个不同的微服务应用完成,因此需要通过分布式事务来保证数据一致性。

订单创建完成后,会等待用户付款,一旦付款成功,就会触发付款成功阶段的执行逻辑。这个阶段同样是通过分布式事务来完成,包含修改订单状态、扣减库存、通知发货、增加积分这4个子事务,它们要么全部不执行,要么全部执行成功。


当然,在真实的抢购业务中,情况有可能会更加的复杂,本文列出的只是其中最具代表性的几类业务行为。

引入消息异步通知机制

传统的分布式事务存在一个很大的弊端是参与节点都是事务阻塞型的,需要长时间锁定资源。以锁定库存 ->创建订单这个流程为例,借助于 Redis 等缓存系统,单纯锁定库存的操作只需要花费毫秒级的时间,可以承载非常高的并发量。但如果把创建订单的操作也考虑进来,加上不同微服务应用之间相互通讯的时候,整体耗时有可能超过1秒,导致性能急剧下降。

假设存在一种异步消息机制,让分布式事务的第一个参与方在执行完本地事务后,通过触发一笔消息通知事务的其他参与方完成后续的操作,就能将大事务拆解为多个本地子事务来分开执行。在这种模式下,事务的多个参与方之间之间并不需要彼此阻塞和等待,就能极大程度地提升并发吞吐能力。

对于库存中心而言,在高并发场景下,只需要不断的执行锁定库存记录操作,并不断通过异步消息通知订单中心创建订单,只要异步消息机制能确保消息一定送达,并得到正确处理,就能够实现分布式最终一致性。

先执行本地事务,还是先发送异步消息?

在这个模型中,异步消息的发送交给了分布式事务的第一个参与方来完成,这个参与方就拥有了两个职责:执行本地事务发送异步消息。到底应该先执行本地事务,还是先发异步消息呢?

第一种方案是先发送异步消息,再执行本地事务。这样做肯定是不行的,如果本地事务没有执行成功,异步消息已经发出去了,其他事务参与方收到消息后会执行对应的远程事务,造成数据不一致。

第二种方案是先执行本地事务,再发送异步消息。这样做能够解决本地事务执行失败导致的数据不一致问题,因为只有在本地事务执行成功的情况下,才会发送异步消息。但如果事务的参与方在执行本地事务成功后,自己宕机了,就再有没有机会发送异步消息了,因此这样做同样会造成数据不一致的问题。记住:在真实场景中,任何一个应用节点都不是 100% 可靠的,都存在宕机的可能性。

一个可行的方案是引入可以处理事务消息的消息队列集群,用于异步消息的中转。一个事务消息包含两种形态:

1、首先,事务的参与方发送一笔半事务消息到消息队列,表示自己即将执行本地事务,消息队列集群在收到这个半事务消息后,不会马上进行投递,而是进行暂存。

2、在执行完本地事务后,事务的参考方再发送一笔确认消息到消息队列集群,告知本地事务的执行状态。如果本地事务执行成功,消息队列集群会将之前收到的半事务消息进行投递;如果本地事务执行失败,消息队列集群直接删除之前收到的半事务消息,这样远程事务就不会被执行,从而保证了最终一致性。

同样,如果事务参与方在执行完本地事务后宕机了怎么办呢?这就需要消息队列集群具备回查机制:如果收到半事务消息后,在特定时间内没有再收到确认消息,会反过来请求事务参与方查询本地事务的执行状态,并给予反馈。这样,即便错过了确认消息,消息队列集群也有能力了解到本地事务的执行状态,从而决定是否将消息进行投递。

在一个微服务应用中,会存在多个对等的应用实例,这也就代表着即便一个事务参与方的实例在执行完本地事务后宕机了,消息队列集群依然可以通过这个实例的兄弟实例了解到本地事务执行的最终状态。

如何确保远程事务能执行成功?

如果一切本地事务的执行,以及异步消息的投递都一切顺利的话,接下来还会存在另外两种数据不一致的可能性:

  • 消息队列集群在将异步消息投递到远程事务参与方的时候,由于网络不稳定,消息没能投递成功。

  • 消息投递成功了,但远程事务参与方还没来得及执行远程事务,就宕机了。

这两种情况都会导致远程事务执行失败,所以需要建立一种消息重试机制,让远程事务参与方在完成任务后(实际上对远程事务参与方而言,这个任务是它要执行的本地任务),给予消息队列集群一个反馈,告知异步消息已经得到了正确的处理。否则,消息队列会在一定时间后,周期性的重复投递消息,直到它收到了来自远程事务参与方的反馈,以确保远程事务一定能执行成功。

和事务回查机制类似,远程事务参与方也有多个对等的微服务实例,即便某个实例在没来得及执行远程事务的时候宕机,消息队列也可以将任务交给这个实例的兄弟实例来完成。

完整流程

事务消息实战


了解到事务消息的原理后,我们不难得出一个结论:消息队列集群在整个流程中起着至关重要的作用,如果消息队列集群不可用,所有涉及到分布式事务的业务都将中止!因此,我们需要一个高可用的消息队列集群,能够始终保持在工作状态,即便其某个组件出现故障,也能够在短时间内自动恢复,不会影响业务,还能确保接收到的消息不丢失。

阿里云消息队列 RocketMQ 提供了对于事务消息机制最完整实现,包括半事务消息、确认消息、事务回查机制、消息重试等重要功能。此外,消息队列 RocketMQ 还提供了极强的高可用能力以及数据可靠性,可以确保在各种极端场景下都能提供稳定的服务,并确保消息不丢失。

本地事务参与方的业务代码

本文将通过 Java 代码介绍如何实现事务消息相关的业务逻辑,为了简化业务逻辑,我们继续基于 锁定库存 - > 创建订单 这个流程来演示,在这个流程中,仅有 2 个事务参与方。


1、初始化 TransactionProducer

我们先通过 Maven 引入消息队列 RocketMQ 的 SDK,优先使用阿里云官方提供的 TCP 版 SDK。

<dependency>  <groupId>com.aliyun.openservices</groupId>  <artifactId>ons-client</artifactId>  <version>1.8.7.2.Final</version></dependency>

顺利引入 Log4j2 用于日志输出。

<dependency>  <groupId>org.slf4j</groupId>  <artifactId>slf4j-api</artifactId>  <version>1.7.7</version></dependency><dependency>  <groupId>org.apache.logging.log4j</groupId>  <artifactId>log4j-slf4j-impl</artifactId>  <version>2.13.1</version></dependency>

在库存中心的代码中,我们需要初始化一个TransactionProducer,用于异步消息的发送,需要填入如下信息:

  • Group ID:之前创建的用于本地事务参与方的 Group ID。

  • Access key和Secret Key:RAM 用户对应的密钥信息,从 RAM 用户控制台获得。

  • Nameserver Address:RocketMQ 实例的接入点信息,从 RocketMQ 控制台获得。

Properties properties = new Properties();// 您在控制台创建的Group ID。注意:事务消息的Group ID不能与其他类型消息的Group ID共用。properties.put(PropertyKeyConst.GROUP_ID, "XXX");// AccessKey ID阿里云身份验证,在阿里云RAM控制台创建。properties.put(PropertyKeyConst.AccessKey, "XXX");// AccessKey Secret阿里云身份验证,在阿里云RAM控制台创建。properties.put(PropertyKeyConst.SecretKey, "XXX");// 设置TCP接入域名,进入消息队列RocketMQ版控制台的实例详情页面的TCP协议客户端接入点区域查看。properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX");// LocalTransactionCheckerImpl本地事务回查类的实现TransactionProducer producer = ONSFactory.createTransactionProducer(properties,    new LocalTransactionCheckerImpl());producer.start();

TransactionProducer 是线程安全的,启动后能在多线程环境中复用。


2、获取全局唯一的交易流水号

在发送半事务消息以及执行本地事务之前,我们需要先获取一个全局唯一的交易流水号,订单与交易流水号一一对应,接下来的事务消息机制都会依赖于这个这个交易流水号。我们可以通过引入第三方 ID 生成组件,或者在本地通过 Snowflake 算法实现。

3、实现本地事务回查逻辑

创建一个实现了LocalTransactionChecker接口的LocalTransactionCheckerImpl类,实现其中的check(Message)方法,该方法返回本地事务的最终状态。至于具体的业务逻辑如何实现,不在本文讨论的范围之前,我们将其封装在BusinessService类中。

package transaction;import com.aliyun.openservices.ons.api.Message;import com.aliyun.openservices.ons.api.transaction.LocalTransactionChecker;import com.aliyun.openservices.ons.api.transaction.TransactionStatus;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class LocalTransactionCheckerImpl implements LocalTransactionChecker {    private static Logger LOGGER = LoggerFactory.getLogger(LocalTransactionCheckerImpl.class);    private static BusinessService businessService = new BusinessService();    @Override    public TransactionStatus check(Message msg) {        // 从消息体中获得的交易ID        String transactionKey = msg.getKey();        TransactionStatus transactionStatus = TransactionStatus.Unknow;        try {            boolean isCommit = businessService.checkbusinessService(transactionKey);            if (isCommit) {                transactionStatus = TransactionStatus.CommitTransaction;            } else {                transactionStatus = TransactionStatus.RollbackTransaction;            }        } catch (Exception e) {            LOGGER.error("Transaction Key:{}", transactionKey, e);        }        LOGGER.warn("Transaction Key:{}transactionStatus:{}", transactionKey, transactionStatus.name());        return transactionStatus;    }}

4、执行本地事务并发送异步消息

我们先组装一条异步消息,其中包含了全局交易 ID,消息将要发往的 Topic,以及消息体。远程事务参与方将通过这个消息体中获取执行远程事务所必须的数据信息。

接下来,将这条异步消息连同一个实现了LocalTransactionExecuter接口的匿名类,通过send方法进行发送,这就是本地事务参与方所需要实现的所有业务代码了。当然,这个匿名类实现了TransactionStatus execute.execute()方法,其中包含了对于本地事务的执行。完整代码如下:

package transaction;import com.aliyun.openservices.ons.api.Message;import com.aliyun.openservices.ons.api.ONSFactory;import com.aliyun.openservices.ons.api.PropertyKeyConst;import com.aliyun.openservices.ons.api.SendResult;import com.aliyun.openservices.ons.api.transaction.TransactionProducer;import com.aliyun.openservices.ons.api.transaction.TransactionStatus;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.Properties;import java.util.concurrent.TimeUnit;public class TransactionProducerClient {    private static Logger LOGGER = LoggerFactory.getLogger(TransactionProducerClient.class);    private static final BusinessService businessService = new BusinessService();    private static final String TOPIC = "create_order";    private static final TransactionProducer producer = null;    static {        Properties properties = new Properties();        // 您在控制台创建的Group ID。注意:事务消息的Group ID不能与其他类型消息的Group ID共用。        properties.put(PropertyKeyConst.GROUP_ID, "XXX");        // AccessKey ID阿里云身份验证,在阿里云RAM控制台创建。        properties.put(PropertyKeyConst.AccessKey, "XXX");        // AccessKey Secret阿里云身份验证,在阿里云RAM控制台创建。        properties.put(PropertyKeyConst.SecretKey, "XXX");        // 设置TCP接入域名,进入消息队列RocketMQ版控制台的实例详情页面的TCP协议客户端接入点区域查看。        properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX");        // LocalTransactionCheckerImpl本地事务回查类的实现        TransactionProducer producer = ONSFactory.createTransactionProducer(properties,                new LocalTransactionCheckerImpl());        producer.start();    }    public static void main(String[] args) throws InterruptedException {        String transactionKey = getGlobalTransactionKey();        String messageContent = String.format("lock inventory for: %s", transactionKey);        Message message = new Message(TOPIC, null, transactionKey, messageContent.getBytes());        try {            SendResult sendResult = producer.send(message, (msg, arg) -> {                // 此处用Lambda表示,实际是实现TransactionStatus execute(final Message msg, final Object arg)方法                TransactionStatus transactionStatus = TransactionStatus.Unknow;                try {                    boolean localTransactionOK = businessService.execbusinessService(transactionKey);                    if (localTransactionOK) {                        transactionStatus = TransactionStatus.CommitTransaction;                    } else {                        transactionStatus = TransactionStatus.RollbackTransaction;                    }                } catch (Exception e) {                    LOGGER.error("Transaction Key:{}", transactionKey, e);                }                LOGGER.warn("Transaction Key:{}", transactionKey);                return transactionStatus;            }, null);            LOGGER.info("send message OK, Transaction Key:{}, result:{}", message.getKey(), sendResult);        } catch (Exception e) {            LOGGER.info("send message failed, Transaction Key:{}", message.getKey());        }        // demo example防止进程退出        TimeUnit.MILLISECONDS.sleep(Integer.MAX_VALUE);    }    private static String getGlobalTransactionKey() {        // TODO        return "";    }}

得益于 RocketMQ SDK 优秀的封装,发送半事务消息、发送确认消息、事务回查等重要步骤都已经完整实现,不需要开发者再编写代码了,这将为用户带来特别顺畅开发体验。

远程事务参与方的业务代码

相对本地事务参与方而言,远程事务参与方的代码更加简单,只需要从异步消息中提取出对应信息,完成对远程事务的执行即可。

package transaction;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
public class TransactionConsumerClient {private static Logger LOGGER = LoggerFactory.getLogger(TransactionProducerClient.class);private static final BusinessService businessService = new BusinessService();private static final String TOPIC = "create_order";private static final Consumer consumer = null;static {Properties properties = new Properties();// 在控制台创建的Group ID,不同于本地事务参与方使用的Group IDproperties.put(PropertyKeyConst.GROUP_ID, "XXX");// AccessKey ID阿里云身份验证,在阿里云RAM控制台创建。properties.put(PropertyKeyConst.AccessKey, "XXX");// Accesskey Secret阿里云身份验证,在阿里云服RAM控制台创建。properties.put(PropertyKeyConst.SecretKey, "XXX");// 设置TCP接入域名,进入控制台的实例详情页面的TCP协议客户端接入点区域查看。properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX");Consumer consumer = ONSFactory.createConsumer(properties);consumer.start();}public static void main(String[] args) {consumer.subscribe(TOPIC, "*", (message, context) -> {LOGGER.info("Receive: " + message);businessService.doBusiness(message);// 返回CommitMessage,代表给予消息队列集群异步消息已经得到正常处理的回馈return Action.CommitMessage;});}
}

事务回滚

是否存在这样的情况:当本地事务执行成功后,因为远程事务没有办法执行,而导致本地事务需要进行回滚操作呢?在事务消息原理分析一节,我们已经介绍过如何通过消息重试,确保远程事务能够执行成功,这是不是已经说明只要异步消息被确认,远程事务就一定可以执行成功,从而不存在对本地事务的回滚呢?

实际生产情况下,确实存在远程事务无法正常执行的情况。比如在付款成功阶段,当本地事务“修改订单状态”执行完成后,在执行远程事务“通知发货”的时,因为订单地址有误而被物流公司拒绝,这种情况下就必须对订单状态进行回退操作,并发起退款流程。

所以在执行远程事务的时候,我们有必要区分如下两种完全不同的异常:

  • 技术异常:远程事务参与方宕机、网络故障、数据库故障等。

  • 业务异常:远程逻辑在业务上无法执行、代码业务逻辑错误等。

简单来讲,当远程事务执行失败的时候,能够通过消息重试的方式解决问题的,属于技术异常;否则,属于业务异常。基于事务消息的分布式事务机制不能实现自动回滚,当业务异常发生的时候,必须通过回退流程确保已经完成的本地事务得到恢复。比如在修改订单状态 -> 通知发货这个场景中,如果由于业务异常导致无法发货的时候,需要通过额外的回退流程,将订单状态设置为“已取消”,并执行退款流程。


在事务消息机制中,回退流程相当于远程事务参与方和本地事务参与方调换了角色,和正常流程一样,同样也可以通过事务消息来完成分布式事务。由于正常流程和回退流程的业务逻辑是完全不一样的,所以最理想的方式是建立另外一个 Topic 来实现。这也就说明,我们在创建事务消息 Topic 的时候,要充分考虑到这个 Topic 背后的业务含义,并在 Topic 命名上尽可能的与真实业务相匹配

多个事务参与方

本节展示的示例中,都只涉及到 2 个事务参与方,但在真实世界中,分布式事务往往涉及到更多的事务参与方,比如之前提到的付款成功环节,有修改订单状态->扣减库存->通知发货->增加积分这 4 个需要同时进行的操作,涉及到 4 个事务参与方。这种情况下如何通过事务消息来实现分布式事务呢?

我们依然可以继续使用之前的架构,只需要加入多个远程事务参与方就行了。可以通过 RocketMQ 的多消费组关联多个远程事务参与方,每一个参与方对应一个 Group ID,在这种情况下,同一个异步消息会复制成多份投递给不同的事务参与方。

需要特别引起注意的是,当某个远程事务参与方遇到业务异常的时候,需要通知其他所有事务参与方执行回退流程,这无疑会增加业务逻辑的整体复杂度。为了简化事务消息的执行流程,我们可以对业务逻辑预先进行梳理,将子事务分为如下两类:

  • 有可能发生业务异常的:比如锁定库存的操作,有可能因为库存不足而执行失败。又比如扣除积分的操作,有可能因为用户积分不足而无法扣除。

  • 不太可能发生业务异常的:比如删除购物车条目的操作,除非是技术类故障,一定可以执行成功,即便对应的条目并不存在,也没有关系。又比如积分增加的操作,只要对应的用户没有注销,是不可能遇到业务异常的。

我们尽量将第一类事务作为本地事务而实现,将第二类事务作为远程事务而实现,这样就可以最大程度避免回退流程。

其他注意事项


消息幂等

RocketMQ 能保证消息不丢失,但不能保证消息不重复,所以消费者在接收到消息后,有必要根据业务上的唯一 Key 对消息做幂等处理。在抢购业务中,唯一 Key 当然就是全局唯一的交易流水号,具体幂等处理方法在互联网上有很多文章供读者参考。

当然,不是每一种业务远程事务都需要确保消息的幂等性,比如删除购物车指定条目这样的操作,在业务上是可以容忍多次反复执行的,就没有必要引入额外的幂等处理了。

每日对账

不同于传统事务的强一致性保证,柔性事务需要经历一个中间状态,才到达成事务的最终一致性。有某些特殊情况下,这个中间状态会持续非常长的时间,甚至需要人工主动介入才能实现最终一致性:

1、消息重试多次后,依然不成功:当消费者完全无法正常工作的时候,RocketMQ 不可能永无止境地重试消息,事实上,如果16次重试后异步消息依然没有办法被正常处理,RocketMQ 会停止尝试,将消息放到一个特殊的队列中。

2、未处理的业务异常:比如给某个账号加积分的时候,发现此账号被注销了,这是一个非常罕见的业务现象,有可能事先对此并没有健壮的处理机制。

3、幂等校验失败:处理幂等所依赖的系统比如 Redis 发生了故障,导致某些消息被重复处理。

4、其他严重的系统故障:比如网络长时间中断,留下了大量执行到一半的事务。

5、其他漏网之鱼。

这些情况下,我们都有需要通过定期对账机制来进行排查,在必要的时候发起人工主动介入流程,修复不一致的数据。事实上,在任何柔性事务的实现中,每日对账都是必不可少的数据安全保障性手段。


总结


在柔性事务的多种实现中,事务消息是最为优雅易用的一种。基于阿里云 RocketMQ 高性能、高可用的特点,完全可以胜任抢购业务这类高并发大流量的场景。在阿里巴巴自身的业务中,事务消息也广泛使用于双 11 这样的大型营销活动中,有着非常高的通用性。

但在IT领域,没有任何一种技术是银弹,引入事务消息机制需要针对性的修改业务逻辑,还需要借助于每日对账等额外的手段确保数据安全,在实现高性能的同时,也增加了整体的业务复杂度。我们需要对业务场景进行充分评估,对比多种不同的技术实现方案,从中挑选与自身业务特点最为匹配的一种,才能更好地发挥柔性事务的价值。

想知道更多?描下面的二维码关注我

后台回复"技术",加入技术群

后台回复“k8s”,可领取k8s资料

【精彩推荐】

  • 原创|OpenAPI标准规范

  • 中台不是万能药,关于中台的思考和尝试

  • ClickHouse到底是什么?为什么如此牛逼!

  • 原来ElasticSearch还可以这么理解

  • 面试官:InnoDB中一棵B+树可以存放多少行数据?

  • 微服务下如何解耦?对于已经紧耦合下如何重构?

  • 如何构建一套高性能、高可用、低成本的视频处理系统?

  • 架构之道:分离业务逻辑和技术细节

  • 星巴克不使用两阶段提交

点个赞+在看,少个 bug ????

这篇关于如何通过事务消息保障抢购业务的分布式一致性?的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1129538

相关文章

业务中14个需要进行A/B测试的时刻[信息图]

在本指南中,我们将全面了解有关 A/B测试 的所有内容。 我们将介绍不同类型的A/B测试,如何有效地规划和启动测试,如何评估测试是否成功,您应该关注哪些指标,多年来我们发现的常见错误等等。 什么是A/B测试? A/B测试(有时称为“分割测试”)是一种实验类型,其中您创建两种或多种内容变体——如登录页面、电子邮件或广告——并将它们显示给不同的受众群体,以查看哪一种效果最好。 本质上,A/B测

业务协同平台--简介

一、使用场景         1.多个系统统一在业务协同平台定义协同策略,由业务协同平台代替人工完成一系列的单据录入         2.同时业务协同平台将执行任务推送给pda、pad等执行终端,通知各人员、设备进行作业执行         3.作业过程中,可设置完成时间预警、作业节点通知,时刻了解作业进程         4.做完再给你做过程分析,给出优化建议         就问你这一套下

集中式版本控制与分布式版本控制——Git 学习笔记01

什么是版本控制 如果你用 Microsoft Word 写过东西,那你八成会有这样的经历: 想删除一段文字,又怕将来这段文字有用,怎么办呢?有一个办法,先把当前文件“另存为”一个文件,然后继续改,改到某个程度,再“另存为”一个文件。就这样改着、存着……最后你的 Word 文档变成了这样: 过了几天,你想找回被删除的文字,但是已经记不清保存在哪个文件了,只能挨个去找。真麻烦,眼睛都花了。看

MySql 事务练习

事务(transaction) -- 事务 transaction-- 事务是一组操作的集合,是一个不可分割的工作单位,事务会将所有的操作作为一个整体一起向系统提交或撤销请求-- 事务的操作要么同时成功,要么同时失败-- MySql的事务默认是自动提交的,当执行一个DML语句,MySql会立即自动隐式提交事务-- 常见案例:银行转账-- 逻辑:A给B转账1000:1.查询

开源分布式数据库中间件

转自:https://www.csdn.net/article/2015-07-16/2825228 MyCat:开源分布式数据库中间件 为什么需要MyCat? 虽然云计算时代,传统数据库存在着先天性的弊端,但是NoSQL数据库又无法将其替代。如果传统数据易于扩展,可切分,就可以避免单机(单库)的性能缺陷。 MyCat的目标就是:低成本地将现有的单机数据库和应用平滑迁移到“云”端

ActiveMQ—消息特性(延迟和定时消息投递)

ActiveMQ消息特性:延迟和定时消息投递(Delay and Schedule Message Delivery) 转自:http://blog.csdn.net/kimmking/article/details/8443872 有时候我们不希望消息马上被broker投递出去,而是想要消息60秒以后发给消费者,或者我们想让消息没隔一定时间投递一次,一共投递指定的次数。。。 类似

Lua 脚本在 Redis 中执行时的原子性以及与redis的事务的区别

在 Redis 中,Lua 脚本具有原子性是因为 Redis 保证在执行脚本时,脚本中的所有操作都会被当作一个不可分割的整体。具体来说,Redis 使用单线程的执行模型来处理命令,因此当 Lua 脚本在 Redis 中执行时,不会有其他命令打断脚本的执行过程。脚本中的所有操作都将连续执行,直到脚本执行完成后,Redis 才会继续处理其他客户端的请求。 Lua 脚本在 Redis 中原子性的原因

深入解析秒杀业务中的核心问题 —— 从并发控制到事务管理

深入解析秒杀业务中的核心问题 —— 从并发控制到事务管理 秒杀系统是应对高并发、高压力下的典型业务场景,涉及到并发控制、库存管理、事务管理等多个关键技术点。本文将深入剖析秒杀商品业务中常见的几个核心问题,包括 AOP 事务管理、同步锁机制、乐观锁、CAS 操作,以及用户限购策略。通过这些技术的结合,确保秒杀系统在高并发场景下的稳定性和一致性。 1. AOP 代理对象与事务管理 在秒杀商品

MySQL中一致性非锁定读

一致性非锁定读(consistent nonlocking read)是指InnoDB存储引擎通过多版本控制(multi versionning)的方式来读取当前执行时间数据库中行的数据,如果读取的行正在执行DELETE或UPDATE操作,这是读取操作不会因此等待行上锁的释放。相反的,InnoDB会去读取行的一个快照数据 上面展示了InnoDB存储引擎一致性的非锁定读。之所以称为非锁定读,因

InnoDB的多版本一致性读的实现

InnoDB是支持MVCC多版本一致性读的,因此和其他实现了MVCC的系统如Oracle,PostgreSQL一样,读不会阻塞写,写也不会阻塞读。虽然同样是MVCC,各家的实现是不太一样的。Oracle通过在block头部的事务列表,和记录中的锁标志位,加上回滚段,个人认为实现上是最优雅的方式。 而PostgreSQL则更是将多个版本的数据都放在表中,而没有单独的回滚段,导致的一个结果是回滚非