你所不知道的日志异步落库

2023-12-15 07:58
文章标签 日志 异步 知道 落库

本文主要是介绍你所不知道的日志异步落库,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

  点击上方 好好学java ,选择 星标 公众号

重磅资讯、干货,第一时间送达
今日推荐:牛人 20000 字的 Spring Cloud 总结,太硬核了~
作者:程序诗人
出处:https://www.cnblogs.com/scy251147/p/9193075.html

在互联网设计架构过程中,日志异步落库,俨然已经是高并发环节中不可缺少的一环。为什么说是高并发环节中不可缺少的呢?原因在于,如果直接用mq进行日志落库的时候,低并发下,生产端生产数据,然后由消费端异步落库,是没有什么问题的,而且性能也都是异常的好,估计tp99应该都在1ms以内。但是一旦并发增长起来,慢慢的你就发现生产端的tp99一直在增长,从1ms,变为2ms,4ms,直至send timeout。尤其在大促的时候,我司的系统就经历过这个情况,当时mq的发送耗时超过200ms,甚至一度有不少timeout产生。

考虑到这种情况在高并发的情况下才出现,所以今天我们就来探索更加可靠的方法来进行异步日志落库,保证所使用的方式不会因为过高的并发而出现接口ops持续下降甚至到不可用的情况。

方案一:基于log4j的异步appender实现

此种方案,依赖于log4j。在log4j的异步appender中,通过mq进行生产消费入库。相当于在接口和mq之间建立了一个缓冲区,使得接口和mq的依赖分离,从而不让mq的操作影响接口的ops。

此种方案由于使用了异步方式,且由于异步的discard policy策略,当大量数据过来,缓冲区满了之后,会抛弃部分数据。此种方案适用于能够容忍数据丢失的业务场景,不适用于对数据完整有严格要求的业务场景。

来看看具体的实现方式:

首先,我们需要自定义一个Appender,继承自log4j的AppenderSkeleton类,实现方式如下:

public class AsyncJmqAppender extends AppenderSkeleton {@Resource(name = "messageProducer")private MessageProducer messageProducer;@Overrideprotected void append(LoggingEvent loggingEvent) {asyncPushMessage(loggingEvent.getMessage());}/*** 异步调用jmq输出日志* @param message*/private void asyncPushMessage(Object message) {CompletableFuture.runAsync(() -> {Message messageConverted = (Message) message;try {messageProducer.send(messageConverted);} catch (JMQException e) {e.printStackTrace();}});}@Overridepublic boolean requiresLayout() {return false;}@Overridepublic void close() {}
}

然后在log4j.xml中,为此类进行配置:

<!--异步JMQ appender-->
<appender name="async_mq_appender" class="com.jd.limitbuy.common.util.AsyncJmqAppender"><!-- 设置File参数:日志输出文件名 --><param name="File" value="D:/export/Instances/order/server1/logs/order.async.jmq" /><!-- 设置是否在重新启动服务时,在原有日志的基础添加新日志 --><param name="Append" value="true" /><!-- 设置文件大小 --><param name="MaxFileSize" value="10KB" /><!-- 设置文件备份 --><param name="MaxBackupIndex" value="10000" /><!-- 设置输出文件项目和格式 --><layout class="org.apache.log4j.PatternLayout"><param name="ConversionPattern" value="%m%n" /></layout>
</appender>
<logger name="async_mq_appender_logger"><appender-ref ref="async_mq_appender"/>
</logger>

最后就可以按照如下的方式进行正常使用了:

private static Logger logger = LoggerFactory.getLogger("filelog_appender_logger");

注意:此处需要注意log4j的一个性能问题。在log4j的conversionPattern中,匹配符最好不要出现 C% L%通配符,压测实践表明,这两个通配符会导致log4j打日志的效率降低10倍。

方案一很简便,且剥离了接口直接依赖mq导致的性能问题。但是无法解决数据丢失的问题(但是我们其实可以在本地搞个策略落盘来不及处理的数据,可以大大的减少数据丢失的几率)。但是很多的业务场景,是需要数据不丢失的,所以这就衍生出我们的另一套方案来。

方案二:增量消费log4j日志

此种方式,是开启worker在后台增量消费log4j的日志信息,和接口完全脱离。此种方式相比方案一,可以保证数据的不丢失,且可以做到完全不影响接口的ops。但是此种方式,由于是后台worker在后台启动进行扫描,会导致落库的数据慢一些,比如一分钟之后才落库完毕。所以适用于对落库数据实时性不高的场景。

具体的实现步骤如下:

首先,将需要进行增量消费的日志统一打到一个文件夹,以天为单位,每天生成一个带时间戳日志文件。由于log4j不支持直接带时间戳的日志文件生成,所以这里需要引入log4j.extras组件,然后配置log4j.xml如下:

