Flume-NG源码阅读之FileChannel

2024-05-03 23:32

本文主要是介绍Flume-NG源码阅读之FileChannel,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

FileChannel是flume一个非常重要的channel组件,非常常用。这个channel非常复杂,涉及的文件更多涉及三个包:org.apache.flume.channel.file、org.apache.flume.channel.file.encryption(加密)、org.apache.flume.channel.file.proto共计40个源码文件。


一、configure(Context context)方法:

1、首先获取配置文件中的checkpointDir和dataDirs属性,这是存放检查点和数据的目录,默认使用$user.home/.flume/file-channel/checkpoint和$user.home/.flume/file-channel/data来;checkpointDir是一个目录,而dataDirs可以是多个以“,”分割;且这两个目录最好不要来回修改,因为里面存储着数据呢;

2、获取容量capacity,并做一些检查比如是否<0,是否是动态加载(有无变化?),默认1000000;这指的是checkpoint文件存放event信息的最大容量。

3、keepAlive超时时间,就是如果channel中没有数据最长等待时间,默认3s;

4、transactionCapacity事务的最大容量,默认1000;

5、checkpointInterval检查点写入间隔,默认30000ms;

6、maxFileSize,data文件大小的上限,用户设置的和1623195647 之间较小的那个;

7、最少需要多少空间minimumRequiredSpace,max((用户配置的,500M),1M);

8、useLogReplayV1,默认false;

9、useFastReplay,默认false;

10、encryptionActiveKey,加密密钥别名,默认为null;

11、encryptionContext加密配置信息;

12、encryptionCipherProvider加密密码提供者,缺省值为null;

13、encryptionKeyProviderName,加密密钥提供者,缺省值为null;

14、queueRemaining,队列是否有剩余空间信号量,初始化容量为capacity;

15、设置Log log的检查间隔checkpointInterval和maxFileSize最大文件大小。

16、是否新建一个计数器channelCounter。


二、start()方法

1、通过Log.Builder()构建了一个builder对象,并设置了相应的参数,然后log = builder.build(),Log的构造方法会对checkpointDir及logDirs尝试获取锁操作,所以如果存在多个file channel则checkpointDir及logDirs最好配置在多个磁盘下或者多个目录下,否则只能一个获得初始化;Log用来将封装好的FlumeEvent写入磁盘并将指向这些event的指针存入一个内存队列queue。会创建一个线程工作内容就是每隔checkpointInterval毫秒,默认30s写一次检查点log.writeCheckpoint(),会将checpoint、inflightTakes、inflightPuts都刷新至磁盘,会先后将inflightPuts、inflightTakes、checkpoint.meta重建,更新checkpoint文件并刷新至磁盘,这些文件都在checkpointDir目录下;更新log-ID.meta文件;同时肩负起删除log文件及其对应的meta文件的责任。

2、log.replay(),一旦一个Log对象被创建,则需要调用replay()方法使用queue最新的检查点来调整磁盘上的write ahead log。会获取最大的fileID;然后读取log文件根据record的类型进行相应的操作,进行恢复;遍历所有的data目录,然后roll(index)创建LogFile.Writer(空的);然后将queue刷新到相关文件。

3、 open = true,表示channel打开;

4、depth = getDepth(),FlumeEventQueue的大小,然后需要判断一下queueRemaining是否有足够的剩余量queueRemaining.tryAcquire(depth);

5、如果open==true,计数器开始工作。


三、createTransaction()方法主要是构造一个FileBackedTransaction对象用来直接操作channel,并返回。


四、stop()停止channel,清理数据。


五、静态类FileBackedTransaction extends BasicTransactionSemantics。
类似可参考memory channel,必须要实现doTake()、doCommit()、doRollback()、doPut()四个方法。put和take不能同时操作。

