7.实现任务的rebalance

2023-12-18 07:12
文章标签 实现 任务 rebalance

本文主要是介绍7.实现任务的rebalance,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1.设计

1.1 背景

系统启动后,所有任务都在被执行,如果这时某个节点宕机,那它负责的任务就不能执行了,这对有稳定性要求的任务是不能接受的,所以系统要实现rebalance的功能。

1.2 设计

下面是Job分配与执行的业务点,重分配就是在 follower下线、controller下线、节点新上线进行重分配。理清楚接下来实现就是水到渠成了

2. 实现

2.1 RebalanceJobType

定义了重平衡job的类型

public enum RebalanceJobType {FOLLOWER_OFFLINE(0), CONTROLLER_OFFLINE(1), NODE_ONLINE(2);private int code;RebalanceJobType(int code) {this.code = code;}public boolean isFollowerOffline() {return this.code == FOLLOWER_OFFLINE.code;}public boolean isControllerOffline() {return this.code == CONTROLLER_OFFLINE.code;}public boolean isNodeOnline() {return this.code == NODE_ONLINE.code;}}

2.2 AverageJobAllotStrategy

添加了 rebalanceJob的方法,只有Controller才能调用,对不同的重平衡情况进行分别处理

private Map<Long, List<DttaskJob>> getDttaskJobMap() {List<DttaskJob> allDttaskJob = getAllDttaskJob();return average(allDttaskJob);
}@Override
public void rebalanceJob(RebalanceJobContext rebalanceJobContext) {if (rebalanceJobContext.getType().isFollowerOffline()|| rebalanceJobContext.getType().isControllerOffline()) {long offlineServerId = rebalanceJobContext.getServerId();log.info("{}节点={}下线->重平衡job={}",rebalanceJobContext.getType().isFollowerOffline() ? "follower" : "controller",offlineServerId,rebalanceJobContext);List<DttaskJob> dttaskJobs = getByDttaskId(offlineServerId);List<NodeInfo> nodeInfoList = ServerInfo.getNodeInfoList();Map<Long, List<DttaskJob>> allotMap = new HashMap<>();int i = 0;int nodeCount = nodeInfoList.size();while (i < dttaskJobs.size()) {DttaskJob dttaskJob = dttaskJobs.get(i);NodeInfo nodeInfo = nodeInfoList.get(i % nodeCount);i++;List<DttaskJob> dttaskJobList = allotMap.getOrDefault(nodeInfo.getServerId(), new ArrayList<>());dttaskJobList.add(dttaskJob);allotMap.put(nodeInfo.getServerId(), dttaskJobList);}executeDttaskJob(new ExecuteDttaskJobContext(allotMap, true));} else if (rebalanceJobContext.getType().isNodeOnline()) {log.info("节点上线->重平衡job={}", rebalanceJobContext);long onlineServerId = rebalanceJobContext.getServerId();Map<Long, List<DttaskJob>> dttaskJobMap = BeanUseHelper.entityHelpService().queryDttaskJob();Map<Long, List<DttaskJob>> allotDttaskJobMap = getDttaskJobMap();Map<Long, List<DttaskJob>> stopDttaskJobMapOfOldNodes = new HashMap<>();Map<Long, List<DttaskJob>> startDttaskJobMapOfNewNodes = new HashMap<>();List<DttaskJob> startDttaskJobs = new ArrayList<>();dttaskJobMap.forEach((serverId, dttaskJobList) -> {int size = dttaskJobList.size();int newSize = allotDttaskJobMap.get(serverId).size();if (size > newSize) {List<DttaskJob> dttaskJobs = dttaskJobList.subList(0, size - newSize);stopDttaskJobMapOfOldNodes.put(serverId, dttaskJobs);startDttaskJobs.addAll(dttaskJobs);}});startDttaskJobMapOfNewNodes.put(onlineServerId, startDttaskJobs);executeDttaskJob(new ExecuteDttaskJobContext(stopDttaskJobMapOfOldNodes, false));executeDttaskJob(new ExecuteDttaskJobContext(startDttaskJobMapOfNewNodes, true));}
}

2.3 ServerClientChannelHandler

对节点下线进行重平衡处理

2.4 NodeOnlineMessageService

3. 测试

启动三个节点,节点完成选举,每个节点执行2个任务

  • 3号节点下线

