分布式任务调度平台SIA-TASK的架构设计与运行流程

2023-11-10 03:59

本文主要是介绍分布式任务调度平台SIA-TASK的架构设计与运行流程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、分布式任务调度的背景

无论是互联网应用或者企业级应用,都充斥着大量的批处理任务。我们常常需要一些任务调度系统来帮助解决问题。随着微服务化架构的逐步演进,单体架构逐渐演变为分布式、微服务架构。在此背景下,很多原先的任务调度平台已经不能满足业务系统的需求,于是出现了一些基于分布式的任务调度平台。

1.1 分布式任务调度的演进

在实际业务开发过程中,很多时候我们无可避免地需要使用一些定时任务来解决问题。通常我们会有多种解决方案:使用 Crontab 或 SpringCron (当然这种情况可能机器很少而且任务简单又不是很多的情况下)。然而,当应用复杂度升高、定时任务数量增多且任务之间产生依赖关系时,Crontab 进行定时任务的管理配置就会非常混乱,严重影响工作效率。这时就会产生一系列问题:

  • 任务管理混乱,生命周期无法统一协调管理;
  • 任务之间如果存在依赖关系,难以编排。

随着互联网的发展,分布式服务架构势越来越流行。相应的也需要一个分布式任务调度系统来管理分布式架构中的定时任务。

1.2 分布式任务调度架构

分布式任务调度设计

当垂直应用越来越多,应用之间交互也会越来越复杂,通常我们采用分布式或者微服务架构,将核心业务抽取出来,形成单独的服务。一个独立的微服务群体逐渐形成稳定的服务中心,使得业务应用能更快地响应多变的市场需求。

此时,用于提高业务复用及整合的分布式服务框架成为关键。同时,由于服务独立,一般能做到定时任务独立的情况,任务的更改对于整体系统的影响小之又小。通常我们会采用任务与调度分离的方式(如上图所示),任务的执行逻辑无需关注调度与编排,同时可以保证执行器和调度的高可用,易于开发和维护。

1.3 分布式任务调度优势

在分布式服务架构的基础上,由于独立业务的数量可能很多,此时如果定时任务单独在该服务中实现,很可能会出现难以管理的情况,且避免不了由于定时任务的更改而导致的业务重启。因此,一个独立的分布式任务调度系统是很必要的,可以用来全局统筹管理所有的定时任务。同时,将任务的配置单独抽离出来,作为该分布式任务调度系统的功能,就能做到定时任务的更改不影响任何业务,也不影响整个系统:

  • 通过调度与任务分离的方式进行管理,大大降低了开发和维护成本;
  • 分布式部署,保证了系统的高可用性、伸缩性、负载均衡,提高了容错性;
  • 可以通过控制台部署和管理定时任务,方便灵活高效;
  • 任务都可以持久化到数据库,避免了宕机和数据丢失带来的隐患,同时有完善的任务失败重做机制和详细的任务跟踪及告警策略。

二、分布式任务调度技术选型

2.1 分布式任务调度考虑因素

sia-task-设计图

  • 任务编排:多个业务之间的定时任务存在流程次序。
  • 任务分片:对于一个大型任务,需要分片并行执行。
  • 跨平台:除了使用 Java 技术栈(SpringBoot、Spring等)的项目之外,还有使用其他语言的应用。
  • 无侵入:业务不希望与调度高耦合,只关注业务的执行逻辑。
  • 故障转移:任务执行过程中遇到问题有补偿措施,减少人工介入。
  • 高可用:调度系统自身必须保证高可用。
  • 实时监控:实时获取任务的执行状态。
  • 可视化:任务调度的操作提供可视化页面,方便使用。
  • 动态编辑:业务的任务时钟参数可能变动,不希望停机部署。

2.2 SIA-TASK与其它分布式任务调度技术比较

SIA是宜信公司基础开发平台Simple is Awesome的简称,SIA-TASK(微服务任务调度平台)是其中的一项重要产品,SIA-TASK契合当前微服务架构模式,具有跨平台、可编排、高可用、无侵入、一致性、异步并行、动态扩展、实时监控等特点。

开源地址:https://github.com/siaorg/sia-task

