LogStash日志数据采集

2024-04-28 07:48
文章标签 数据 日志 采集 logstash

本文主要是介绍LogStash日志数据采集,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

    • 一、LogStash介绍及安装
      • 1.1、介绍
      • 1.2 、node01机器安装LogStash
      • 1.3、Input插件
        • 1.3.1 stdin标准输入和stdout标准输出
        • 1.3.2 监控日志文件变化
        • 1.3.3 jdbc插件
        • 1.3.4 systlog插件
      • 1.4、filter插件
        • 1.4.1、grok正则表达式
        • 1.4.2、使用grok收集nginx日志数据
      • 1.5、Output插件
        • 1.5.1 标准输出到控制台
        • 1.5.2 将采集数据保存到file文件中
        • 1.5.3 将采集数据保存到elasticsearch
    • 二、kibana报表展示
      • 第一步:下载数据集
      • 第二步:上传我们的数据集并解压
      • 第三步:创建对应的索引库
      • 第四步:加载示例数据到我们的索引库当中来

一、LogStash介绍及安装

官网:https://www.elastic.co/guide/en/logstash/current/index.html

1.1、介绍

logstash就是一个具备实时数据传输能力的管道,负责将数据信息从管道的输入端传输到管道的输出端;与此同时,这根管道还可以让你根据自己的需求在中间加上滤网,Logstash提供里很多功能强大的滤网以满足你的各种应用场景。是一个input | filter | output 的数据流

1.2 、node01机器安装LogStash

下载logstache并上传到第一台服务器的/home/es路径下,然后进行解压

# 下载安装包---可以直接将已经下载好的安装包上传到/home/es路径下即可
cd /kkb/soft 
wget https://artifacts.elastic.co/downloads/logstash/logstash-6.7.0.tar.gz
# 解压
tar -zxf logstash-6.7.0.tar.gz -C /kkb/install/

1.3、Input插件

1.3.1 stdin标准输入和stdout标准输出

使用标准的输入与输出组件,实现将我们的数据从控制台输入,从控制台输出:

cd /kkb/install/logstash-6.7.0/
bin/logstash -e 'input{stdin{}}output{stdout{codec=>rubydebug}}'{"@version" => "1","host" => "node01","@timestamp" => 2018-10-13T08:33:13.126Z,"message" => "hello"
}

在这里插入图片描述

1.3.2 监控日志文件变化

Logstash 使用一个名叫 FileWatch 的 Ruby Gem 库来监听文件变化。这个库支持 glob 展开文件路径,而且会记录一个叫 .sincedb 的数据库文件来跟踪被监听的日志文件的当前读取位置。所以,不要担心 logstash 会漏过你的数据。

编写脚本

cd /kkb/install/logstash-6.7.0/config
vim monitor_file.conf
#输入一下信息 
input{file{path => "/kkb/install/datas/tomcat.log"type => "log"start_position => "beginning"}
}
output{stdout{codec=>rubydebug}
}

检查配置文件是否可用

cd /kkb/install/logstash-6.7.0/
bin/logstash -f /kkb/install/logstash-6.7.0/config/monitor_file.conf -t
#成功会出现一下信息:Config Validation Result: OK. Exiting Logstash

启动服务

cd /kkb/install/logstash-6.7.0
bin/logstash -f /kkb/install/logstash-6.7.0/config/monitor_file.conf

发送数据
新开xshell窗口通过以下命令发送数据

mkdir -p /kkb/install/datas
echo "hello logstash" >> /kkb/install/datas/tomcat.log

其它参数说明:

Path=>表示监控的文件路径
Type=>给类型打标记,用来区分不同的文件类型。
Start_postion=>从哪里开始记录文件,默认是从结尾开始标记,要是你从头导入一个文件就把改成”beginning”.
discover_interval=>多久去监听path下是否有文件,默认是15s
exclude=>排除什么文件
close_older=>一个已经监听中的文件,如果超过这个值的时间内没有更新内容,就关闭监听它的文件句柄。默认是3600秒,即一个小时。
sincedb_path=>监控库存放位置(默认的读取文件信息记录在哪个文件中)。默认在:/data/plugins/inputs/file。
sincedb_write_interval=> logstash 每隔多久写一次 sincedb 文件,默认是 15 秒。
stat_interval=>logstash 每隔多久检查一次被监听文件状态(是否有更新),默认是 1 秒。
1.3.3 jdbc插件

jdbc插件允许我们采集某张数据库表当中的数据到我们的logstashe当中来。