1 2 节点各分配了一个任务继续执行

  • 3号节点上线

新上线的3号节点,重新得到2个任务,1 2节点各停止一个任务

至此,节点上下线的任务重平衡完成

这篇关于7.实现任务的rebalance的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/507519

相关文章

Python如何实现读取csv文件时忽略文件的编码格式

《Python如何实现读取csv文件时忽略文件的编码格式》我们再日常读取csv文件的时候经常会发现csv文件的格式有多种,所以这篇文章为大家介绍了Python如何实现读取csv文件时忽略文件的编码格式... 目录1、背景介绍2、库的安装3、核心代码4、完整代码1、背景介绍我们再日常读取csv文件的时候经常

Golang中map缩容的实现

《Golang中map缩容的实现》本文主要介绍了Go语言中map的扩缩容机制,包括grow和hashGrow方法的处理,具有一定的参考价值,感兴趣的可以了解一下... 目录基本分析带来的隐患为什么不支持缩容基本分析在 Go 底层源码 src/runtime/map.go 中,扩缩容的处理方法是 grow

Go 1.23中Timer无buffer的实现方式详解

《Go1.23中Timer无buffer的实现方式详解》在Go1.23中,Timer的实现通常是通过time包提供的time.Timer类型来实现的,本文主要介绍了Go1.23中Timer无buff... 目录Timer 的基本实现无缓冲区的实现自定义无缓冲 Timer 实现更复杂的 Timer 实现总结在

基于Python实现多语言朗读与单词选择测验

《基于Python实现多语言朗读与单词选择测验》在数字化教育日益普及的今天,开发一款能够支持多语言朗读和单词选择测验的程序,对于语言学习者来说无疑是一个巨大的福音,下面我们就来用Python实现一个这... 目录一、项目概述二、环境准备三、实现朗读功能四、实现单词选择测验五、创建图形用户界面六、运行程序七、

Vue中动态权限到按钮的完整实现方案详解

《Vue中动态权限到按钮的完整实现方案详解》这篇文章主要为大家详细介绍了Vue如何在现有方案的基础上加入对路由的增、删、改、查权限控制,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、数据库设计扩展1.1 修改路由表(routes)1.2 修改角色与路由权限表(role_routes)二、后端接口设计

C#集成DeepSeek模型实现AI私有化的流程步骤(本地部署与API调用教程)

《C#集成DeepSeek模型实现AI私有化的流程步骤(本地部署与API调用教程)》本文主要介绍了C#集成DeepSeek模型实现AI私有化的方法,包括搭建基础环境,如安装Ollama和下载DeepS... 目录前言搭建基础环境1、安装 Ollama2、下载 DeepSeek R1 模型客户端 ChatBo

Qt实现发送HTTP请求的示例详解

《Qt实现发送HTTP请求的示例详解》这篇文章主要为大家详细介绍了如何通过Qt实现发送HTTP请求,文中的示例代码讲解详细,具有一定的借鉴价值,感兴趣的小伙伴可以跟随小编一起学习一下... 目录1、添加network模块2、包含改头文件3、创建网络访问管理器4、创建接口5、创建网络请求对象6、创建一个回复对

C++实现回文串判断的两种高效方法

《C++实现回文串判断的两种高效方法》文章介绍了两种判断回文串的方法:解法一通过创建新字符串来处理,解法二在原字符串上直接筛选判断,两种方法都使用了双指针法,文中通过代码示例讲解的非常详细,需要的朋友... 目录一、问题描述示例二、解法一:将字母数字连接到新的 string思路代码实现代码解释复杂度分析三、

grom设置全局日志实现执行并打印sql语句

《grom设置全局日志实现执行并打印sql语句》本文主要介绍了grom设置全局日志实现执行并打印sql语句,包括设置日志级别、实现自定义Logger接口以及如何使用GORM的默认logger,通过这些... 目录gorm中的自定义日志gorm中日志的其他操作日志级别Debug自定义 Loggergorm中的

Spring Boot整合消息队列RabbitMQ的实现示例

《SpringBoot整合消息队列RabbitMQ的实现示例》本文主要介绍了SpringBoot整合消息队列RabbitMQ的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的... 目录RabbitMQ 简介与安装1. RabbitMQ 简介2. RabbitMQ 安装Spring