聊聊PowerJob Server的高可用

2024-02-08 15:04
文章标签 聊聊 可用 server powerjob

本文主要是介绍聊聊PowerJob Server的高可用,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

本文主要研究一下PowerJob Server的高可用

PowerJobSpringWorker

tech/powerjob/worker/PowerJobSpringWorker.java

public class PowerJobSpringWorker implements ApplicationContextAware, InitializingBean, DisposableBean {/*** 组合优于继承,持有 PowerJobWorker,内部重新设置 ProcessorFactory 更优雅*/private PowerJobWorker powerJobWorker;private final PowerJobWorkerConfig config;public PowerJobSpringWorker(PowerJobWorkerConfig config) {this.config = config;}@Overridepublic void afterPropertiesSet() throws Exception {powerJobWorker = new PowerJobWorker(config);powerJobWorker.init();}@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {BuiltInSpringProcessorFactory springProcessorFactory = new BuiltInSpringProcessorFactory(applicationContext);BuildInSpringMethodProcessorFactory springMethodProcessorFactory = new BuildInSpringMethodProcessorFactory(applicationContext);// append BuiltInSpringProcessorFactoryList<ProcessorFactory> processorFactories = Lists.newArrayList(Optional.ofNullable(config.getProcessorFactoryList()).orElse(Collections.emptyList()));processorFactories.add(springProcessorFactory);processorFactories.add(springMethodProcessorFactory);config.setProcessorFactoryList(processorFactories);}@Overridepublic void destroy() throws Exception {powerJobWorker.destroy();}
}

PowerJobSpringWorker实现了InitializingBean接口,其afterPropertiesSet会创建powerJobWorker,然后执行其init方法

PowerJobWorker.init

tech/powerjob/worker/PowerJobWorker.java