第一步:编写脚本
开发脚本配置文件

cd /kkb/install/logstash-6.7.0/config
vim jdbc.confinput {jdbc {jdbc_driver_library => "/kkb/install/mysql-connector-java-5.1.38.jar"jdbc_driver_class => "com.mysql.jdbc.Driver"jdbc_connection_string => "jdbc:mysql://192.168.31.82:3306/mydb"jdbc_user => "root"jdbc_password => "123456"use_column_value => truetracking_column => "tno"#  parameters => { "favorite_artist" => "Beethoven" }schedule => "* * * * *"statement => "SELECT * from courses where tno > :sql_last_value ;"}
}output{stdout{codec=>rubydebug}
}

第二步:上传mysql连接驱动包到指定路劲

将我们mysql的连接驱动包上传到我们指定的/kkb/install/路径下

第三步:检查配置文件是否可用

cd /kkb/install/logstash-6.7.0/
bin/logstash -f /kkb/install/logstash-6.7.0/config/jdbc.conf  -t

通过之后
Config Validation Result: OK. Exiting Logstash

第四步:启动服务

通过以下命令启动logstash

cd /kkb/install/logstash-6.7.0
bin/logstash -f /kkb/install/logstash-6.7.0/config/jdbc.conf

第五步:数据库当中添加数据
在我们的数据库当中手动随便插入数据,发现我们的logstash可以进行收集

1.3.4 systlog插件

syslog机制负责记录内核和应用程序产生的日志信息,管理员可以通过查看日志记录,来掌握系统状况。
默认系统已经安装了rsyslog.直接启动即可
编写脚本

cd /kkb/install/logstash-6.7.0/config
vim syslog.conf
input{tcp{port=> 6789type=> syslog}udp{port=> 6789type=> syslog}
}filter{if [type] == "syslog" {grok {match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }add_field => [ "received_at", "%{@timestamp}" ]add_field => [ "received_from", "%{host}" ]}date {match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]}}
}output{stdout{codec=> rubydebug}
}

检查配置文件是否可用

cd /kkb/install/logstash-6.7.0
bin/logstash -f /kkb/install/logstash-6.7.0/config/syslog.conf -t

启动服务
执行以下命令启动logstash服务

cd /kkb/install/logstash-6.7.0
bin/logstash -f /kkb/install/logstash-6.7.0/config/syslog.conf

发送数据
修改系统日志配置文件

sudo vim /etc/rsyslog.conf

添加一行以下配置

*.* @@node01:6789

在这里插入图片描述

重启系统日志服务

sudo systemctl restart rsyslog

其它参数说明
在logstash中的grok是正则表达式,用来解析当前数据
原始数据其实是:

Dec 23 12:11:43 louis postfix/smtpd[31499]: connect from unknown[95.75.93.154]
Jun 05 08:00:00 louis named[16000]: client 199.48.164.7#64817: query (cache) 'amsterdamboothuren.com/MX/IN' denied
Jun 05 08:10:00 louis CRON[620]: (www-data) CMD (php /usr/share/cacti/site/poller.php >/dev/null 2>/var/log/cacti/poller-error.log)
Jun 05 08:05:06 louis rsyslogd: [origin software="rsyslogd" swVersion="4.2.0" x-pid="2253" x-info="http://www.rsyslog.com"] rsyslogd was HUPed, type 'lightweight'.

1.4、filter插件

Logstash之所以强悍的主要原因是filter插件;通过过滤器的各种组合可以得到我们想要的结构化数据。

1.4.1、grok正则表达式

grok正则表达式是logstash非常重要的一个环节;可以通过grok非常方便的将数据拆分和索引
语法格式:

(?<name>pattern)   

?表示要取出里面的值,pattern就是正则表达式

1、收集控制台输入数据,采集日期时间出来

第一步:开发配置文件

cd /kkb/install/logstash-6.7.0/config/
vim filter.conf
input {stdin{}} filter {grok {match => {
"message" => "(?<date>\d+\.\d+)\s+"}       }       
}       
output {stdout{codec => rubydebug}}

第二步:启动logstash服务

cd /kkb/install/logstash-6.7.0/
bin/logstash -f /kkb/install/logstash-6.7.0/config/filter.conf

第三步:控制台输入文字

5.20 今天天气还不错

在这里插入图片描述

1.4.2、使用grok收集nginx日志数据

nginx一般打印出来的日志格式如下

