数字化转型--基于事件驱动的中台架构实践(一)

2024-08-22 00:18

本文主要是介绍数字化转型--基于事件驱动的中台架构实践(一),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

1.背景

2.什么是业务流程驱动引擎

3.核心概念

4.技术实现

5.思考及后续规划


引言:在方舟(业务能力研发协作平台)、水滴(业务能力扩展框架)、业务能力共享模型(业务侧可视化的自定义业务对象)等数字中台基础设施的帮助下,我们的技术体系从幼儿园升入了小学,而事件可视化配置平台和业务流程驱动引擎则让我们完成了小升初。

1.背景

目前我司的业务环境和数据环境基于政府的监管要求而日趋复杂,因业务合规和数据合规的要求导致我们对业务和数据的隔离格外慎重。在这样的政策限定条件下,如何在面对不确定性的复杂业务和环境下环境,如何以安全、高效的方式进行业务复杂度的讲解及业务能力沉淀和复用是我们当前面临的课题。为什么需要业务流程驱动引擎?从业务的流程和业务活动的组织层面来讲,企业的业务实质上是一系列的业务活动基于特定的业务场景或业务规则在时间维度的时序性组合表达。在传统的研发模式中,开发人员在应用内部基于上帝之手般的代码组织聚合业务代码,但是在跨应用的消息触达和通知的场景上帝之手也难免难堪。

例如在业务的抽象和设计中,对于订单创建这个场景,在业务系统中的关联业务需要对订单创建的事件进行感知。如用户购买了一张从杭州飞往背景的机票,在支付环节因网络或某种因素导致支付失败,客服系统需要感知这个订单创建的事件然后基于订单的状态进行相关的障碍进行预测和行程预测等用户体验服务的提升,产品运营人员需要及时感知该机票支付失败后某段时间内依旧没有尝试重新支付,我们需要热线进行跟进。 在这种流程式聚合的场景上帝之手也不好使了。

2.什么是业务流程驱动引擎

业务流程驱动引擎是我们基于业务中台的业务场景打造的无服务器事件总线服务,解决跨应用之间的路由事件,消息注册、订阅、管理等问题,简化接入配合配置,帮助研发人员轻松构建松耦合、分布式的事件驱动架构的中台化的基础设施组件。

3.核心概念

事件业务中数据变化的记录或动作。

生产应用产生数据变化并发送事件消息的应用。

订阅应用:监听或消费事件消息的应用。

业务线数据产生业务部门或业务实体如新零售事业群、客服体验事业群。

消息投递通道:数据产生消息发送的通道如RockMateq或kafk。

事件名称:数据产生数据变化所产生的业务活动如创建订单/修改收货地址等。

消息主题:消息队列 RocketMQ的Topic。

事件编码:消息队列 RocketMQ的Tag。

运行模式:消息投递后消息消费的模式有自主消费模式(订阅方自己监听并处理消息)和托管消费模式(业务流程驱动引擎统一监听并处理消息)。

触发类型:消息触发处理的通道。

系统服务:业务提供的消息触达后调用的处理程序或服务接口。

4.技术实现

为实现事件驱动架构,我们从消息通道统一、云服务配置简化、业务系统接入集成、业务配置可视、业务流程可编排五个方面入手,来建设基于事件消息驱动的业务流程驱动引擎引擎系统。

4.1 统一消息通道

目前的业务中使用了多种消息中间件,在中台化建设中我们统一了消息中间件的选型,统一使用阿里云的消息队列 RocketMQ进行消息生产和消费,通过提供统一的消息总线服务,消息的接入可以简化到只需要调用消息发送的接口即可;系统无需额外进行配置,无需关心业务环境的隔离,我们通过SpringBoot规范提供了我司内部的Starter集成。

案列一:使用消息总线发送事件消息

  1 引入 依赖包:

            <dependency><groupId>com.xxxx.event</groupId><artifactId>business-driven-engine-client</artifactId><version>${business-driven-engine-version}</version></dependency>

使用示例代码:

