elk+filebeat+kafka日志收集方案

2024-05-10 13:48

本文主要是介绍elk+filebeat+kafka日志收集方案,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

        • 架构图
        • 搭建
          • 安装zookeeper集群
          • 安装kafka集群
          • 安装kafka-manager 管理平台
          • 安装elasticsearch集群
          • 安装filebeat
          • 安装logstash
          • 安装kibana

架构图

在这里插入图片描述

搭建
安装zookeeper集群
  • tar -zxf /opt/files/zookeeper-3.4.8.tar.gz -C /opt/env

  • vim /opt/env/zookeeper-3.4.8/conf/zoo.cfg

    tickTime=2000
    initLimit=10
    syncLimit=5
    dataDir=/opt/env/zookeeper-3.4.8/data
    clientPort=2181
    server.1=192.168.12.41:2888:3888
    server.2=192.168.12.42:2888:3888
    server.3=192.168.12.43:2888:3888
    
  • vim /opt/env/zookeeper-3.4.8/conf/java.env

    export JVMFLAGS="-Xms512m -Xmx512m $JVMFLAGS"
    
  • vim /opt/env/zookeeper-3.4.8/data/myid #在数据目录下配置集群ID

    1
    
  • 禁用防火墙

    systemctl stop firewalld.service
    systemctl disable firewalld.service
    
  • 服务启动

    /opt/env/zookeeper-3.4.8/bin/zkServer.sh start # 启动
    /opt/env/zookeeper-3.4.8/bin/zkServer.sh status # 查看状态
    

同样步骤配置其他集群节点

安装kafka集群
  • tar -zxf kafka_2.11-0.9.0.0.tgz -C /opt/env/

  • vi /opt/env/kafka_2.11-0.9.0.0/config/server.properties

    broker.id=1
    listeners=PLAINTEXT://pek1-vm-05:9092
    log.dirs=/opt/env/kafka_2.11-0.9.0.0/log
    zookeeper.connect=192.168.1.222:2181
    advertised.host.name=0.0.0.0 #如果不修改。默认为localhost 导致使用IP远程连接不上
    
  • mkdir -p /opt/env/kafka_2.11-0.9.0.0/log --创建日志目录

  • 启动 kafka (请先确保zk是启动的)

    nohup /opt/env/kafka_2.11-0.9.0.0/bin/kafka-server-start.sh /opt/env/kafka_2.12-1.0.0/config/server.properties  #启动 kafka
    /opt/env/kafka_2.12-1.0.0/bin/kafka-server-start.sh -daemon /opt/env/kafka_2.12-1.0.0/config/server.properties # 后台运行
    
  • 创建topic

    /opt/env/kafka_2.12-1.0.0/bin/kafka-topics.sh --create --zookeeper 192.168.1.222:2181 --replication-factor 1 --partitions 1 --topic test001
    nohup ./bin/kafka-topics.sh --list --zookeeper 192.168.1.222:2181
    
  • 单机连通性测试

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test # 启动生产者
    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning  # 启动消费者,在producer端输入字符串并回车,查看consumer端是否显示
    

同样步骤配置其他集群节点

安装kafka-manager 管理平台
  • tar -zxf kafka-manager-1.3.3.17.tar.gz -C /opt/env/ #解压

  • vim ~/.sbt/repositories #更改sbt源,sbt运行时经常需要下载大量的jar包,默认连接到maven官网,速度通常比较慢

    [repositories]
    local
    aliyun: http://maven.aliyun.com/nexus/content/groups/public
    typesafe: http://repo.typesafe.com/typesafe/ivy-releases/, [organization]/[module]/(scala_[scalaVersion]/)(sbt_[sbtVersion]/)[revision]/[type]s/[artifact](-[classifier]).[ext], bootOnly
    
  • ./sbt clean dist

  • unzip /opt/env/kafka-manager-1.3.3.17/target/universal/kafka-manager-1.3.3.17.zip

  • mv /opt/env/kafka-manager-1.3.3.17/target/universal/kafka-manager-1.3.3.17 /opt/env/kafka-manager-1.3.3.17/kafka-manager

  • vim /opt/env/kafka-manager-1.3.3.17/kafka-manager/conf/application.conf kafka-manager.zkhosts=“10.1.1.41:2181,10.1.1.42:2181,10.1.1.43:2181”

  • 启动

      nohup /opt/env/kafka-manager-1.3.3.17/kafka-manager/bin/kafka-manager -Dconfig.file=/opt/env/kafka-manager-1.3.3.17/kafka-manager/conf/application.conf -Dhttp.port=19092 &
    
