Springboot + kafka + ELK 日志存储 外网 流量 过大 iftop 流量分析

2024-02-02 19:38

本文主要是介绍Springboot + kafka + ELK 日志存储 外网 流量 过大 iftop 流量分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Springboot + kafka + ELK 的部署和使用请另行查找。

这值记录一下 外网流量过大问题。

环境:

腾讯云 服务器

内存型M5

4核 32GB 5Mbps

应用服务器:

kafka , ELK , 游戏服务器日志生成

问题:

由于这个时一个测试服务器,所以流量并没有开很大,5M 也是够用了。

在测试服务器中,4,5 个玩家正常的数据操作 会导致 网络很卡。

使用 腾讯云 服务器 流量监控 查看 ,显示 流量被占用完。

使用 iftop (如果 安装 请另行查找) 查看 时 logstash 和 kafka 相互之间的通讯(没有留下图)

修改 所有的 连接 ,使用 内网IP,localhost,127.0.0.1 都没有改变。

找到 不同点 ,最后发现 kafka 使用的是 docker 安装。ELK 和 游戏 使用的是原生安装。

使用 原生 安装 kafka 后,重启 所有服务,使用 iftop 查看 ,没有 logstash 和 kafka 使用外网流量

使用 java 调用 iftop 记录到日志中并分析


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.BufferedReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.lang.management.ManagementFactory;
import java.nio.charset.StandardCharsets;
import java.util.Objects;/*** @author: laowang* @date: 2022/2/11* @description: 执行,并记录到日志中*/
public class ExecIfTop {private static Logger LOGGER = LoggerFactory.getLogger(ExecIfTop.class);public void exec() throws IOException {// 保存进程IdString pid = ManagementFactory.getRuntimeMXBean().getName().split("@")[0];try (FileWriter writer = new FileWriter("server.pid")) {writer.write(pid);writer.flush();}new Thread(() -> {try {Process process = Runtime.getRuntime().exec("iftop -i eth0 -n -N -P -p -t -L 50");new ReaderInfoThread(process.getInputStream()).start();new ReaderErrorThread(process.getErrorStream()).start();if (process.waitFor() != 0) {LOGGER.info("cmd 执行失败:");LOGGER.error("cmd 执行失败:");}} catch (Exception e) {e.printStackTrace();LOGGER.info("执行错误:", e);LOGGER.error("执行错误:", e);}}).start();}public static class ReaderInfoThread extends Thread {InputStream is;public ReaderInfoThread(InputStream is) {this.is = is;}@Overridepublic void run() {try {InputStreamReader isr = new InputStreamReader(is, StandardCharsets.UTF_8);BufferedReader br = new BufferedReader(isr);String line = null;while (true) {line = br.readLine();if (Objects.isNull(line)) {break;}LOGGER.info(line);System.out.println(line);}} catch (Exception e) {e.printStackTrace();}}}public static class ReaderErrorThread extends Thread {InputStream is;public ReaderErrorThread(InputStream is) {this.is = is;}@Overridepublic void run() {try {InputStreamReader isr = new InputStreamReader(this.is, StandardCharsets.UTF_8);BufferedReader br = new BufferedReader(isr);String line = null;while (true) {line = br.readLine();if (Objects.isNull(line)) {break;}LOGGER.error(line);System.err.println(line);}} catch (Exception e) {e.printStackTrace();}}}
}
package com.wgq.iftop;import org.apache.poi.hssf.usermodel.HSSFWorkbook;
import org.apache.poi.ss.usermodel.Row;
import org.apache.poi.ss.usermodel.Sheet;
import org.apache.poi.ss.usermodel.Workbook;import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.function.Function;/*** @author: laowang* @date: 2022/2/11* @description: 分析日志信息*/
public class AnalysisIfTop {/** 日期 */private static final DateTimeFormatter dateFormat = DateTimeFormatter.ofPattern("yyyy-MM-dd");/** 统计 一个 完整的数据中 当前时第几部分 中 */private Count count = new Count();private ConnectInfo connectInfo = null;private HashMap<String, ConnectInfo> connectMap = new HashMap<>();public void exec(String path) {File file = new File(path);if (!file.exists()) {return;}List<File> files = Arrays.asList(Objects.requireNonNull(file.listFiles((File pathname) -> {if (!pathname.isFile()) {return false;}if (pathname.getName().startsWith("info")) {return true;}return false;})));if (Objects.isNull(files) || files.isEmpty()) {return;}this.count = new Count();this.connectMap = new HashMap<>();this.connectInfo = null;files.sort(new FileNameComparator());count.set(0);for (var f : files) {System.err.println(f.getName());readFile(f);}saveToExcel(new ArrayList<>(this.connectMap.values()));}private void readFile(File file) {try {InputStreamReader isr = new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8);BufferedReader br = new BufferedReader(isr);String line = null;while (true) {line = br.readLine();if (Objects.isNull(line)) {break;}analysis(line);}} catch (Exception e) {e.printStackTrace();}}// 分析数据private void analysis(String line) {line = line.substring(13);if (line.indexOf("=======") > -1) {doFinish();} else if (line.indexOf("-----") > -1) {doStart();} else {if (count.get() == 1) {// 开始计算if (line.indexOf("=>") > -1) {// 出网connectInfo = doOutput(line);} else if (line.indexOf("<=") > -1) {// 入网connectInfo = doInput(line, connectInfo);ConnectInfo old = connectMap.get(connectInfo.getKey());if (Objects.isNull(old)) {connectMap.put(connectInfo.getKey(), connectInfo);} else {old.merge(connectInfo);}connectInfo = null;}}}}/** 出网 */private ConnectInfo doOutput(String line) {line = delMoreEmpty(line);String[] values = line.split(" ");ConnectInfo info = new ConnectInfo();info.setOut(values[1], new FlowInfo(values[3], values[4], values[5], values[6]));return info;}/** 入网 */private ConnectInfo doInput(String line, ConnectInfo info) {line = delMoreEmpty(line);String[] values = line.split(" ");info.setInput(values[0], new FlowInfo(values[2], values[3], values[4], values[5]));return info;}/** 移除多余的 空格 */private String delMoreEmpty(String line) {char[] chars = line.toCharArray();StringBuffer sb = new StringBuffer();boolean isEmpty = true;for (char c : chars) {if (c == ' ') {if (isEmpty) {continue;}isEmpty = true;} else {isEmpty = false;}sb.append(c);}return sb.toString();}// 完整数据中,数据的拆分private void doStart() {count.add();if (count.get() == 0) {// 前面是数据的说明,现在开始统计数据} else if (count.get() == 1) {// 流量数据 逐条 完毕,开始总量统计}}// 一个数据完毕private void doFinish() {count.set(0);}private CumulativeLevel getLevel(String value) {if (value.indexOf("GB") > -1) {value = value.replaceAll("GB", "").trim();return new CumulativeLevel(Double.valueOf(value), 4);} else if (value.indexOf("MB") > -1) {value = value.replaceAll("MB", "").trim();return new CumulativeLevel(Double.valueOf(value), 3);} else if (value.indexOf("KB") > -1) {value = value.replaceAll("KB", "").trim();return new CumulativeLevel(Double.valueOf(value), 2);} else if (value.indexOf("B") > -1) {value = value.replaceAll("B", "").trim();return new CumulativeLevel(Double.valueOf(value), 1);}return new CumulativeLevel(0, 0);}public int doCompare(String o1, String o2) {CumulativeLevel l1 = getLevel(o1);CumulativeLevel l2 = getLevel(o2);if (l1.getL() != l2.getL()) {return l2.getL() - l1.getL();}if (l1.getC() > l2.getC()) {return -1;} else if (l1.getC() < l2.getC()) {return 1;}return 0;}private void saveToExcel(List<ConnectInfo> connectInfos) {Workbook wb = new HSSFWorkbook();// 输入排行榜Sheet sheet = wb.createSheet("入网");connectInfos.sort(new InputComparator());createSheet(sheet, connectInfos, ConnectInfo::getInputCumulative);// 输出排行榜sheet = wb.createSheet("出网");connectInfos.sort(new OutputComparator());createSheet(sheet, connectInfos, ConnectInfo::getOutCumulative);try {wb.write(new FileOutputStream(new File("IfTop.xls")));wb.close();} catch (Exception e) {e.printStackTrace();}}private void createSheet(Sheet sheet, List<ConnectInfo> connectInfos, Function<ConnectInfo, String> function) {Row row = sheet.createRow(0);row.createCell(0).setCellValue("IP:PORT");row.createCell(1).setCellValue("IP:PORT");row.createCell(2).setCellValue("总流量");for (int i = 0; i < connectInfos.size(); i++) {ConnectInfo info = connectInfos.get(i);row = sheet.createRow(i + 1);row.createCell(0).setCellValue(info.getValue1());row.createCell(1).setCellValue(info.getValue2());row.createCell(2).setCellValue(function.apply(info));}}/** 链接信息 */class ConnectInfo {private String value1;private String value2;private FlowInfo input;private FlowInfo out;public String getKey() {return value1 + " " + value2;}public void setInput(String value1, FlowInfo input) {this.value1 = value1;this.input = input;}public void setOut(String value2, FlowInfo out) {this.out = out;this.value2 = value2;}public void merge(ConnectInfo connectInfo) {// 数值进行合并this.input = connectInfo.input;this.out = connectInfo.out;}public String getInputCumulative() {return input.getCumulative();}public String getOutCumulative() {return out.getCumulative();}public String getValue1() {return value1;}public String getValue2() {return value2;}@Overridepublic String toString() {return "{" +"input=" + input +", out=" + out +'}';}}// 流量信息class FlowInfo {private String last2;private String last10;private String last40;private String cumulative;FlowInfo(String last2, String last10, String last40, String cumulative) {this.last2 = last2;this.last10 = last10;this.last40 = last40;this.cumulative = cumulative;}public String getLast2() {return last2;}public String getLast10() {return last10;}public String getLast40() {return last40;}public String getCumulative() {return cumulative;}@Overridepublic String toString() {return "{" +"last2='" + last2 + '\'' +", last10='" + last10 + '\'' +", last40='" + last40 + '\'' +", cumulative='" + cumulative + '\'' +'}';}}/** 根据总流量排序 */class InputComparator implements Comparator<ConnectInfo> {@Overridepublic int compare(ConnectInfo o1, ConnectInfo o2) {return doCompare(o1.getInputCumulative(), o2.getInputCumulative());}}/** 根据总流量排序 */class OutputComparator implements Comparator<ConnectInfo> {@Overridepublic int compare(ConnectInfo o1, ConnectInfo o2) {return doCompare(o1.getOutCumulative(), o2.getOutCumulative());}}/** 流量和层级 */class CumulativeLevel {private double c;private int l;CumulativeLevel(double c, int l) {this.c = c;this.l = l;}public double getC() {return c;}public int getL() {return l;}}/** 更具文件名字排序 */class FileNameComparator implements Comparator<File> {@Overridepublic int compare(File o1, File o2) {if (o1.getName().equals("info.log")) {return 1;}if (o2.getName().equals("info.log")) {return -1;}// 根据 年月日排序String name1 = o1.getName().replaceAll("info-", "").replaceAll(".log", "");String name2 = o2.getName().replaceAll("info-", "").replaceAll(".log", "");DateIndex v1 = new DateIndex(name1);DateIndex v2 = new DateIndex(name2);int dateCompare = v1.getDate().compareTo(v2.getDate());if (dateCompare != 0) {return dateCompare;}return v1.getIndex() - v2.getIndex();}}/** 时间和编号 */class DateIndex {private LocalDate date;private int index;public DateIndex(String input) {String[] values = input.split("[.]");this.date = LocalDate.parse(values[0], dateFormat);this.index = Integer.valueOf(values[1]);}public LocalDate getDate() {return date;}public int getIndex() {return index;}}/** 计数 */class Count {private int count;public int get() {return count;}public void add() {this.count++;}public void set(int v) {this.count = v;}}
}