@Reference(version = "1.0.0", timeout = 3000, check = false, interfaceClass = EventSourceService.class)private EventSourceService eventSourceService;/*** 同步发送事件消息* @param eventSourceMessage 事件消息* @return*/public  Result<Boolean>  publish(EventSourceMessage eventSourceMessage) {try {Result<Boolean> result = eventSourceService.publish(eventSourceMessage);logger.info("EventSourceService.publish### 同步发送信息到事件中心. eventSourceMessage:{}, result={}", eventSourceMessage.toString(), result);return result;} catch (Exception e) {logger.error("EventSourceService.publish### 同步消息发送异常, eventSourceMessage={} msg={}", eventSourceMessage.toString(), e.getMessage());return  Result.failure(ErrorCode.DATA_COLLECT_MESSAGE_SEND_EXCEPTION.getErrCode(), e.getMessage());}}/*** 异步发送事件消息* @param eventSourceMessage 事件消息* @return*/public  Result<Boolean>  asyncPublish(EventSourceMessage eventSourceMessage) {try {Result<Boolean> result = eventSourceService.asyncPublish(eventSourceMessage);logger.info("EventSourceService.asyncPublish### 异步发送信息到事件中心. eventSourceMessage:{}, result={}", eventSourceMessage.toString(), result);return result;} catch (Exception e) {logger.error("EventSourceService.asyncPublish### 异步消息发送异常, eventSourceMessage={} msg={}", eventSourceMessage.toString(), e.getMessage());return  Result.failure(ErrorCode.DATA_COLLECT_MESSAGE_SEND_EXCEPTION.getErrCode(), e.getMessage());}}

案列二:使用集成Starter发送事件消息

  1 引入依赖包:

            <dependency><groupId>com.xxxx.event.bus</groupId><artifactId>event-bus-spring-boot-starter</artifactId><version>1.0.0</version></dependency>

2 引用消息生产者