36.157.150.1 - - [05/Nov/2018:12:59:28 +0800] “GET /phpmyadmin_8c1019c9c0de7a0f/js/get_scripts.js.php?scripts%5B%5D=jquery/jquery-1.11.1.min.js&scripts%5B%5D=sprintf.js&scripts%5B%5D=ajax.js&scripts%5B%5D=keyhandler.js&scripts%5B%5D=jquery/jquery-ui-1.11.2.min.js&scripts%5B%5D=jquery/jquery.cookie.js&scripts%5B%5D=jquery/jquery.mousewheel.js&scripts%5B%5D=jquery/jquery.event.drag-2.2.js&scripts%5B%5D=jquery/jquery-ui-timepicker-addon.js&scripts%5B%5D=jquery/jquery.ba-hashchange-1.3.js
HTTP/1.1” 200 139613 “-” “Mozilla/5.0 (Windows NT 6.1; WOW64)
AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.101
Safari/537.36”

这种日志是非格式化的,通常,我们获取到日志后,还要使用mapreduce 或者spark 做一下清洗操作,就是将非格式化日志编程格式化日志;

在清洗的时候,如果日志的数据量比较大,那么也是需要花费一定的时间的;所以可以使用logstash 的grok 功能,将nginx 的非格式化数据采集成格式化数据:

第一步:安装grok插件
第一种方式安装:在线安装(墙裂不推荐)
在线安装grok插件

cd /kkb/install/logstash-6.7.0/
vim Gemfile
#source 'https://rubygems.org'    # 将这个镜像源注释掉
source https://gems.ruby-china.com/  # 配置成中国的这个镜像源
#准备在线安装
cd /kkb/install/logstash-6.7.0/
bin/logstash-plugin  install logstash-filter-grok

第二种安装方式,直接使用我已经下载好的安装包进行本地安装(墙裂推荐使用)
上传我们的压缩包logstash-filter-grok-4.0.4.zip上传到/kkb/install/logstash-6.7.0 这个路径下面

#然后准备执行本地安装
cd /kkb/install/logstash-6.7.0/
bin/logstash-plugin install file:///kkb/install/logstash-6.7.0/logstash-filter-grok-4.0.4.zip
#安装成功之后查看logstash的插件列表
cd /kkb/install/logstash-6.7.0/
bin/logstash-plugin list

第二步:开发logstash的配置文件
定义logstash的配置文件如下,我们从控制台输入nginx的日志数据,然后经过filter的过滤,将我们的日志文件转换成为标准的数据格式

cd /kkb/install/logstash-6.7.0/config
vim monitor_nginx.confinput {stdin{}}
filter {
grok {
match => {
"message" => "%{IPORHOST:clientip} \- \- \[%{HTTPDATE:time_local}\] \"(?:%{WORD:method} %{NOTSPACE:request}(?:HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})\" %{NUMBER:status} %{NUMBER:body_bytes_sent} %{QS:http_referer} %{QS:agent}"}}
}
output {stdout{codec => rubydebug}}

第三步:启动logstash
执行以下命令启动logstash

cd /kkb/install/logstash-6.7.0
bin/logstash -f /kkb/install/logstash-6.7.0/config/monitor_nginx.conf

第四步:从控制台输入nginx日志文件数据
输入第一条数据

36.157.150.1 - - [05/Nov/2018:12:59:27 +0800] “GET /phpmyadmin_8c1019c9c0de7a0f/js/messages.php?lang=zh_CN&db=&collation_connection=utf8_unicode_ci&token=6a44d72481633c90bffcfd42f11e25a1
HTTP/1.1” 200 8131 “-” “Mozilla/5.0 (Windows NT 6.1; WOW64)
AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.101
Safari/537.36”

输入第二条数据

36.157.150.1 - - [05/Nov/2018:12:59:28 +0800] “GET /phpmyadmin_8c1019c9c0de7a0f/js/get_scripts.js.php?scripts%5B%5D=jquery/jquery-1.11.1.min.js&scripts%5B%5D=sprintf.js&scripts%5B%5D=ajax.js&scripts%5B%5D=keyhandler.js&scripts%5B%5D=jquery/jquery-ui-1.11.2.min.js&scripts%5B%5D=jquery/jquery.cookie.js&scripts%5B%5D=jquery/jquery.mousewheel.js&scripts%5B%5D=jquery/jquery.event.drag-2.2.js&scripts%5B%5D=jquery/jquery-ui-timepicker-addon.js&scripts%5B%5D=jquery/jquery.ba-hashchange-1.3.js
HTTP/1.1” 200 139613 “-” “Mozilla/5.0 (Windows NT 6.1; WOW64)
AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.101
Safari/537.36”

