聊聊PowerJob的OmsLogHandler

2023-12-25 02:12

本文主要是介绍聊聊PowerJob的OmsLogHandler,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

本文主要研究一下PowerJob的OmsLogHandler

OmsLogHandler

tech/powerjob/worker/background/OmsLogHandler.java

@Slf4j
public class OmsLogHandler {private final String workerAddress;private final Transporter transporter;private final ServerDiscoveryService serverDiscoveryService;// 处理线程,需要通过线程池启动public final Runnable logSubmitter = new LogSubmitter();// 上报锁,只需要一个线程上报即可private final Lock reportLock = new ReentrantLock();// 生产者消费者模式,异步上传日志private final BlockingQueue<InstanceLogContent> logQueue = Queues.newLinkedBlockingQueue(10240);// 每次上报携带的数据条数private static final int BATCH_SIZE = 20;// 本地囤积阈值private static final int REPORT_SIZE = 1024;public OmsLogHandler(String workerAddress, Transporter transporter, ServerDiscoveryService serverDiscoveryService) {this.workerAddress = workerAddress;this.transporter = transporter;this.serverDiscoveryService = serverDiscoveryService;}/*** 提交日志* @param instanceId 任务实例ID* @param logContent 日志内容*/public void submitLog(long instanceId, LogLevel logLevel, String logContent) {if (logQueue.size() > REPORT_SIZE) {// 线程的生命周期是个不可循环的过程,一个线程对象结束了不能再次start,只能一直创建和销毁new Thread(logSubmitter).start();}InstanceLogContent tuple = new InstanceLogContent(instanceId, System.currentTimeMillis(), logLevel.getV(), logContent);boolean offerRet = logQueue.offer(tuple);if (!offerRet) {log.warn("[OmsLogHandler] [{}] submit log failed, maybe your log speed is too fast!", instanceId);}}//......
}    

OmsLogHandler提供了submitLog方法,它先判断logQueue大小是否超过REPORT_SIZE(1024),超过则通过异步线程执行logSubmitter;接着将内容包装为InstanceLogContent,然后放入到logQueue

