本文主要是介绍ELK+Kafka+Beats实现海量日志收集平台(二),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
ELK+Kafka+Beats实现海量日志收集平台(二)
目录
三、环境搭建
四、部署demo工程项目
五、测试
三、环境搭建
通过上一小节应用场景和实现原理的介绍,接下来实现所需环境搭建及说明架构图如下所示:
环境说明:
192.168.232.6 : 部署了demo项目(用于产生数据日志)
filebeat-6.6.0
192.168.232.3 : Kafka (单体)(Zookeeper:192.168.232.3~5)
192.168.232.4 : Kibana
192.168.232.7 : Logstash (单体)
ES集群:192.168.232.8~10
1、filebeat安装配置参考https://www.cnblogs.com/jhtian/p/13731230.html
2、Kafka安装配置参考https://www.cnblogs.com/jhtian/p/13708679.html
3、Logstash安装配置参考https://www.cnblogs.com/jhtian/p/13744753.html
4、ES集群搭建可参考https://www.cnblogs.com/jhtian/p/12703651.html
5、Kibana安装可参考https://www.cnblogs.com/jhtian/p/13785029.html
四、部署demo工程项目
项目结构图如下,分别调用项目的 /index、/error两个方法分别打印正常、错误日志(warn及以上级别日志)到logs文件夹中,作为filebeat读取数据的来源。
web访问类文件
indexAction.java
1 package com.tianjh.demo.web;2 3 import com.tianjh.demo.util.SetMDC;4 import lombok.extern.slf4j.Slf4j;5 import org.springframework.web.bind.annotation.RequestMapping;6 import org.springframework.web.bind.annotation.RestController;7 8 @Slf4j9 @RestController
10 public class indexAction {
11
12 @RequestMapping(value = "/index")
13 public String index() {
14 SetMDC.putMDC();
15 log.info("这是一条模拟error日志打印");
16 log.info("这是一条模拟warn日志打印");
17 log.info("这是一条模拟info日志打印");
18 return "hello word";
19 }
20
21 @RequestMapping(value = "/err")
22 public String error() {
23 SetMDC.putMDC();
24 try {
25 int a = 5/0;
26 } catch (Exception e) {
27 log.error("算术异常", e);
28 }
29 return "error";
30 }
31 }
工具类Utils
FastJsonConvertUtil.java
1 package com.tianjh.demo.util;2 3 import java.util.ArrayList;4 import java.util.List;5 6 import com.alibaba.fastjson.JSON;7 import com.alibaba.fastjson.JSONObject;8 import com.alibaba.fastjson.serializer.SerializerFeature;9 10 import lombok.extern.slf4j.Slf4j;11 12 /**13 * $FastJsonConvertUtil14 * @author hezhuo.bai15 * @since 2019年1月15日 下午4:53:2816 */17 @Slf4j18 public class FastJsonConvertUtil {19 20 private static final SerializerFeature[] featuresWithNullValue = { SerializerFeature.WriteMapNullValue, SerializerFeature.WriteNullBooleanAsFalse,21 SerializerFeature.WriteNullListAsEmpty, SerializerFeature.WriteNullNumberAsZero, SerializerFeature.WriteNullStringAsEmpty };22 23 /**24 * <B>方法名称:</B>将JSON字符串转换为实体对象<BR>25 * <B>概要说明:</B>将JSON字符串转换为实体对象<BR>26 * @author hezhuo.bai27 * @since 2019年1月15日 下午4:53:49 28 * @param data JSON字符串29 * @param clzss 转换对象30 * @return T31 */32 public static <T> T convertJSONToObject(String data, Class<T> clzss) {33 try {34 T t = JSON.parseObject(data, clzss);35 return t;36 } catch (Exception e) {37 log.error("convertJSONToObject Exception", e);38 return null;39 }40 }41 42 /**43 * <B>方法名称:</B>将JSONObject对象转换为实体对象<BR>44 * <B>概要说明:</B>将JSONObject对象转换为实体对象<BR>45 * @author hezhuo.bai46 * @since 2019年1月15日 下午4:54:3247 * @param data JSONObject对象48 * @param clzss 转换对象49 * @return T50 */51 public static <T> T convertJSONToObject(JSONObject data, Class<T> clzss) {52 try {53 T t = JSONObject.toJavaObject(data, clzss);54 return t;55 } catch (Exception e) {56 log.error("convertJSONToObject Exception", e);57 return null;58 }59 }60 61 /**62 * <B>方法名称:</B>将JSON字符串数组转为List集合对象<BR>63 * <B>概要说明:</B>将JSON字符串数组转为List集合对象<BR>64 * @author hezhuo.bai65 * @since 2019年1月15日 下午4:54:5066 * @param data JSON字符串数组67 * @param clzss 转换对象68 * @return List<T>集合对象69 */70 public static <T> List<T> convertJSONToArray(String data, Class<T> clzss) {71 try {72 List<T> t = JSON.parseArray(data, clzss);73 return t;74 } catch (Exception e) {75 log.error("convertJSONToArray Exception", e);76 return null;77 }78 }79 80 /**81 * <B>方法名称:</B>将List<JSONObject>转为List集合对象<BR>82 * <B>概要说明:</B>将List<JSONObject>转为List集合对象<BR>83 * @author hezhuo.bai84 * @since 2019年1月15日 下午4:55:1185 * @param data List<JSONObject>86 * @param clzss 转换对象87 * @return List<T>集合对象88 */89 public static <T> List<T> convertJSONToArray(List<JSONObject> data, Class<T> clzss) {90 try {91 List<T> t = new ArrayList<T>();92 for (JSONObject jsonObject : data) {93 t.add(convertJSONToObject(jsonObject, clzss));94 }95 return t;96 } catch (Exception e) {97 log.error("convertJSONToArray Exception", e);98 return null;99 }
100 }
101
102 /**
103 * <B>方法名称:</B>将对象转为JSON字符串<BR>
104 * <B>概要说明:</B>将对象转为JSON字符串<BR>
105 * @author hezhuo.bai
106 * @since 2019年1月15日 下午4:55:41
107 * @param obj 任意对象
108 * @return JSON字符串
109 */
110 public static String convertObjectToJSON(Object obj) {
111 try {
112 String text = JSON.toJSONString(obj);
113 return text;
114 } catch (Exception e) {
115 log.error("convertObjectToJSON Exception", e);
116 return null;
117 }
118 }
119
120 /**
121 * <B>方法名称:</B>将对象转为JSONObject对象<BR>
122 * <B>概要说明:</B>将对象转为JSONObject对象<BR>
123 * @author hezhuo.bai
124 * @since 2019年1月15日 下午4:55:55
125 * @param obj 任意对象
126 * @return JSONObject对象
127 */
128 public static JSONObject convertObjectToJSONObject(Object obj){
129 try {
130 JSONObject jsonObject = (JSONObject) JSONObject.toJSON(obj);
131 return jsonObject;
132 } catch (Exception e) {
133 log.error("convertObjectToJSONObject Exception", e);
134 return null;
135 }
136 }
137
138 public static String convertObjectToJSONWithNullValue(Object obj) {
139 try {
140 String text = JSON.toJSONString(obj, featuresWithNullValue);
141 return text;
142 } catch (Exception e) {
143 log.error("convertObjectToJSONWithNullValue Exception", e);
144 return null;
145 }
146 }
147 }
NetUtil.java
1 package com.tianjh.demo.util;2 3 import java.lang.management.ManagementFactory;4 import java.lang.management.RuntimeMXBean;5 import java.net.InetAddress;6 import java.net.NetworkInterface;7 import java.net.SocketAddress;8 import java.net.UnknownHostException;9 import java.nio.channels.SocketChannel;10 import java.util.Enumeration;11 import java.util.regex.Matcher;12 import java.util.regex.Pattern;13 14 /**15 * $NetUtil16 * 获取本机地址 端口号的工具类17 * * @author hezhuo.bai18 * * @since 2019年1月15日 下午4:53:2819 */20 public class NetUtil { 21 22 public static String normalizeAddress(String address){23 String[] blocks = address.split("[:]");24 if(blocks.length > 2){25 throw new IllegalArgumentException(address + " is invalid");26 }27 String host = blocks[0];28 int port = 80;29 if(blocks.length > 1){30 port = Integer.valueOf(blocks[1]);31 } else {32 address += ":"+port; //use default 8033 } 34 String serverAddr = String.format("%s:%d", host, port);35 return serverAddr;36 }37 38 public static String getLocalAddress(String address){39 String[] blocks = address.split("[:]");40 if(blocks.length != 2){41 throw new IllegalArgumentException(address + " is invalid address");42 } 43 String host = blocks[0];44 int port = Integer.valueOf(blocks[1]);45 46 if("0.0.0.0".equals(host)){47 return String.format("%s:%d",NetUtil.getLocalIp(), port);48 }49 return address;50 }51 52 private static int matchedIndex(String ip, String[] prefix){53 for(int i=0; i<prefix.length; i++){54 String p = prefix[i];55 if("*".equals(p)){ //*, assumed to be IP56 if(ip.startsWith("127.") ||57 ip.startsWith("10.") || 58 ip.startsWith("172.") ||59 ip.startsWith("192.")){60 continue;61 }62 return i;63 } else {64 if(ip.startsWith(p)){65 return i;66 }67 } 68 }69 70 return -1;71 }72 73 public static String getLocalIp(String ipPreference) {74 if(ipPreference == null){75 ipPreference = "*>10>172>192>127";76 }77 String[] prefix = ipPreference.split("[> ]+");78 try {79 Pattern pattern = Pattern.compile("[0-9]+\\.[0-9]+\\.[0-9]+\\.[0-9]+");80 Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();81 String matchedIp = null;82 int matchedIdx = -1;83 while (interfaces.hasMoreElements()) {84 NetworkInterface ni = interfaces.nextElement();85 Enumeration<InetAddress> en = ni.getInetAddresses(); 86 while (en.hasMoreElements()) {87 InetAddress addr = en.nextElement();88 String ip = addr.getHostAddress(); 89 Matcher matcher = pattern.matcher(ip);90 if (matcher.matches()) { 91 int idx = matchedIndex(ip, prefix);92 if(idx == -1) continue;93 if(matchedIdx == -1){94 matchedIdx = idx;95 matchedIp = ip;96 } else {97 if(matchedIdx>idx){98 matchedIdx = idx;99 matchedIp = ip;
100 }
101 }
102 }
103 }
104 }
105 if(matchedIp != null) return matchedIp;
106 return "127.0.0.1";
107 } catch (Exception e) {
108 return "127.0.0.1";
109 }
110 }
111
112 public static String getLocalIp() {
113 return getLocalIp("*>10>172>192>127");
114 }
115
116 public static String remoteAddress(SocketChannel channel){
117 SocketAddress addr = channel.socket().getRemoteSocketAddress();
118 String res = String.format("%s", addr);
119 return res;
120 }
121
122 public static String localAddress(SocketChannel channel){
123 SocketAddress addr = channel.socket().getLocalSocketAddress();
124 String res = String.format("%s", addr);
125 return addr==null? res: res.substring(1);
126 }
127
128 public static String getPid(){
129 RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
130 String name = runtime.getName();
131 int index = name.indexOf("@");
132 if (index != -1) {
133 return name.substring(0, index);
134 }
135 return null;
136 }
137
138 public static String getLocalHostName() {
139 try {
140 return (InetAddress.getLocalHost()).getHostName();
141 } catch (UnknownHostException uhe) {
142 String host = uhe.getMessage();
143 if (host != null) {
144 int colon = host.indexOf(':');
145 if (colon > 0) {
146 return host.substring(0, colon);
147 }
148 }
149 return "UnknownHost";
150 }
151 }
152 }
SetMDC.java
1 package com.tianjh.demo.util;2 3 import org.jboss.logging.MDC;4 import org.springframework.context.EnvironmentAware;5 import org.springframework.core.env.Environment;6 import org.springframework.stereotype.Component;7 8 @Component9 public class SetMDC implements EnvironmentAware {
10
11 private static Environment environment;
12
13 @Override
14 public void setEnvironment(Environment environment) {
15 SetMDC.environment = environment;
16 }
17
18 public static void putMDC() {
19 MDC.put("hostName", NetUtil.getLocalHostName());
20 MDC.put("ip", NetUtil.getLocalIp());
21 MDC.put("applicationName", environment.getProperty("spring.application.name"));
22 }
23
24 }
及关键配置文件
pom.xml
1 <?xml version="1.0" encoding="UTF-8"?>2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"3 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">4 <modelVersion>4.0.0</modelVersion>5 <parent>6 <groupId>org.springframework.boot</groupId>7 <artifactId>spring-boot-starter-parent</artifactId>8 <version>2.1.5.RELEASE</version>9 <relativePath/> <!-- lookup parent from repository -->
10 </parent>
11 <groupId>com.tianjh</groupId>
12 <artifactId>demo</artifactId>
13 <version>0.0.1-SNAPSHOT</version>
14 <name>demo</name>
15 <description>Demo project for Spring Boot</description>
16
17 <properties>
18 <java.version>1.8</java.version>
19 </properties>
20
21
22 <dependencies>
23 <dependency>
24 <groupId>org.springframework.boot</groupId>
25 <artifactId>spring-boot-starter-web</artifactId>
26 <!-- 排除spring-boot-starter-logging -->
27 <exclusions>
28 <exclusion>
29 <groupId>org.springframework.boot</groupId>
30 <artifactId>spring-boot-starter-logging</artifactId>
31 </exclusion>
32 </exclusions>
33 </dependency>
34
35 <dependency>
36 <groupId>org.springframework.boot</groupId>
37 <artifactId>spring-boot-starter-test</artifactId>
38 <scope>test</scope>
39 </dependency>
40 <dependency>
41 <groupId>org.projectlombok</groupId>
42 <artifactId>lombok</artifactId>
43 </dependency>
44 <!-- log4j2 -->
45 <dependency>
46 <groupId>org.springframework.boot</groupId>
47 <artifactId>spring-boot-starter-log4j2</artifactId>
48 </dependency>
49 <dependency>
50 <groupId>com.lmax</groupId>
51 <artifactId>disruptor</artifactId>
52 <version>3.3.4</version>
53 </dependency>
54
55 <dependency>
56 <groupId>com.alibaba</groupId>
57 <artifactId>fastjson</artifactId>
58 <version>1.2.58</version>
59 </dependency>
60
61 </dependencies>
62
63 <build>
64 <finalName>demo</finalName>
65 <!-- 打包时包含properties、xml -->
66 <resources>
67 <resource>
68 <directory>src/main/java</directory>
69 <includes>
70 <include>**/*.properties</include>
71 <include>**/*.xml</include>
72 </includes>
73 <!-- 是否替换资源中的属性-->
74 <filtering>true</filtering>
75 </resource>
76 <resource>
77 <directory>src/main/resources</directory>
78 </resource>
79 </resources>
80
81 <plugins>
82 <plugin>
83 <groupId>org.springframework.boot</groupId>
84 <artifactId>spring-boot-maven-plugin</artifactId>
85 <configuration>
86 <mainClass>com.tianjh.demo.Application</mainClass>
87 </configuration>
88 </plugin>
89 </plugins>
90 </build>
91
92 </project>
Spring配置文件
1 server.servlet.context-path=/
2 server.port=8001
3
4 spring.application.name=demo
5 spring.http.encoding.charset=UTF-8
6 spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
7 spring.jackson.time-zone=GMT+8
8 spring.jackson.default-property-inclusion=NON_NULL
日志配置文件
1 <?xml version="1.0" encoding="UTF-8"?>2 <Configuration status="INFO" schema="Log4J-V2.0.xsd" monitorInterval="600">3 <Properties>4 <!-- 日志要输出的文件-->5 <Property name="LOG_HOME">logs</Property>6 <!-- 项目名称-->7 <property name="FILE_NAME">demo</property>8 <!-- 日志输出格式-->9 <property name="patternLayout">[%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}] [%level{length=5}] [%thread-%tid] [%logger]
10 [%X{hostName}] [%X{ip}] [%X{applicationName}] [%F,%L,%C,%M] [%m] ## '%ex'%n
11 </property>
12 </Properties>
13 <Appenders>
14 <Console name="CONSOLE" target="SYSTEM_OUT">
15 <PatternLayout pattern="${patternLayout}"/>
16 </Console>
17 <RollingRandomAccessFile name="appAppender" fileName="${LOG_HOME}/all-${FILE_NAME}.log"
18 filePattern="${LOG_HOME}/all-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log">
19 <PatternLayout pattern="${patternLayout}"/>
20 <Policies>
21 <TimeBasedTriggeringPolicy interval="1"/>
22 <SizeBasedTriggeringPolicy size="500MB"/>
23 </Policies>
24 <DefaultRolloverStrategy max="20"/>
25 </RollingRandomAccessFile>
26 <RollingRandomAccessFile name="errorAppender" fileName="${LOG_HOME}/err-${FILE_NAME}.log"
27 filePattern="${LOG_HOME}/err-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log">
28 <PatternLayout pattern="${patternLayout}"/>
29 <Filters>
30 <ThresholdFilter level="warn" onMatch="ACCEPT" onMismatch="DENY"/>
31 </Filters>
32 <Policies>
33 <TimeBasedTriggeringPolicy interval="1"/>
34 <SizeBasedTriggeringPolicy size="500MB"/>
35 </Policies>
36 <DefaultRolloverStrategy max="20"/>
37 </RollingRandomAccessFile>
38 </Appenders>
39 <Loggers>
40 <!-- 业务相关 异步logger -->
41 <AsyncLogger name="com.tianjh.*" level="info" includeLocation="true">
42 <AppenderRef ref="appAppender"/>
43 </AsyncLogger>
44 <AsyncLogger name="com.tianjh.*" level="info" includeLocation="true">
45 <AppenderRef ref="errorAppender"/>
46 </AsyncLogger>
47 <Root level="info">
48 <Appender-Ref ref="CONSOLE"/>
49 <Appender-Ref ref="appAppender"/>
50 <AppenderRef ref="errorAppender"/>
51 </Root>
52 </Loggers>
53 </Configuration>
打包上传到192.168.232.6这台服务器进行运行
运行之后调用该项目的index方法
在项目指定的文件夹里生成了咋们所要的日志文件
参考前面的链接安装好所有环境后,filebeat、kafka、Logstash、es都应该配置好了接下来就结合filebeat(生产者-Producer)、kafka(broker)、Logstash(消费者-Consumer)实现fielbeat从demo.jar项目输出的日志文件logs下读取all-demo.log 、err-demo.log两个日志文件,然后把相应日志数据发送到kafka中,再由Logstash到Kafka中获取数据进行消费。
在这个过程中需要在kafka中新增两个topic
./kafka-topics.sh --zookeeper 192.168.232.3:2181 --create --topic all-log-demo --partitions 1 --replication-factor 1 ./kafka-topics.sh --zookeeper 192.168.232.3:2181 --create --topic err-log-demo --partitions 1 --replication-factor 1
也就是采集到的all-demo.log日志数据放入topic:all-log-demo 中,而采集到的err-demo.log日志数据放入topic:err-log-demo 中。
五、测试
启动demo.jar、filebeat、kafka、logstash
启动demo.jar之前先删除掉之前测试的日志文件all-demo.log 、err-demo.log
启动demo.jar
启动kafka
启动Logstash
随后通过浏览器访问:
http://192.168.232.6:8001/err
http://192.168.232.6:8001/index 两个地址来调用index、err方法打印日志文件。
demo.jar 项目控制台输出了日志---调用index方法
demo.jar 项目控制台输出了日志---调用err方法
在调用上述两个方法之后,filebeat会将日志数据发送到kafka,通过使用如下的命令进行查看消费情况:
./kafka-consumer-groups.sh --bootstrap-server 192.168.232.3:9092 --describe --group all-log-group ./kafka-consumer-groups.sh --bootstrap-server 192.168.232.3:9092 --describe --group err-log-group
在kafka上也有相应的日志消费记录情况,( all-log-group、err-log-group )是在Logstash中进行配置的。
通过前面的几个流程之后,现在日志数据到达了kafka-broker之上,现在就需要用logstash来进行消费数据,logstash上也实时进行数据消费,如下图所示是全量日志过滤:
all-log-demo.log 日志
err-log-demo.log 日志
通过上述的一系列操作,简单的实现了日志数据的生成、采集、过滤。这其中最为重要也最核心的地方就是kafka,利用kafka的高性能来缓存filebeat生成的海量数据,从而让logstash慢慢的进行消费,当然上面的例子并不能体现出kafka处理海量数据的能力。
下一章: ELK+Kafka+Beats实现海量日志收集平台(三)
这篇关于ELK+Kafka+Beats实现海量日志收集平台(二)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!