1、doPut(Event event)方法,该方法source会通过transaction.put()方法调用。检查LinkedBlockingDeque<FlumeEventPointer> putList是否有剩余空间(putList.remainingCapacity() == 0);检查queue是否有剩余空间,如果没有则等待keepAlive秒(!queueRemaining.tryAcquire(keepAlive, TimeUnit.SECONDS)),获取一个许可;获取log的共享锁;FlumeEventPointer ptr = log.put(transactionID, event)将event写入数据文件(是RandomAccessFile),transactionID指的是每次事务的编号,且都是在上一次基础之上加一而来,每个event都要调用put(Event)方法,该方法会获取要写入的数据文件的目录(配置文件中可以配置多个data文件目录,这里会依据transactionID轮训式的向所有的目录写数据),由于start()方法中有log.replay()方法,该方法会遍历所有的data目录并roll(index)创建LogFile.Writer,logFiles.get(logFileIndex).getUsableSpace()不会为0,检查是否剩余空间够,然后获取transactionID对应文件的写LogFile.Writer(其实是其子类LogFileV3.Writer),如果没有则调用方法 roll(logFileIndex, buffer)创建一个LogFileV3.Writer,放入logFiles(这个维持着每个data目录对应着一个正在活动的可以用于写的文件)可能会根据buffer的大小滚动文件因为单个文件有最大限制,LogFileV3.Writer的构造方法会在这个data目录创建一个meta文件,写入一些基本数据,FlumeEventPointer ptr = logFiles.get(logFileIndex).put(buffer)会把event封装成的ByteBuffer,通过LogFile.Writer.write(buffer)方法写入磁盘文件(buffer会再次被封装,最终封装成的格式顺序是:OP_RECORD、buffer.limit()、buffer。buffer的内容是(RecordType(这是1)、TransactionID、LogWriteOrderID、event.getHeaders、event.getBody())),返回Pair.of(getLogFileID(), offset);ptr由两部分组成:fileID和在这个文件中的偏移量offset。fileID就是Log.nextFileID,不管有几个data目录,始终根据这个变量设置文件编号(文件名的后面的编号);putList.offer(ptr)加入到putList之中;queue.addWithoutCommit(ptr, transactionID)将这个方法在每个put操作中必须要被调用,确保检查点之后事务的提交,它会调用inflightPuts.addEvent(transactionID, e.toLong())此时e.toLong()=fileID+offset,addEvent方法会执行inflightEvents.put(transactionID, pointer)、inflightFileIDs.put(transactionID,FlumeEventPointer.fromLong(pointer).getFileID())、syncRequired = true,inflightEvents中存放的就是<transactionID,fileID+offset>而inflightFileIDs存放的是<transactionID,fileID>;success = true表示put成功;log.unlockShared()解除共享锁。put事件并未刷新至磁盘文件,因为并没有commit,commit操作会导致刷新至磁盘的操作。queue中累计的数量不能超过capacity,超过就会等待一定时间后异常。

2、doTake()方法,该方法sink会通过transaction.take()方法调用。检查LinkedBlockingDeque<FlumeEventPointer> takeList时刻有剩余空间;然后获取共享锁;ptr = queue.removeHead(transactionID)取出头部的数据,ptr的内容<fileID, offset>,头部是逻辑上的0位置,但是物理上的头位置会一边take一边变化,是从checkpoint中取出的数据,期间会inflightTakes.addEvent(transactionID, value)将数据缓存在inflightTakes之中;然后放入takeList.offer(ptr);log.take(transactionID, ptr)会封装数据(buffer会再次被封装,最终封装成的格式顺序 是:OP_RECORD、buffer.limit()、buffer。buffer的内容是(RecordType(这是1)、 TransactionID、LogWriteOrderID、fileID、offset))然后写入缓存等待commit刷新至磁盘;event = log.get(ptr)是从data文件中取出数据event;最后释放共享锁。take操作queue.removeHead(transactionID)会从overwriteMap或者内存映射elementsBuffer中取出对应的head位置数据。