application.properties

game.logs.path=logs

logback-spring.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration><contextName>logback</contextName><springProperty scope="context" name="app_name" source="spring.application.name"/><springProperty scope="context" name="log.path" source="game.logs.path"/><!-- 控制台输出 --><appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"><encoder><pattern>%d{HH:mm:ss.SSS} %msg%n </pattern></encoder></appender><!-- 不带过滤器,能记录所有级别的日志 --><appender name="INFO_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender"><File>${log.path}/info.log</File><rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"><!-- 策略在每次往日志中添加新内容时触发,如果满足条件(每分钟对应一个日志文件),就将info.log复制到${log.path}目录并更名为info-2017-11-22_13-15.1.log,并删除原info.log,另一种生成新文件的条件是,info.log大小大于maxFileSize时,如果当前这一分钟已经有一个文件了,则i加1。通常情况下,日志按天分割,如:${log.path}/info-%d{yyyyMMdd}.%i.log --><fileNamePattern>${log.path}/info-%d{yyyy-MM-dd}.%i.log</fileNamePattern><timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP"><maxFileSize>1MB</maxFileSize></timeBasedFileNamingAndTriggeringPolicy><maxHistory>60</maxHistory><totalSizeCap>20GB</totalSizeCap></rollingPolicy><layout class="ch.qos.logback.classic.PatternLayout"><pattern>%d{HH:mm:ss.SSS} %msg%n </pattern></layout></appender><!-- error级别的文件输出 --><appender name="ERROR_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender"><filter class="ch.qos.logback.classic.filter.ThresholdFilter"><level>ERROR</level></filter><File>${log.path}/error.log</File><rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"><fileNamePattern>${log.path}/error-%d{yyyy-MM-dd}.%i.log</fileNamePattern><timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP"><maxFileSize>1MB</maxFileSize></timeBasedFileNamingAndTriggeringPolicy><maxHistory>60</maxHistory><totalSizeCap>20GB</totalSizeCap></rollingPolicy><layout class="ch.qos.logback.classic.PatternLayout"><pattern>%d{HH:mm:ss.SSS} %msg%n </pattern></layout></appender><root level="info"><appender-ref ref="STDOUT" /><appender-ref ref="INFO_FILE" /><appender-ref ref="ERROR_FILE" /></root><springProfile name="dev"><logger name="com" level="DEBUG" additivity="false"><appender-ref ref="STDOUT" /></logger></springProfile><springProfile name="test,prod"><logger name="com" level="INFO" additivity="false"><appender-ref ref="INFO_FILE" /><appender-ref ref="ERROR_FILE" /></logger></springProfile>
</configuration>