我们先对比市场上主流的开源分布式任务调度框架,分析其优缺点,然后再介绍我们的技术选型。

  • Quartz: Quartz 是 OpenSymphony 开源组织在任务调度领域的一个开源项目,完全基于 Java 实现。该项目于 2009 年被 Terracotta 收购,目前是 Terracotta 旗下的一个项目。相比于 JDK 或 Spring 提供的定时任务,Quartz 对单个任务的控制基本做到了极致,以其强大功能和应用灵活性,在企业应用中发挥了巨大的作用。然而 Quartz 并不支持任务的编排(任务之间有依赖),而且不支持任务分片。
  • TBSchedule: TBSchedule 是一个支持分布式的调度框架,能让一种批量任务或者不断变化的任务,被动态地分配到多个主机的 JVM 中,不同的线程组中并行执行。基于 ZooKeeper 的纯 Java 实现,由 Alibaba 开源。TBSchedule 侧重于任务的分发,支持任务分片,但是没有任务编排,也不是跨平台的。
  • Elastic-Job: Elastic-Job 是当当开源的一个分布式调度解决方案,由两个相互独立的子项目Elastic-Job-Lite 和 Elastic-Job-Cloud 组成。Elastic-Job 支持任务分片(作业分片一致性),但是没有任务编排,也不是跨平台的。
  • Saturn: Saturn 是唯品会开源的分布式,高可用的调度服务。Saturn 在 Elastic-Job 做二次开发,支持监控、任务分片、跨平台,但是没有任务编排。
  • Antares: Antares 是基于 Quartz 的分布式调度,支持分片、支持树形任务依赖,但不是跨平台的。
  • Uncode-Schedule: Uncode-Schedule 是基于 Zookeeper 的分布式任务调度组件。支持所有任务在集群中不重复、不遗漏的执行。支持动态添加和删除任务。但是不支持任务分片,也没有任务编排,还不是跨平台的。
  • XXL-JOB: XXL-JOB 是一个轻量级分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。XXL-JOB 支持分片,简单支持任务依赖,支持子任务依赖,不是跨平台的。

下面我们简单对比下 SIA-TASK 与这些任务调度框架:

 任务编排任务分片跨平台高可用故障转移实时监控
SIA-TASK
Quartz××.NET×API监控
TBSchedule××
Elastic-Job××
Saturn×
Antares×
Uncode-Schedule×××
XXL-JOB子任务依赖×

可以发现,这些调度框架基本上都支持高可用、故障转移与实时监控等功能,但是对于任务编排、任务分片与跨平台等功能的支持各有侧重点。SIA-TASK 将全面支持这些功能。

三、SIA-TASK介绍

3.1 SIA-TASK技术选型

sia-task-technology

  • REST:一种软件架构风格。要求执行器暴露 Http 调用接口来达到跨平台的目的。
  • AOP:切面编程技术。在 Spring 项目扩展包 Hunter 中使用,保证 Task 被串行调用(单例单线程)。
  • Quartz:功能强大,应用灵活,对单个任务的控制基本做到了极致,用来作为调度中心时钟组件。
  • MySQL:用于元数据存储与(暂时)日志存取。
  • Elastic:基于 Lucene 的搜索服务器,提供了一个分布式多用户能力的全文搜索引擎,用于日志的存储与查询。
  • SpringCloud:社区活跃的开发框架,也是公司指定的统一开发框架。用于快速开发,快速迭代。
  • MyBatis:一款优秀的持久层框架,支持定制化 SQL,存储过程以及高级映射。用于简化持久层开发。
  • Zookeeper:久经考验的注册中心。用来解决调度中心高可用,分布式一致性等问题。

3.2 SIA-TASK设计思想

SIA-TASK借鉴微服务设计思想,获取分布在每个执行器节点上的任务(Task)元数据,进行汇报,上传注册中心。利用在线可编辑方式支持任务在线编排、动态修改任务时钟;使用 Http 协议作为交互传输协议。数据交互格式统一使用Json。用户通过编排器(下文会做介绍)进行操作,触发事件,调度器接收事件,由调度中心进行时钟解析,执行任务流程,进行任务通知。

3.3 SIA-TASK基本概念

SIA-TASK 采用任务和调度分离的方式,业务的执行任务逻辑和调度逻辑完全分离。系统组成共涉及以下几个核心概念:

  • 任务(Task): 基本执行单元,执行器对外暴露的一个HTTP调用接口。
  • 作业(Job): 由一个或者多个存在相互逻辑关系(串行/并行)的任务组成,任务调度中心调度的最小单位。
  • 计划(Plan): 由若干个顺序执行的作业组成,每个作业都有自己的执行周期,计划没有执行周期。
  • 任务调度中心(Scheduler): 根据每个的作业的执行周期进行调度,即按照计划、作业、任务的逻辑进行HTTP请求。
  • 任务编排中心(Config): 编排中心使用任务来创建计划和作业。
  • 任务执行器(Executer): 接收HTTP请求进行业务逻辑的执行。
  • Hunter:Spring项目扩展包,负责执行器中的任务抓取,上传注册中心,业务可依赖该组件进行Task编写。

