聊聊PowerJob的RemoteEngine

2023-12-26 11:12

本文主要是介绍聊聊PowerJob的RemoteEngine,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

本文主要研究一下PowerJob的RemoteEngine

RemoteEngine

tech/powerjob/remote/framework/engine/RemoteEngine.java

public interface RemoteEngine {EngineOutput start(EngineConfig engineConfig);void close() throws IOException;
}

RemoteEngine定义了start及close方法,其中start接收EngineConfig参数返回EngineOutput

EngineConfig

tech/powerjob/remote/framework/engine/EngineConfig.java

@Data
@Accessors(chain = true)
public class EngineConfig implements Serializable {/*** 服务类型*/private ServerType serverType;/*** 需要启动的引擎类型*/private String type;/*** 绑定的本地地址*/private Address bindAddress;/*** actor实例,交由使用侧自己实例化以便自行注入各种 bean*/private List<Object> actorList;
}

EngineConfig定义了serverType(SERVER、WORKER),type、bindAddress、actorList属性

EngineOutput

tech/powerjob/remote/framework/engine/EngineOutput.java

@Getter
@Setter
public class EngineOutput {private Transporter transporter;
}

EngineOutput定义了transporter

Transporter

tech/powerjob/remote/framework/transporter/Transporter.java

public interface Transporter {/*** Protocol* @return return protocol*/Protocol getProtocol();/***send message* @param url url* @param request request*/void tell(URL url, PowerSerializable request);/*** ask by request* @param url url* @param request request* @param clz response type* @return CompletionStage* @throws RemotingException remote exception*/<T> CompletionStage<T> ask(URL url, PowerSerializable request, Class<T> clz) throws RemotingException;
}

Transporter接口定义了getProtocol(AkkaProtocol、HttpProtocol)、tell、ask三个方法

PowerJobRemoteEngine

tech/powerjob/remote/framework/engine/impl/PowerJobRemoteEngine.java