pom.xml 引用 poi 生成 excel

<dependency><groupId>org.apache.poi</groupId><artifactId>poi-ooxml</artifactId><version>5.2.0</version></dependency>

打包 生成 iftop-1.0.0.jar 运行

nohup /data/java/bin/java -jar -Xms512m -Xmx512m -server iftop-1.0.0.jar --spring.profiles.active=prod --spring.name=iftop 1>>logs/out.log 2>>logs/err.log & tail logs/out.log -f logs/err.log -f

这篇关于Springboot + kafka + ELK 日志存储 外网 流量 过大 iftop 流量分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/671752

相关文章

Java图片压缩三种高效压缩方案详细解析

《Java图片压缩三种高效压缩方案详细解析》图片压缩通常涉及减少图片的尺寸缩放、调整图片的质量(针对JPEG、PNG等)、使用特定的算法来减少图片的数据量等,:本文主要介绍Java图片压缩三种高效... 目录一、基于OpenCV的智能尺寸压缩技术亮点:适用场景:二、JPEG质量参数压缩关键技术:压缩效果对比

Java调用C++动态库超详细步骤讲解(附源码)

《Java调用C++动态库超详细步骤讲解(附源码)》C语言因其高效和接近硬件的特性,时常会被用在性能要求较高或者需要直接操作硬件的场合,:本文主要介绍Java调用C++动态库的相关资料,文中通过代... 目录一、直接调用C++库第一步:动态库生成(vs2017+qt5.12.10)第二步:Java调用C++