3.4 SIA-TASK系统架构

SIA-TASK 可以分为三大模块(调度中心、编排中心和执行器)、两大组件(持久化存储和注册中心)。这三大模块和两大组件的作用如下:

  • 任务调度中心:负责抢占Job和任务调度以及任务迁移等,是SIA-TASK 的核心功能模块。
  • 任务编排中心:负责对在线任务进行逻辑编排,提供日志查看和实时监控功能。
  • 任务执行器:负责接收调度请求并执行任务逻辑。
  • 任务注册中心(ZK):协调Job和Task、调度器等的工作流程。
  • 持久化存储(DB):记录项目的Job和Task数据,并提供日志存储。

SIA-TASK 使用 SpringBoot 体系作为架构选型,基于Quartz及Zookeeper进行二次开发,支持相应的特性功能,SIA-TASK 的逻辑架构图如下图所示:

逻辑架构图

3.5 SIA-TASK模块说明

3.5.1 任务调度中心

任务调度中心负责任务调度,管理调度信息,按照调度配置发出调度请求,自身不承担业务代码。调度系统与任务解耦,提高了系统可用性和稳定性,同时调度系统性能不再受限于任务模块;支持可视化、简单且动态地管理调度信息,包括任务新建,更新,删除和任务报警等,所有上述操作都会实时生效,同时支持监控调度结果以及执行日志,支持执行器故障恢复。

3.5.2 任务编排中心

任务编排中心是分布式调度中心支持在线任务模型编排的组件;依托于UI可进行web端任务编排。

我们可以通过上述基础模型来编排一些复杂的调度模型,例如:

调度模型

SIA-TASK的UI编排界面:

UI编排界面

编排结束后查看task的编排信息如下图所示:

编排信息

同时,编排中心还提供首页统计数据查看、调度监控、Job管理、Task管理以及日志管理功能。

3.5.3 任务执行器

负责接收调度请求并执行任务逻辑。任务模块专注于任务的执行等操作,开发和维护更加简单和高效;

执行器支持两种类型:

(1) 如果使用 sia-task-hunter,支持SpringBoot项目和Spring项目, 引入 sia-task-hunter,任务(Task)抓取客户端。合规的HTTP接口(称之为Task)任务会自动被抓取并上传注册中心;

(2) 如果不使用 sia-task-hunter,只需提供任务可调用的HTTP接口,此时需要业务手动录入,且自行控制该任务的并发调用控制。

3.5.4 任务注册中心(Zookeeper)

分布式框架采用Zookeeper作为注册中心。

注册中心

(1) 任务注册

调度中心和执行集群都以Zookeeper作为注册中心,所有数据以节点及节点内容的形式注册,通过定时汇报主机状态保持存活在Zookeeper上。

(2) 元数据存储

注册中心不仅仅提供注册服务,并且存储每个执行器的信息(包括执行器实例信息,执行器上传的Task元数据,以及任务运行时的一些临时状态数据)。

(3) 事件发布

基于Zookeeper事件推送机制,进行任务的发布,通过平衡算法保证调度器任务抢占的分布均衡。

(4) 负载均衡

保证调度器获取执行Job的个数均衡,避免单一节点压力。

3.5.5 持久化存储(DB)

这里采用MySQL作为数据持久化解决方案。

除了Task动态元数据保存在注册中心之外,其他相关的元数据都存入MySQL,包括但不限于:手动录入的Task、配置的Job信息、编排的Task依赖信息、调度日志、业务人员操作日志、Task执行日志等。

3.6 SIA-TASK关键运行流程

3.6.1 任务发布流程

任务发布流程

(1) 用户可以通过UI进行Job创建。可以选择Job类型,设置预警邮箱,设置Job描述。然后为创建的Job进行任务Task编排。

(2) Job创建完毕并且设置Task编排关系后可进行任务发布,通过UI对相应的Job进行操作(激活,执行一次,停止以及删除操作)。