3、doCommit()方法,该方法source和sink会通过transaction.commit()方法调用。首先获取takeList和putList的大小;putList和takeList不能同时都>0,其中有一个得是==0;如果putList>0,获取共享锁,log.commitPut(transactionID)会调用Log.commit(long transactionID, short type)方法把commit操作封装成一个ByteBuffer buffer(最终封装成的格式顺序是:OP_RECORD、 buffer.limit()、buffer。buffer的内容是(RecordType(这是4)、TransactionID、 LogWriteOrderID、type))写入数据文件,并刷新至磁盘文件,此刷新也会将这次的put或者take中的所有事件写入磁盘文件;然后是将所有putList中的数据放入queue中queue.addTail(putList.removeFirst())当添加第一个时会从checkpointfile的最后一个位置开始先写入overwriteMap(逻辑位置转为物理位置)中,后续会再从0开始循环写入overwriteMap,take操作也会从最后一个位置取会先检查overwriteMap中有无对应的数据,没有就再检查checkpoint的内存映射elementsBuffer中有无(控制take位置的是head位置,控制put位置的是size),每次更新检查点时都会把overwriteMap写入内存映射elementsBuffer中并刷新至磁盘文件checkpoint;queue.completeTransaction(transactionID)会执行清除操作即如果inflightPuts和inflightTakes执行其一,如果inflightPuts包含transactionID则清空inflightPuts,否则清空inflightTakes;然后解锁。如果takes>0,则获取共享锁,log.commitTake(transactionID)会进行封装写入data文件,commit中的类型标记除了自己的表示还有要提交类型的标记这里是TAKE,上面是PUT;queue.completeTransaction(transactionID)和上面的一样;解除共享锁;queueRemaining.release(takes)释放。最后将putList和takeList清空。

4、doRollback()方法,该方法source和sink会通过transaction.rollback()方法调用。首先会获取takeList和putList的大小;然后获取共享锁;如果takes>0,

并且puts==0,将putList中的所有有数据queue.addHead(takeList.removeLast()),addHead操作和声明的addTail操作相似,只不过是要在调用add(int index, long value)

方法时index是0,会插入到第一个位置;清空putList、takeList;queue.completeTransaction(transactionID)上面已经讲过;log.rollback(transactionID)会调用Log.rollback(long transactionID, short type)方法把commit操作封装成一个ByteBuffer buffer(最 终封装成的格式顺序是:OP_RECORD、 buffer.limit()、buffer。buffer的内容是(RecordType(这是3)、TransactionID、 LogWriteOrderID))写入缓存中;释放共享锁;queueRemaining.release(puts)释放许可。(这一段的格式乱了,这编辑器,我屮艸芔茻!!无语了。。。不自动换行了。。手动换的行。)


1中的put操作将写入log文件的指针添加进了缓存putList中;2中的take操作从缓存中的取出指针,然如takeList中,然后写入log文件,从log文件中获取数据还原为event;3中的commit操作无论是对put的还是对take的都会讲commit信息写入log文件,都会清理queue中的缓存(inflightPuts和inflightTakes),如果对put还要将putList中的所有数据添加进queue的队尾,实际上是overwriteMap中,如果是对take则要释放queueRemaining的takes个许可量,还要清空putList、takeList;4中的rollback操作会将takeList中所有数据放入queue的头部,再清空putList、takeList,再清空queue中的缓存(inflightPuts和inflightTakes),将rollback信息写入log,要释放queueRemaining的puts个许可量。


ps:
  1、data/log-ID,这种类型的文件存放的是put、take、commit、rollback的操作记录及数据。

  2、checkpoint/checkpoint存放的是event在那个data文件logFileID,的什么位置offset等信息。

  2、checkpoint/inflightTakes存放的是事务take的缓存数据,每隔段时间就重建文件。内容:1、16字节是校验码;2
transactionID1+eventsCount1+eventPointer11+eventPointer12+...;3、transactionID2+eventsCount2+eventPointer21+eventPointer22+...
  
     3、checkpoint/inflightPuts存放的是事务对应的put缓存数据,每隔段时间就重建文件。内容:1、16字节是校验码;2、transactionID1+eventsCount1+eventPointer11+eventPointer12+...;3、transactionID2+eventsCount2+eventPointer21+eventPointer22+...

  4、checkpoint/checkpoint.meta主要存储的是logfileID及对应event的数量等信息。

  5、data/log-ID.meta,主要记录log-ID下一个写入位置以及logWriteOrderID等信息。

   6、每个data目录里data文件保持不超过2个。

  7、putList和takeList是缓存存储的是相应的FlumeEventPointer,但是inflightTakes和inflightPuts其实也是缓存存储的也是相应的信息,只不过比两者多存一些信息罢了,功能重合度很高,为什么会这样呢?我想是一个只能在内存,一个可以永久存储(当然是不断重建的),后者可以用来进行flume再启动的恢复。

  file channel太过复杂了,比配置的文件的加载复杂更多,涉及的知识非常多,还不能一下子就消耗了。。。。大体的已经了解了,剩下的都是细节!!后续会再慢慢咀嚼!!争取吃透file。