@Autowired
private ProducerBean producerBean;

 ProducerBean已经在应用启动的时候构建完成。

 通过nacos配置中心对项目环境、日常环境、预发环境、生产环境进行了隔离,配置统一在配置中心配置,业务方无需关注技术细节。

    private static final Logger logger = LoggerFactory.getLogger(ProducerRepository.class);/*** Job处理线程池*/ExecutorService callbackExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() + 1,Runtime.getRuntime().availableProcessors() + 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(1000),new ThreadFactoryBuilder().setNameFormat("event-driven-engine-callback-executor-pool-%d").build());@Autowiredprivate ProducerBean producerBean;/*** 发布事件消息** @param eventSourceMessage*/public SendResult rocketPublish(EventSourceMessage eventSourceMessage, EventDO event) {//消息结构化MessageBody messageBody = new MessageBody(JSONObject.toJSONString(eventSourceMessage));Message message = new Message(event.getEventTopic(), event.getEventCode(), eventSourceMessage.getKey(), messageBody.getMessage().getBytes());return producerBean.send(message);}/*** 异步发送事件消息,只要不抛异常就是成功* @param eventSourceMessage* @param event*/public void asyncPublish(EventSourceMessage eventSourceMessage, EventDO event) {producerBean.setCallbackExecutor(callbackExecutor);//消息结构化MessageBody messageBody = new MessageBody(JSONObject.toJSONString(eventSourceMessage));Message message = new Message(event.getEventTopic(), event.getEventCode(), eventSourceMessage.getKey(), messageBody.getMessage().getBytes());producerBean.sendAsync(message, new SendCallback() {@Overridepublic void onSuccess(final SendResult sendResult) {if (StringUtils.isNotBlank(sendResult.getMessageId())) {logger.info("异步消息发送成功: topic={}, eventCode={}, bizKey={}", event.getEventTopic(), event.getEventCode(), eventSourceMessage.getKey());} else {logger.info("异步消息发送失败: topic={}, eventCode={}, bizKey={}", event.getEventTopic(), event.getEventCode(), eventSourceMessage.getKey());}}@Overridepublic void onException(final OnExceptionContext context) {logger.info("异步消息发送异常: topic={}, eventCode={}, bizKey={}, messageId={}", event.getEventTopic(), event.getEventCode(), eventSourceMessage.getKey(), context.getMessageId());}});}

案列三:使用消息托管模式处理相关业务(业务驱动引擎统一消费消息并调用绑定的业务处理逻辑)

1 引入二方包

            <dependency><groupId>com.xxxx.event</groupId><artifactId>business-driven-engine-client</artifactId><version>${business-driven-engine-version}</version></dependency>

2 实现EventHandleService接口

基于便捷和统一的考量,我们统一提供泛化调用接口,使用Dubbo的泛化调用和版本隔离机制。具体实现如下:

                                     

托管消费会在消息触达后通过泛化调用工厂调用这个接口进行相关的业务处理。

案列四:使用自主消费模式处理相关业务(业务方监听消息并业务处理逻辑)

1 引入二方包

<dependency><groupId>com.xxxx.event.bus</groupId><artifactId>event-bus-spring-boot-starter</artifactId><version>1.0.0</version></dependency>

2 在yaml文件中配置订阅分组

spring:event:rocketmqConfig:groupId: GID_RMQ_TOPIC_EVENT_BUS

3 编写自主消费的订阅处理程序

@Slf4j
@Subscribe(topic = "RMQ_TOPIC_EVENT_BUS", tag = "EVENT_EVENT-PLATFORM_EVENT_BUS_TAG")
@Component
public class BusinessDrivenEngineMessageHandler implements MessageHandler {@Overridepublic Action handle(String topic, String tag, Message message) {if(null == message){throw new EventBizException(EventEngineErrorCode.EVENT_MESSAGE_IS_EMPTY.getErrCode());}String appName =  DomainApplicationConstant.APPLICATION.toUpperCase();Subscribe subscribeAnn = this.getClass().getDeclaredAnnotation(Subscribe.class);String eventCode = subscribeAnn.tag();String subscribeCode = CommonConstant.SUBSCRIBE_PREFIX + eventCode + CommonConstant.SEPARATOR +  appName;log.info("EventDrivenEngineListener.handle#  messageId={}, topic={}, tag={}, key={}", message.getMsgID(), message.getTopic(), message.getTag(), message.getKey());Entry entry = null;try{// 定义资源。为了便于标识,资源名称定义为Group ID和Topic的组合。Group ID和Topic可以通过消息队列RocketMQ控制台获得。entry = SphU.entry("EventDrivenEngineListener.consume:" + topic);int reconsume = message.getReconsumeTimes();/** 消息已经重试了3次,如果不需要再次消费,则返回成功*/if (reconsume == CommonConstant.RETRIES_NUMBER) {log.info("EventTopicConsumeHandler.consume# 消息已经重试了{}次, 设置为消费成功", reconsume);return Action.CommitMessage;}return Action.CommitMessage;} catch(BlockException e){log.error("EventDrivenEngineListener.consume# 消息消费异常. messages={}, msg={}, exception={}", JSONObject.toJSONString(message), e.getMessage(), e);return Action.ReconsumeLater;} finally {if (entry != null) {entry.exit();}}}
}

4 注册订阅处理服务

    @Autowiredprivate BusinessDrivenEngineMessageHandler businessDrivenEngineMessageHandler;@Beanpublic EventBusRocketmqConsumeListener busRocketmqConsumeListener(){EventBusRocketmqConsumeListener eventBusRocketmqConsumeListener = new EventBusRocketmqConsumeListener();eventBusRocketmqConsumeListener.registerHandler(businessDrivenEngineMessageHandler);return eventBusRocketmqConsumeListener;}

备注:使用该注解的时候需要设置topic和tag(tag = "*"表示定约该消息下的所有Tag)

Subscribe(topic = "RMQ_TOPIC_EVENT_BUS", tag = "EVENT_EVENT-PLATFORM_EVENT_BUS_TAG")

案列五:使用自主消费模式触发业务驱动引擎

              此方式适用于业务方自主监听消息,然后通过触发器触发在配置平台配置好的任务清单,详请请下下文。

1 引入二方包

<dependency><groupId>com.xxxx.event.bus</groupId><artifactId>event-bus-spring-boot-starter</artifactId><version>1.0.0</version></dependency>

2 在yaml文件中配置订阅分组

spring:event:rocketmqConfig:groupId: GID_RMQ_TOPIC_EVENT_BUS

3 编写自主消费的订阅处理程序

@Slf4j
@Subscribe(topic = "RMQ_TOPIC_EVENT_BUS", tag = "EVENT_EVENT-PLATFORM_EVENT_BUS_TAG")
@Component
public class BusinessDrivenEngineMessageHandler implements MessageHandler {@Reference(version = "1.0.0", timeout = 3000, check = false, interfaceClass = EventSourceService.class)private EventSourceService eventSourceService;@Overridepublic Action handle(String topic, String tag, Message message) {if(null == message){throw new EventBizException(EventEngineErrorCode.EVENT_MESSAGE_IS_EMPTY.getErrCode());}log.info("EventDrivenEngineListener.handle#  messageId={}, topic={}, tag={}, key={}", message.getMsgID(), message.getTopic(), message.getTag(), message.getKey());try{Result<Boolean> result = eventSourceService.trigger(new EventMessageTriggerParam(message.getMsgID(), message.getTopic(), message.getTag(), message.getKey(), new String(message.getBody(), StandardCharsets.UTF_8)));return result.isSuccess() && result.getData() ? Action.CommitMessage : Action.ReconsumeLater;} catch(EventBizException e){log.error("EventDrivenEngineListener.consume# 消息消费异常. messages={}, msg={}, exception={}", JSONObject.toJSONString(message), e.getMessage(), e);return Action.ReconsumeLater;}}
}

4 注册订阅处理服务

    @Autowiredprivate BusinessDrivenEngineMessageHandler businessDrivenEngineMessageHandler;@Beanpublic EventBusRocketmqConsumeListener busRocketmqConsumeListener(){EventBusRocketmqConsumeListener eventBusRocketmqConsumeListener = new EventBusRocketmqConsumeListener();eventBusRocketmqConsumeListener.registerHandler(businessDrivenEngineMessageHandler);return eventBusRocketmqConsumeListener;}

4.2 阿里云服务配置简化

在业务调研中发现业务使用了多种消息通道,大量的Topic和队列不知道用在了什么业务场景、Topic是那也业务那个应用创建的、有哪些业务和应用订阅消费Topic、这些生产者和订阅者是为什么业务服务的,更有甚者在同一实例中跑着测试的和生产的Topic。这里的乱象我不过多叙述,基于这样的乱象,我们决定对事件的生产者和订阅者的关系进行维护,使其能够结构化的展现其组织关系,同时保证事件业务语义的场景化使其可以被复用。

4.2.1. 事件配置管理

为寻求事件治理的可视、可控、可管理,我们决定在研发协同层面做文章,具体实现方案如下:

  • 事件定义场景化:每个事件都有唯一的标识,每个事件都有业务化的语义描述;

  • 统一配置管理:提供统一的配合操作界面;

  • 集成消息通道:屏蔽底层的消息队列,对事件的消息投递业务可以选择多种消息投递通道;

  • 确定生产和消费的关系:通过结构化的方式展现事件原宿关系,如这个事件是那个业务线的哪个应用生产的、事件的业务场景是什么、使用什么消息通道、关联那个Topic以及这个事件被哪些应用订阅、订阅方订阅消息的用途、运行的模式等信息。

     

                                                         

 

4.2.2 订阅配置管理

为管理业务中的订阅者我们对订阅信息使用配置界面进行配置,具体方案如下:

  • 订阅关系层次化:每个订阅者都有唯一的标识及订阅分组;

  • 订阅数据同步:通过阿里云接口同步创建订阅者的分组信息;

  • 业务可配置化:订阅者消息消费、触达运行等都通过界面进行配置下发;

                                                                                   

                                                                                                                                     订阅配置界面

                                                                     

4.2.3. 订阅消费服务可配置可编排

为实现一个业务事件触发后,多个订阅者的消息触发任务的编排,我们对订阅者的消费处理逻辑做了抽象和封装,具体方案如下:

  • 任务可编排:事件订阅者可以同时挂载多个触发处理任务,任务之间的执行顺序可以通过拖拽实现调整;

  • 任务可复用:挂载的任务是独立的系统服务接口,是封装的细粒度的业务逻辑,可以复用;

  • 调用通道泛化:提供统一的接口,通过实现接口和版本进行泛化调用;                                                                     

                                                                 

5.思考及后续规划

业务驱动引擎一期上线后,彻底解决了依靠手工维护数据和配置的困境,使数据归属更加明确,使业务场景和业务语义更清晰,使业务关系更明确,使事件驱动和复用变得可能。但随着业务的发展,驱动任务的串行和并行的场景快速增加,我们又面临着新的课题。归纳起来有以下几点:

  • 如何保证并行网关和串行网关之间的输入和输出可以管道式的执行

  • 如何构建企业级的业务流程编排引擎实现地代码的业务开发?

这篇关于数字化转型--基于事件驱动的中台架构实践(一)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1094698

相关文章

Docker集成CI/CD的项目实践

《Docker集成CI/CD的项目实践》本文主要介绍了Docker集成CI/CD的项目实践,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学... 目录一、引言1.1 什么是 CI/CD?1.2 docker 在 CI/CD 中的作用二、Docke

mybatis的整体架构

mybatis的整体架构分为三层: 1.基础支持层 该层包括:数据源模块、事务管理模块、缓存模块、Binding模块、反射模块、类型转换模块、日志模块、资源加载模块、解析器模块 2.核心处理层 该层包括:配置解析、参数映射、SQL解析、SQL执行、结果集映射、插件 3.接口层 该层包括:SqlSession 基础支持层 该层保护mybatis的基础模块,它们为核心处理层提供了良好的支撑。

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

百度/小米/滴滴/京东,中台架构比较

小米中台建设实践 01 小米的三大中台建设:业务+数据+技术 业务中台--从业务说起 在中台建设中,需要规范化的服务接口、一致整合化的数据、容器化的技术组件以及弹性的基础设施。并结合业务情况,判定是否真的需要中台。 小米参考了业界优秀的案例包括移动中台、数据中台、业务中台、技术中台等,再结合其业务发展历程及业务现状,整理了中台架构的核心方法论,一是企业如何共享服务,二是如何为业务提供便利。

系统架构师考试学习笔记第三篇——架构设计高级知识(20)通信系统架构设计理论与实践

本章知识考点:         第20课时主要学习通信系统架构设计的理论和工作中的实践。根据新版考试大纲,本课时知识点会涉及案例分析题(25分),而在历年考试中,案例题对该部分内容的考查并不多,虽在综合知识选择题目中经常考查,但分值也不高。本课时内容侧重于对知识点的记忆和理解,按照以往的出题规律,通信系统架构设计基础知识点多来源于教材内的基础网络设备、网络架构和教材外最新时事热点技术。本课时知识

系统架构设计师: 信息安全技术

简简单单 Online zuozuo: 简简单单 Online zuozuo 简简单单 Online zuozuo 简简单单 Online zuozuo 简简单单 Online zuozuo :本心、输入输出、结果 简简单单 Online zuozuo : 文章目录 系统架构设计师: 信息安全技术前言信息安全的基本要素:信息安全的范围:安全措施的目标:访问控制技术要素:访问控制包括:等保

利用命令模式构建高效的手游后端架构

在现代手游开发中,后端架构的设计对于支持高并发、快速迭代和复杂游戏逻辑至关重要。命令模式作为一种行为设计模式,可以有效地解耦请求的发起者与接收者,提升系统的可维护性和扩展性。本文将深入探讨如何利用命令模式构建一个强大且灵活的手游后端架构。 1. 命令模式的概念与优势 命令模式通过将请求封装为对象,使得请求的发起者和接收者之间的耦合度降低。这种模式的主要优势包括: 解耦请求发起者与处理者

Prometheus与Grafana在DevOps中的应用与最佳实践

Prometheus 与 Grafana 在 DevOps 中的应用与最佳实践 随着 DevOps 文化和实践的普及,监控和可视化工具已成为 DevOps 工具链中不可或缺的部分。Prometheus 和 Grafana 是其中最受欢迎的开源监控解决方案之一,它们的结合能够为系统和应用程序提供全面的监控、告警和可视化展示。本篇文章将详细探讨 Prometheus 和 Grafana 在 DevO

springboot整合swagger2之最佳实践

来源:https://blog.lqdev.cn/2018/07/21/springboot/chapter-ten/ Swagger是一款RESTful接口的文档在线自动生成、功能测试功能框架。 一个规范和完整的框架,用于生成、描述、调用和可视化RESTful风格的Web服务,加上swagger-ui,可以有很好的呈现。 SpringBoot集成 pom <!--swagge

创业者该如何设计公司的股权架构

本文来自七八点联合IT橘子和车库咖啡的一系列关于设计公司股权结构的讲座。 主讲人何德文: 在公司发展的不同阶段,创业者都会面临公司股权架构设计问题: 1.合伙人合伙创业第一天,就会面临股权架构设计问题(合伙人股权设计); 2.公司早期要引入天使资金,会面临股权架构设计问题(天使融资); 3.公司有三五十号人,要激励中层管理与重要技术人员和公司长期走下去,会面临股权架构设计问题(员工股权激