(3) 用户的Task任务可以是通过抓取器抓取的,亦可以使用UI手动创建。

  从图中可以看到,ConcurrentHashMap离不开Segment,Segment是ConcurrentHashMap的一个静态内部类,可以看到Segment继承了重入锁ReentrantLock,要想访问Segment片段,线程必须获得同步锁,结构如下:
  
  static final class Segment<K,V> extends ReentrantLock implements Serializable {
  
  //尝试获取锁的最多尝试次数,即自旋次数
  
  static final int MAX_SCAN_RETRIES =
  
  Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
  
  //HashEntry数组,也就是键值对数组
  
  transient volatile HashEntry<K, V>[] table;
  
  //元素的个数
  
  transient int count;
  
  //segment中发生改变元素的操作的次数,如put/remove
  
  transient int modCount;
  
  //当table大小超过阈值时,对table进行扩容,值为capacity *loadFactor
  
  transient int threshold;
  
  //加载因子
  
  final float loadFactor;
  
  Segment(float lf, int threshold, HashEntry<K, V>[] tab) {
  
  this.loadFactor = lf;
  
  this.threshold = threshold;
  
  this.table = tab;
  
  }
  
  }
  
    键值对HashEntry是ConcurrentHashMap的基本数据结构,多个HashEntry可以形成链表用于解决hash冲突。
  
  static final class HashEntry<K,V> {
  
  //hash值
  
  final int hash;
  
  //键
  
  final K key;
  
  //值
  
  volatile V value;
  
  //下一个键值对
  
  volatile HashEntry<K, V> next;
  
  HashEntry(int hash, K key, V value, HashEntry<K, V> next) {
  
  this.hash = hash;
  
  this.key = key;
  
  this.value = value;
  
  this.next = next;
  
  }
  
  }
  
    ConcurrentHashMap成员变量和构造方法如下:
  
  public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
  
  implements ConcurrentMap<K, V>, Serializable {
  
  private static final long serialVersionUID = 7249069246763182397L;
  
  //默认的初始容量
  
  static final int DEFAULT_INITIAL_CAPACITY = 16;
  
  //默认加载因子
  
  static final float DEFAULT_LOAD_FACTOR = 0.75f;
  
  //默认的并发度,也就是默认的Segment数组长度
  
  static final int DEFAULT_CONCURRENCY_LEVEL = 16;
  
  //最大容量,ConcurrentMap最大容量
  
  static final int MAXIMUM_CAPACITY = 1 << 30;
  
  //每个segment中table数组的长度,必须是2^n,最小为2
  
  static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
  
  //允许最大segment数量,用于限定concurrencyLevel的边界,必须是2^n
  
  static final int MAX_SEGMENTS = 1 << 16; // slightly conservative
  
  //非锁定情况下调用size和contains方法的重试次数,避免由于table连续被修改导致无限重试
  
  static final int RETRIES_BEFORE_LOCK = 2;
  
  //计算segment位置的掩码值
  
  final int segmentMask;
  
  //用于计算算segment位置时,hash参与运算的位数
  
  final int segmentShift;
  
  //Segment数组
  
  final Segment<K,V>[] segments;
  
  public ConcurrentHashMap(int initialCapacity,
  
  float loadFactor, int concurrencyLevel) {
  
  //参数校验
  
  if (!(loadFactor > 0) || initialCapacity <www.yuntianyuL.cn 0 || concurrencyLevel <= 0)
  
  throw new IllegalArgumentException();
  
  if (concurrencyLevel > MAX_SEGMENTS)
  
  concurrencyLevel = MAX_SEGMENTS;
  
  // Find power-of-two sizes best matching arguments
  
  //找到一个大于等于传入的concurrencyLevel的2^n数,且与concurrencyLevel最接近
  
  //ssize作为Segment数组
  
  int sshift = 0;
  
  int ssize = 1;
  
  while (ssize <www.yuntianyul.com concurrencyLevel) {
  
  ++sshift;
  
  ssize <<= 1;
  
  }
  
  this.segmentShift = 32 - sshift;
  
  this.segmentMask = ssize - 1;
  
  if (initialCapacity > MAXIMUM_CAPACITY)
  
  initialCapacity = MAXIMUM_CAPACITY;
  
  // 计算每个segment中table的容量
  
  int c = initialCapacity / ssize;
  
  if (c * ssize < initialCapacity)
  
  ++c;
  
  int cap = MIN_SEGMENT_TABLE_CAPACITY;
  
  // 确保cap是2^n
  
  while (cap < c)
  
  cap <<= 1;
  
  // create segments and segments[0]
  
  // 创建segments并初始化第一个segment数组,其余的segment延迟初始化
  
  Segment<K,V> s0 =
  
  new Segment<K,V>(loadFactor, (www.qilinchengdl.cn int)(cap * loadFactor),
  
  (HashEntry<K,V>[])new HashEntry[cap]);
  
  Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
  
  UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
  
  this.segments = ss;
  
  }
  
  }
  
  concurrencyLevel 参数表示期望并发的修改 ConcurrentHashMap 的线程数量,用于决定 Segment 的数量,通过算法可以知道就是找到最接近传入的concurrencyLevel的2的幂次方。而segmentMask 和 segmentShift看上去有点难以理解,作用主要是根据key的hash值做计算定位在哪个Segment片段。
  
  对于哈希表而言,最重要的方法就是put和get了,下面分别来分析这两个方法的实现:
  
  put(K key, V value)
  
    put方法实际上只有两步:1.根据键的值定位键值对在那个segment片段 2.调用Segment的put方法
  
  public V put(K key, V value) {
  
  Segment<K,V> s;
  
  if (value ==www.cmylli.com null)
  
  throw new NullPointerException();
  
  //计算键的hash值
  
  int hash = hash(key);
  
  //通过hash值运算把键值对定位到segment[j]片段上
  
  int j = (hash >>> segmentShift) & segmentMask;
  
  //检查segment[j]是否已经初始化了,没有的话调用ensureSegment初始化segment[j]
  
  if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
  
  (segments, (j <www.chengmyuLegw.cn< SSHIFT) + SBASE)) == null) // in ensureSegment
  
  s = ensureSegment(j);
  
  //向片段中插入键值对
  
  return s.put(key, hash, value, false);
  
  }
  
  ensureSegment(int k)
  
    我们从ConcurrentHashMap的构造函数可以发现Segment数组只初始化了Segment[0],其余的Segment是用到了在初始化,用了延迟加载的策略,而延迟加载调用的就是ensureSegment方法
  
  private Segment<K,V> ensureSegment(int k) {
  
  final Segment<K,V>[] ss = this.segments;
  
  long u = (k << SSHIFT) + SBASE; // raw offset
  
  Segment<K,V> seg;
  
  //按照segment[0]的HashEntry数组长度和加载因子初始化Segment[k]
  
  if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
  
  Segment<K,V> proto = ss[0]; // use segment 0 as prototype
  
  int cap = proto.table.length;
  
  float lf = proto.loadFactor;
  
  int threshold = (int)(cap www.cmylept.cn* lf);
  
  HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
  
  if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
  
  == null) { // recheck
  
  Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
  
  while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
  
  == null) {
  
  if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
  
  break;
  
  }
  
  }
  
  }
  
  return seg;
  
  }
  
  put(K key, int hash, V value, boolean onlyIfAbsent)
  
    调用Segment的put方法插入键值对到Segment的HashEntry数组
  
  final V put(K key, int hash, V value, boolean onlyIfAbsent) {
  
  //Segment继承ReentrantLock,尝试获取独占锁
  
  HashEntry<K,V> node =www.ztyLegw.cn tryLock() ? null :
  
  scanAndLockForPut(key, hash, value);
  
  V oldValue;
  
  try {
  
  HashEntry<K,V>[] tab = table;
  
  //定位键值对在HashEntry数组上的位置
  
  int index = (tab.length - 1) & hash;
  
  //获取这个位置的第一个键值对
  
  HashEntry<K,V> first = entryAt(tab, index);
  
  for (HashEntry<K,V> e = first;;) {
  
  if (e != null) {//此处有链表结构,一直循环到e==null
  
  K k;
  
  //存在与待插入键值对相同的键,则替换value
  
  if ((k = e.key) == key ||
  
  (e.hash == hash && key.equals(k))) {
  
  oldValue = e.value;
  
  if (!onlyIfAbsent) {www.tcgjgw.com//onlyIfAbsent默认为false
  
  e.value = value;
  
  ++modCount;
  
  }
  
  break;
  
  }
  
  e = e.next;
  
  }
  
  else {
  
  //node不为null,设置node的next为first,node为当前链表的头节点
  
  if (node != null)
  
  node.setNext(first);
  
  //node为null,创建头节点,指定next为first,node为当前链表的头节点
  
  else
  
  node = new HashEntry<K,V>(hash, key, value, first);
  
  int c = count + 1;
  
  //扩容条件 (www.feironggw.cn)entry数量大于阈值 (2) 当前数组tab长度小于最大容量。满足以上条件就扩容
  
  if (c > threshold && tab.length < MAXIMUM_CAPACITY)
  
  //扩容
  
  rehash(node);
  
  else
  
  //tab的index位置设置为node,
  
  setEntryAt(tab, index, node);
  
  ++modCount;
  
  count = c;
  
  oldValue = null;

3.6.2 执行流程

执行流程

(1) Job创建完成之后,可以选择激活触发定时任务;

(2) Job到达预订时间后,调度中心触发Job,然后按照预定的Task编排逻辑通过http通知Task执行器进行执行,并异步监听任务执行结果;

(3) 若执行结果成功,则判断是否存在后置Task,若存在,则继续下一次调度,若不存在,则说明该Job执行完毕,结束本次调用;若执行结果失败,则触发故障恢复策略:立即停止、忽略本次失败、多次尝试、转到其它执行器执行。

3.6.3 状态流转

Job在整个生命周期内存在四种状态,分别是:已停止(NULL)、准备中(READY)、开始运行(RUNNING)、异常停止(STOP),状态流转及流转条件如下图所示。

状态流转

3.7 SIA-TASK模块设计

SIA-TASK 的物理网络拓扑图如下所示:

网络拓扑图

SIA-TASK 的模块间交互设计思路:

(1) 通过编排中心创建Task任务或通过Hunter自动抓取,并将 Task 信息异步保存到DB;创建Job并激活,在zookeeper中创建JobKey。

(2) 调度中心会监听zookeeper中JobKey创建事件,然后抢占创建的Job,抢占成功后加入quartz定时任务,当时间到达即触发Job运行。调度中心异步调用执行器服务执行Job中的 Task (可能存在多个 Task ,遵循 Task 失败策略),并将结果返回到调度中心。

(3) 将Job执行状态随时在zookeeper上更改,通过编排中心的查询接口可以进行查询。

(4) Job执行结束后,等待下一次执行。

3.7.1 任务编排中心设计

编排中心可以与DB和zookeeper进行数据交互,其主要功能可分为三方面:

  • 数据持久化接口服务;
  • zookeeper上元数据变更;
  • 数据可视化:查看系统各种统计数据等。

编排中心首页监控展示如下:

首页监控

3.7.2 任务调度中心设计

调度中心主要与DB、ZK和执行器进行交互,其主要功能可分为以下几个方面:

  • Job执行日志记录
  • ZK中Job状态变更
  • 调用执行器服务执行Job
  • 调度中心高可用
  • Job 调度线程池
3.7.3 任务执行器设计

执行器可以与ZK和调度中心进行交互,其主要功能可分为两个方面:

  • 接受调度中心的调度,执行定时任务,并将结果返回到调度中心;
  • 自动抓取执行器上的 Task 任务,提交到ZK。

执行器 Task示例:

@OnlineTask(description = "在线任务示例",enableSerial=true)
@RequestMapping(value = "/example", method = { RequestMethod.POST }, produces = "application/json;charset=UTF-8") @CrossOrigin(methods = { RequestMethod.POST }, origins = "*") @ResponseBody public String example(@RequestBody String json) { /** * TODO:客户端业务逻辑处理 */ Map<String, String> info = new HashMap<String, String>(); info.put("status", "success"); info.put("result", "as you need"); return JSONHelper.toString(info); } 

由此可见,任务 Task 编写非常简单。

3.8 SIA-TASK高可用设计

分布式服务一般都要考虑高可用方案,同样 SIA-TASK 为了保证高可用,针对不同的服务组件进行了不同维度增强。

3.8.1 任务编排中心的高可用

SIA-TASK 通过前后端分离、服务拆分等措施实现了编排中心的高可用。当集群中某实例失效后,不会影响集群的其它实例,因此无需特殊操作即可使用集群中其它的可用编排中心。

3.8.2 任务调度中心的高可用
3.8.2.1 异常转移

如果调度中心集群中的某个实例节点服务宕机后,这个实例节点上的所有Job会平滑迁移到集群中可用的实例上,不会造成定时任务的执行缺失,同时,当崩溃后的实例修复成功重新接入该集群时,会继续抢占Job提供服务。

3.8.2.2 配置线程池

调度采用线程池方式实现,避免单线程因阻塞而引起任务调度延迟。程池里的线程数,默认值是10,当执行任务会并发执行多个耗时任务时,要根据业务特点选择线程池的大小。

org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool org.quartz.threadPool.threadCount = 60
org.quartz.threadPool.threadPriority = 5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread = true

SIA-TASK 根据quartz自身提供的threadPool再次进行线程池的利用。进行线程池重新定义,针对每个Job去分配一个独有的线程池。线程池的大小可根据Job自身编排的 Task 个数的大小进行动态伸缩,从而保证每个Job的调度线程完全独立,不在会因为编排 Task 个数的陡增而耗尽线程资源。同时提供线程池资源的回收逻辑,在Job进行永久性终止时回收为期分配的线程池资源。

public static ExecutorService getExecutorService(String JobKey) { ExecutorService exec = executorPool.get(JobKey); if (exec == null) { LOGGER.info(Constants.LOG_PREFIX + "Initialize thread pool for running Jobs,Job is {}",JobKey); exec = Executors.newCachedThreadPool(); executorPool.putIfAbsent(JobKey, exec); exec = executorPool.get(JobKey); } return exec; } 
3.8.2.3 全日志跟踪

SIA-TASK 针对Job的整个调度生命周期进行全面跟踪,利用AOP进行日志增强,调度中心每触发一次Job调度就会进行日志记录。同时针对Job编排的 Task 执行也会进行记录任务日志。

日志分为Job日志和 Task 日志:

  • Job日志:包含调度器信息、调度时间、调度状态以及其他附加属性。
  • Task日志:包含执行器信息、执行时间、执行状态、返回信息以及其他附加属性。
3.8.2.4 异步封装
  • SIA-TASK 从一开始设计就考虑了任务进行远程调用对调度中心并发线程资源的损耗。对于Job封装的 Task 远程调度,全部采用异步调用,每次任务请求逻辑的耗时非常的轻量化。只仅仅一次见到的http请求。
  • 支持 Task 进行用户自定义超时设置,支持两种模式的超时:connecttimeout、readtimeout。支持用户根据业务的具体执行周期来进行超时设置。
public interface RestTemplate {/** * 异步Post方法 * @param request * @param responseType * @param uriVariables * @param <T> * @return */ <T> ListenableFuture<ResponseEntity<T>> postAsyncForEntity(Request request, Class<T> responseType, Object... uriVariables); } 
3.8.2.5 自定义调度器资源池

调度器资源池

SIA-TASK 从物理资源角度设计了调度资源池,出于一些特殊情况的考量我们针对调度器进行了池化;调度器可以通过不同的操作进行状态的转变,从而进行能力的转化。

  • 工作调度器资源池:管理具备获取任务能力并且可以实际获取任务的调度器资源。
  • 下线调度器资源池:管理具备获取任务能力但是实际不允许获取的调度器资源。
  • 离线调度器资源池:管理下线调度器资源池中已经宕机的调度器资源。
3.8.3 任务执行器的高可用
  • 考虑网络的不稳定性,SIA-TASK 针对网络的不稳定性也做出了非常重要的设计,对于节点的连通性的测试支持以及针对 Task 运行实例节点健康的预感知,保证提前感知 Task 实例节点的健康情况,保证调度 Task 高可用。

  • 同时也保证了执行器实例针对网络导致链接中断的问题,SIA-TASK 重新设计了zookeeper的重连机制,保证 Task 运行实例节点因网络问题丢失链接后还能进行恢复重试,直到恢复正常后并入执行池中正常接收任务的调度。

  • 一般来说,执行器也是集群部署的。作为 Task 的执行单元,如果在执行器集群中一台机器上执行失败,调度中心会根据失败策略来做故障转移。这里提供了两种故障转移策略:轮询转移和最大补偿转移。轮询转移为对可用的执行器列表进行轮询,若有一个执行器执行成功,则 Task 执行成功,若全部执行失败,则 Task 执行失败。最大补偿转移为首先在本执行器再次执行若干次,若执行成功,则不会转移,若还是执行失败,则执行轮询转移策略。

四、总结

至此对微服务任务调度平台 SIA-TASK 做了一个简要的介绍,包括设计背景、架构设计以及产品组件功能与特性。微服务任务调度平台 SIA-TASK 基本上解决了当前的业务需求,提供简单高效的编排调度服务。SIA-TASK 会持续迭代,提供更为完善的服务。之后也会提供相关技术文档和使用文档。

转载于:https://www.cnblogs.com/qwangxiao/p/10972288.html

这篇关于分布式任务调度平台SIA-TASK的架构设计与运行流程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/380285

相关文章

Security OAuth2 单点登录流程

单点登录(英语:Single sign-on,缩写为 SSO),又译为单一签入,一种对于许多相互关连,但是又是各自独立的软件系统,提供访问控制的属性。当拥有这项属性时,当用户登录时,就可以获取所有系统的访问权限,不用对每个单一系统都逐一登录。这项功能通常是以轻型目录访问协议(LDAP)来实现,在服务器上会将用户信息存储到LDAP数据库中。相同的,单一注销(single sign-off)就是指

Spring Security基于数据库验证流程详解

Spring Security 校验流程图 相关解释说明(认真看哦) AbstractAuthenticationProcessingFilter 抽象类 /*** 调用 #requiresAuthentication(HttpServletRequest, HttpServletResponse) 决定是否需要进行验证操作。* 如果需要验证,则会调用 #attemptAuthentica

流媒体平台/视频监控/安防视频汇聚EasyCVR播放暂停后视频画面黑屏是什么原因?

视频智能分析/视频监控/安防监控综合管理系统EasyCVR视频汇聚融合平台,是TSINGSEE青犀视频垂直深耕音视频流媒体技术、AI智能技术领域的杰出成果。该平台以其强大的视频处理、汇聚与融合能力,在构建全栈视频监控系统中展现出了独特的优势。视频监控管理系统EasyCVR平台内置了强大的视频解码、转码、压缩等技术,能够处理多种视频流格式,并以多种格式(RTMP、RTSP、HTTP-FLV、WebS

如何用Docker运行Django项目

本章教程,介绍如何用Docker创建一个Django,并运行能够访问。 一、拉取镜像 这里我们使用python3.11版本的docker镜像 docker pull python:3.11 二、运行容器 这里我们将容器内部的8080端口,映射到宿主机的80端口上。 docker run -itd --name python311 -p

综合安防管理平台LntonAIServer视频监控汇聚抖动检测算法优势

LntonAIServer视频质量诊断功能中的抖动检测是一个专门针对视频稳定性进行分析的功能。抖动通常是指视频帧之间的不必要运动,这种运动可能是由于摄像机的移动、传输中的错误或编解码问题导致的。抖动检测对于确保视频内容的平滑性和观看体验至关重要。 优势 1. 提高图像质量 - 清晰度提升:减少抖动,提高图像的清晰度和细节表现力,使得监控画面更加真实可信。 - 细节增强:在低光条件下,抖

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

如何解决线上平台抽佣高 线下门店客流少的痛点!

目前,许多传统零售店铺正遭遇客源下降的难题。尽管广告推广能带来一定的客流,但其费用昂贵。鉴于此,众多零售商纷纷选择加入像美团、饿了么和抖音这样的大型在线平台,但这些平台的高佣金率导致了利润的大幅缩水。在这样的市场环境下,商家之间的合作网络逐渐成为一种有效的解决方案,通过资源和客户基础的共享,实现共同的利益增长。 以最近在上海兴起的一个跨行业合作平台为例,该平台融合了环保消费积分系统,在短

Android平台播放RTSP流的几种方案探究(VLC VS ExoPlayer VS SmartPlayer)

技术背景 好多开发者需要遴选Android平台RTSP直播播放器的时候,不知道如何选的好,本文针对常用的方案,做个大概的说明: 1. 使用VLC for Android VLC Media Player(VLC多媒体播放器),最初命名为VideoLAN客户端,是VideoLAN品牌产品,是VideoLAN计划的多媒体播放器。它支持众多音频与视频解码器及文件格式,并支持DVD影音光盘,VCD影

系统架构师考试学习笔记第三篇——架构设计高级知识(20)通信系统架构设计理论与实践

本章知识考点:         第20课时主要学习通信系统架构设计的理论和工作中的实践。根据新版考试大纲,本课时知识点会涉及案例分析题(25分),而在历年考试中,案例题对该部分内容的考查并不多,虽在综合知识选择题目中经常考查,但分值也不高。本课时内容侧重于对知识点的记忆和理解,按照以往的出题规律,通信系统架构设计基础知识点多来源于教材内的基础网络设备、网络架构和教材外最新时事热点技术。本课时知识

【区块链 + 人才服务】区块链集成开发平台 | FISCO BCOS应用案例

随着区块链技术的快速发展,越来越多的企业开始将其应用于实际业务中。然而,区块链技术的专业性使得其集成开发成为一项挑战。针对此,广东中创智慧科技有限公司基于国产开源联盟链 FISCO BCOS 推出了区块链集成开发平台。该平台基于区块链技术,提供一套全面的区块链开发工具和开发环境,支持开发者快速开发和部署区块链应用。此外,该平台还可以提供一套全面的区块链开发教程和文档,帮助开发者快速上手区块链开发。