Logstash【从无到有从有到无】【L10】数据弹性(Data Resiliency)

本文主要是介绍Logstash【从无到有从有到无】【L10】数据弹性(Data Resiliency),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

1.数据弹性(Data Resiliency)

2.持久队列(Persistent Queues)

2.1.持久队列的限制

2.2.持久队列如何工作

2.3.配置持久队列

2.4.处理背压(Handing Back Pressure)

2.5.控制耐久性(Controlling Durability)

2.6.磁盘垃圾收集

3.死信队列(Dead Letter Queues)

3.1.配置Logstash以使用死信队列

3.1.1.文件轮换(File Rotation)

3.2.处理死信队列

3.3.从时间戳读取

3.4.示例:处理具有映射错误的数据


1.数据弹性(Data Resiliency)

当数据流经事件处理管道时,Logstash可能会遇到阻止其向配置的输出传递事件的情况。例如,数据可能包含意外的数据类型,或者Logstash可能异常终止。

为防止数据丢失并确保事件不间断地流经管道,Logstash提供以下数据弹性功能。

  • 持久性队列通过将事件存储在磁盘上的内部队列中来防止数据丢失。
  • 死信队列为Logstash无法处理的事件提供磁盘存储。您可以使用dead_letter_queue输入插件轻松地重新处理死信队列中的事件。

默认情况下禁用这些弹性功能。要打开这些功能,您必须在Logstash 设置文件中明确启用它们。

 

2.持久队列(Persistent Queues)

默认情况下,在管道阶段(输入→管道工作者)之间Logstash使用内存中有界队列来缓冲事件。这些内存中队列的大小是固定的,不可配置。如果Logstash遇到临时计算机故障,则内存中队列的内容将丢失。临时机器故障是Logstash或其主机异常终止但能够重新启动的情况。

为了防止异常终止期间的数据丢失,Logstash具有持久队列功能,该功能将消息队列存储在磁盘上。持久队列提供Logstash中数据的持久性。

持久队列对于需要大缓冲区的Logstash部署也很有用。您可以启用持久队列来缓冲磁盘上的事件并删除消息代理,而不是部署和管理消息代理(如Redis,RabbitMQ或Apache Kafka)以促进缓冲的发布 - 订阅者模型。

总之,启用持久队列的好处如下:

  • 在不需要外部缓冲机制(如Redis或Apache Kafka)的情况下吸收突发事件。
  • 在正常关闭期间以及Logstash异常终止时提供至少一次传递保证以防止消息丢失。如果在事件发生时重新启动Logstash,Logstash将尝试传递存储在持久队列中的消息,直到传递成功至少一次。

您必须 queue.checkpoint.writes: 1 明确设置以保证所有输入事件的最大持久性。请参阅控制耐久性。

 

2.1.持久队列的限制

以下是持久队列功能无法解决的问题:

  • 不能保护不使用请求 - 响应协议的输入插件无法防止数据丢失。例如:tcp,udp,zeromq push + pull,以及许多其他输入没有确认收件人的机制。beats和http等具有确认功能的插件都受到这个队列的良好保护。
  • 它不处理永久性机器故障,例如磁盘损坏,磁盘故障和机器丢失。持久存储到磁盘的数据不会被复制。

 

2.2.持久队列如何工作

队列位于同一进程中的输入和过滤阶段之间:

input → queue → filter + output

输入→ 队列 → 过滤器 + 输出

  1. 当输入已准备好处理事件时,它会将它们写入队列。
  2. 当对队列的写入成功时,输入可以向其数据源发送确认。

处理队列中的事件时,只有在过滤器和输出完成后,Logstash才会在队列中确认(acknowledges)已完成的事件。队列记录管道已处理的事件。当且仅当事件已由Logstash管道完全处理时,事件被记录为已处理(在本文档中,称为“已确认(acknowledged)”或“已确认(ACKed)”)。

确认(acknowledges)什么意思?这意味着事件已由所有已配置的过滤器和输出处理。例如,如果您只有一个输出,Elasticsearch,则在Elasticsearch输出成功将此事件发送到Elasticsearch时会确认(ACKed )事件。

