本文主要是介绍聊聊PowerJob的ServerController,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
序
本文主要研究一下PowerJob的ServerController
ServerController
tech/powerjob/server/web/controller/ServerController.java
@RestController
@RequestMapping("/server")
@RequiredArgsConstructor
public class ServerController implements ServerInfoAware {private ServerInfo serverInfo;private final TransportService transportService;private final ServerElectionService serverElectionService;private final AppInfoRepository appInfoRepository;private final WorkerClusterQueryService workerClusterQueryService;@GetMapping("/assert")public ResultDTO<Long> assertAppName(String appName) {Optional<AppInfoDO> appInfoOpt = appInfoRepository.findByAppName(appName);return appInfoOpt.map(appInfoDO -> ResultDTO.success(appInfoDO.getId())).orElseGet(() -> ResultDTO.failed(String.format("app(%s) is not registered! Please register the app in oms-console first.", appName)));}@GetMapping("/assertV2")public ResultDTO<WorkerAppInfo> assertAppNameV2(String appName) {Optional<AppInfoDO> appInfoOpt = appInfoRepository.findByAppName(appName);return appInfoOpt.map(appInfoDO -> {WorkerAppInfo workerAppInfo = new WorkerAppInfo().setAppId(appInfoDO.getId());return ResultDTO.success(workerAppInfo);}).orElseGet(() -> ResultDTO.failed(String.format("app(%s) is not registered! Please register the app in oms-console first.", appName)));}@GetMapping("/acquire")public ResultDTO<String> acquireServer(ServerDiscoveryRequest request) {return ResultDTO.success(serverElectionService.elect(request));}@GetMapping("/hello")public ResultDTO<JSONObject> ping(@RequestParam(required = false) boolean debug) {JSONObject res = new JSONObject();res.put("localHost", NetUtils.getLocalHost());res.put("serverInfo", serverInfo);res.put("serverTime", CommonUtils.formatTime(System.currentTimeMillis()));res.put("serverTimeTs", System.currentTimeMillis());res.put("serverTimeZone", TimeZone.getDefault().getDisplayName());res.put("appIds", workerClusterQueryService.getAppId2ClusterStatus().keySet());if (debug) {res.put("appId2ClusterInfo", JSON.parseObject(JSON.toJSONString(workerClusterQueryService.getAppId2ClusterStatus())));}try {res.put("defaultAddress", JSONObject.toJSON(transportService.defaultProtocol()));} catch (Exception ignore) {}return ResultDTO.success(res);}@Overridepublic void setServerInfo(ServerInfo serverInfo) {this.serverInfo = serverInfo;}
}
ServerController实现了ServerInfoAware接口,它提供了assert、assertV2、acquire、hello接口;其中assert接口用于判断指定的appName是否存在,assertV2返回的是WorkerAppInfo;acquire委托给了serverElectionService.elect(request);hello接口返回server端的localhost、serverInfo、serverTime等信息
elect
tech/powerjob/server/remote/server/election/ServerElectionService.java
public String elect(ServerDiscoveryRequest request) {if (!accurate()) {final String currentServer = request.getCurrentServer();// 如果是本机,就不需要查数据库那么复杂的操作了,直接返回成功Optional<ProtocolInfo> localProtocolInfoOpt = Optional.ofNullable(transportService.allProtocols().get(request.getProtocol()));if (localProtocolInfoOpt.isPresent()) {if (localProtocolInfoOpt.get().getExternalAddress().equals(currentServer) || localProtocolInfoOpt.get().getAddress().equals(currentServer)) {log.info("[ServerElection] this server[{}] is worker[appId={}]'s current server, skip check", currentServer, request.getAppId());return currentServer;}}}return getServer0(request);}private boolean accurate() {return ThreadLocalRandom.current().nextInt(100) < accurateSelectServerPercentage;}
ServerElectionService的elect方法接收ServerDiscoveryRequest,它先判断是否accurate(
判断100以内的随机数是否小于accurateSelectServerPercentage,默认50
),是则执行getServer0,否则则先判断ProtocolInfo的address是否是currentServer,是则直接返回,否则还是走getServer0
getServer0
private String getServer0(ServerDiscoveryRequest discoveryRequest) {final Long appId = discoveryRequest.getAppId();final String protocol = discoveryRequest.getProtocol();Set<String> downServerCache = Sets.newHashSet();for (int i = 0; i < RETRY_TIMES; i++) {// 无锁获取当前数据库中的ServerOptional<AppInfoDO> appInfoOpt = appInfoRepository.findById(appId);if (!appInfoOpt.isPresent()) {throw new PowerJobException(appId + " is not registered!");}String appName = appInfoOpt.get().getAppName();String originServer = appInfoOpt.get().getCurrentServer();String activeAddress = activeAddress(originServer, downServerCache, protocol);if (StringUtils.isNotEmpty(activeAddress)) {return activeAddress;}// 无可用Server,重新进行Server选举,需要加锁String lockName = String.format(SERVER_ELECT_LOCK, appId);boolean lockStatus = lockService.tryLock(lockName, 30000);if (!lockStatus) {try {Thread.sleep(500);}catch (Exception ignore) {}continue;}try {// 可能上一台机器已经完成了Server选举,需要再次判断AppInfoDO appInfo = appInfoRepository.findById(appId).orElseThrow(() -> new RuntimeException("impossible, unless we just lost our database."));String address = activeAddress(appInfo.getCurrentServer(), downServerCache, protocol);if (StringUtils.isNotEmpty(address)) {return address;}// 篡位,如果本机存在协议,则作为Server调度该 workerfinal ProtocolInfo targetProtocolInfo = transportService.allProtocols().get(protocol);if (targetProtocolInfo != null) {// 注意,写入 AppInfoDO#currentServer 的永远是 default 的绑定地址,仅在返回的时候特殊处理为协议地址appInfo.setCurrentServer(transportService.defaultProtocol().getAddress());appInfo.setGmtModified(new Date());appInfoRepository.saveAndFlush(appInfo);log.info("[ServerElection] this server({}) become the new server for app(appId={}).", appInfo.getCurrentServer(), appId);return targetProtocolInfo.getExternalAddress();}}catch (Exception e) {log.error("[ServerElection] write new server to db failed for app {}.", appName, e);} finally {lockService.unlock(lockName);}}throw new PowerJobException("server elect failed for app " + appId);}
getServer0执行一个循环(最大10次),它先根据appId获取AppInfoDO,然后通过activeAddress判断server是否存活,是则返回,否则重新进行server选举
activeAddress
private String activeAddress(String serverAddress, Set<String> downServerCache, String protocol) {if (downServerCache.contains(serverAddress)) {return null;}if (StringUtils.isEmpty(serverAddress)) {return null;}Ping ping = new Ping();ping.setCurrentTime(System.currentTimeMillis());URL targetUrl = ServerURLFactory.ping2Friend(serverAddress);try {AskResponse response = transportService.ask(Protocol.HTTP.name(), targetUrl, ping, AskResponse.class).toCompletableFuture().get(PING_TIMEOUT_MS, TimeUnit.MILLISECONDS);if (response.isSuccess()) {// 检测通过的是远程 server 的暴露地址,需要返回 worker 需要的协议地址final JSONObject protocolInfo = JsonUtils.parseObject(response.getData(), JSONObject.class).getJSONObject(protocol);if (protocolInfo != null) {downServerCache.remove(serverAddress);ProtocolInfo remoteProtocol = protocolInfo.toJavaObject(ProtocolInfo.class);log.info("[ServerElection] server[{}] is active, it will be the master, final protocol={}", serverAddress, remoteProtocol);// 4.3.3 升级 4.3.4 过程中,未升级的 server 还不存在 externalAddress,需要使用 address 兼容return Optional.ofNullable(remoteProtocol.getExternalAddress()).orElse(remoteProtocol.getAddress());} else {log.warn("[ServerElection] server[{}] is active but don't have target protocol", serverAddress);}}} catch (TimeoutException te) {log.warn("[ServerElection] server[{}] was down due to ping timeout!", serverAddress);} catch (Exception e) {log.warn("[ServerElection] server[{}] was down with unknown case!", serverAddress, e);}downServerCache.add(serverAddress);return null;}
activeAddress通过transportService.ask请求ping接口,1s超时,若成功则从downServerCache移除,返回remoteProtocol.getAddress(),若失败则将该serverAddress加入downServerCache
小结
ServerController实现了ServerInfoAware接口,它提供了assert、assertV2、acquire、hello接口;其中assert接口用于判断指定的appName是否存在,assertV2返回的是WorkerAppInfo;acquire委托给了serverElectionService.elect(request);hello接口返回server端的localhost、serverInfo、serverTime等信息。
这篇关于聊聊PowerJob的ServerController的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!