本文主要是介绍elastic-job(通过zookeeper管理元数据),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
任务名称:
namespace
/elastic-job-demo
|---------demoSimpleJob1
|-------------leader
|-----election
|--------instance[领导者]
|
|--------latch[选举]
|-----sharding
|--------processing
|
|--------necessary[是否需要分片标识]
|-----failover
|
|--------latch
|
|--------items(failover分片)
|
|-------0 failover分片1
|
|-------1 failover分片2
|-------------servers(机器ip相关节点)
|
|---------ip1
|
|---------ip2
|
|---------ip3
|
|---------ip4
|-------------config(作业配置信息,json数据)
|
|
|-------------instances(运行实例节点)
|
|
|
|----------------------ip1+进程号
|
|
|----------------------ip2+进程号
|-------------sharding
|----------1(分片1)
|
|
|---------running:运行标识,临时节点
|
|---------instance:该分片执行的instaceid
|
|---------misfire:错过运行标识
|
|---------failover:failover标识,存储运行该failover分片的instanceid
|
|---------disable:分片是否有效
|----------2(分片2)
|---------demoSimpleJob2
|---------demoSimpleJob3
任务重新分片只是解决了下次任务执行时,所有的分片任务都是分布到各个实例中,但是当前失效的任务是如何处理的。
失效转移。Failover:在执行任务的过程中遇见异常的情况,这个分片任务可以在其他节点再次执行。
首先,在某个任务实例elastic-job会在leader节点下面创建failover节点以及items节点。
items节点下会有失效任务实例的原本应该做好的分片。
处理步骤:
1、每个分片id写到/jobName/leader/failover/items
条件一:${JOB_NAME}/leader/failover/items/${ITEM_ID} 有失效转移的作业分片项。
条件二:当前作业不在运行中。此条件即是上文提交的作业节点空闲的定义。
失效转移: 运行中的作业服务器崩溃不会导致重新分片,只会在下次作业启动时分片。
启用失效转移功能可以在本次作业执行过程中,监测其他作业服务器【空闲】,抓取未完成的孤儿分片项执行。
在FailoverLeaderExecutionCallback中回调逻辑如下:
(1)也会首先判断是否需要失效转移,
(2)从注册中心获得一个 `${JOB_NAME}/leader/failover/items/${ITEM_ID}` 作业分片项,
(3)在注册中心注册节点`${JOB_NAME}/sharding/${ITEM_ID}/failover` 作业分片项为当前作业节点,
(4)然后移除任务转移分片项,【${JOB_NAME}/leader/failover/items/${ITEM_ID}】
(5)最后调用执行,提交任务
// 获得一个 `${JOB_NAME}/leader/failover/items/${ITEM_ID}` 作业分片项
int crashedItem = Integer.parseInt(jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).get(0));
log.debug("Failover job '{}' begin, crashed item '{}'", jobName, crashedItem);
// 设置这个 `${JOB_NAME}/sharding/${ITEM_ID}/failover` 作业分片项 为 当前作业节点
jobNodeStorage.fillEphemeralJobNode(FailoverNode.getExecutionFailoverNode(crashedItem), JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
// 移除这个 `${JOB_NAME}/leader/failover/items/${ITEM_ID}` 作业分片项
jobNodeStorage.removeJobNodeIfExisted(FailoverNode.getItemsNode(crashedItem));
// TODO 不应使用triggerJob, 而是使用executor统一调度 疑问:为什么要用executor统一,后面研究下
// 触发作业执行
JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
if (null != jobScheduleController) {
jobScheduleController.triggerJob();
}
这篇关于elastic-job(通过zookeeper管理元数据)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!