在正常关闭期间(CTRL + C或SIGTERM),Logstash将停止从队列中读取并将完成处理由过滤器和输出处理的正在进行的事件。重启后,Logstash将继续处理持久队列中的事件以及接受来自输入的新事件。

如果Logstash异常终止,则任何正在进行的事件都不会被确认,并且在重新启动Logstash时将由过滤器和输出进行重新处理。Logstash批量处理事件,因此对于任何给定的批次,可能已成功完成某些批次,但在发生异常终止时未记录为已确认。

有关队列写入和确认的特定行为的更多详细信息,请参阅 控制持久性。

 

2.3.配置持久队列

要配置持久队列,可以在 Logstash 设置文件中指定以下选项:

  • queue.type:指定persisted启用持久队列。默认情况下,禁用持久队列(默认值:queue.type: memory) 。
  • path.queue:将存储数据文件的目录路径。默认情况下,文件存储在 path.data/queue
  • queue.page_capacity:队列页面的最大大小(以字节为单位)。队列数据由称为“pages”的仅附加文件组成。默认大小为64mb。更改此值不太可能带来性能优势。
  • queue.drain:指定true,如果希望Logstash在关闭之前等待持久队列耗尽。耗尽队列所需的时间取决于队列中累积的事件数。因此,您应该避免使用此设置,除非队列(即使已满)相对较小并且可以快速耗尽。
  • queue.max_events:队列中允许的最大事件数。默认值为0(无限制)。
  • queue.max_bytes:队列的总容量,以字节数表示。默认值为1024mb(1gb)。确保磁盘驱动器的容量大于此处指定的值。

如果使用持久队列来防止数据丢失,但不需要太多缓冲,则可以设置queue.max_bytes较小的值(例如10mb)来生成较小的队列并提高队列性能。

如果同时指定了queue.max_events和 queue.max_bytes,则Logstash将使用最先达到的标准。请参阅处理背压(Handling Back Pressure)以了解达到这些队列限制时的行为。

您还可以通过设置queue.checkpoint.writes控制检查点文件何时更新。请参阅控制耐久性(Controlling Durability)。

配置示例:

queue.type: persisted
queue.max_bytes: 4gb

 

2.4.处理背压(Handing Back Pressure)

当队列已满时,Logstash会对输入施加压力,以阻止流入Logstash的数据。这种机制有助于Logstash控制输入阶段的数据流速率,而不会像Elasticsearch那样压倒性的输出。

使用queue.max_bytes设置配置磁盘上队列的总容量。以下示例将队列的总容量设置为8gb:

queue.type: persisted
queue.max_bytes: 8gb

通过指定这些设置,Logstash将缓冲磁盘上的事件,直到队列大小达到8gb。当队列中充满未记录事件且已达到大小限制时,Logstash将不再接受新事件。

每个输入独立处理背压。例如,当 beats输入遇到背压时,它不再接受新连接并等待持久队列有空间接受更多事件。在过滤器和输出阶段完成处理队列中的现有事件并确认它们之后,Logstash会自动开始接受新事件。

 

2.5.控制耐久性(Controlling Durability)

持久性是存储写入的属性,可确保数据在写入后可用。

启用持久队列功能后,Logstash会将事件存储在磁盘上。Logstash在名为checkpointing的机制中提交到磁盘。

为了讨论持久性,我们需要介绍一些有关如何实现持久队列的细节。