这篇关于Flume-NG源码阅读之FileChannel的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/957888

相关文章

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL

如何在Visual Studio中调试.NET源码

今天偶然在看别人代码时,发现在他的代码里使用了Any判断List<T>是否为空。 我一般的做法是先判断是否为null,再判断Count。 看了一下Count的源码如下: 1 [__DynamicallyInvokable]2 public int Count3 {4 [__DynamicallyInvokable]5 get

工厂ERP管理系统实现源码(JAVA)

工厂进销存管理系统是一个集采购管理、仓库管理、生产管理和销售管理于一体的综合解决方案。该系统旨在帮助企业优化流程、提高效率、降低成本,并实时掌握各环节的运营状况。 在采购管理方面,系统能够处理采购订单、供应商管理和采购入库等流程,确保采购过程的透明和高效。仓库管理方面,实现库存的精准管理,包括入库、出库、盘点等操作,确保库存数据的准确性和实时性。 生产管理模块则涵盖了生产计划制定、物料需求计划、

flume系列之:查看flume系统日志、查看统计flume日志类型、查看flume日志

遍历指定目录下多个文件查找指定内容 服务器系统日志会记录flume相关日志 cat /var/log/messages |grep -i oom 查找系统日志中关于flume的指定日志 import osdef search_string_in_files(directory, search_string):count = 0

论文阅读笔记: Segment Anything

文章目录 Segment Anything摘要引言任务模型数据引擎数据集负责任的人工智能 Segment Anything Model图像编码器提示编码器mask解码器解决歧义损失和训练 Segment Anything 论文地址: https://arxiv.org/abs/2304.02643 代码地址:https://github.com/facebookresear

Spring 源码解读:自定义实现Bean定义的注册与解析

引言 在Spring框架中,Bean的注册与解析是整个依赖注入流程的核心步骤。通过Bean定义,Spring容器知道如何创建、配置和管理每个Bean实例。本篇文章将通过实现一个简化版的Bean定义注册与解析机制,帮助你理解Spring框架背后的设计逻辑。我们还将对比Spring中的BeanDefinition和BeanDefinitionRegistry,以全面掌握Bean注册和解析的核心原理。

音视频入门基础:WAV专题(10)——FFmpeg源码中计算WAV音频文件每个packet的pts、dts的实现

一、引言 从文章《音视频入门基础:WAV专题(6)——通过FFprobe显示WAV音频文件每个数据包的信息》中我们可以知道,通过FFprobe命令可以打印WAV音频文件每个packet(也称为数据包或多媒体包)的信息,这些信息包含该packet的pts、dts: 打印出来的“pts”实际是AVPacket结构体中的成员变量pts,是以AVStream->time_base为单位的显

kubelet组件的启动流程源码分析

概述 摘要: 本文将总结kubelet的作用以及原理,在有一定基础认识的前提下,通过阅读kubelet源码,对kubelet组件的启动流程进行分析。 正文 kubelet的作用 这里对kubelet的作用做一个简单总结。 节点管理 节点的注册 节点状态更新 容器管理(pod生命周期管理) 监听apiserver的容器事件 容器的创建、删除(CRI) 容器的网络的创建与删除

软件架构模式:5 分钟阅读

原文: https://orkhanscience.medium.com/software-architecture-patterns-5-mins-read-e9e3c8eb47d2 软件架构模式:5 分钟阅读 当有人潜入软件工程世界时,有一天他需要学习软件架构模式的基础知识。当我刚接触编码时,我不知道从哪里获得简要介绍现有架构模式的资源,这样它就不会太详细和混乱,而是非常抽象和易