image

之后在代码中的申明方式如下:

private static Logger businessLogger = LoggerFactory.getLogger("file_rolling_logger");

最后在需要记录日志的地方使用方式如下:

businessLogger.error(JsonUtils.toJSONString(myMessage))

这样就可以将日志打印到一个单独的文件中,且按照日期,每天生成一个。

然后,当日志文件生成完毕后,我们就可以开启我们的worker进行增量消费了,这里的增量消费方式,我们选择RandomAccessFile这个类来进行,由于其独特的位点读取方式,可以使得我们非常方便的根据位点的位置来消费增量文件,从而避免了逐行读取这种低效率的实现方式。

注意,为每个日志文件都单独创建了一个位点文件,里面存储了对应的文件的位点读取信息。当worker扫描开始的时候,会首先读取位点文件里面的位点信息,然后找到相应的日志文件,从位点信息位置开始进行消费。这就是整个增量消费worker的核心。具体代码实现如下(代码太长,做了折叠):

/*** @Description: 增量日志扫描worker* @Detail: 此worker主要用来扫描增量日志,日志本身会在不停的插入中,此worker会不停的扫描此日志来将数据上传到kafka集群* @date 2018-04-08 10:30*/
public class LimitBuyScanWorker {/*** 日志和位点文件保存的目录*/private static final String FILE_DIRECTORY = "D:\\export\\Instances\\order\\server1\\logs\\";/*** 每次步进的长度,此处为1000行*/private static final int SCAN_STEP = 1000;/*** 日志文件名前缀*/private static final String LOG_FILE_PREFIX = "limitbuy.soa.order.";/*** 位点文件名后缀*/private static final String OFT_FILE_APPENDIX = ".offset";public void logScanner() {//当前时间Date currentDate = new Date();//今日String currentDay = DateUtil.formatDate("yyyy-MM-dd", currentDate);//今日日志文件路径String currentLogFilePath = FILE_DIRECTORY + LOG_FILE_PREFIX + currentDay;logger.error("今日的日志文件路径:" + currentLogFilePath);//今日位点文件路径String currentOffsetFilePath = FILE_DIRECTORY + LOG_FILE_PREFIX + currentDay + OFT_FILE_APPENDIX;//昨日String yesterDay = DateUtil.formatDate("yyyy-MM-dd", DateUtil.queryPlusDay(currentDate, -1));//昨日日志文件路径String yesterdayLogFilePath = FILE_DIRECTORY + LOG_FILE_PREFIX + yesterDay;logger.error("昨日的日志文件路径:" + yesterdayLogFilePath);//昨日位点文件路径String yesterdayOffsetFilePath = FILE_DIRECTORY + LOG_FILE_PREFIX + yesterDay + OFT_FILE_APPENDIX;//先检测昨日位点和文件体积是否一致,不一致则代表未消费完毕boolean yesterdayConsumedOK = checkIfConsumeOK(yesterdayLogFilePath, yesterdayOffsetFilePath);logger.error("昨日的日志文件已被消费完毕:" + yesterdayConsumedOK);//昨日的文件已扫描完毕if (yesterdayConsumedOK) {//扫描并消费今日增量日志scanAndConsumeLog(currentLogFilePath, currentOffsetFilePath);}//昨日的文件未扫描完毕else {//扫描并消费昨日增量日志scanAndConsumeLog(yesterdayLogFilePath, yesterdayOffsetFilePath);}}/*** 检测日志是否被扫描消费完毕,true:消费完毕;false:未消费完毕* @Description 此举主要防止log4j在零点大促开始的时候,突然的滚动文件造成的部分增量日志不会被消费的问题* @param logFilePath* @param offsetFilePath*/private boolean checkIfConsumeOK(String logFilePath, String offsetFilePath) {try {//打开文件RandomAccessFile randomAccessFile = new RandomAccessFile(logFilePath, "r");//得到当前位点long currentOffset = checkOffset(offsetFilePath);//得到文件总长long currentFileLength = randomAccessFile.length();//比对if (currentOffset >= currentFileLength) {return true;}return false;} catch (FileNotFoundException e) {logger.error("com.jd.limitbuy.service.worker.logScanner 出错(FileNotFoundException):", e);AlarmUtil.alarm("com.jd.limitbuy.service.worker.logScanner 出错:" + e.getMessage());return false;} catch (IOException e) {logger.error("com.jd.limitbuy.service.worker.logScanner 出错(IOException):", e);AlarmUtil.alarm("com.jd.limitbuy.service.worker.logScanner 出错:" + e.getMessage());return false;}}/*** 扫描并消费增量日志* @param logFilePath* @param offsetFilePath*/private void scanAndConsumeLog(String logFilePath, String offsetFilePath) {try {RandomAccessFile randomAccessFile = new RandomAccessFile(logFilePath, "r");//得到当前位点long currentOffset = checkOffset(offsetFilePath);logger.error("开始位点==>" + currentOffset);//重置位点到当前位点if (currentOffset <= randomAccessFile.length()) {randomAccessFile.seek(currentOffset);}//读取@SCAN_STEP行for (long i = currentOffset; i < currentOffset + SCAN_STEP; i++) {//得到行String result = randomAccessFile.readLine();//如果内容不为空if (StringUtil.isNotBlank(result)) {//TODO 逻辑实现}}//读取@SCAN_STEP行之后的位点logger.error("读取" + SCAN_STEP + "行之后位点==>" + randomAccessFile.getFilePointer());//如果update不成功,可以不处理,后面扫描进来重新过一遍即可updateOffset(randomAccessFile.getFilePointer(), offsetFilePath);logger.error("文件总长==>" + randomAccessFile.length());} catch (FileNotFoundException e) {logger.error("com.jd.limitbuy.service.worker.logScanner 出错(FileNotFoundException):", e);AlarmUtil.alarm("com.jd.limitbuy.service.worker.logScanner 出错:" + e.getMessage());} catch (IOException e) {logger.error("com.jd.limitbuy.service.worker.logScanner 出错(IOException):", e);AlarmUtil.alarm("com.jd.limitbuy.service.worker.logScanner 出错:" + e.getMessage());}}/*** 校验位点*     不存在则创建并赋值为0*     已存在则更新位点* @param offsetFilePath* @return* @throws IOException*/private long checkOffset(String offsetFilePath) throws IOException {File offsetFile = new File(offsetFilePath);//如果位点文件不存在,则创建位点文件并返回0if (!offsetFile.exists()) {updateOffset(0, offsetFilePath);return 0;}//如果位点文件存在,则返回位点文件内容else {FileReader fileReader = new FileReader(offsetFilePath);StringBuilder stringBuilder = new StringBuilder();char[] bytesChar = new char[50];fileReader.read(bytesChar);fileReader.close();for (char c : bytesChar) {stringBuilder.append(c);}String filteredOffset = stringBuilder.toString().trim();if (StringUtil.isNotBlank(filteredOffset)) {return Long.parseLong(filteredOffset);} else {return 0;}}}/*** 更新位点信息* @param offset* @param offsetFilePath*/private void updateOffset(long offset, String offsetFilePath) throws IOException {FileWriter fileWriter = new FileWriter(offsetFilePath);fileWriter.write(offset + "");fileWriter.flush();fileWriter.close();}
}

此种方式由于worker扫描是每隔一段时间启动一次进行消费,所以导致数据从产生到入库,可能经历时间超过一分钟以上,但是在一些对数据延迟要求比较高的业务场景,比如库存扣减,是不能容忍的,所以这里我们就引申出第三种做法,基于内存文件队列的异步日志消费。

方案三:基于内存文件队列的异步日志消费

由于方案一和方案二都严重依赖log4j,且方案本身都存在着要么丢数据,要么入库时间长的缺点,所以都并不是那么尽如人意。但是本方案的做法,既解决了数据丢失的问题,又解决了数据入库时间被拉长的尴尬,所以是终极解决之道。而且在大促销过程中,此种方式经历了实战检验,可以大面积的推广使用。

此方案中提到的内存文件队列,是我司自研的一款基于RandomAccessFile和MappedByteBuffer实现的内存文件队列。队列核心使用了ArrayBlockingQueue,并提供了produce方法,进行数据入管道操作,提供了consume方法,进行数据出管道操作。而且后台有一个worker一直启动着,每隔5ms或者遍历了100条数据之后,就将数据落盘一次,以防数据丢失。具体的设计,就这么多,感兴趣的可以根据我提供的信息,自己实践一下。

由于有此中间件的加持,数据生产的时候,只需要入压入管道,然后消费端进行消费即可。未被消费的数据,会进行落盘操作,谨防数据丢失。当大促的时候,大量数据涌来的时候,管道满了的情况下会阻塞接口,数据不会被抛弃。虽然可能会导致接口在那一瞬间无响应,但是由于有落盘操作和消费操作(此操作操控的是JVM堆外内存数据,不受GC的影响,所以不会出现操作暂停的情况,为什么呢?因为用了MappedByteBuffer),此种阻塞并未影响到接口整体的ops。

在实际使用的时候,ArrayBlockingQueue作为核心队列,显然是全局加锁的,后续我们考虑升级为无锁队列,所以将会参考Netty中的有界无锁队列:MpscArrayQueue。预计性能将会再好一些。

受限于公司政策,我仅提供大致思路,但是不会提供具体代码,有问题评论区交流吧。

上面就是在进行异步日志消费的时候,我所经历的三个阶段,并且一步一步的优化到目前的方式。虽然过程曲折,但是结果令人欢欣鼓舞。如果喜欢就给个推荐,后续我将会持续更新你所不知道的系列,以期达到抛砖引玉的效果。

这篇关于你所不知道的日志异步落库的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/495702

相关文章

C++中实现调试日志输出

《C++中实现调试日志输出》在C++编程中,调试日志对于定位问题和优化代码至关重要,本文将介绍几种常用的调试日志输出方法,并教你如何在日志中添加时间戳,希望对大家有所帮助... 目录1. 使用 #ifdef _DEBUG 宏2. 加入时间戳:精确到毫秒3.Windows 和 MFC 中的调试日志方法MFC

SpringBoot如何使用TraceId日志链路追踪

《SpringBoot如何使用TraceId日志链路追踪》文章介绍了如何使用TraceId进行日志链路追踪,通过在日志中添加TraceId关键字,可以将同一次业务调用链上的日志串起来,本文通过实例代码... 目录项目场景:实现步骤1、pom.XML 依赖2、整合logback,打印日志,logback-sp

如何使用celery进行异步处理和定时任务(django)

《如何使用celery进行异步处理和定时任务(django)》文章介绍了Celery的基本概念、安装方法、如何使用Celery进行异步任务处理以及如何设置定时任务,通过Celery,可以在Web应用中... 目录一、celery的作用二、安装celery三、使用celery 异步执行任务四、使用celery

Python使用asyncio实现异步操作的示例

《Python使用asyncio实现异步操作的示例》本文主要介绍了Python使用asyncio实现异步操作的示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋... 目录1. 基础概念2. 实现异步 I/O 的步骤2.1 定义异步函数2.2 使用 await 等待异

Python中的异步:async 和 await以及操作中的事件循环、回调和异常

《Python中的异步:async和await以及操作中的事件循环、回调和异常》在现代编程中,异步操作在处理I/O密集型任务时,可以显著提高程序的性能和响应速度,Python提供了asyn... 目录引言什么是异步操作?python 中的异步编程基础async 和 await 关键字asyncio 模块理论

flume系列之:查看flume系统日志、查看统计flume日志类型、查看flume日志

遍历指定目录下多个文件查找指定内容 服务器系统日志会记录flume相关日志 cat /var/log/messages |grep -i oom 查找系统日志中关于flume的指定日志 import osdef search_string_in_files(directory, search_string):count = 0

我在移动打工的日志

客户:给我搞一下录音 我:不会。不在服务范围。 客户:是不想吧 我:笑嘻嘻(气笑) 客户:小姑娘明明会,却欺负老人 我:笑嘻嘻 客户:那我交话费 我:手机号 客户:给我搞录音 我:不会。不懂。没搞过。 客户:那我交话费 我:手机号。这是电信的啊!!我这是中国移动!! 客户:我不管,我要充话费,充话费是你们的 我:可是这是移动!!中国移动!! 客户:我这是手机号 我:那又如何,这是移动!你是电信!!

js异步提交form表单的解决方案

1.定义异步提交表单的方法 (通用方法) /*** 异步提交form表单* @param options {form:form表单元素,success:执行成功后处理函数}* <span style="color:#ff0000;"><strong>@注意 后台接收参数要解码否则中文会导致乱码 如:URLDecoder.decode(param,"UTF-8")</strong></span>

Detectorn2预训练模型复现:数据准备、训练命令、日志分析与输出目录

Detectorn2预训练模型复现:数据准备、训练命令、日志分析与输出目录 在深度学习项目中,目标检测是一项重要的任务。本文将详细介绍如何使用Detectron2进行目标检测模型的复现训练,涵盖训练数据准备、训练命令、训练日志分析、训练指标以及训练输出目录的各个文件及其作用。特别地,我们将演示在训练过程中出现中断后,如何使用 resume 功能继续训练,并将我们复现的模型与Model Zoo中的

SSM项目使用AOP技术进行日志记录

本步骤只记录完成切面所需的必要代码 本人开发中遇到的问题: 切面一直切不进去,最后发现需要在springMVC的核心配置文件中中开启注解驱动才可以,只在spring的核心配置文件中开启是不会在web项目中生效的。 之后按照下面的代码进行配置,然后前端在访问controller层中的路径时即可观察到日志已经被正常记录到数据库,代码中有部分注释,看不懂的可以参照注释。接下来进入正题 1、导入m