LogSubmitter

    private class LogSubmitter implements Runnable {@Overridepublic void run() {boolean lockResult = reportLock.tryLock();if (!lockResult) {return;}try {final String currentServerAddress = serverDiscoveryService.getCurrentServerAddress();// 当前无可用 Serverif (StringUtils.isEmpty(currentServerAddress)) {if (!logQueue.isEmpty()) {logQueue.clear();log.warn("[OmsLogHandler] because there is no available server to report logs which leads to queue accumulation, oms discarded all logs.");}return;}List<InstanceLogContent> logs = Lists.newLinkedList();while (!logQueue.isEmpty()) {try {InstanceLogContent logContent = logQueue.poll(100, TimeUnit.MILLISECONDS);logs.add(logContent);if (logs.size() >= BATCH_SIZE) {WorkerLogReportReq req = new WorkerLogReportReq(workerAddress, Lists.newLinkedList(logs));// 不可靠请求,WEB日志不追求极致TransportUtils.reportLogs(req, currentServerAddress, transporter);logs.clear();}}catch (Exception ignore) {break;}}if (!logs.isEmpty()) {WorkerLogReportReq req = new WorkerLogReportReq(workerAddress, logs);TransportUtils.reportLogs(req, currentServerAddress, transporter);}}finally {reportLock.unlock();}}}

LogSubmitter实现了Runnable接口,其run方法先通过reportLock加锁,成功才继续,它通过serverDiscoveryService.getCurrentServerAddress()获取当前server的地址,若获取不到则清空logQueue;否则while循环,每次从logQueue拉取InstanceLogContent,放到linkedList,超过BATCH_SIZE(20)则创建WorkerLogReportReq,通过TransportUtils.reportLogs(req, currentServerAddress, transporter)上报,然后清空linkedList,跳出循环之后再上报剩下的日志,最后释放锁

reportLogs

tech/powerjob/worker/common/utils/TransportUtils.java

    public static void reportLogs(WorkerLogReportReq req, String address, Transporter transporter) {final URL url = easyBuildUrl(ServerType.SERVER, S4W_PATH, S4W_HANDLER_REPORT_LOG, address);transporter.tell(url, req);}public static URL easyBuildUrl(ServerType serverType, String rootPath, String handlerPath, String address) {HandlerLocation handlerLocation = new HandlerLocation().setRootPath(rootPath).setMethodPath(handlerPath);return new URL().setServerType(serverType).setAddress(Address.fromIpv4(address)).setLocation(handlerLocation);}    

reportLogs先通过easyBuildUrl构建URL,再通过transporter.tell(url, req)发送请求,rootPath为server,handlerPath为reportLog

tell

AkkaTransporter

tech/powerjob/remote/akka/AkkaTransporter.java

    public void tell(URL url, PowerSerializable request) {ActorSelection actorSelection = fetchActorSelection(url);actorSelection.tell(request, null);}

AkkaTransporter直接使用actorSelection发送请求

VertxTransporter

tech/powerjob/remote/http/vertx/VertxTransporter.java

    public void tell(URL url, PowerSerializable request) {post(url, request, null);}private <T> CompletionStage<T> post(URL url, PowerSerializable request, Class<T> clz) {final String host = url.getAddress().getHost();final int port = url.getAddress().getPort();final String path = url.getLocation().toPath();RequestOptions requestOptions = new RequestOptions().setMethod(HttpMethod.POST).setHost(host).setPort(port).setURI(path);// 获取远程服务器的HTTP连接Future<HttpClientRequest> httpClientRequestFuture = httpClient.request(requestOptions);// 转换 -> 发送请求获取响应Future<HttpClientResponse> responseFuture = httpClientRequestFuture.compose(httpClientRequest ->httpClientRequest.putHeader(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON).send(JsonObject.mapFrom(request).toBuffer()));return responseFuture.compose(httpClientResponse -> {// throw exceptionfinal int statusCode = httpClientResponse.statusCode();if (statusCode != HttpResponseStatus.OK.code()) {// CompletableFuture.get() 时会传递抛出该异常throw new RemotingException(String.format("request [host:%s,port:%s,url:%s] failed, status: %d, msg: %s",host, port, path, statusCode, httpClientResponse.statusMessage()));}return httpClientResponse.body().compose(x -> {if (clz == null) {return Future.succeededFuture(null);}if (clz.equals(String.class)) {return Future.succeededFuture((T) x.toString());}return Future.succeededFuture(x.toJsonObject().mapTo(clz));});}).toCompletionStage();}    

VertxTransporter则使用post方法通过vertx的httpClient发送请求

processWorkerLogReport

tech/powerjob/server/core/handler/AbWorkerRequestHandler.java

    @Handler(path = S4W_HANDLER_REPORT_LOG, processType = ProcessType.NO_BLOCKING)public void processWorkerLogReport(WorkerLogReportReq req) {WorkerLogReportEvent event = new WorkerLogReportEvent().setWorkerAddress(req.getWorkerAddress()).setLogNum(req.getInstanceLogContents().size());try {processWorkerLogReport0(req, event);event.setStatus(WorkerLogReportEvent.Status.SUCCESS);} catch (RejectedExecutionException re) {event.setStatus(WorkerLogReportEvent.Status.REJECTED);} catch (Throwable t) {event.setStatus(WorkerLogReportEvent.Status.EXCEPTION);log.warn("[WorkerRequestHandler] process worker report failed!", t);} finally {monitorService.monitor(event);}}

processWorkerLogReport通过processWorkerLogReport0进行处理,最后通过monitorService.monitor(event)上报监控

processWorkerLogReport0

tech/powerjob/server/core/handler/WorkerRequestHandlerImpl.java

    @Overrideprotected void processWorkerLogReport0(WorkerLogReportReq req, WorkerLogReportEvent event) {// 这个效率应该不会拉垮吧...也就是一些判断 + Map#get 吧...instanceLogService.submitLogs(req.getWorkerAddress(), req.getInstanceLogContents());}

processWorkerLogReport0通过instanceLogService.submitLogs进行上报

submitLogs

tech/powerjob/server/core/instance/InstanceLogService.java

    /*** 提交日志记录,持久化到本地数据库中* @param workerAddress 上报机器地址* @param logs 任务实例运行时日志*/@Async(value = PJThreadPool.LOCAL_DB_POOL)public void submitLogs(String workerAddress, List<InstanceLogContent> logs) {List<LocalInstanceLogDO> logList = logs.stream().map(x -> {instanceId2LastReportTime.put(x.getInstanceId(), System.currentTimeMillis());LocalInstanceLogDO y = new LocalInstanceLogDO();BeanUtils.copyProperties(x, y);y.setWorkerAddress(workerAddress);return y;}).collect(Collectors.toList());try {CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.saveAll(logList));}catch (Exception e) {log.warn("[InstanceLogService] persistent instance logs failed, these logs will be dropped: {}.", logs, e);}}

InstanceLogService通过PJThreadPool.LOCAL_DB_POOL线程池进行异步,它通过localInstanceLogRepository.saveAll(logList)保存到本地数据库

monitor

tech/powerjob/server/monitor/PowerJobMonitorService.java

    public void monitor(Event event) {monitors.forEach(m -> m.record(event));}

monitor方法遍历monitors,挨个执行record

LogMonitor

tech/powerjob/server/monitor/monitors/LogMonitor.java

    public void record(Event event) {MDC.put(MDC_KEY_SERVER_ID, String.valueOf(serverInfo.getId()));LoggerFactory.getLogger(event.type()).info(event.message());}

LogMonitor的record方法通过日志打印event信息

小结

PowerJob的OmsLogHandler提供了submitLog方法,它先判断logQueue大小是否超过REPORT_SIZE(1024),超过则通过异步线程执行logSubmitter;接着将内容包装为InstanceLogContent,然后放入到logQueue;logSubmitter主要是执行reportLogs,它先通过easyBuildUrl构建URL,再通过transporter.tell(url, req)发送请求,rootPath为server,handlerPath为reportLog;服务端的processWorkerLogReport通过processWorkerLogReport0进行处理(通过localInstanceLogRepository.saveAll(logList)保存到本地数据库),最后通过monitorService.monitor(event)上报监控。

这篇关于聊聊PowerJob的OmsLogHandler的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/533939

相关文章

聊聊说话的习惯

1 在日常生活中,每个人都有固定的说话习惯。心理学研究表明,通过一个人的说话习惯,也可以分析出他的性格特点。对于每一个人来讲,说话习惯已经融为他们生活中的一部分。在社交活动中,一些不良的说话习惯很可能会给他们带来麻烦。因此,了解说话习惯对心理活动的影响是十分有必要的。 2 具有顺畅的说话习惯的人,大多思路清晰、语速适中、用词准确并且声声人耳,是典型的顺畅型说话方式这种类型的人要么不说话,要么

聊聊分布式,再讨论分布式解决方案

前言 最近很久没有写博客了,一方面是因为公司事情最近比较忙,另外一方面是因为在进行 CAP 的下一阶段的开发工作,不过目前已经告一段落了。 接下来还是开始我们今天的话题,说说分布式事务,或者说是我眼中的分布式事务,因为每个人可能对其的理解都不一样。 分布式事务是企业集成中的一个技术难点,也是每一个分布式系统架构中都会涉及到的一个东西,特别是在微服务架构中,几乎可以说是无法避免,本文就分布式事

聊聊资源调度

资源调度 般分为两个阶段: 是实现物理资源的虚拟化(即资源的抽象)于当前机器的性能越来越好,硬件配置越来越高,直接用物理机跑业务比较浪费,所以将物理机分割成更小单位的虚拟机,这样可以显著提升机器的利用效率,在公司内部一般采用容器技术来隔离资源 是将资源虚拟化后进 步在时间和空间上实现更细粒度的编排 ,优化资源的使用。 1 .一些数据 如果公司的几万台机器都是物理机,那么资源的使用率稍低: CP

聊聊Spark中的宽依赖和窄依赖

开门见山,本文就针对一个点,谈谈Spark中的宽依赖和窄依赖,这是Spark计算引擎划分Stage的根源所在,遇到宽依赖,则划分为多个stage,针对每个Stage,提交一个TaskSet: 上图:一张网上的图: 基于此图,分析下这里为什么前面的流程都是窄依赖,而后面的却是宽依赖: 我们仔细看看,map和filter算子中,对于父RDD来说,一个分区内的数据,有且仅有一个子RDD的分区来

聊聊灰度发布

有没有在北京面试java的小伙伴,每家公司面试问的问题都不一样,昨天面试官问到了灰度发布,一脸懵,好像在哪儿听说过,毕竟我都没发布过,之前都是项目组长在干这些事儿,所以聊聊,了解一下 什么是灰度发布 全量发布:把旧服务kill掉,把新服务启动,这个过程就可以理解为全量发布 回滚周期长 如果我们更新完应用之后,我们做线上回归测试的时候发现有BUG,这个时候就要做回滚,过程就是把新服

聊聊随机测试和猴子测试

目录 随机测试的特点 1.不可预测性 2.缺乏针对性 3.自动化 4.资源密集型 猴子测试 随机测试 (Random Testing) 猴子测试 (Monkey Testing) 特点: 区别 1.控制程度 2.目标差异 3.实现方式 在我们测试的过程中,通常会使用到随机测试和猴子测试,其中随机测试侧重于人工测试,猴子测试侧重于借助工具执行命令进行测试。 随机测试

【聊聊经济社会】论阶级跨越

为什么要在市场中寻求自由,在市场中寻求洒脱,原因不胜其数,其中便有一条,现实生活中多是xx,可能社会属性本身就具备党同伐异,像是一股意志,平庸一切不平庸,中和一切特立独行,最终以达到一种变态的稳定. 消其意志,断其未来,耗其钱财 ,而我称之为阶级壁垒 阶级之所以难以跨越,主要也在于这三点 一:没有这样的志向,像那种羡慕有钱,或者羡慕有权,权当做梦。这样的志向,正常人只停留于羡慕的层次,而一旦受到丁

聊聊PC端页面适配

聊聊PC端页面适配  目也pc端有适配的需求:目前我们pc项目的设计稿尺寸是宽度1920,高度最小是1080。 适配目标: 1.在不同分辨率的电脑上,网页可以正常显示 2.放大或者缩小屏幕,网页可以正常显示 对于宽度的适配   对于宽度适配: 首先设置html,body{width:100%;overflow-x:hidden;} 然后我们可以把页面分解为背景层(

来聊聊我用go手写redis这件事

写在文章开头 网上有看过一些实现redis的项目,要么完全脱离go语言的理念,要么又完全去迎合c的实现理念,也不是说这些项目写的不好,只能说不符合笔者所认为的那种"平衡",于是整理了一段时间的设计稿,自己尝试着用go语言写了一版"有redis味道"的mini-redis。 截至目前,笔者已经完成了redis服务端和客户端交互的基本通信架构和实现基调,如下所示,可以看到笔者已经实现了ping

供应链劫持?聊聊什么是RepoJacking

介绍        近几个月来,对开源存储库的主要威胁就包括存储仓库劫持,通常称为RepoJacking。RepoJacking 是指恶意攻击者通过一定手段接管托管存储库的所有权或维护者的账户。通过获取对账户的访问权限,攻击者可以将恶意代码注入到使用对应仓库作为依赖项的项目中。 RepoJacking 如何攻击?        存储库攻击,也称为供应链攻击,通常利用 GitH