1.5、Output插件

1.5.1 标准输出到控制台

将我们收集的数据直接打印到控制台

output {stdout {codec => rubydebug}
}
bin/logstash -e 'input{stdin{}}output{stdout{codec=>rubydebug}}'
[es@node01 logstash]$ bin/logstash -e 'input{stdin{}}output{stdout{codec=>rubydebug}}'
hello
1.5.2 将采集数据保存到file文件中

logstash也可以将收集到的数据写入到文件当中去永久保存,接下来我们来看看logstash如何配置以实现将数据写入到文件当中。
第一步:开发logstash的配置文件

cd /kkb/install/logstash-6.7.0/config
vim output_file.confinput {stdin{}}
output {file {path => "/kkb/install/datas/%{+YYYY-MM-dd}-%{host}.txt"codec => line {format => "%{message}"}flush_interval => 0}
}

第二步:检测配置文件并启动logstash服务

cd /kkb/install/logstash-6.7.0
#检测配置文件是否正确
bin/logstash -f config/output_file.conf -t
#启动服务,然后从控制台输入一些数据
bin/logstash -f config/output_file.conf
#查看文件写入的内容
cd /kkb/install/datas
more 2018-11-08-node01.hadoop.com.txt

在这里插入图片描述

1.5.3 将采集数据保存到elasticsearch

第一步:开发logstash的配置文件

cd /kkb/install/logstash-6.7.0/config
vim output_es.conf
input {stdin{}}
output {elasticsearch {hosts => ["node01:9200"]index => "logstash-%{+YYYY.MM.dd}"}
}

这个index是保存到elasticsearch上的索引名称,如何命名特别重要,因为我们很可能后续根据某些需求做查询,所以最好带时间,因为我们在中间加上type,就代表不同的业务,这样我们在查询当天数据的时候,就可以根据类型+时间做范围查询。

第二步:检测配置文件并启动logstash
检测配置文件是否正确

cd /kkb/install/logstash-6.7.0
bin/logstash -f config/output_es.conf -t 
启动logstash
bin/logstash -f config/output_es.conf

在这里插入图片描述

第三步:es当中查看数据
访问:http://node01:9100/
查看es当中的数据
在这里插入图片描述

注意:
更多的input,filter,output组件参见以下链接
https://www.elastic.co/guide/en/logstash/current/index.html

二、kibana报表展示

官网对于kibana的基本简介
https://www.elastic.co/guide/cn/kibana/current/index.html

kibana是一个强大的报表展示工具,可以通过kibana自定义我们的数据报表展示,实现我们的数据的各种图表查看。

我们可以通过官网提供的数据集来实现我们的数据的报表展示

第一步:下载数据集

下载账户数据集
https://download.elastic.co/demos/kibana/gettingstarted/accounts.zip
下载日志数据集
https://download.elastic.co/demos/kibana/gettingstarted/logs.jsonl.gz

第二步:上传我们的数据集并解压

将我们以上下载的数据集全部上传到node01服务器的/kkb路径下

第三步:创建对应的索引库

创建我们的索引库,然后加载数据
第一个索引库:日志数据集数据索引库
创建第一个索引库,将我们第一个索引库日志数据也创建好
我们这里实际上按照日期,创建了三个索引库,都是用于加载我们的日志数据。

PUT /logstash-2015.05.18
{"mappings": {"log": {"properties": {"geo": {"properties": {"coordinates": {"type": "geo_point"}}}}}}
}
PUT /logstash-2015.05.19
{"mappings": {"log": {"properties": {"geo": {"properties": {"coordinates": {"type": "geo_point"}}}}}}
}
PUT /logstash-2015.05.20
{"mappings": {"log": {"properties": {"geo": {"properties": {"coordinates": {"type": "geo_point"}}}}}}
}

第四步:加载示例数据到我们的索引库当中来

直接在node01的/kkb路径下执行以下命令来加载我们的示例数据到索引库当中来
node01机器上面执行以下命令

cd /kkb
curl -H 'Content-Type: application/x-ndjson' -XPOST 'node01:9200/bank/account/_bulk?pretty' --data-binary @accounts.json
curl -H 'Content-Type: application/x-ndjson' -XPOST 'node01:9200/_bulk?pretty' --data-binary @logs.json