    public void init() throws Exception {if (!initialized.compareAndSet(false, true)) {log.warn("[PowerJobWorker] please do not repeat the initialization");return;}Stopwatch stopwatch = Stopwatch.createStarted();log.info("[PowerJobWorker] start to initialize PowerJobWorker...");PowerJobWorkerConfig config = workerRuntime.getWorkerConfig();CommonUtils.requireNonNull(config, "can't find PowerJobWorkerConfig, please set PowerJobWorkerConfig first");ServerDiscoveryService serverDiscoveryService = new PowerJobServerDiscoveryService(config);workerRuntime.setServerDiscoveryService(serverDiscoveryService);try {PowerBannerPrinter.print();// 校验 appNameWorkerAppInfo appInfo = serverDiscoveryService.assertApp();workerRuntime.setAppInfo(appInfo);// 初始化网络数据,区别对待上报地址和本机绑定地址(对外统一使用上报地址)String localBindIp = NetUtils.getLocalHost();int localBindPort = config.getPort();String externalIp = PropertyUtils.readProperty(PowerJobDKey.NT_EXTERNAL_ADDRESS, localBindIp);String externalPort = PropertyUtils.readProperty(PowerJobDKey.NT_EXTERNAL_PORT, String.valueOf(localBindPort));log.info("[PowerJobWorker] [ADDRESS_INFO] localBindIp: {}, localBindPort: {}; externalIp: {}, externalPort: {}", localBindIp, localBindPort, externalIp, externalPort);workerRuntime.setWorkerAddress(Address.toFullAddress(externalIp, Integer.parseInt(externalPort)));// 初始化 线程池final ExecutorManager executorManager = new ExecutorManager(workerRuntime.getWorkerConfig());workerRuntime.setExecutorManager(executorManager);// 初始化 ProcessorLoaderProcessorLoader processorLoader = buildProcessorLoader(workerRuntime);workerRuntime.setProcessorLoader(processorLoader);// 初始化 actorTaskTrackerActor taskTrackerActor = new TaskTrackerActor(workerRuntime);ProcessorTrackerActor processorTrackerActor = new ProcessorTrackerActor(workerRuntime);WorkerActor workerActor = new WorkerActor(workerRuntime, taskTrackerActor);// 初始化通讯引擎EngineConfig engineConfig = new EngineConfig().setType(config.getProtocol().name()).setServerType(ServerType.WORKER).setBindAddress(new Address().setHost(localBindIp).setPort(localBindPort)).setActorList(Lists.newArrayList(taskTrackerActor, processorTrackerActor, workerActor));EngineOutput engineOutput = remoteEngine.start(engineConfig);workerRuntime.setTransporter(engineOutput.getTransporter());// 连接 serverserverDiscoveryService.timingCheck(workerRuntime.getExecutorManager().getCoreExecutor());log.info("[PowerJobWorker] PowerJobRemoteEngine initialized successfully.");// 初始化日志系统OmsLogHandler omsLogHandler = new OmsLogHandler(workerRuntime.getWorkerAddress(), workerRuntime.getTransporter(), serverDiscoveryService);workerRuntime.setOmsLogHandler(omsLogHandler);// 初始化存储TaskPersistenceService taskPersistenceService = new TaskPersistenceService(workerRuntime.getWorkerConfig().getStoreStrategy());taskPersistenceService.init();workerRuntime.setTaskPersistenceService(taskPersistenceService);log.info("[PowerJobWorker] local storage initialized successfully.");// 初始化定时任务workerRuntime.getExecutorManager().getCoreExecutor().scheduleAtFixedRate(new WorkerHealthReporter(workerRuntime), 0, config.getHealthReportInterval(), TimeUnit.SECONDS);workerRuntime.getExecutorManager().getCoreExecutor().scheduleWithFixedDelay(omsLogHandler.logSubmitter, 0, 5, TimeUnit.SECONDS);log.info("[PowerJobWorker] PowerJobWorker initialized successfully, using time: {}, congratulations!", stopwatch);}catch (Exception e) {log.error("[PowerJobWorker] initialize PowerJobWorker failed, using {}.", stopwatch, e);throw e;}}

PowerJobWorker的init方法会执行serverDiscoveryService.timingCheck(workerRuntime.getExecutorManager().getCoreExecutor())调度timingCheck

timingCheck

tech/powerjob/worker/background/discovery/PowerJobServerDiscoveryService.java

    public void timingCheck(ScheduledExecutorService timingPool) {this.currentServerAddress = discovery();if (StringUtils.isEmpty(this.currentServerAddress) && !config.isAllowLazyConnectServer()) {throw new PowerJobException("can't find any available server, this worker has been quarantined.");}// 这里必须保证成功timingPool.scheduleAtFixedRate(() -> {try {this.currentServerAddress = discovery();} catch (Exception e) {log.error("[PowerDiscovery] fail to discovery server!", e);}}, 10, 10, TimeUnit.SECONDS);}

PowerJobServerDiscoveryService的timingCheck会使用timingPool定时每隔10s调度执行discovery()来更新当前worker的server地址

discovery

    private String discovery() {// 只有允许延迟加载模式下,appId 才可能为空。每次服务发现前,都重新尝试获取 appInfo。由于是懒加载链路,此处完全忽略异常if (appInfo.getAppId() == null || appInfo.getAppId() < 0) {try {assertApp0();} catch (Exception e) {log.warn("[PowerDiscovery] assertAppName in discovery stage failed, msg: {}", e.getMessage());return null;}}if (ip2Address.isEmpty()) {config.getServerAddress().forEach(x -> ip2Address.put(x.split(":")[0], x));}String result = null;// 先对当前机器发起请求String currentServer = currentServerAddress;if (!StringUtils.isEmpty(currentServer)) {String ip = currentServer.split(":")[0];// 直接请求当前Server的HTTP服务,可以少一次网络开销,减轻Server负担String firstServerAddress = ip2Address.get(ip);if (firstServerAddress != null) {result = acquire(firstServerAddress);}}for (String httpServerAddress : config.getServerAddress()) {if (StringUtils.isEmpty(result)) {result = acquire(httpServerAddress);}else {break;}}if (StringUtils.isEmpty(result)) {log.warn("[PowerDiscovery] can't find any available server, this worker has been quarantined.");// 在 Server 高可用的前提下,连续失败多次,说明该节点与外界失联,Server已经将秒级任务转移到其他Worker,需要杀死本地的任务if (FAILED_COUNT++ > MAX_FAILED_COUNT) {log.warn("[PowerDiscovery] can't find any available server for 3 consecutive times, It's time to kill all frequent job in this worker.");List<Long> frequentInstanceIds = HeavyTaskTrackerManager.getAllFrequentTaskTrackerKeys();if (!CollectionUtils.isEmpty(frequentInstanceIds)) {frequentInstanceIds.forEach(instanceId -> {HeavyTaskTracker taskTracker = HeavyTaskTrackerManager.removeTaskTracker(instanceId);taskTracker.destroy();log.warn("[PowerDiscovery] kill frequent instance(instanceId={}) due to can't find any available server.", instanceId);});}FAILED_COUNT = 0;}return null;} else {// 重置失败次数FAILED_COUNT = 0;log.debug("[PowerDiscovery] current server is {}.", result);return result;}}

discovery方法就是定时遍历配置的serverAddress地址列表,调用server端的acquire方法来获取可用的server

acquireServer

tech/powerjob/server/web/controller/ServerController.java

    @GetMapping("/acquire")public ResultDTO<String> acquireServer(ServerDiscoveryRequest request) {return ResultDTO.success(serverElectionService.elect(request));}

ServerController提供了acquire接口,它执行的是serverElectionService.elect(request)

elect

tech/powerjob/server/remote/server/election/ServerElectionService.java

    public String elect(ServerDiscoveryRequest request) {if (!accurate()) {final String currentServer = request.getCurrentServer();// 如果是本机,就不需要查数据库那么复杂的操作了,直接返回成功Optional<ProtocolInfo> localProtocolInfoOpt = Optional.ofNullable(transportService.allProtocols().get(request.getProtocol()));if (localProtocolInfoOpt.isPresent()) {if (localProtocolInfoOpt.get().getExternalAddress().equals(currentServer) || localProtocolInfoOpt.get().getAddress().equals(currentServer)) {log.info("[ServerElection] this server[{}] is worker[appId={}]'s current server, skip check", currentServer, request.getAppId());return currentServer;}}}return getServer0(request);}

ServerElectionService的elect方法主要是执行getServer0

getServer0

    private String getServer0(ServerDiscoveryRequest discoveryRequest) {final Long appId = discoveryRequest.getAppId();final String protocol = discoveryRequest.getProtocol();Set<String> downServerCache = Sets.newHashSet();for (int i = 0; i < RETRY_TIMES; i++) {// 无锁获取当前数据库中的ServerOptional<AppInfoDO> appInfoOpt = appInfoRepository.findById(appId);if (!appInfoOpt.isPresent()) {throw new PowerJobException(appId + " is not registered!");}String appName = appInfoOpt.get().getAppName();String originServer = appInfoOpt.get().getCurrentServer();String activeAddress = activeAddress(originServer, downServerCache, protocol);if (StringUtils.isNotEmpty(activeAddress)) {return activeAddress;}// 无可用Server,重新进行Server选举,需要加锁String lockName = String.format(SERVER_ELECT_LOCK, appId);boolean lockStatus = lockService.tryLock(lockName, 30000);if (!lockStatus) {try {Thread.sleep(500);}catch (Exception ignore) {}continue;}try {// 可能上一台机器已经完成了Server选举,需要再次判断AppInfoDO appInfo = appInfoRepository.findById(appId).orElseThrow(() -> new RuntimeException("impossible, unless we just lost our database."));String address = activeAddress(appInfo.getCurrentServer(), downServerCache, protocol);if (StringUtils.isNotEmpty(address)) {return address;}// 篡位,如果本机存在协议,则作为Server调度该 workerfinal ProtocolInfo targetProtocolInfo = transportService.allProtocols().get(protocol);if (targetProtocolInfo != null) {// 注意,写入 AppInfoDO#currentServer 的永远是 default 的绑定地址,仅在返回的时候特殊处理为协议地址appInfo.setCurrentServer(transportService.defaultProtocol().getAddress());appInfo.setGmtModified(new Date());appInfoRepository.saveAndFlush(appInfo);log.info("[ServerElection] this server({}) become the new server for app(appId={}).", appInfo.getCurrentServer(), appId);return targetProtocolInfo.getExternalAddress();}}catch (Exception e) {log.error("[ServerElection] write new server to db failed for app {}.", appName, e);} finally {lockService.unlock(lockName);}}throw new PowerJobException("server elect failed for app " + appId);}

getServer0方法会重试10次,它先针对discoveryRequest指定的currentServer进行activeAddress,成功则返回,没有可用server则加锁进行重新分配,这里优先本机判断

activeAddress

    private String activeAddress(String serverAddress, Set<String> downServerCache, String protocol) {if (downServerCache.contains(serverAddress)) {return null;}if (StringUtils.isEmpty(serverAddress)) {return null;}Ping ping = new Ping();ping.setCurrentTime(System.currentTimeMillis());URL targetUrl = ServerURLFactory.ping2Friend(serverAddress);try {AskResponse response = transportService.ask(Protocol.HTTP.name(), targetUrl, ping, AskResponse.class).toCompletableFuture().get(PING_TIMEOUT_MS, TimeUnit.MILLISECONDS);if (response.isSuccess()) {// 检测通过的是远程 server 的暴露地址,需要返回 worker 需要的协议地址final JSONObject protocolInfo = JsonUtils.parseObject(response.getData(), JSONObject.class).getJSONObject(protocol);if (protocolInfo != null) {downServerCache.remove(serverAddress);ProtocolInfo remoteProtocol = protocolInfo.toJavaObject(ProtocolInfo.class);log.info("[ServerElection] server[{}] is active, it will be the master, final protocol={}", serverAddress, remoteProtocol);// 4.3.3 升级 4.3.4 过程中,未升级的 server 还不存在 externalAddress,需要使用 address 兼容return Optional.ofNullable(remoteProtocol.getExternalAddress()).orElse(remoteProtocol.getAddress());} else {log.warn("[ServerElection] server[{}] is active but don't have target protocol", serverAddress);}}} catch (TimeoutException te) {log.warn("[ServerElection] server[{}] was down due to ping timeout!", serverAddress);} catch (Exception e) {log.warn("[ServerElection] server[{}] was down with unknown case!", serverAddress, e);}downServerCache.add(serverAddress);return null;}

activeAddress方法主要是对目标server发起ping请求,超时时间为1s,若目标server挂了,则抛出TimeoutException,将目标server加入到downServerCache中;若目标server响应成功,则从downServerCache中移除

小结

PowerJob的worker在初始化的时候会启动一个定时任务,每隔10s调度执行discovery()来更新当前worker的server地址;discovery方法就是定时遍历配置的serverAddress地址列表,调用server端的acquire方法来获取可用的server;ServerController提供了acquire接口,它执行的是serverElectionService.elect(request),ServerElectionService的elect方法主要是执行getServer0,getServer0方法会重试10次,它先针对discoveryRequest指定的currentServer进行activeAddress,成功则返回,没有可用server则加锁进行重新分配,这里优先本机判断。activeAddress方法主要是对目标server发起ping请求,超时时间为1s,若目标server挂了,则抛出TimeoutException,将目标server加入到downServerCache中;若目标server响应成功,则从downServerCache中移除。

worker定时任务 --> 轮询serverAddress请求acquire --> server端判断目标server的ping是否成功,不成功则加锁优先使用本机作为替代server。

这篇关于聊聊PowerJob Server的高可用的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/691383

相关文章

Sentinel 高可用流量管理框架

Sentinel 是面向分布式服务架构的高可用流量防护组件,主要以流量为切入点,从限流、流量整形、熔断降级、系统负载保护、热点防护等多个维度来帮助开发者保障微服务的稳定性。 Sentinel 具有以下特性: 丰富的应用场景:Sentinel 承接了阿里巴巴近 10 年的双十一大促流量的核心场景,例如秒杀(即突发流量控制在系统容量可以承受的范围)、消息削峰填谷、集群流量控制、实时熔断下游不可用应

JAVA用最简单的方法来构建一个高可用的服务端,提升系统可用性

一、什么是提升系统的高可用性 JAVA服务端,顾名思义就是23体验网为用户提供服务的。停工时间,就是不能向用户提供服务的时间。高可用,就是系统具有高度可用性,尽量减少停工时间。如何用最简单的方法来搭建一个高效率可用的服务端JAVA呢? 停工的原因一般有: 服务器故障。例如服务器宕机,服务器网络出现问题,机房或者机架出现问题等;访问量急剧上升,导致服务器压力过大导致访问量急剧上升的原因;时间和

red5-server源码

red5-server源码:https://github.com/Red5/red5-server

VMware8实现高可用(HA)集群

陈科肇 =========== 操作系统:中标麒麟高级操作系统V6 x86-64 实现软件:中标麒麟高可用集群软件 ======================== 1.环境的规划与配置 硬件要求 服务器服务器至少需要 2 台,每台服务器至少需要 2 块网卡以做心跳与连接公网使用存储环境 建议使用一台 SAN/NAS/ISCSI 存储作为数据共享存储空间 软

用Cri-O,Sealos CLI,Kubeadm方式部署K8s高可用集群

3.6 Cri-O方式部署K8s集群 注意:基于Kubernetes基础环境 3.6.1 所有节点安装配置cri-o [root@k8s-all ~]# VERSION=1.28[root@k8s-all ~]# curl -L -o /etc/yum.repos.d/devel:kubic:libcontainers:stable.repo https://download.opensu

安装SQL2005后SQL Server Management Studio 没有出来的解决方案

一种情况,在安装 sqlServer2005 时 居然出现两个警告: 1 Com+ 目录要求 2 Edition change check 郁闷!网上说出现两个警告,是肯定装不成功的!我抱着侥幸的态度试了下,成功了。 安装成功后,正准备 “ 仅工具、联机丛书和示例(T)” 但是安装不了,他提示我“工作站组件”安装过了对现有组件无法更新或升级。 解决办法: 1 打开“控

聊聊说话的习惯

1 在日常生活中,每个人都有固定的说话习惯。心理学研究表明,通过一个人的说话习惯,也可以分析出他的性格特点。对于每一个人来讲,说话习惯已经融为他们生活中的一部分。在社交活动中,一些不良的说话习惯很可能会给他们带来麻烦。因此,了解说话习惯对心理活动的影响是十分有必要的。 2 具有顺畅的说话习惯的人,大多思路清晰、语速适中、用词准确并且声声人耳,是典型的顺畅型说话方式这种类型的人要么不说话,要么

Kubernetes高可用集群搭建(kubeadm)

1 Kubernetes高可用集群介绍 前面已经介绍了Kubernetes的集群部署,但是都只是介绍的单master节点的情况,由于单master节点的可靠性不高,不适合实际的生产环境,此处介绍如何实现多master节点的高可用集群的搭建。 2 安装要求 一台或多台机器,操作系统CentOS7.x-x86_64硬件配置:2G或更多ARM,2个CPU或更多CPU,硬盘20G及以上集群中所有机器

ERROR 2003 (HY000): Can't connect to MySQL server on (10061)

在linux系统上装了一个mysql-5.5,启动后本机都是可以访问的,操作都正常,同时建了一个%的用户(支持远程访问), root@debian:/# mysql -u loongson -pEnter password: Welcome to the MySQL monitor. Commands end with ; or \g.Your MySQL connection id

Oracle和Sql_Server 部分sql语句的区别

比如:A表中, 字段:gxmlflag  number;  比如数据:20210115 字段:gxmldate date ;    比如数据:2021-01-15 09:50:50 一、在Oracle数据库中: 1、insert 和 update 语句: t.gxmlflag = to_char(sysdate,'yyyymmdd'),t.gxmldate=sysdate 比如:update f