@Slf4j
public class PowerJobRemoteEngine implements RemoteEngine {private CSInitializer csInitializer;@Overridepublic EngineOutput start(EngineConfig engineConfig) {final String engineType = engineConfig.getType();EngineOutput engineOutput = new EngineOutput();log.info("[PowerJobRemoteEngine] [{}] start remote engine with config: {}", engineType, engineConfig);List<ActorInfo> actorInfos = ActorFactory.load(engineConfig.getActorList());csInitializer = CSInitializerFactory.build(engineType);String type = csInitializer.type();Stopwatch sw = Stopwatch.createStarted();log.info("[PowerJobRemoteEngine] [{}] try to startup CSInitializer[type={}]", engineType, type);csInitializer.init(new CSInitializerConfig().setBindAddress(engineConfig.getBindAddress()).setServerType(engineConfig.getServerType()));// 构建通讯器Transporter transporter = csInitializer.buildTransporter();engineOutput.setTransporter(transporter);log.info("[PowerJobRemoteEngine] [{}] start to bind Handler", engineType);actorInfos.forEach(actor -> actor.getHandlerInfos().forEach(handlerInfo -> log.info("[PowerJobRemoteEngine] [{}] PATH={}, handler={}", engineType, handlerInfo.getLocation().toPath(), handlerInfo.getMethod())));// 绑定 handlercsInitializer.bindHandlers(actorInfos);log.info("[PowerJobRemoteEngine] [{}] startup successfully, cost: {}", engineType, sw);return engineOutput;}@Overridepublic void close() throws IOException {csInitializer.close();}
}

PowerJobRemoteEngine定义了csInitializer,其start方法先通过ActorFactory.load(engineConfig.getActorList())加载actorInfos,再通过CSInitializerFactory.build(engineType)构建csInitializer,然后执行其init方法;接着通过csInitializer.buildTransporter()构建transporter;最后将actorInfos绑定到csInitializer;其close方法关闭csInitializer

ActorFactory.load

tech/powerjob/remote/framework/engine/impl/ActorFactory.java

@Slf4j
class ActorFactory {static List<ActorInfo> load(List<Object> actorList) {List<ActorInfo> actorInfos = Lists.newArrayList();actorList.forEach(actor -> {final Class<?> clz = actor.getClass();try {final Actor anno = clz.getAnnotation(Actor.class);ActorInfo actorInfo = new ActorInfo().setActor(actor).setAnno(anno);actorInfo.setHandlerInfos(loadHandlerInfos4Actor(actorInfo));actorInfos.add(actorInfo);} catch (Throwable t) {log.error("[ActorFactory] process Actor[{}] failed!", clz);ExceptionUtils.rethrow(t);}});return actorInfos;}//......
}    

ActorFactory.load方法遍历actorList,获取其类上的@Actor注解,再收集其方法上的@Handler注解信息设置到actorInfo

CSInitializerFactory

tech/powerjob/remote/framework/engine/impl/CSInitializerFactory.java

@Slf4j
class CSInitializerFactory {static CSInitializer build(String targetType) {Reflections reflections = new Reflections(OmsConstant.PACKAGE);Set<Class<? extends CSInitializer>> cSInitializerClzSet = reflections.getSubTypesOf(CSInitializer.class);log.info("[CSInitializerFactory] scan subTypeOf CSInitializer: {}", cSInitializerClzSet);for (Class<? extends CSInitializer> clz : cSInitializerClzSet) {try {CSInitializer csInitializer = clz.getDeclaredConstructor().newInstance();String type = csInitializer.type();log.info("[CSInitializerFactory] new instance for CSInitializer[{}] successfully, type={}, object: {}", clz, type, csInitializer);if (targetType.equalsIgnoreCase(type)) {return csInitializer;}} catch (Exception e) {log.error("[CSInitializerFactory] new instance for CSInitializer[{}] failed, maybe you should provide a non-parameter constructor", clz);ExceptionUtils.rethrow(e);}}throw new PowerJobException(String.format("can't load CSInitializer[%s], ensure your package name start with 'tech.powerjob' and import the dependencies!", targetType));}
}

CSInitializerFactory的build方法通过org.reflections.Reflections去扫描tech.powerjob包,获取CSInitializer的子类,之后通过反射进行实例化

CSInitializer

tech/powerjob/remote/framework/cs/CSInitializer.java

public interface CSInitializer {/*** 类型名称,比如 akka, netty4,httpJson* @return 名称*/String type();/*** initialize the framework* @param config config*/void init(CSInitializerConfig config);/*** build a Transporter by based network framework* @return Transporter*/Transporter buildTransporter();/*** bind Actor, publish handler's service* @param actorInfos actor infos*/void bindHandlers(List<ActorInfo> actorInfos);void close() throws IOException;
}

CSInitializer接口定义了type、init、buildTransporter、close方法,它有两个实现类,分别是AkkaCSInitializer、HttpVertxCSInitializer

CSInitializerConfig

tech/powerjob/remote/framework/cs/CSInitializerConfig.java

@Getter
@Setter
@Accessors(chain = true)
public class CSInitializerConfig implements Serializable {private Address bindAddress;private ServerType serverType;
}

CSInitializerConfig定义了bindAddress、serverType两个属性

AkkaCSInitializer

tech/powerjob/remote/akka/AkkaCSInitializer.java

@Slf4j
public class AkkaCSInitializer implements CSInitializer {private ActorSystem actorSystem;private CSInitializerConfig config;@Overridepublic String type() {return tech.powerjob.common.enums.Protocol.AKKA.name();}@Overridepublic void init(CSInitializerConfig config) {this.config = config;Address bindAddress = config.getBindAddress();log.info("[PowerJob-AKKA] bindAddress: {}", bindAddress);// 初始化 ActorSystem(macOS上 new ServerSocket 检测端口占用的方法并不生效,可能是AKKA是Scala写的缘故?没办法...只能靠异常重试了)Map<String, Object> overrideConfig = Maps.newHashMap();overrideConfig.put("akka.remote.artery.canonical.hostname", bindAddress.getHost());overrideConfig.put("akka.remote.artery.canonical.port", bindAddress.getPort());Config akkaBasicConfig = ConfigFactory.load(AkkaConstant.AKKA_CONFIG);Config akkaFinalConfig = ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig);log.info("[PowerJob-AKKA] try to start AKKA System.");// 启动时绑定当前的 actorSystemNameString actorSystemName = AkkaConstant.fetchActorSystemName(config.getServerType());this.actorSystem = ActorSystem.create(actorSystemName, akkaFinalConfig);// 处理系统中产生的异常情况ActorRef troubleshootingActor = actorSystem.actorOf(Props.create(AkkaTroubleshootingActor.class), "troubleshooting");actorSystem.eventStream().subscribe(troubleshootingActor, DeadLetter.class);log.info("[PowerJob-AKKA] initialize actorSystem[{}] successfully!", actorSystem.name());}@Overridepublic Transporter buildTransporter() {return new AkkaTransporter(actorSystem);}@Overridepublic void bindHandlers(List<ActorInfo> actorInfos) {int cores = Runtime.getRuntime().availableProcessors();actorInfos.forEach(actorInfo -> {String rootPath = actorInfo.getAnno().path();AkkaMappingService.ActorConfig actorConfig = AkkaMappingService.parseActorName(rootPath);log.info("[PowerJob-AKKA] start to process actor[path={},config={}]", rootPath, JsonUtils.toJSONString(actorConfig));actorSystem.actorOf(AkkaProxyActor.props(actorInfo).withDispatcher("akka.".concat(actorConfig.getDispatcherName())).withRouter(new RoundRobinPool(cores)), actorConfig.getActorName());});}@Overridepublic void close() throws IOException {actorSystem.terminate();}
}

AkkaCSInitializer其type方法返回的是AKKA类型,init方法先通过ConfigFactory.load(AkkaConstant.AKKA_CONFIG)加载akka基本配置,再覆盖hostname和port信息,最后通过ActorSystem.create(actorSystemName, akkaFinalConfig)创建actorSystem,并创建AkkaTroubleshootingActor,订阅DeadLetter消息;buildTransporter返回的是AkkaTransporter;其bindHandlers方法主要是根据ActorInfo信息来创建actor;其close方法执行actorSystem.terminate()

AkkaTransporter

tech/powerjob/remote/akka/AkkaTransporter.java

public class AkkaTransporter implements Transporter {private final ActorSystem actorSystem;/*** akka://<actor system>@<hostname>:<port>/<actor path>*/private static final String AKKA_NODE_PATH = "akka://%s@%s/user/%s";public AkkaTransporter(ActorSystem actorSystem) {this.actorSystem = actorSystem;}@Overridepublic Protocol getProtocol() {return new AkkaProtocol();}@Overridepublic void tell(URL url, PowerSerializable request) {ActorSelection actorSelection = fetchActorSelection(url);actorSelection.tell(request, null);}@Override@SuppressWarnings("unchecked")public <T> CompletionStage<T> ask(URL url, PowerSerializable request, Class<T> clz) throws RemotingException {ActorSelection actorSelection = fetchActorSelection(url);return (CompletionStage<T>) Patterns.ask(actorSelection, request, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS));}private ActorSelection fetchActorSelection(URL url) {HandlerLocation location = url.getLocation();String targetActorSystemName = AkkaConstant.fetchActorSystemName(url.getServerType());String targetActorName = AkkaMappingService.parseActorName(location.getRootPath()).getActorName();CommonUtils.requireNonNull(targetActorName, "can't find actor by URL: " + location);String address = url.getAddress().toFullAddress();return actorSystem.actorSelection(String.format(AKKA_NODE_PATH, targetActorSystemName, address, targetActorName));}
}

AkkaTransporter其protocol为AkkaProtocol;其tell方法根据url找到actorSelection,通过actorSelection的tell发送请求;ask方法使用的是Patterns.ask(actorSelection, request, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS))