springboot+dubbo实现时间轮算法

《springboot+dubbo实现时间轮算法》时间轮是一种高效利用线程资源进行批量化调度的算法,本文主要介绍了springboot+dubbo实现时间轮算法,文中通过示例代码介绍的非常详细,对大家... 目录前言一、参数说明二、具体实现1、HashedwheelTimer2、createWheel3、n

Java利用docx4j+Freemarker生成word文档

《Java利用docx4j+Freemarker生成word文档》这篇文章主要为大家详细介绍了Java如何利用docx4j+Freemarker生成word文档,文中的示例代码讲解详细,感兴趣的小伙伴... 目录技术方案maven依赖创建模板文件实现代码技术方案Java 1.8 + docx4j + Fr

SpringBoot首笔交易慢问题排查与优化方案

《SpringBoot首笔交易慢问题排查与优化方案》在我们的微服务项目中,遇到这样的问题:应用启动后,第一笔交易响应耗时高达4、5秒,而后续请求均能在毫秒级完成,这不仅触发监控告警,也极大影响了用户体... 目录问题背景排查步骤1. 日志分析2. 性能工具定位优化方案:提前预热各种资源1. Flowable

kotlin中const 和val的区别及使用场景分析

《kotlin中const和val的区别及使用场景分析》在Kotlin中,const和val都是用来声明常量的,但它们的使用场景和功能有所不同,下面给大家介绍kotlin中const和val的区别,... 目录kotlin中const 和val的区别1. val:2. const:二 代码示例1 Java