安装elasticsearch集群
  • tar zxf /opt/file/elasticsearch-6.2.3.tar.gz -C /opt/env/

  • vim elasticsearch-5.5.0/config/elasticsearch.yml #es配置项

    cluster.name: es.dev #集群名称,集群内各个节点必须一致
    node.name: node1 #节点名
    network.host: 0.0.0.0 #绑定IP ,所有网卡
    http.port: 9200 # http端口
    discovery.zen.ping.unicast.hosts: ["host1","host2","host3"] #集群内其他节点host
    discovery.zen.minimum_master_nodes: 2 #主节点数,官方建议nodes / 2 + 1
    
  • vim elasticsearch-5.5.0/config/jvm.options #jvm 参数配置

    -Xms512m
    -Xmx512m
    
  • useradd elasticsearch -g wheel -p elasticsearch #添加es 账户,因为默认不允许root启动

  • chown -R elasticsearch:wheel /opt/env/elasticsearch-5.5.0 #配置可执行权限

  • vim /etc/security/limits.conf ->gg +G 到最后一行,在最后一行之上添加如下内容

    soft nproc 65536
    hard nproc 65536
    soft nofile 65536
    hard nofile 65536
    
  • vim /etc/sysctl.conf

    vm.max_map_count=655360
    
  • sysctl -p #执行生效

  • 启动

    su elasticsearch # 切换到elasticsearch 用户
    /opt/env/elasticsearch-5.5.0/bin/elasticsearch  -d #后台启动es
    netstat -nltp #查看端口,可以看到 9200 和9300 已经开启
    
  • 测试

    curl -X GET http://localhost:9200/

  • 其他节点使用scp 拷贝相关配置文件

    scp -r -P 22 root@192.168.10.1:/opt/data/backup/mysql/* /root #示例而已
    
  • chrome 浏览器安装 elasticsearch head 插件

    插件下载:https://www.gugeapps.com/webstore/detail/elasticsearch-head/ffmkiejjmecolpfloofpjologoblkegm
    将下载的文件拖入:chrome 浏览器->更多工具->扩展程序 即可完成安装
    

vim /etc/sudoers 可以看到wheel组内用户可以执行所有命令,这是一种偷懒的做法
多网卡配置参数network.bind_host: 101.201.103.78,network.publish_host: 10.174.8.116,transport.host: 10.174.8.116

安装filebeat
  • tar zxf /opt/file/filebeat-5.5.0-linux-x86_64.tar.gz -C /opt/env

  • cp filebeat.yml filebeat.yml.init #默认的配置备份一下

  • vim filebeat.yml

    input_type: logpaths:/opt/apps/his_med/logs/*-error.logmultiline.pattern: ^\[  #多行日志合并处理multiline.negate: true  #多行日志合并处理multiline.match: after  #多行日志合并处理
    output.kafka:hosts: ["192.168.10.1:9092","192.168.10.2:9092","192.168.10.3:9092"]topic: 'log'
    
  • nohup /opt/env/filebeat-5.5.0-linux-x86_64/filebeat -e -c /opt/env/filebeat-5.5.0-linux-x86_64/filebeat.yml -d “publish” & #启动

安装logstash
  • tar -zxf /opt/file/logstash-5.5.0.tar.gz -C /opt/env/

  • 安装email 插件

  • vim Gemfile

    source "https://ruby.taobao.org/" #使用国内的镜像服务更快
    
  • logstash-plugin install logstash-output-email 安装email 插件,比较慢

  • vim config/kafka-es.conf

    input {kafka {bootstrap_servers => "192.168.10.1:9092,192.168.10.2:9092,192.168.10.3:9092"topics => ["log"]}
    }	output {elasticsearch {hosts => ["192.168.10.1:9200","192.168.10.2:9200","192.168.10.3:9200"]}
    }output {email {port => "25"address => "smtp.exmail.qq.com"username => "test@xxx.cn"password => "jFuddWjbsdfds9wwi4HS" #一定是客户端授权码,因为这个被坑了好久authentication => "plain"use_tls => falsefrom => "test@xxx.cn"subject => "服务器异常提醒"to => "own@xxx.cn"via => "smtp"body => "Content:\\n%{message}"}
    }nohup logstash -f /opt/env/logstash-5.5.0/config/kafka-es.conf &
    
  • nohup logstash -f /opt/env/logstash-5.5.0/config/kafka-es.conf & #启动

安装kibana
  • tar zxf kibana-5.5.0-linux-x86_64.tar.gz -C /opt/env/

  • vim config/kibana.yml

    server.port: 5601
    server.host: "0.0.0.0"
    elasticsearch.url: "http://192.168.10.1:9200"
    
  • nohup /opt/env/kibana-5.5.0-linux-x86_64/bin/kibana & #启动

这篇关于elk+filebeat+kafka日志收集方案的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/976622

相关文章

MyBatis Plus实现时间字段自动填充的完整方案

《MyBatisPlus实现时间字段自动填充的完整方案》在日常开发中,我们经常需要记录数据的创建时间和更新时间,传统的做法是在每次插入或更新操作时手动设置这些时间字段,这种方式不仅繁琐,还容易遗漏,... 目录前言解决目标技术栈实现步骤1. 实体类注解配置2. 创建元数据处理器3. 服务层代码优化填充机制详

防止Linux rm命令误操作的多场景防护方案与实践

《防止Linuxrm命令误操作的多场景防护方案与实践》在Linux系统中,rm命令是删除文件和目录的高效工具,但一旦误操作,如执行rm-rf/或rm-rf/*,极易导致系统数据灾难,本文针对不同场景... 目录引言理解 rm 命令及误操作风险rm 命令基础常见误操作案例防护方案使用 rm编程 别名及安全删除

Python实现批量CSV转Excel的高性能处理方案

《Python实现批量CSV转Excel的高性能处理方案》在日常办公中,我们经常需要将CSV格式的数据转换为Excel文件,本文将介绍一个基于Python的高性能解决方案,感兴趣的小伙伴可以跟随小编一... 目录一、场景需求二、技术方案三、核心代码四、批量处理方案五、性能优化六、使用示例完整代码七、小结一、

C#使用Spire.Doc for .NET实现HTML转Word的高效方案

《C#使用Spire.Docfor.NET实现HTML转Word的高效方案》在Web开发中,HTML内容的生成与处理是高频需求,然而,当用户需要将HTML页面或动态生成的HTML字符串转换为Wor... 目录引言一、html转Word的典型场景与挑战二、用 Spire.Doc 实现 HTML 转 Word1

SpringBoot日志级别与日志分组详解

《SpringBoot日志级别与日志分组详解》文章介绍了日志级别(ALL至OFF)及其作用,说明SpringBoot默认日志级别为INFO,可通过application.properties调整全局或... 目录日志级别1、级别内容2、调整日志级别调整默认日志级别调整指定类的日志级别项目开发过程中,利用日志

使用Python实现Word文档的自动化对比方案

《使用Python实现Word文档的自动化对比方案》我们经常需要比较两个Word文档的版本差异,无论是合同修订、论文修改还是代码文档更新,人工比对不仅效率低下,还容易遗漏关键改动,下面通过一个实际案例... 目录引言一、使用python-docx库解析文档结构二、使用difflib进行差异比对三、高级对比方

Java Kafka消费者实现过程

《JavaKafka消费者实现过程》Kafka消费者通过KafkaConsumer类实现,核心机制包括偏移量管理、消费者组协调、批量拉取消息及多线程处理,手动提交offset确保数据可靠性,自动提交... 目录基础KafkaConsumer类分析关键代码与核心算法2.1 订阅与分区分配2.2 拉取消息2.3

深度剖析SpringBoot日志性能提升的原因与解决

《深度剖析SpringBoot日志性能提升的原因与解决》日志记录本该是辅助工具,却为何成了性能瓶颈,SpringBoot如何用代码彻底破解日志导致的高延迟问题,感兴趣的小伙伴可以跟随小编一起学习一下... 目录前言第一章:日志性能陷阱的底层原理1.1 日志级别的“双刃剑”效应1.2 同步日志的“吞吐量杀手”

Python利用PySpark和Kafka实现流处理引擎构建指南

《Python利用PySpark和Kafka实现流处理引擎构建指南》本文将深入解剖基于Python的实时处理黄金组合:Kafka(分布式消息队列)与PySpark(分布式计算引擎)的化学反应,并构建一... 目录引言:数据洪流时代的生存法则第一章 Kafka:数据世界的中央神经系统消息引擎核心设计哲学高吞吐

Python多线程应用中的卡死问题优化方案指南

《Python多线程应用中的卡死问题优化方案指南》在利用Python语言开发某查询软件时,遇到了点击搜索按钮后软件卡死的问题,本文将简单分析一下出现的原因以及对应的优化方案,希望对大家有所帮助... 目录问题描述优化方案1. 网络请求优化2. 多线程架构优化3. 全局异常处理4. 配置管理优化优化效果1.