elk+filebeat+kafka日志收集方案

2024-05-10 13:48

本文主要是介绍elk+filebeat+kafka日志收集方案,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

        • 架构图
        • 搭建
          • 安装zookeeper集群
          • 安装kafka集群
          • 安装kafka-manager 管理平台
          • 安装elasticsearch集群
          • 安装filebeat
          • 安装logstash
          • 安装kibana

架构图

在这里插入图片描述

搭建
安装zookeeper集群
  • tar -zxf /opt/files/zookeeper-3.4.8.tar.gz -C /opt/env

  • vim /opt/env/zookeeper-3.4.8/conf/zoo.cfg

    tickTime=2000
    initLimit=10
    syncLimit=5
    dataDir=/opt/env/zookeeper-3.4.8/data
    clientPort=2181
    server.1=192.168.12.41:2888:3888
    server.2=192.168.12.42:2888:3888
    server.3=192.168.12.43:2888:3888
    
  • vim /opt/env/zookeeper-3.4.8/conf/java.env

    export JVMFLAGS="-Xms512m -Xmx512m $JVMFLAGS"
    
  • vim /opt/env/zookeeper-3.4.8/data/myid #在数据目录下配置集群ID

    1
    
  • 禁用防火墙

    systemctl stop firewalld.service
    systemctl disable firewalld.service
    
  • 服务启动

    /opt/env/zookeeper-3.4.8/bin/zkServer.sh start # 启动
    /opt/env/zookeeper-3.4.8/bin/zkServer.sh status # 查看状态
    

同样步骤配置其他集群节点

安装kafka集群
  • tar -zxf kafka_2.11-0.9.0.0.tgz -C /opt/env/

  • vi /opt/env/kafka_2.11-0.9.0.0/config/server.properties

    broker.id=1
    listeners=PLAINTEXT://pek1-vm-05:9092
    log.dirs=/opt/env/kafka_2.11-0.9.0.0/log
    zookeeper.connect=192.168.1.222:2181
    advertised.host.name=0.0.0.0 #如果不修改。默认为localhost 导致使用IP远程连接不上
    
  • mkdir -p /opt/env/kafka_2.11-0.9.0.0/log --创建日志目录

  • 启动 kafka (请先确保zk是启动的)

    nohup /opt/env/kafka_2.11-0.9.0.0/bin/kafka-server-start.sh /opt/env/kafka_2.12-1.0.0/config/server.properties  #启动 kafka
    /opt/env/kafka_2.12-1.0.0/bin/kafka-server-start.sh -daemon /opt/env/kafka_2.12-1.0.0/config/server.properties # 后台运行
    
  • 创建topic

    /opt/env/kafka_2.12-1.0.0/bin/kafka-topics.sh --create --zookeeper 192.168.1.222:2181 --replication-factor 1 --partitions 1 --topic test001
    nohup ./bin/kafka-topics.sh --list --zookeeper 192.168.1.222:2181
    
  • 单机连通性测试

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test # 启动生产者
    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning  # 启动消费者,在producer端输入字符串并回车,查看consumer端是否显示
    

同样步骤配置其他集群节点

安装kafka-manager 管理平台
  • tar -zxf kafka-manager-1.3.3.17.tar.gz -C /opt/env/ #解压

  • vim ~/.sbt/repositories #更改sbt源,sbt运行时经常需要下载大量的jar包,默认连接到maven官网,速度通常比较慢

    [repositories]
    local
    aliyun: http://maven.aliyun.com/nexus/content/groups/public
    typesafe: http://repo.typesafe.com/typesafe/ivy-releases/, [organization]/[module]/(scala_[scalaVersion]/)(sbt_[sbtVersion]/)[revision]/[type]s/[artifact](-[classifier]).[ext], bootOnly
    
  • ./sbt clean dist

  • unzip /opt/env/kafka-manager-1.3.3.17/target/universal/kafka-manager-1.3.3.17.zip

  • mv /opt/env/kafka-manager-1.3.3.17/target/universal/kafka-manager-1.3.3.17 /opt/env/kafka-manager-1.3.3.17/kafka-manager

  • vim /opt/env/kafka-manager-1.3.3.17/kafka-manager/conf/application.conf kafka-manager.zkhosts=“10.1.1.41:2181,10.1.1.42:2181,10.1.1.43:2181”

  • 启动

      nohup /opt/env/kafka-manager-1.3.3.17/kafka-manager/bin/kafka-manager -Dconfig.file=/opt/env/kafka-manager-1.3.3.17/kafka-manager/conf/application.conf -Dhttp.port=19092 &
    