首先,队列本身是一组页面。有两种页面:头页和尾页。头页是写入新事件的地方。只有一个头页。当头页具有特定大小(请参考queue.page_capacity)时,它将成为尾页,并创建新的头页。尾页是不可变的,头页是仅附加的。其次,队列在称为检查点文件的单独文件中记录有关其自身的详细信息((pages, acknowledgements, etc)。

当记录检查点时,Logstash将会做以下事情:

  • 在头页上调用fsync。
  • 原子的(原子性、Atomically)写入磁盘队列的当前状态。

检查点(checkpointing)的过程是原子的(Atomically),这意味着如果成功,将保存对文件的任何更新。

如果Logstash终止,或者存在硬件级别故障,那么在持久队列中缓冲但尚未检查点的任何数据都将丢失。

通过设置queue.checkpoint.writes,您可以强制Logstash检查点更加频繁 。此设置指定在强制检查点之前可能写入磁盘的最大事件数。默认值为1024.要确保最大持久性并避免丢失持久队列中的数据,可以设置queue.checkpoint.writes: 1 在每个事件写入后强制检查点。请记住,磁盘写入会产生资源成本。将此值设置为1会严重影响性能。

 

2.6.磁盘垃圾收集

在磁盘上,队列存储为一组页(page),其中每个页是一个文件。每个页的最多有queue.page_capacity大小。在确认该页面中的所有事件之后删除页(垃圾回收)。如果旧页至少有一个尚未确认的事件,则整个页面将保留在磁盘上,直到该页面中的所有事件都成功处理完毕。包含未处理事件的每个页面将根据queue.max_bytes字节大小进行计数。

 

3.死信队列(Dead Letter Queues)


仅当elasticsearch输出支持死信队列功能 。此外,死信队列仅用于响应代码为400或404的情况,两者都表示无法重试的事件。在将来的Logstash插件版本中将提供对其他输出的支持。在配置Logstash以使用此功能之前,请参阅输出插件文档以验证插件是否支持死信队列功能。

 

默认情况下,当Logstash遇到由于数据包含映射错误或其他问题而无法处理的事件时,Logstash管道会挂起或丢弃不成功的事件。为了防止在这种情况下丢失数据,您可以配置Logstash将不成功的事件写入死信队列而不是丢弃它们。

写入死信队列的每个事件都包括原始事件,描述无法处理事件的原因的元数据,有关编写事件的插件的信息以及事件进入死信队列的时间戳。

要处理死信队列中的事件,只需创建一个Logstash管道配置,该配置使用 dead_letter_queue输入插件从队列中读取。

有关详细信息,请参阅处理死信队列中的事件。

 

3.1.配置Logstash以使用死信队列

默认情况下禁用死信队列。要启用死信队列,请dead_letter_queue_enablelogstash.yml 设置文件中设置该选项:

dead_letter_queue.enable: true

死信队列作为文件存储在Logstash实例的本地目录中。默认情况下,死信队列文件存储在path.data/dead_letter_queue。每个管道都有一个单独的队列。例如,默认情况下,main管道的 死信队列存储在LOGSTASH_HOME/data/dead_letter_queue/main。队列文件按顺序编号:1.log2.log等等。

您可以path.dead_letter_queuelogstash.yml文件中设置为文件指定不同的路径:

path.dead_letter_queue: "path/to/data/dead_letter_queue"

对于两个不同的logstash实例,不能使用相同的死信队列路径。

 

3.1.1.文件轮换(File Rotation)

死信队列具有内置的文件轮换策略,用于管理队列的文件大小。当文件大小达到预配置阈值时,将自动创建新文件。

默认情况下,每个死信队列的最大大小设置为1024mb。要更改此设置,请使用该dead_letter_queue.max_bytes选项。如果条目超过此设置会增加死信队列的大小,则会删除条目。

 

3.2.处理死信队列

当您准备好处理死信队列中的事件时,您将创建一个使用dead_letter_queue输入插件从死信队列中读取的管道 。您使用的管道配置当然取决于您需要做什么。例如,如果死信队列包含由Elasticsearch中的映射错误导致的事件,则可以创建读取“死(dead)”事件的管道,删除导致映射问题的字段,并将干净事件重新索引到Elasticsearch中。

以下示例显示了一个简单的管道,它从死信队列中读取事件,并将事件(包括元数据)写入标准输出:

input {dead_letter_queue {path => "/path/to/data/dead_letter_queue" commit_offsets => true pipeline_id => "main" }
}output {stdout {codec => rubydebug { metadata => true }}
}
  1. path:  包含死信队列的顶级目录的路径。此目录包含写入死信队列的每个管道的单独文件夹。要查找此目录的路径,请查看logstash.yml 设置文件。默认情况下,Logstash会在用于持久存储(path.data)的位置下创建dead_letter_queue 目录,例如LOGSTASH_HOME/data/dead_letter_queue。但是,如果path.dead_letter_queue设置,则使用该位置。
  2. commit_offsets: 设置true时,保存偏移量。当管道重新启动时,它将继续从它停止的位置读取,而不是重新处理队列中的所有项目。您可以设置commit_offsets为false在死信队列中探索事件并希望多次迭代事件。
  3. pipeline_id: 写入死信队列的管道的ID。默认是"main"

有关另一个示例,请参阅示例:处理具有映射错误的数据。

当管道完成处理死信队列中的所有事件时,它将继续运行并在流入队列时处理新事件。这意味着您无需停止生产系统来处理死信队列中的事件。

如果无法正确处理dead_letter_queue输入插件插件中发出的事件, 则不会将其重新提交到死信队列。

 

3.3.从时间戳读取

当您从死信队列中读取时,您可能不希望处理队列中的所有事件,尤其是在队列中存在大量旧事件的情况下。您可以使用start_timestamp选项开始在队列中的特定点处理事件 。此选项将管道配置为根据进入队列的时间戳开始处理事件:

input {dead_letter_queue {path => "/path/to/data/dead_letter_queue"start_timestamp => "2017-06-06T23:40:37"pipeline_id => "main"}
}

对于此示例,管道开始读取在2017年6月6日23:40:37或之后传递到死信队列的所有事件。

 

3.4.示例:处理具有映射错误的数据

在此示例中,用户尝试索引包含geo_ip数据的文档,但无法处理数据,因为它包含映射错误:

{"geoip":{"location":"home"}}

索引失败,因为Logstash输出插件在location字段中需要一个geo_point对象,但该值是字符串。失败的事件将写入死信队列,以及有关导致失败的错误的元数据:

{"@metadata" => {"dead_letter_queue" => {"entry_time" => #<Java::OrgLogstash::Timestamp:0x5b5dacd5>,"plugin_id" => "fb80f1925088497215b8d037e622dec5819b503e-4","plugin_type" => "elasticsearch","reason" => "Could not index event to Elasticsearch. status: 400, action: [\"index\", {:_id=>nil, :_index=>\"logstash-2017.06.22\", :_type=>\"doc\", :_routing=>nil}, 2017-06-22T01:29:29.804Z My-MacBook-Pro-2.local {\"geoip\":{\"location\":\"home\"}}], response: {\"index\"=>{\"_index\"=>\"logstash-2017.06.22\", \"_type\"=>\"doc\", \"_id\"=>\"AVzNayPze1iR9yDdI2MD\", \"status\"=>400, \"error\"=>{\"type\"=>\"mapper_parsing_exception\", \"reason\"=>\"failed to parse\", \"caused_by\"=>{\"type\"=>\"illegal_argument_exception\", \"reason\"=>\"illegal latitude value [266.30859375] for geoip.location\"}}}}"}},"@timestamp" => 2017-06-22T01:29:29.804Z,"@version" => "1","geoip" => {"location" => "home"},"host" => "My-MacBook-Pro-2.local","message" => "{\"geoip\":{\"location\":\"home\"}}"
}

要处理失败的事件,请创建以下管道,从死信队列中读取并删除映射问题:

input {dead_letter_queue {path => "/path/to/data/dead_letter_queue/" }
}
filter {mutate {remove_field => "[geoip][location]" }
}
output {elasticsearch{hosts => [ "localhost:9200" ] }
}
  1. path:  dead_letter_queue输入从死信队列中读取。
  2. remove_field:  mutate过滤器删除了名为location的问题字段。
  3. hosts:  clean事件被发送到Elasticsearch,可以将其被索引,因为映射问题已得到解决。

 

这篇关于Logstash【从无到有从有到无】【L10】数据弹性(Data Resiliency)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/288031

相关文章

大模型研发全揭秘:客服工单数据标注的完整攻略

在人工智能(AI)领域,数据标注是模型训练过程中至关重要的一步。无论你是新手还是有经验的从业者,掌握数据标注的技术细节和常见问题的解决方案都能为你的AI项目增添不少价值。在电信运营商的客服系统中,工单数据是客户问题和解决方案的重要记录。通过对这些工单数据进行有效标注,不仅能够帮助提升客服自动化系统的智能化水平,还能优化客户服务流程,提高客户满意度。本文将详细介绍如何在电信运营商客服工单的背景下进行

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

关于数据埋点,你需要了解这些基本知识

产品汪每天都在和数据打交道,你知道数据来自哪里吗? 移动app端内的用户行为数据大多来自埋点,了解一些埋点知识,能和数据分析师、技术侃大山,参与到前期的数据采集,更重要是让最终的埋点数据能为我所用,否则可怜巴巴等上几个月是常有的事。   埋点类型 根据埋点方式,可以区分为: 手动埋点半自动埋点全自动埋点 秉承“任何事物都有两面性”的道理:自动程度高的,能解决通用统计,便于统一化管理,但个性化定

使用SecondaryNameNode恢复NameNode的数据

1)需求: NameNode进程挂了并且存储的数据也丢失了,如何恢复NameNode 此种方式恢复的数据可能存在小部分数据的丢失。 2)故障模拟 (1)kill -9 NameNode进程 [lytfly@hadoop102 current]$ kill -9 19886 (2)删除NameNode存储的数据(/opt/module/hadoop-3.1.4/data/tmp/dfs/na

异构存储(冷热数据分离)

异构存储主要解决不同的数据,存储在不同类型的硬盘中,达到最佳性能的问题。 异构存储Shell操作 (1)查看当前有哪些存储策略可以用 [lytfly@hadoop102 hadoop-3.1.4]$ hdfs storagepolicies -listPolicies (2)为指定路径(数据存储目录)设置指定的存储策略 hdfs storagepolicies -setStoragePo

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd

【Prometheus】PromQL向量匹配实现不同标签的向量数据进行运算

✨✨ 欢迎大家来到景天科技苑✨✨ 🎈🎈 养成好习惯,先赞后看哦~🎈🎈 🏆 作者简介:景天科技苑 🏆《头衔》:大厂架构师,华为云开发者社区专家博主,阿里云开发者社区专家博主,CSDN全栈领域优质创作者,掘金优秀博主,51CTO博客专家等。 🏆《博客》:Python全栈,前后端开发,小程序开发,人工智能,js逆向,App逆向,网络系统安全,数据分析,Django,fastapi

烟火目标检测数据集 7800张 烟火检测 带标注 voc yolo

一个包含7800张带标注图像的数据集,专门用于烟火目标检测,是一个非常有价值的资源,尤其对于那些致力于公共安全、事件管理和烟花表演监控等领域的人士而言。下面是对此数据集的一个详细介绍: 数据集名称:烟火目标检测数据集 数据集规模: 图片数量:7800张类别:主要包含烟火类目标,可能还包括其他相关类别,如烟火发射装置、背景等。格式:图像文件通常为JPEG或PNG格式;标注文件可能为X

pandas数据过滤

Pandas 数据过滤方法 Pandas 提供了多种方法来过滤数据,可以根据不同的条件进行筛选。以下是一些常见的 Pandas 数据过滤方法,结合实例进行讲解,希望能帮你快速理解。 1. 基于条件筛选行 可以使用布尔索引来根据条件过滤行。 import pandas as pd# 创建示例数据data = {'Name': ['Alice', 'Bob', 'Charlie', 'Dav

SWAP作物生长模型安装教程、数据制备、敏感性分析、气候变化影响、R模型敏感性分析与贝叶斯优化、Fortran源代码分析、气候数据降尺度与变化影响分析

查看原文>>>全流程SWAP农业模型数据制备、敏感性分析及气候变化影响实践技术应用 SWAP模型是由荷兰瓦赫宁根大学开发的先进农作物模型,它综合考虑了土壤-水分-大气以及植被间的相互作用;是一种描述作物生长过程的一种机理性作物生长模型。它不但运用Richard方程,使其能够精确的模拟土壤中水分的运动,而且耦合了WOFOST作物模型使作物的生长描述更为科学。 本文让更多的科研人员和农业工作者