这篇关于LogStash日志数据采集的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/942683

相关文章

大模型研发全揭秘:客服工单数据标注的完整攻略

在人工智能(AI)领域,数据标注是模型训练过程中至关重要的一步。无论你是新手还是有经验的从业者,掌握数据标注的技术细节和常见问题的解决方案都能为你的AI项目增添不少价值。在电信运营商的客服系统中,工单数据是客户问题和解决方案的重要记录。通过对这些工单数据进行有效标注,不仅能够帮助提升客服自动化系统的智能化水平,还能优化客户服务流程,提高客户满意度。本文将详细介绍如何在电信运营商客服工单的背景下进行

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

关于数据埋点,你需要了解这些基本知识

产品汪每天都在和数据打交道,你知道数据来自哪里吗? 移动app端内的用户行为数据大多来自埋点,了解一些埋点知识,能和数据分析师、技术侃大山,参与到前期的数据采集,更重要是让最终的埋点数据能为我所用,否则可怜巴巴等上几个月是常有的事。   埋点类型 根据埋点方式,可以区分为: 手动埋点半自动埋点全自动埋点 秉承“任何事物都有两面性”的道理:自动程度高的,能解决通用统计,便于统一化管理,但个性化定

使用SecondaryNameNode恢复NameNode的数据

1)需求: NameNode进程挂了并且存储的数据也丢失了,如何恢复NameNode 此种方式恢复的数据可能存在小部分数据的丢失。 2)故障模拟 (1)kill -9 NameNode进程 [lytfly@hadoop102 current]$ kill -9 19886 (2)删除NameNode存储的数据(/opt/module/hadoop-3.1.4/data/tmp/dfs/na

异构存储(冷热数据分离)

异构存储主要解决不同的数据,存储在不同类型的硬盘中,达到最佳性能的问题。 异构存储Shell操作 (1)查看当前有哪些存储策略可以用 [lytfly@hadoop102 hadoop-3.1.4]$ hdfs storagepolicies -listPolicies (2)为指定路径(数据存储目录)设置指定的存储策略 hdfs storagepolicies -setStoragePo

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd

【Prometheus】PromQL向量匹配实现不同标签的向量数据进行运算

✨✨ 欢迎大家来到景天科技苑✨✨ 🎈🎈 养成好习惯,先赞后看哦~🎈🎈 🏆 作者简介:景天科技苑 🏆《头衔》:大厂架构师,华为云开发者社区专家博主,阿里云开发者社区专家博主,CSDN全栈领域优质创作者,掘金优秀博主,51CTO博客专家等。 🏆《博客》:Python全栈,前后端开发,小程序开发,人工智能,js逆向,App逆向,网络系统安全,数据分析,Django,fastapi

烟火目标检测数据集 7800张 烟火检测 带标注 voc yolo

一个包含7800张带标注图像的数据集,专门用于烟火目标检测,是一个非常有价值的资源,尤其对于那些致力于公共安全、事件管理和烟花表演监控等领域的人士而言。下面是对此数据集的一个详细介绍: 数据集名称:烟火目标检测数据集 数据集规模: 图片数量:7800张类别:主要包含烟火类目标,可能还包括其他相关类别,如烟火发射装置、背景等。格式:图像文件通常为JPEG或PNG格式;标注文件可能为X

pandas数据过滤

Pandas 数据过滤方法 Pandas 提供了多种方法来过滤数据,可以根据不同的条件进行筛选。以下是一些常见的 Pandas 数据过滤方法,结合实例进行讲解,希望能帮你快速理解。 1. 基于条件筛选行 可以使用布尔索引来根据条件过滤行。 import pandas as pd# 创建示例数据data = {'Name': ['Alice', 'Bob', 'Charlie', 'Dav

SWAP作物生长模型安装教程、数据制备、敏感性分析、气候变化影响、R模型敏感性分析与贝叶斯优化、Fortran源代码分析、气候数据降尺度与变化影响分析

查看原文>>>全流程SWAP农业模型数据制备、敏感性分析及气候变化影响实践技术应用 SWAP模型是由荷兰瓦赫宁根大学开发的先进农作物模型,它综合考虑了土壤-水分-大气以及植被间的相互作用;是一种描述作物生长过程的一种机理性作物生长模型。它不但运用Richard方程,使其能够精确的模拟土壤中水分的运动,而且耦合了WOFOST作物模型使作物的生长描述更为科学。 本文让更多的科研人员和农业工作者