本文主要是介绍聊聊PowerJob的OmsLogHandler,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
序
本文主要研究一下PowerJob的OmsLogHandler
OmsLogHandler
tech/powerjob/worker/background/OmsLogHandler.java
@Slf4j
public class OmsLogHandler {private final String workerAddress;private final Transporter transporter;private final ServerDiscoveryService serverDiscoveryService;// 处理线程,需要通过线程池启动public final Runnable logSubmitter = new LogSubmitter();// 上报锁,只需要一个线程上报即可private final Lock reportLock = new ReentrantLock();// 生产者消费者模式,异步上传日志private final BlockingQueue<InstanceLogContent> logQueue = Queues.newLinkedBlockingQueue(10240);// 每次上报携带的数据条数private static final int BATCH_SIZE = 20;// 本地囤积阈值private static final int REPORT_SIZE = 1024;public OmsLogHandler(String workerAddress, Transporter transporter, ServerDiscoveryService serverDiscoveryService) {this.workerAddress = workerAddress;this.transporter = transporter;this.serverDiscoveryService = serverDiscoveryService;}/*** 提交日志* @param instanceId 任务实例ID* @param logContent 日志内容*/public void submitLog(long instanceId, LogLevel logLevel, String logContent) {if (logQueue.size() > REPORT_SIZE) {// 线程的生命周期是个不可循环的过程,一个线程对象结束了不能再次start,只能一直创建和销毁new Thread(logSubmitter).start();}InstanceLogContent tuple = new InstanceLogContent(instanceId, System.currentTimeMillis(), logLevel.getV(), logContent);boolean offerRet = logQueue.offer(tuple);if (!offerRet) {log.warn("[OmsLogHandler] [{}] submit log failed, maybe your log speed is too fast!", instanceId);}}//......
}
OmsLogHandler提供了submitLog方法,它先判断logQueue大小是否超过REPORT_SIZE(
1024
),超过则通过异步线程执行logSubmitter;接着将内容包装为InstanceLogContent,然后放入到logQueue
LogSubmitter
private class LogSubmitter implements Runnable {@Overridepublic void run() {boolean lockResult = reportLock.tryLock();if (!lockResult) {return;}try {final String currentServerAddress = serverDiscoveryService.getCurrentServerAddress();// 当前无可用 Serverif (StringUtils.isEmpty(currentServerAddress)) {if (!logQueue.isEmpty()) {logQueue.clear();log.warn("[OmsLogHandler] because there is no available server to report logs which leads to queue accumulation, oms discarded all logs.");}return;}List<InstanceLogContent> logs = Lists.newLinkedList();while (!logQueue.isEmpty()) {try {InstanceLogContent logContent = logQueue.poll(100, TimeUnit.MILLISECONDS);logs.add(logContent);if (logs.size() >= BATCH_SIZE) {WorkerLogReportReq req = new WorkerLogReportReq(workerAddress, Lists.newLinkedList(logs));// 不可靠请求,WEB日志不追求极致TransportUtils.reportLogs(req, currentServerAddress, transporter);logs.clear();}}catch (Exception ignore) {break;}}if (!logs.isEmpty()) {WorkerLogReportReq req = new WorkerLogReportReq(workerAddress, logs);TransportUtils.reportLogs(req, currentServerAddress, transporter);}}finally {reportLock.unlock();}}}
LogSubmitter实现了Runnable接口,其run方法先通过reportLock加锁,成功才继续,它通过serverDiscoveryService.getCurrentServerAddress()获取当前server的地址,若获取不到则清空logQueue;否则while循环,每次从logQueue拉取InstanceLogContent,放到linkedList,超过BATCH_SIZE(
20
)则创建WorkerLogReportReq,通过TransportUtils.reportLogs(req, currentServerAddress, transporter)上报,然后清空linkedList,跳出循环之后再上报剩下的日志,最后释放锁
reportLogs
tech/powerjob/worker/common/utils/TransportUtils.java
public static void reportLogs(WorkerLogReportReq req, String address, Transporter transporter) {final URL url = easyBuildUrl(ServerType.SERVER, S4W_PATH, S4W_HANDLER_REPORT_LOG, address);transporter.tell(url, req);}public static URL easyBuildUrl(ServerType serverType, String rootPath, String handlerPath, String address) {HandlerLocation handlerLocation = new HandlerLocation().setRootPath(rootPath).setMethodPath(handlerPath);return new URL().setServerType(serverType).setAddress(Address.fromIpv4(address)).setLocation(handlerLocation);}
reportLogs先通过easyBuildUrl构建URL,再通过transporter.tell(url, req)发送请求,rootPath为server,handlerPath为reportLog
tell
AkkaTransporter
tech/powerjob/remote/akka/AkkaTransporter.java
public void tell(URL url, PowerSerializable request) {ActorSelection actorSelection = fetchActorSelection(url);actorSelection.tell(request, null);}
AkkaTransporter直接使用actorSelection发送请求
VertxTransporter
tech/powerjob/remote/http/vertx/VertxTransporter.java
public void tell(URL url, PowerSerializable request) {post(url, request, null);}private <T> CompletionStage<T> post(URL url, PowerSerializable request, Class<T> clz) {final String host = url.getAddress().getHost();final int port = url.getAddress().getPort();final String path = url.getLocation().toPath();RequestOptions requestOptions = new RequestOptions().setMethod(HttpMethod.POST).setHost(host).setPort(port).setURI(path);// 获取远程服务器的HTTP连接Future<HttpClientRequest> httpClientRequestFuture = httpClient.request(requestOptions);// 转换 -> 发送请求获取响应Future<HttpClientResponse> responseFuture = httpClientRequestFuture.compose(httpClientRequest ->httpClientRequest.putHeader(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON).send(JsonObject.mapFrom(request).toBuffer()));return responseFuture.compose(httpClientResponse -> {// throw exceptionfinal int statusCode = httpClientResponse.statusCode();if (statusCode != HttpResponseStatus.OK.code()) {// CompletableFuture.get() 时会传递抛出该异常throw new RemotingException(String.format("request [host:%s,port:%s,url:%s] failed, status: %d, msg: %s",host, port, path, statusCode, httpClientResponse.statusMessage()));}return httpClientResponse.body().compose(x -> {if (clz == null) {return Future.succeededFuture(null);}if (clz.equals(String.class)) {return Future.succeededFuture((T) x.toString());}return Future.succeededFuture(x.toJsonObject().mapTo(clz));});}).toCompletionStage();}
VertxTransporter则使用post方法通过vertx的httpClient发送请求
processWorkerLogReport
tech/powerjob/server/core/handler/AbWorkerRequestHandler.java
@Handler(path = S4W_HANDLER_REPORT_LOG, processType = ProcessType.NO_BLOCKING)public void processWorkerLogReport(WorkerLogReportReq req) {WorkerLogReportEvent event = new WorkerLogReportEvent().setWorkerAddress(req.getWorkerAddress()).setLogNum(req.getInstanceLogContents().size());try {processWorkerLogReport0(req, event);event.setStatus(WorkerLogReportEvent.Status.SUCCESS);} catch (RejectedExecutionException re) {event.setStatus(WorkerLogReportEvent.Status.REJECTED);} catch (Throwable t) {event.setStatus(WorkerLogReportEvent.Status.EXCEPTION);log.warn("[WorkerRequestHandler] process worker report failed!", t);} finally {monitorService.monitor(event);}}
processWorkerLogReport通过processWorkerLogReport0进行处理,最后通过monitorService.monitor(event)上报监控
processWorkerLogReport0
tech/powerjob/server/core/handler/WorkerRequestHandlerImpl.java
@Overrideprotected void processWorkerLogReport0(WorkerLogReportReq req, WorkerLogReportEvent event) {// 这个效率应该不会拉垮吧...也就是一些判断 + Map#get 吧...instanceLogService.submitLogs(req.getWorkerAddress(), req.getInstanceLogContents());}
processWorkerLogReport0通过instanceLogService.submitLogs进行上报
submitLogs
tech/powerjob/server/core/instance/InstanceLogService.java
/*** 提交日志记录,持久化到本地数据库中* @param workerAddress 上报机器地址* @param logs 任务实例运行时日志*/@Async(value = PJThreadPool.LOCAL_DB_POOL)public void submitLogs(String workerAddress, List<InstanceLogContent> logs) {List<LocalInstanceLogDO> logList = logs.stream().map(x -> {instanceId2LastReportTime.put(x.getInstanceId(), System.currentTimeMillis());LocalInstanceLogDO y = new LocalInstanceLogDO();BeanUtils.copyProperties(x, y);y.setWorkerAddress(workerAddress);return y;}).collect(Collectors.toList());try {CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.saveAll(logList));}catch (Exception e) {log.warn("[InstanceLogService] persistent instance logs failed, these logs will be dropped: {}.", logs, e);}}
InstanceLogService通过PJThreadPool.LOCAL_DB_POOL线程池进行异步,它通过localInstanceLogRepository.saveAll(logList)保存到本地数据库
monitor
tech/powerjob/server/monitor/PowerJobMonitorService.java
public void monitor(Event event) {monitors.forEach(m -> m.record(event));}
monitor方法遍历monitors,挨个执行record
LogMonitor
tech/powerjob/server/monitor/monitors/LogMonitor.java
public void record(Event event) {MDC.put(MDC_KEY_SERVER_ID, String.valueOf(serverInfo.getId()));LoggerFactory.getLogger(event.type()).info(event.message());}
LogMonitor的record方法通过日志打印event信息
小结
PowerJob的OmsLogHandler提供了submitLog方法,它先判断logQueue大小是否超过REPORT_SIZE(1024
),超过则通过异步线程执行logSubmitter;接着将内容包装为InstanceLogContent,然后放入到logQueue;logSubmitter主要是执行reportLogs,它先通过easyBuildUrl构建URL,再通过transporter.tell(url, req)发送请求,rootPath为server,handlerPath为reportLog;服务端的processWorkerLogReport通过processWorkerLogReport0进行处理(通过localInstanceLogRepository.saveAll(logList)保存到本地数据库
),最后通过monitorService.monitor(event)上报监控。
这篇关于聊聊PowerJob的OmsLogHandler的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!