本文主要是介绍Logstash【从无到有从有到无】【L10】数据弹性(Data Resiliency),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
目录
1.数据弹性(Data Resiliency)
2.持久队列(Persistent Queues)
2.1.持久队列的限制
2.2.持久队列如何工作
2.3.配置持久队列
2.4.处理背压(Handing Back Pressure)
2.5.控制耐久性(Controlling Durability)
2.6.磁盘垃圾收集
3.死信队列(Dead Letter Queues)
3.1.配置Logstash以使用死信队列
3.1.1.文件轮换(File Rotation)
3.2.处理死信队列
3.3.从时间戳读取
3.4.示例:处理具有映射错误的数据
1.数据弹性(Data Resiliency)
当数据流经事件处理管道时,Logstash可能会遇到阻止其向配置的输出传递事件的情况。例如,数据可能包含意外的数据类型,或者Logstash可能异常终止。
为防止数据丢失并确保事件不间断地流经管道,Logstash提供以下数据弹性功能。
- 持久性队列通过将事件存储在磁盘上的内部队列中来防止数据丢失。
- 死信队列为Logstash无法处理的事件提供磁盘存储。您可以使用
dead_letter_queue
输入插件轻松地重新处理死信队列中的事件。
默认情况下禁用这些弹性功能。要打开这些功能,您必须在Logstash 设置文件中明确启用它们。
2.持久队列(Persistent Queues)
默认情况下,在管道阶段(输入→管道工作者)之间Logstash使用内存中有界队列来缓冲事件。这些内存中队列的大小是固定的,不可配置。如果Logstash遇到临时计算机故障,则内存中队列的内容将丢失。临时机器故障是Logstash或其主机异常终止但能够重新启动的情况。
为了防止异常终止期间的数据丢失,Logstash具有持久队列功能,该功能将消息队列存储在磁盘上。持久队列提供Logstash中数据的持久性。
持久队列对于需要大缓冲区的Logstash部署也很有用。您可以启用持久队列来缓冲磁盘上的事件并删除消息代理,而不是部署和管理消息代理(如Redis,RabbitMQ或Apache Kafka)以促进缓冲的发布 - 订阅者模型。
总之,启用持久队列的好处如下:
- 在不需要外部缓冲机制(如Redis或Apache Kafka)的情况下吸收突发事件。
-
在正常关闭期间以及Logstash异常终止时提供至少一次传递保证以防止消息丢失。如果在事件发生时重新启动Logstash,Logstash将尝试传递存储在持久队列中的消息,直到传递成功至少一次。
您必须 queue.checkpoint.writes: 1
明确设置以保证所有输入事件的最大持久性。请参阅控制耐久性。
2.1.持久队列的限制
以下是持久队列功能无法解决的问题:
- 不能保护不使用请求 - 响应协议的输入插件无法防止数据丢失。例如:tcp,udp,zeromq push + pull,以及许多其他输入没有确认收件人的机制。beats和http等具有确认功能的插件都受到这个队列的良好保护。
- 它不处理永久性机器故障,例如磁盘损坏,磁盘故障和机器丢失。持久存储到磁盘的数据不会被复制。
2.2.持久队列如何工作
队列位于同一进程中的输入和过滤阶段之间:
input → queue → filter + output
输入→ 队列 → 过滤器 + 输出
- 当输入已准备好处理事件时,它会将它们写入队列。
- 当对队列的写入成功时,输入可以向其数据源发送确认。
处理队列中的事件时,只有在过滤器和输出完成后,Logstash才会在队列中确认(acknowledges)已完成的事件。队列记录管道已处理的事件。当且仅当事件已由Logstash管道完全处理时,事件被记录为已处理(在本文档中,称为“已确认(acknowledged)”或“已确认(ACKed)”)。
确认(acknowledges)什么意思?这意味着事件已由所有已配置的过滤器和输出处理。例如,如果您只有一个输出,Elasticsearch,则在Elasticsearch输出成功将此事件发送到Elasticsearch时会确认(ACKed )事件。
在正常关闭期间(CTRL + C或SIGTERM),Logstash将停止从队列中读取并将完成处理由过滤器和输出处理的正在进行的事件。重启后,Logstash将继续处理持久队列中的事件以及接受来自输入的新事件。
如果Logstash异常终止,则任何正在进行的事件都不会被确认,并且在重新启动Logstash时将由过滤器和输出进行重新处理。Logstash批量处理事件,因此对于任何给定的批次,可能已成功完成某些批次,但在发生异常终止时未记录为已确认。
有关队列写入和确认的特定行为的更多详细信息,请参阅 控制持久性。
2.3.配置持久队列
要配置持久队列,可以在 Logstash 设置文件中指定以下选项:
queue.type
:指定persisted
启用持久队列。默认情况下,禁用持久队列(默认值:queue.type: memory
) 。path.queue
:将存储数据文件的目录路径。默认情况下,文件存储在path.data/queue
。queue.page_capacity
:队列页面的最大大小(以字节为单位)。队列数据由称为“pages”的仅附加文件组成。默认大小为64mb。更改此值不太可能带来性能优势。queue.drain
:指定true,
如果希望Logstash在关闭之前等待持久队列耗尽。耗尽队列所需的时间取决于队列中累积的事件数。因此,您应该避免使用此设置,除非队列(即使已满)相对较小并且可以快速耗尽。queue.max_events
:队列中允许的最大事件数。默认值为0(无限制)。-
queue.max_bytes
:队列的总容量,以字节数表示。默认值为1024mb(1gb)。确保磁盘驱动器的容量大于此处指定的值。
如果使用持久队列来防止数据丢失,但不需要太多缓冲,则可以设置queue.max_bytes
较小的值(例如10mb)来生成较小的队列并提高队列性能。
如果同时指定了queue.max_events
和 queue.max_bytes,
则Logstash将使用最先达到的标准。请参阅处理背压(Handling Back Pressure)以了解达到这些队列限制时的行为。
您还可以通过设置queue.checkpoint.writes
控制检查点文件何时更新。请参阅控制耐久性(Controlling Durability)。
配置示例:
queue.type: persisted
queue.max_bytes: 4gb
2.4.处理背压(Handing Back Pressure)
当队列已满时,Logstash会对输入施加压力,以阻止流入Logstash的数据。这种机制有助于Logstash控制输入阶段的数据流速率,而不会像Elasticsearch那样压倒性的输出。
使用queue.max_bytes
设置配置磁盘上队列的总容量。以下示例将队列的总容量设置为8gb:
queue.type: persisted
queue.max_bytes: 8gb
通过指定这些设置,Logstash将缓冲磁盘上的事件,直到队列大小达到8gb。当队列中充满未记录事件且已达到大小限制时,Logstash将不再接受新事件。
每个输入独立处理背压。例如,当 beats输入遇到背压时,它不再接受新连接并等待持久队列有空间接受更多事件。在过滤器和输出阶段完成处理队列中的现有事件并确认它们之后,Logstash会自动开始接受新事件。
2.5.控制耐久性(Controlling Durability)
持久性是存储写入的属性,可确保数据在写入后可用。
启用持久队列功能后,Logstash会将事件存储在磁盘上。Logstash在名为checkpointing的机制中提交到磁盘。
为了讨论持久性,我们需要介绍一些有关如何实现持久队列的细节。
首先,队列本身是一组页面。有两种页面:头页和尾页。头页是写入新事件的地方。只有一个头页。当头页具有特定大小(请参考queue.page_capacity
)时,它将成为尾页,并创建新的头页。尾页是不可变的,头页是仅附加的。其次,队列在称为检查点文件的单独文件中记录有关其自身的详细信息((pages, acknowledgements, etc)。
当记录检查点时,Logstash将会做以下事情:
- 在头页上调用fsync。
- 原子的(原子性、Atomically)写入磁盘队列的当前状态。
检查点(checkpointing)的过程是原子的(Atomically),这意味着如果成功,将保存对文件的任何更新。
如果Logstash终止,或者存在硬件级别故障,那么在持久队列中缓冲但尚未检查点的任何数据都将丢失。
通过设置queue.checkpoint.writes,
您可以强制Logstash检查点更加频繁 。此设置指定在强制检查点之前可能写入磁盘的最大事件数。默认值为1024.要确保最大持久性并避免丢失持久队列中的数据,可以设置queue.checkpoint.writes: 1
在每个事件写入后强制检查点。请记住,磁盘写入会产生资源成本。将此值设置为1
会严重影响性能。
2.6.磁盘垃圾收集
在磁盘上,队列存储为一组页(page),其中每个页是一个文件。每个页的最多有queue.page_capacity
大小。在确认该页面中的所有事件之后删除页(垃圾回收)。如果旧页至少有一个尚未确认的事件,则整个页面将保留在磁盘上,直到该页面中的所有事件都成功处理完毕。包含未处理事件的每个页面将根据queue.max_bytes
字节大小进行计数。
3.死信队列(Dead Letter Queues)
仅当elasticsearch输出支持死信队列功能 。此外,死信队列仅用于响应代码为400或404的情况,两者都表示无法重试的事件。在将来的Logstash插件版本中将提供对其他输出的支持。在配置Logstash以使用此功能之前,请参阅输出插件文档以验证插件是否支持死信队列功能。
默认情况下,当Logstash遇到由于数据包含映射错误或其他问题而无法处理的事件时,Logstash管道会挂起或丢弃不成功的事件。为了防止在这种情况下丢失数据,您可以配置Logstash将不成功的事件写入死信队列而不是丢弃它们。
写入死信队列的每个事件都包括原始事件,描述无法处理事件的原因的元数据,有关编写事件的插件的信息以及事件进入死信队列的时间戳。
要处理死信队列中的事件,只需创建一个Logstash管道配置,该配置使用 dead_letter_queue
输入插件从队列中读取。
有关详细信息,请参阅处理死信队列中的事件。
3.1.配置Logstash以使用死信队列
默认情况下禁用死信队列。要启用死信队列,请dead_letter_queue_enable
在logstash.yml
设置文件中设置该选项:
dead_letter_queue.enable: true
死信队列作为文件存储在Logstash实例的本地目录中。默认情况下,死信队列文件存储在path.data/dead_letter_queue
。每个管道都有一个单独的队列。例如,默认情况下,main
管道的 死信队列存储在LOGSTASH_HOME/data/dead_letter_queue/main
。队列文件按顺序编号:1.log
,2.log
等等。
您可以path.dead_letter_queue
在logstash.yml
文件中设置为文件指定不同的路径:
path.dead_letter_queue: "path/to/data/dead_letter_queue"
对于两个不同的logstash实例,不能使用相同的死信队列路径。
3.1.1.文件轮换(File Rotation)
死信队列具有内置的文件轮换策略,用于管理队列的文件大小。当文件大小达到预配置阈值时,将自动创建新文件。
默认情况下,每个死信队列的最大大小设置为1024mb。要更改此设置,请使用该dead_letter_queue.max_bytes
选项。如果条目超过此设置会增加死信队列的大小,则会删除条目。
3.2.处理死信队列
当您准备好处理死信队列中的事件时,您将创建一个使用dead_letter_queue
输入插件从死信队列中读取的管道 。您使用的管道配置当然取决于您需要做什么。例如,如果死信队列包含由Elasticsearch中的映射错误导致的事件,则可以创建读取“死(dead)”事件的管道,删除导致映射问题的字段,并将干净事件重新索引到Elasticsearch中。
以下示例显示了一个简单的管道,它从死信队列中读取事件,并将事件(包括元数据)写入标准输出:
input {dead_letter_queue {path => "/path/to/data/dead_letter_queue" commit_offsets => true pipeline_id => "main" }
}output {stdout {codec => rubydebug { metadata => true }}
}
- path: 包含死信队列的顶级目录的路径。此目录包含写入死信队列的每个管道的单独文件夹。要查找此目录的路径,请查看
logstash.yml
设置文件。默认情况下,Logstash会在用于持久存储(path.data
)的位置下创建dead_letter_queue
目录,例如LOGSTASH_HOME/data/dead_letter_queue
。但是,如果path.dead_letter_queue
设置,则使用该位置。 - commit_offsets: 设置true时,保存偏移量。当管道重新启动时,它将继续从它停止的位置读取,而不是重新处理队列中的所有项目。您可以设置commit_offsets为false在死信队列中探索事件并希望多次迭代事件。
- pipeline_id: 写入死信队列的管道的ID。默认是
"main"
。
有关另一个示例,请参阅示例:处理具有映射错误的数据。
当管道完成处理死信队列中的所有事件时,它将继续运行并在流入队列时处理新事件。这意味着您无需停止生产系统来处理死信队列中的事件。
如果无法正确处理dead_letter_queue
输入插件插件中发出的事件, 则不会将其重新提交到死信队列。
3.3.从时间戳读取
当您从死信队列中读取时,您可能不希望处理队列中的所有事件,尤其是在队列中存在大量旧事件的情况下。您可以使用start_timestamp
选项开始在队列中的特定点处理事件 。此选项将管道配置为根据进入队列的时间戳开始处理事件:
input {dead_letter_queue {path => "/path/to/data/dead_letter_queue"start_timestamp => "2017-06-06T23:40:37"pipeline_id => "main"}
}
对于此示例,管道开始读取在2017年6月6日23:40:37或之后传递到死信队列的所有事件。
3.4.示例:处理具有映射错误的数据
在此示例中,用户尝试索引包含geo_ip数据的文档,但无法处理数据,因为它包含映射错误:
{"geoip":{"location":"home"}}
索引失败,因为Logstash输出插件在location
字段中需要一个geo_point
对象,但该值是字符串。失败的事件将写入死信队列,以及有关导致失败的错误的元数据:
{"@metadata" => {"dead_letter_queue" => {"entry_time" => #<Java::OrgLogstash::Timestamp:0x5b5dacd5>,"plugin_id" => "fb80f1925088497215b8d037e622dec5819b503e-4","plugin_type" => "elasticsearch","reason" => "Could not index event to Elasticsearch. status: 400, action: [\"index\", {:_id=>nil, :_index=>\"logstash-2017.06.22\", :_type=>\"doc\", :_routing=>nil}, 2017-06-22T01:29:29.804Z My-MacBook-Pro-2.local {\"geoip\":{\"location\":\"home\"}}], response: {\"index\"=>{\"_index\"=>\"logstash-2017.06.22\", \"_type\"=>\"doc\", \"_id\"=>\"AVzNayPze1iR9yDdI2MD\", \"status\"=>400, \"error\"=>{\"type\"=>\"mapper_parsing_exception\", \"reason\"=>\"failed to parse\", \"caused_by\"=>{\"type\"=>\"illegal_argument_exception\", \"reason\"=>\"illegal latitude value [266.30859375] for geoip.location\"}}}}"}},"@timestamp" => 2017-06-22T01:29:29.804Z,"@version" => "1","geoip" => {"location" => "home"},"host" => "My-MacBook-Pro-2.local","message" => "{\"geoip\":{\"location\":\"home\"}}"
}
要处理失败的事件,请创建以下管道,从死信队列中读取并删除映射问题:
input {dead_letter_queue {path => "/path/to/data/dead_letter_queue/" }
}
filter {mutate {remove_field => "[geoip][location]" }
}
output {elasticsearch{hosts => [ "localhost:9200" ] }
}
- path: dead_letter_queue输入从死信队列中读取。
- remove_field: mutate过滤器删除了名为location的问题字段。
- hosts: clean事件被发送到Elasticsearch,可以将其被索引,因为映射问题已得到解决。
这篇关于Logstash【从无到有从有到无】【L10】数据弹性(Data Resiliency)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!