安装elasticsearch集群
  • tar zxf /opt/file/elasticsearch-6.2.3.tar.gz -C /opt/env/

  • vim elasticsearch-5.5.0/config/elasticsearch.yml #es配置项

    cluster.name: es.dev #集群名称,集群内各个节点必须一致
    node.name: node1 #节点名
    network.host: 0.0.0.0 #绑定IP ,所有网卡
    http.port: 9200 # http端口
    discovery.zen.ping.unicast.hosts: ["host1","host2","host3"] #集群内其他节点host
    discovery.zen.minimum_master_nodes: 2 #主节点数,官方建议nodes / 2 + 1
    
  • vim elasticsearch-5.5.0/config/jvm.options #jvm 参数配置

    -Xms512m
    -Xmx512m
    
  • useradd elasticsearch -g wheel -p elasticsearch #添加es 账户,因为默认不允许root启动

  • chown -R elasticsearch:wheel /opt/env/elasticsearch-5.5.0 #配置可执行权限

  • vim /etc/security/limits.conf ->gg +G 到最后一行,在最后一行之上添加如下内容

    soft nproc 65536
    hard nproc 65536
    soft nofile 65536
    hard nofile 65536
    
  • vim /etc/sysctl.conf

    vm.max_map_count=655360
    
  • sysctl -p #执行生效

  • 启动

    su elasticsearch # 切换到elasticsearch 用户
    /opt/env/elasticsearch-5.5.0/bin/elasticsearch  -d #后台启动es
    netstat -nltp #查看端口,可以看到 9200 和9300 已经开启
    
  • 测试

    curl -X GET http://localhost:9200/

  • 其他节点使用scp 拷贝相关配置文件

    scp -r -P 22 root@192.168.10.1:/opt/data/backup/mysql/* /root #示例而已
    
  • chrome 浏览器安装 elasticsearch head 插件

    插件下载:https://www.gugeapps.com/webstore/detail/elasticsearch-head/ffmkiejjmecolpfloofpjologoblkegm
    将下载的文件拖入:chrome 浏览器->更多工具->扩展程序 即可完成安装
    

vim /etc/sudoers 可以看到wheel组内用户可以执行所有命令,这是一种偷懒的做法
多网卡配置参数network.bind_host: 101.201.103.78,network.publish_host: 10.174.8.116,transport.host: 10.174.8.116

安装filebeat
  • tar zxf /opt/file/filebeat-5.5.0-linux-x86_64.tar.gz -C /opt/env

  • cp filebeat.yml filebeat.yml.init #默认的配置备份一下

  • vim filebeat.yml

    input_type: logpaths:/opt/apps/his_med/logs/*-error.logmultiline.pattern: ^\[  #多行日志合并处理multiline.negate: true  #多行日志合并处理multiline.match: after  #多行日志合并处理
    output.kafka:hosts: ["192.168.10.1:9092","192.168.10.2:9092","192.168.10.3:9092"]topic: 'log'
    
  • nohup /opt/env/filebeat-5.5.0-linux-x86_64/filebeat -e -c /opt/env/filebeat-5.5.0-linux-x86_64/filebeat.yml -d “publish” & #启动

安装logstash
  • tar -zxf /opt/file/logstash-5.5.0.tar.gz -C /opt/env/

  • 安装email 插件

  • vim Gemfile

    source "https://ruby.taobao.org/" #使用国内的镜像服务更快
    
  • logstash-plugin install logstash-output-email 安装email 插件,比较慢

  • vim config/kafka-es.conf

    input {kafka {bootstrap_servers => "192.168.10.1:9092,192.168.10.2:9092,192.168.10.3:9092"topics => ["log"]}
    }	output {elasticsearch {hosts => ["192.168.10.1:9200","192.168.10.2:9200","192.168.10.3:9200"]}
    }output {email {port => "25"address => "smtp.exmail.qq.com"username => "test@xxx.cn"password => "jFuddWjbsdfds9wwi4HS" #一定是客户端授权码,因为这个被坑了好久authentication => "plain"use_tls => falsefrom => "test@xxx.cn"subject => "服务器异常提醒"to => "own@xxx.cn"via => "smtp"body => "Content:\\n%{message}"}
    }nohup logstash -f /opt/env/logstash-5.5.0/config/kafka-es.conf &
    
  • nohup logstash -f /opt/env/logstash-5.5.0/config/kafka-es.conf & #启动

安装kibana
  • tar zxf kibana-5.5.0-linux-x86_64.tar.gz -C /opt/env/

  • vim config/kibana.yml

    server.port: 5601
    server.host: "0.0.0.0"
    elasticsearch.url: "http://192.168.10.1:9200"
    
  • nohup /opt/env/kibana-5.5.0-linux-x86_64/bin/kibana & #启动

这篇关于elk+filebeat+kafka日志收集方案的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/976622

相关文章

Debezium 与 Apache Kafka 的集成方式步骤详解

《Debezium与ApacheKafka的集成方式步骤详解》本文详细介绍了如何将Debezium与ApacheKafka集成,包括集成概述、步骤、注意事项等,通过KafkaConnect,D... 目录一、集成概述二、集成步骤1. 准备 Kafka 环境2. 配置 Kafka Connect3. 安装 D

Spring Boot整合log4j2日志配置的详细教程

《SpringBoot整合log4j2日志配置的详细教程》:本文主要介绍SpringBoot项目中整合Log4j2日志框架的步骤和配置,包括常用日志框架的比较、配置参数介绍、Log4j2配置详解... 目录前言一、常用日志框架二、配置参数介绍1. 日志级别2. 输出形式3. 日志格式3.1 PatternL

Redis 多规则限流和防重复提交方案实现小结

《Redis多规则限流和防重复提交方案实现小结》本文主要介绍了Redis多规则限流和防重复提交方案实现小结,包括使用String结构和Zset结构来记录用户IP的访问次数,具有一定的参考价值,感兴趣... 目录一:使用 String 结构记录固定时间段内某用户 IP 访问某接口的次数二:使用 Zset 进行

解读Redis秒杀优化方案(阻塞队列+基于Stream流的消息队列)

《解读Redis秒杀优化方案(阻塞队列+基于Stream流的消息队列)》该文章介绍了使用Redis的阻塞队列和Stream流的消息队列来优化秒杀系统的方案,通过将秒杀流程拆分为两条流水线,使用Redi... 目录Redis秒杀优化方案(阻塞队列+Stream流的消息队列)什么是消息队列?消费者组的工作方式每

开启mysql的binlog日志步骤详解

《开启mysql的binlog日志步骤详解》:本文主要介绍MySQL5.7版本中二进制日志(bin_log)的配置和使用,文中通过图文及代码介绍的非常详细,需要的朋友可以参考下... 目录1.查看是否开启bin_log2.数据库会把日志放进logs目录中3.查看log日志总结 mysql版本5.71.查看

MySQL分表自动化创建的实现方案

《MySQL分表自动化创建的实现方案》在数据库应用场景中,随着数据量的不断增长,单表存储数据可能会面临性能瓶颈,例如查询、插入、更新等操作的效率会逐渐降低,分表是一种有效的优化策略,它将数据分散存储在... 目录一、项目目的二、实现过程(一)mysql 事件调度器结合存储过程方式1. 开启事件调度器2. 创

Java中Springboot集成Kafka实现消息发送和接收功能

《Java中Springboot集成Kafka实现消息发送和接收功能》Kafka是一个高吞吐量的分布式发布-订阅消息系统,主要用于处理大规模数据流,它由生产者、消费者、主题、分区和代理等组件构成,Ka... 目录一、Kafka 简介二、Kafka 功能三、POM依赖四、配置文件五、生产者六、消费者一、Kaf

Kafka拦截器的神奇操作方法

《Kafka拦截器的神奇操作方法》Kafka拦截器是一种强大的机制,用于在消息发送和接收过程中插入自定义逻辑,它们可以用于消息定制、日志记录、监控、业务逻辑集成、性能统计和异常处理等,本文介绍Kafk... 目录前言拦截器的基本概念Kafka 拦截器的定义和基本原理:拦截器是 Kafka 消息传递的不可或缺

C++中实现调试日志输出

《C++中实现调试日志输出》在C++编程中,调试日志对于定位问题和优化代码至关重要,本文将介绍几种常用的调试日志输出方法,并教你如何在日志中添加时间戳,希望对大家有所帮助... 目录1. 使用 #ifdef _DEBUG 宏2. 加入时间戳:精确到毫秒3.Windows 和 MFC 中的调试日志方法MFC

SpringBoot如何使用TraceId日志链路追踪

《SpringBoot如何使用TraceId日志链路追踪》文章介绍了如何使用TraceId进行日志链路追踪,通过在日志中添加TraceId关键字,可以将同一次业务调用链上的日志串起来,本文通过实例代码... 目录项目场景:实现步骤1、pom.XML 依赖2、整合logback,打印日志,logback-sp