本文主要是介绍聊聊PowerJob的RemoteEngine,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
序
本文主要研究一下PowerJob的RemoteEngine
RemoteEngine
tech/powerjob/remote/framework/engine/RemoteEngine.java
public interface RemoteEngine {EngineOutput start(EngineConfig engineConfig);void close() throws IOException;
}
RemoteEngine定义了start及close方法,其中start接收EngineConfig参数返回EngineOutput
EngineConfig
tech/powerjob/remote/framework/engine/EngineConfig.java
@Data
@Accessors(chain = true)
public class EngineConfig implements Serializable {/*** 服务类型*/private ServerType serverType;/*** 需要启动的引擎类型*/private String type;/*** 绑定的本地地址*/private Address bindAddress;/*** actor实例,交由使用侧自己实例化以便自行注入各种 bean*/private List<Object> actorList;
}
EngineConfig定义了serverType(
SERVER、WORKER
),type、bindAddress、actorList属性
EngineOutput
tech/powerjob/remote/framework/engine/EngineOutput.java
@Getter
@Setter
public class EngineOutput {private Transporter transporter;
}
EngineOutput定义了transporter
Transporter
tech/powerjob/remote/framework/transporter/Transporter.java
public interface Transporter {/*** Protocol* @return return protocol*/Protocol getProtocol();/***send message* @param url url* @param request request*/void tell(URL url, PowerSerializable request);/*** ask by request* @param url url* @param request request* @param clz response type* @return CompletionStage* @throws RemotingException remote exception*/<T> CompletionStage<T> ask(URL url, PowerSerializable request, Class<T> clz) throws RemotingException;
}
Transporter接口定义了getProtocol(
AkkaProtocol、HttpProtocol
)、tell、ask三个方法
PowerJobRemoteEngine
tech/powerjob/remote/framework/engine/impl/PowerJobRemoteEngine.java
@Slf4j
public class PowerJobRemoteEngine implements RemoteEngine {private CSInitializer csInitializer;@Overridepublic EngineOutput start(EngineConfig engineConfig) {final String engineType = engineConfig.getType();EngineOutput engineOutput = new EngineOutput();log.info("[PowerJobRemoteEngine] [{}] start remote engine with config: {}", engineType, engineConfig);List<ActorInfo> actorInfos = ActorFactory.load(engineConfig.getActorList());csInitializer = CSInitializerFactory.build(engineType);String type = csInitializer.type();Stopwatch sw = Stopwatch.createStarted();log.info("[PowerJobRemoteEngine] [{}] try to startup CSInitializer[type={}]", engineType, type);csInitializer.init(new CSInitializerConfig().setBindAddress(engineConfig.getBindAddress()).setServerType(engineConfig.getServerType()));// 构建通讯器Transporter transporter = csInitializer.buildTransporter();engineOutput.setTransporter(transporter);log.info("[PowerJobRemoteEngine] [{}] start to bind Handler", engineType);actorInfos.forEach(actor -> actor.getHandlerInfos().forEach(handlerInfo -> log.info("[PowerJobRemoteEngine] [{}] PATH={}, handler={}", engineType, handlerInfo.getLocation().toPath(), handlerInfo.getMethod())));// 绑定 handlercsInitializer.bindHandlers(actorInfos);log.info("[PowerJobRemoteEngine] [{}] startup successfully, cost: {}", engineType, sw);return engineOutput;}@Overridepublic void close() throws IOException {csInitializer.close();}
}
PowerJobRemoteEngine定义了csInitializer,其start方法先通过ActorFactory.load(engineConfig.getActorList())加载actorInfos,再通过CSInitializerFactory.build(engineType)构建csInitializer,然后执行其init方法;接着通过csInitializer.buildTransporter()构建transporter;最后将actorInfos绑定到csInitializer;其close方法关闭csInitializer
ActorFactory.load
tech/powerjob/remote/framework/engine/impl/ActorFactory.java
@Slf4j
class ActorFactory {static List<ActorInfo> load(List<Object> actorList) {List<ActorInfo> actorInfos = Lists.newArrayList();actorList.forEach(actor -> {final Class<?> clz = actor.getClass();try {final Actor anno = clz.getAnnotation(Actor.class);ActorInfo actorInfo = new ActorInfo().setActor(actor).setAnno(anno);actorInfo.setHandlerInfos(loadHandlerInfos4Actor(actorInfo));actorInfos.add(actorInfo);} catch (Throwable t) {log.error("[ActorFactory] process Actor[{}] failed!", clz);ExceptionUtils.rethrow(t);}});return actorInfos;}//......
}
ActorFactory.load方法遍历actorList,获取其类上的
@Actor
注解,再收集其方法上的@Handler
注解信息设置到actorInfo
CSInitializerFactory
tech/powerjob/remote/framework/engine/impl/CSInitializerFactory.java
@Slf4j
class CSInitializerFactory {static CSInitializer build(String targetType) {Reflections reflections = new Reflections(OmsConstant.PACKAGE);Set<Class<? extends CSInitializer>> cSInitializerClzSet = reflections.getSubTypesOf(CSInitializer.class);log.info("[CSInitializerFactory] scan subTypeOf CSInitializer: {}", cSInitializerClzSet);for (Class<? extends CSInitializer> clz : cSInitializerClzSet) {try {CSInitializer csInitializer = clz.getDeclaredConstructor().newInstance();String type = csInitializer.type();log.info("[CSInitializerFactory] new instance for CSInitializer[{}] successfully, type={}, object: {}", clz, type, csInitializer);if (targetType.equalsIgnoreCase(type)) {return csInitializer;}} catch (Exception e) {log.error("[CSInitializerFactory] new instance for CSInitializer[{}] failed, maybe you should provide a non-parameter constructor", clz);ExceptionUtils.rethrow(e);}}throw new PowerJobException(String.format("can't load CSInitializer[%s], ensure your package name start with 'tech.powerjob' and import the dependencies!", targetType));}
}
CSInitializerFactory的build方法通过org.reflections.Reflections去扫描
tech.powerjob
包,获取CSInitializer的子类,之后通过反射进行实例化
CSInitializer
tech/powerjob/remote/framework/cs/CSInitializer.java
public interface CSInitializer {/*** 类型名称,比如 akka, netty4,httpJson* @return 名称*/String type();/*** initialize the framework* @param config config*/void init(CSInitializerConfig config);/*** build a Transporter by based network framework* @return Transporter*/Transporter buildTransporter();/*** bind Actor, publish handler's service* @param actorInfos actor infos*/void bindHandlers(List<ActorInfo> actorInfos);void close() throws IOException;
}
CSInitializer接口定义了type、init、buildTransporter、close方法,它有两个实现类,分别是AkkaCSInitializer、HttpVertxCSInitializer
CSInitializerConfig
tech/powerjob/remote/framework/cs/CSInitializerConfig.java
@Getter
@Setter
@Accessors(chain = true)
public class CSInitializerConfig implements Serializable {private Address bindAddress;private ServerType serverType;
}
CSInitializerConfig定义了bindAddress、serverType两个属性
AkkaCSInitializer
tech/powerjob/remote/akka/AkkaCSInitializer.java
@Slf4j
public class AkkaCSInitializer implements CSInitializer {private ActorSystem actorSystem;private CSInitializerConfig config;@Overridepublic String type() {return tech.powerjob.common.enums.Protocol.AKKA.name();}@Overridepublic void init(CSInitializerConfig config) {this.config = config;Address bindAddress = config.getBindAddress();log.info("[PowerJob-AKKA] bindAddress: {}", bindAddress);// 初始化 ActorSystem(macOS上 new ServerSocket 检测端口占用的方法并不生效,可能是AKKA是Scala写的缘故?没办法...只能靠异常重试了)Map<String, Object> overrideConfig = Maps.newHashMap();overrideConfig.put("akka.remote.artery.canonical.hostname", bindAddress.getHost());overrideConfig.put("akka.remote.artery.canonical.port", bindAddress.getPort());Config akkaBasicConfig = ConfigFactory.load(AkkaConstant.AKKA_CONFIG);Config akkaFinalConfig = ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig);log.info("[PowerJob-AKKA] try to start AKKA System.");// 启动时绑定当前的 actorSystemNameString actorSystemName = AkkaConstant.fetchActorSystemName(config.getServerType());this.actorSystem = ActorSystem.create(actorSystemName, akkaFinalConfig);// 处理系统中产生的异常情况ActorRef troubleshootingActor = actorSystem.actorOf(Props.create(AkkaTroubleshootingActor.class), "troubleshooting");actorSystem.eventStream().subscribe(troubleshootingActor, DeadLetter.class);log.info("[PowerJob-AKKA] initialize actorSystem[{}] successfully!", actorSystem.name());}@Overridepublic Transporter buildTransporter() {return new AkkaTransporter(actorSystem);}@Overridepublic void bindHandlers(List<ActorInfo> actorInfos) {int cores = Runtime.getRuntime().availableProcessors();actorInfos.forEach(actorInfo -> {String rootPath = actorInfo.getAnno().path();AkkaMappingService.ActorConfig actorConfig = AkkaMappingService.parseActorName(rootPath);log.info("[PowerJob-AKKA] start to process actor[path={},config={}]", rootPath, JsonUtils.toJSONString(actorConfig));actorSystem.actorOf(AkkaProxyActor.props(actorInfo).withDispatcher("akka.".concat(actorConfig.getDispatcherName())).withRouter(new RoundRobinPool(cores)), actorConfig.getActorName());});}@Overridepublic void close() throws IOException {actorSystem.terminate();}
}
AkkaCSInitializer其type方法返回的是AKKA类型,init方法先通过ConfigFactory.load(AkkaConstant.AKKA_CONFIG)加载akka基本配置,再覆盖hostname和port信息,最后通过ActorSystem.create(actorSystemName, akkaFinalConfig)创建actorSystem,并创建AkkaTroubleshootingActor,订阅DeadLetter消息;buildTransporter返回的是AkkaTransporter;其bindHandlers方法主要是根据ActorInfo信息来创建actor;其close方法执行actorSystem.terminate()
AkkaTransporter
tech/powerjob/remote/akka/AkkaTransporter.java
public class AkkaTransporter implements Transporter {private final ActorSystem actorSystem;/*** akka://<actor system>@<hostname>:<port>/<actor path>*/private static final String AKKA_NODE_PATH = "akka://%s@%s/user/%s";public AkkaTransporter(ActorSystem actorSystem) {this.actorSystem = actorSystem;}@Overridepublic Protocol getProtocol() {return new AkkaProtocol();}@Overridepublic void tell(URL url, PowerSerializable request) {ActorSelection actorSelection = fetchActorSelection(url);actorSelection.tell(request, null);}@Override@SuppressWarnings("unchecked")public <T> CompletionStage<T> ask(URL url, PowerSerializable request, Class<T> clz) throws RemotingException {ActorSelection actorSelection = fetchActorSelection(url);return (CompletionStage<T>) Patterns.ask(actorSelection, request, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS));}private ActorSelection fetchActorSelection(URL url) {HandlerLocation location = url.getLocation();String targetActorSystemName = AkkaConstant.fetchActorSystemName(url.getServerType());String targetActorName = AkkaMappingService.parseActorName(location.getRootPath()).getActorName();CommonUtils.requireNonNull(targetActorName, "can't find actor by URL: " + location);String address = url.getAddress().toFullAddress();return actorSystem.actorSelection(String.format(AKKA_NODE_PATH, targetActorSystemName, address, targetActorName));}
}
AkkaTransporter其protocol为AkkaProtocol;其tell方法根据url找到actorSelection,通过actorSelection的tell发送请求;ask方法使用的是Patterns.ask(actorSelection, request, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS))
HttpVertxCSInitializer
tech/powerjob/remote/http/HttpVertxCSInitializer.java
@Slf4j
public class HttpVertxCSInitializer implements CSInitializer {private Vertx vertx;private HttpServer httpServer;private HttpClient httpClient;private CSInitializerConfig config;@Overridepublic String type() {return tech.powerjob.common.enums.Protocol.HTTP.name();}@Overridepublic void init(CSInitializerConfig config) {this.config = config;vertx = VertxInitializer.buildVertx();httpServer = VertxInitializer.buildHttpServer(vertx);httpClient = VertxInitializer.buildHttpClient(vertx);}@Overridepublic Transporter buildTransporter() {return new VertxTransporter(httpClient);}@Override@SneakyThrowspublic void bindHandlers(List<ActorInfo> actorInfos) {Router router = Router.router(vertx);// 处理请求响应router.route().handler(BodyHandler.create());actorInfos.forEach(actorInfo -> {Optional.ofNullable(actorInfo.getHandlerInfos()).orElse(Collections.emptyList()).forEach(handlerInfo -> {String handlerHttpPath = handlerInfo.getLocation().toPath();ProcessType processType = handlerInfo.getAnno().processType();Handler<RoutingContext> routingContextHandler = buildRequestHandler(actorInfo, handlerInfo);Route route = router.post(handlerHttpPath);if (processType == ProcessType.BLOCKING) {route.blockingHandler(routingContextHandler, false);} else {route.handler(routingContextHandler);}});});// 启动 vertx http serverfinal int port = config.getBindAddress().getPort();final String host = config.getBindAddress().getHost();httpServer.requestHandler(router).exceptionHandler(e -> log.error("[PowerJob] unknown exception in Actor communication!", e)).listen(port, host).toCompletionStage().toCompletableFuture().get(1, TimeUnit.MINUTES);log.info("[PowerJobRemoteEngine] startup vertx HttpServer successfully!");}private Handler<RoutingContext> buildRequestHandler(ActorInfo actorInfo, HandlerInfo handlerInfo) {Method method = handlerInfo.getMethod();Optional<Class<?>> powerSerializeClz = RemoteUtils.findPowerSerialize(method.getParameterTypes());// 内部框架,严格模式,绑定失败直接报错if (!powerSerializeClz.isPresent()) {throw new PowerJobException("can't find any 'PowerSerialize' object in handler args: " + handlerInfo.getLocation());}return ctx -> {final RequestBody body = ctx.body();final Object convertResult = body.asPojo(powerSerializeClz.get());try {Object response = method.invoke(actorInfo.getActor(), convertResult);if (response != null) {if (response instanceof String) {ctx.end((String) response);} else {ctx.json(JsonObject.mapFrom(response));}return;}ctx.end();} catch (Throwable t) {// 注意这里是框架实际运行时,日志输出用标准 PowerJob 格式log.error("[PowerJob] invoke Handler[{}] failed!", handlerInfo.getLocation(), t);ctx.fail(HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), t);}};}@Overridepublic void close() throws IOException {httpClient.close();httpServer.close();vertx.close();}
}
HttpVertxCSInitializer的type类型为HTTP,其init方法主要是通过VertxInitializer.buildVertx()构建vertx,并通过VertxInitializer.buildHttpServer(vertx)构建httpServer,通过VertxInitializer.buildHttpClient(vertx)构建httpClient;其buildTransporter返回的是VertxTransporter;其bindHandlers主要是通过actorInfo去注册vertx的路由及handler;其close方法依次关闭httpClient、httpServer、vertx
VertxTransporter
tech/powerjob/remote/http/vertx/VertxTransporter.java
public class VertxTransporter implements Transporter {private final HttpClient httpClient;private static final Protocol PROTOCOL = new HttpProtocol();public VertxTransporter(HttpClient httpClient) {this.httpClient = httpClient;}@Overridepublic Protocol getProtocol() {return PROTOCOL;}@Overridepublic void tell(URL url, PowerSerializable request) {post(url, request, null);}@Overridepublic <T> CompletionStage<T> ask(URL url, PowerSerializable request, Class<T> clz) throws RemotingException {return post(url, request, clz);}@SuppressWarnings("unchecked")private <T> CompletionStage<T> post(URL url, PowerSerializable request, Class<T> clz) {final String host = url.getAddress().getHost();final int port = url.getAddress().getPort();final String path = url.getLocation().toPath();RequestOptions requestOptions = new RequestOptions().setMethod(HttpMethod.POST).setHost(host).setPort(port).setURI(path);// 获取远程服务器的HTTP连接Future<HttpClientRequest> httpClientRequestFuture = httpClient.request(requestOptions);// 转换 -> 发送请求获取响应Future<HttpClientResponse> responseFuture = httpClientRequestFuture.compose(httpClientRequest ->httpClientRequest.putHeader(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON).send(JsonObject.mapFrom(request).toBuffer()));return responseFuture.compose(httpClientResponse -> {// throw exceptionfinal int statusCode = httpClientResponse.statusCode();if (statusCode != HttpResponseStatus.OK.code()) {// CompletableFuture.get() 时会传递抛出该异常throw new RemotingException(String.format("request [host:%s,port:%s,url:%s] failed, status: %d, msg: %s",host, port, path, statusCode, httpClientResponse.statusMessage()));}return httpClientResponse.body().compose(x -> {if (clz == null) {return Future.succeededFuture(null);}if (clz.equals(String.class)) {return Future.succeededFuture((T) x.toString());}return Future.succeededFuture(x.toJsonObject().mapTo(clz));});}).toCompletionStage();}
}
VertxTransporter的protocol为HttpProtocol,其tell方法使用的是不需要返回值的post,其ask方法也是调用post方法,只不过其设定了返回值类型
小结
PowerJob的RemoteEngine定义了start及close方法,其中start接收EngineConfig参数返回EngineOutput;PowerJobRemoteEngine定义了csInitializer,其start方法先通过ActorFactory.load(engineConfig.getActorList())加载actorInfos,再通过CSInitializerFactory.build(engineType)构建csInitializer,然后执行其init方法;接着通过csInitializer.buildTransporter()构建transporter;最后将actorInfos绑定到csInitializer;其close方法关闭csInitializer。
这篇关于聊聊PowerJob的RemoteEngine的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!