基于SpringBoot+Mybatis实现Mysql分表

《基于SpringBoot+Mybatis实现Mysql分表》这篇文章主要为大家详细介绍了基于SpringBoot+Mybatis实现Mysql分表的相关知识,文中的示例代码讲解详细,感兴趣的小伙伴可... 目录基本思路定义注解创建ThreadLocal创建拦截器业务处理基本思路1.根据创建时间字段按年进

Java编译生成多个.class文件的原理和作用

《Java编译生成多个.class文件的原理和作用》作为一名经验丰富的开发者,在Java项目中执行编译后,可能会发现一个.java源文件有时会产生多个.class文件,从技术实现层面详细剖析这一现象... 目录一、内部类机制与.class文件生成成员内部类(常规内部类)局部内部类(方法内部类)匿名内部类二、

Go标准库常见错误分析和解决办法

《Go标准库常见错误分析和解决办法》Go语言的标准库为开发者提供了丰富且高效的工具,涵盖了从网络编程到文件操作等各个方面,然而,标准库虽好,使用不当却可能适得其反,正所谓工欲善其事,必先利其器,本文将... 目录1. 使用了错误的time.Duration2. time.After导致的内存泄漏3. jsO

SpringBoot实现数据库读写分离的3种方法小结

《SpringBoot实现数据库读写分离的3种方法小结》为了提高系统的读写性能和可用性,读写分离是一种经典的数据库架构模式,在SpringBoot应用中,有多种方式可以实现数据库读写分离,本文将介绍三... 目录一、数据库读写分离概述二、方案一:基于AbstractRoutingDataSource实现动态