HttpVertxCSInitializer

tech/powerjob/remote/http/HttpVertxCSInitializer.java

@Slf4j
public class HttpVertxCSInitializer implements CSInitializer {private Vertx vertx;private HttpServer httpServer;private HttpClient httpClient;private CSInitializerConfig config;@Overridepublic String type() {return tech.powerjob.common.enums.Protocol.HTTP.name();}@Overridepublic void init(CSInitializerConfig config) {this.config = config;vertx = VertxInitializer.buildVertx();httpServer = VertxInitializer.buildHttpServer(vertx);httpClient = VertxInitializer.buildHttpClient(vertx);}@Overridepublic Transporter buildTransporter() {return new VertxTransporter(httpClient);}@Override@SneakyThrowspublic void bindHandlers(List<ActorInfo> actorInfos) {Router router = Router.router(vertx);// 处理请求响应router.route().handler(BodyHandler.create());actorInfos.forEach(actorInfo -> {Optional.ofNullable(actorInfo.getHandlerInfos()).orElse(Collections.emptyList()).forEach(handlerInfo -> {String handlerHttpPath = handlerInfo.getLocation().toPath();ProcessType processType = handlerInfo.getAnno().processType();Handler<RoutingContext> routingContextHandler = buildRequestHandler(actorInfo, handlerInfo);Route route = router.post(handlerHttpPath);if (processType == ProcessType.BLOCKING) {route.blockingHandler(routingContextHandler, false);} else {route.handler(routingContextHandler);}});});// 启动 vertx http serverfinal int port = config.getBindAddress().getPort();final String host = config.getBindAddress().getHost();httpServer.requestHandler(router).exceptionHandler(e -> log.error("[PowerJob] unknown exception in Actor communication!", e)).listen(port, host).toCompletionStage().toCompletableFuture().get(1, TimeUnit.MINUTES);log.info("[PowerJobRemoteEngine] startup vertx HttpServer successfully!");}private Handler<RoutingContext> buildRequestHandler(ActorInfo actorInfo, HandlerInfo handlerInfo) {Method method = handlerInfo.getMethod();Optional<Class<?>> powerSerializeClz = RemoteUtils.findPowerSerialize(method.getParameterTypes());// 内部框架,严格模式,绑定失败直接报错if (!powerSerializeClz.isPresent()) {throw new PowerJobException("can't find any 'PowerSerialize' object in handler args: " + handlerInfo.getLocation());}return ctx -> {final RequestBody body = ctx.body();final Object convertResult = body.asPojo(powerSerializeClz.get());try {Object response = method.invoke(actorInfo.getActor(), convertResult);if (response != null) {if (response instanceof String) {ctx.end((String) response);} else {ctx.json(JsonObject.mapFrom(response));}return;}ctx.end();} catch (Throwable t) {// 注意这里是框架实际运行时,日志输出用标准 PowerJob 格式log.error("[PowerJob] invoke Handler[{}] failed!", handlerInfo.getLocation(), t);ctx.fail(HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), t);}};}@Overridepublic void close() throws IOException {httpClient.close();httpServer.close();vertx.close();}
}

HttpVertxCSInitializer的type类型为HTTP,其init方法主要是通过VertxInitializer.buildVertx()构建vertx,并通过VertxInitializer.buildHttpServer(vertx)构建httpServer,通过VertxInitializer.buildHttpClient(vertx)构建httpClient;其buildTransporter返回的是VertxTransporter;其bindHandlers主要是通过actorInfo去注册vertx的路由及handler;其close方法依次关闭httpClient、httpServer、vertx

VertxTransporter

tech/powerjob/remote/http/vertx/VertxTransporter.java

public class VertxTransporter implements Transporter {private final HttpClient httpClient;private static final Protocol PROTOCOL = new HttpProtocol();public VertxTransporter(HttpClient httpClient) {this.httpClient = httpClient;}@Overridepublic Protocol getProtocol() {return PROTOCOL;}@Overridepublic void tell(URL url, PowerSerializable request) {post(url, request, null);}@Overridepublic <T> CompletionStage<T> ask(URL url, PowerSerializable request, Class<T> clz) throws RemotingException {return post(url, request, clz);}@SuppressWarnings("unchecked")private <T> CompletionStage<T> post(URL url, PowerSerializable request, Class<T> clz) {final String host = url.getAddress().getHost();final int port = url.getAddress().getPort();final String path = url.getLocation().toPath();RequestOptions requestOptions = new RequestOptions().setMethod(HttpMethod.POST).setHost(host).setPort(port).setURI(path);// 获取远程服务器的HTTP连接Future<HttpClientRequest> httpClientRequestFuture = httpClient.request(requestOptions);// 转换 -> 发送请求获取响应Future<HttpClientResponse> responseFuture = httpClientRequestFuture.compose(httpClientRequest ->httpClientRequest.putHeader(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON).send(JsonObject.mapFrom(request).toBuffer()));return responseFuture.compose(httpClientResponse -> {// throw exceptionfinal int statusCode = httpClientResponse.statusCode();if (statusCode != HttpResponseStatus.OK.code()) {// CompletableFuture.get() 时会传递抛出该异常throw new RemotingException(String.format("request [host:%s,port:%s,url:%s] failed, status: %d, msg: %s",host, port, path, statusCode, httpClientResponse.statusMessage()));}return httpClientResponse.body().compose(x -> {if (clz == null) {return Future.succeededFuture(null);}if (clz.equals(String.class)) {return Future.succeededFuture((T) x.toString());}return Future.succeededFuture(x.toJsonObject().mapTo(clz));});}).toCompletionStage();}
}

VertxTransporter的protocol为HttpProtocol,其tell方法使用的是不需要返回值的post,其ask方法也是调用post方法,只不过其设定了返回值类型

小结

PowerJob的RemoteEngine定义了start及close方法,其中start接收EngineConfig参数返回EngineOutput;PowerJobRemoteEngine定义了csInitializer,其start方法先通过ActorFactory.load(engineConfig.getActorList())加载actorInfos,再通过CSInitializerFactory.build(engineType)构建csInitializer,然后执行其init方法;接着通过csInitializer.buildTransporter()构建transporter;最后将actorInfos绑定到csInitializer;其close方法关闭csInitializer。

这篇关于聊聊PowerJob的RemoteEngine的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/539187

相关文章

聊聊说话的习惯

1 在日常生活中,每个人都有固定的说话习惯。心理学研究表明,通过一个人的说话习惯,也可以分析出他的性格特点。对于每一个人来讲,说话习惯已经融为他们生活中的一部分。在社交活动中,一些不良的说话习惯很可能会给他们带来麻烦。因此,了解说话习惯对心理活动的影响是十分有必要的。 2 具有顺畅的说话习惯的人,大多思路清晰、语速适中、用词准确并且声声人耳,是典型的顺畅型说话方式这种类型的人要么不说话,要么

聊聊分布式,再讨论分布式解决方案

前言 最近很久没有写博客了,一方面是因为公司事情最近比较忙,另外一方面是因为在进行 CAP 的下一阶段的开发工作,不过目前已经告一段落了。 接下来还是开始我们今天的话题,说说分布式事务,或者说是我眼中的分布式事务,因为每个人可能对其的理解都不一样。 分布式事务是企业集成中的一个技术难点,也是每一个分布式系统架构中都会涉及到的一个东西,特别是在微服务架构中,几乎可以说是无法避免,本文就分布式事

聊聊资源调度

资源调度 般分为两个阶段: 是实现物理资源的虚拟化(即资源的抽象)于当前机器的性能越来越好,硬件配置越来越高,直接用物理机跑业务比较浪费,所以将物理机分割成更小单位的虚拟机,这样可以显著提升机器的利用效率,在公司内部一般采用容器技术来隔离资源 是将资源虚拟化后进 步在时间和空间上实现更细粒度的编排 ,优化资源的使用。 1 .一些数据 如果公司的几万台机器都是物理机,那么资源的使用率稍低: CP

聊聊Spark中的宽依赖和窄依赖

开门见山,本文就针对一个点,谈谈Spark中的宽依赖和窄依赖,这是Spark计算引擎划分Stage的根源所在,遇到宽依赖,则划分为多个stage,针对每个Stage,提交一个TaskSet: 上图:一张网上的图: 基于此图,分析下这里为什么前面的流程都是窄依赖,而后面的却是宽依赖: 我们仔细看看,map和filter算子中,对于父RDD来说,一个分区内的数据,有且仅有一个子RDD的分区来

聊聊灰度发布

有没有在北京面试java的小伙伴,每家公司面试问的问题都不一样,昨天面试官问到了灰度发布,一脸懵,好像在哪儿听说过,毕竟我都没发布过,之前都是项目组长在干这些事儿,所以聊聊,了解一下 什么是灰度发布 全量发布:把旧服务kill掉,把新服务启动,这个过程就可以理解为全量发布 回滚周期长 如果我们更新完应用之后,我们做线上回归测试的时候发现有BUG,这个时候就要做回滚,过程就是把新服

聊聊随机测试和猴子测试

目录 随机测试的特点 1.不可预测性 2.缺乏针对性 3.自动化 4.资源密集型 猴子测试 随机测试 (Random Testing) 猴子测试 (Monkey Testing) 特点: 区别 1.控制程度 2.目标差异 3.实现方式 在我们测试的过程中,通常会使用到随机测试和猴子测试,其中随机测试侧重于人工测试,猴子测试侧重于借助工具执行命令进行测试。 随机测试

【聊聊经济社会】论阶级跨越

为什么要在市场中寻求自由,在市场中寻求洒脱,原因不胜其数,其中便有一条,现实生活中多是xx,可能社会属性本身就具备党同伐异,像是一股意志,平庸一切不平庸,中和一切特立独行,最终以达到一种变态的稳定. 消其意志,断其未来,耗其钱财 ,而我称之为阶级壁垒 阶级之所以难以跨越,主要也在于这三点 一:没有这样的志向,像那种羡慕有钱,或者羡慕有权,权当做梦。这样的志向,正常人只停留于羡慕的层次,而一旦受到丁

聊聊PC端页面适配

聊聊PC端页面适配  目也pc端有适配的需求:目前我们pc项目的设计稿尺寸是宽度1920,高度最小是1080。 适配目标: 1.在不同分辨率的电脑上,网页可以正常显示 2.放大或者缩小屏幕,网页可以正常显示 对于宽度的适配   对于宽度适配: 首先设置html,body{width:100%;overflow-x:hidden;} 然后我们可以把页面分解为背景层(

来聊聊我用go手写redis这件事

写在文章开头 网上有看过一些实现redis的项目,要么完全脱离go语言的理念,要么又完全去迎合c的实现理念,也不是说这些项目写的不好,只能说不符合笔者所认为的那种"平衡",于是整理了一段时间的设计稿,自己尝试着用go语言写了一版"有redis味道"的mini-redis。 截至目前,笔者已经完成了redis服务端和客户端交互的基本通信架构和实现基调,如下所示,可以看到笔者已经实现了ping

供应链劫持?聊聊什么是RepoJacking

介绍        近几个月来,对开源存储库的主要威胁就包括存储仓库劫持,通常称为RepoJacking。RepoJacking 是指恶意攻击者通过一定手段接管托管存储库的所有权或维护者的账户。通过获取对账户的访问权限,攻击者可以将恶意代码注入到使用对应仓库作为依赖项的项目中。 RepoJacking 如何攻击?        存储库攻击,也称为供应链攻击,通常利用 GitH