如何收集SparkSteaming运行日志实时进入kafka中

2024-05-15 03:18

本文主要是介绍如何收集SparkSteaming运行日志实时进入kafka中,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

用过sparkstreaming的人都知道,当使用sparkstreaming on yarn模式的时候,如果我们想查看系统运行的log,是没法直接看的,就算能看也只是一部分。

这里的log分:

(1)spark本身运行的log

(2)代码里面业务产生的log

spark on yarn模式,如果你的hadoop集群有100台,那么意味着你的sparkstreaming的log有可能会随机分布在100台中,你想查看log必须登录上每台机器上,一个个查看,如果通过Hadoop的8088页面查看,你也得打开可能几十个页面才能看到所有的log,那么问题来了?

能不能将这个job运行所有的log统一收集到某一个目录里面呢? 如果收集到一起的话排查log就非常方便了。

答案是很遗憾,在sparkstreaming里面没法做到,因为sparkstreaming程序永远不停机,就算你开启hadoop的log聚合也没用,只有当sparkstreaming程序停掉,hadoop的log聚合才能把所有的log收集到一个目录里面,所以其他的非sparkstreaming程序,比如MR,Spark 运行完后,如果开启log聚合,hadoop会负责把运行在各个节点上的log给统一收集到HDFS上,这样的话我们查看log就非常方便了。

现在的问题是sparkstreaming不能停机,那么还能集中收集log到指定的地方吗?答案是可以的,我们使用log4j收集日志然后异步发送至kafka里面,最后再通过logstash收集kafka里面的日志进入es即可,这样一条龙服务打通之后,出现任何异常都可以非常快和方便的在es中排查问题,效率大大提升。至于使用logstash从kafka收集到es里面,不是本文的重点,有兴趣的参考散仙前面的文章:http://qindongliang.iteye.com/blog/2278642。

下面会介绍下如何使用:

streaming项目中的log4j使用的是apache log4j

<dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency>

sparkstreaming项目可以单独提交某个job的log4j文件,这样就能定制每个job的log输出格式,如果提交的时候不提交log4j文件,那么默认用的是spark安装目录下面的log4j文件。 看下我们log4j文件的内容:

log4j.rootLogger=WARN,console,kafka#log4j.logger.com.demo.kafka=DEBUG,kafka
# appender kafka
log4j.appender.kafka=kafka.producer.KafkaLog4jAppender
log4j.appender.kafka.topic=kp_diag_log
# multiple brokers are separated by comma ",".
log4j.appender.kafka.brokerList=192.168.201.6:9092,192.168.201.7:9092,192.168.201.8:9092
log4j.appender.kafka.compressionType=none
log4j.appender.kafka.syncSend=false
log4j.appender.kafka.layout=org.apache.log4j.PatternLayout
#log4j.appender.kafka.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n
log4j.appender.kafka.layout.ConversionPattern=[%d] [%p] [%t] %m%n# appender console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=[%d] [%p] [%t] %m%n
#log4j.appender.console.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n

最后看下提交脚本:

jars=`echo /home/spark/x_spark_job/streaming_lib/*jar | sed 's/ /,/g'`echo $jars#nohup /opt/bigdata/spark/bin/spark-submit  --class  com.bigdata.xuele.streaming.SparkStreamingKmd  --master yarn    --deploy-mode cluster --executor-cores 3  --driver-memory 4g   --executor-memory 4g  --num-executors 10  --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=logback.xml"   --jars  $jars    kpdiag-stream-1.0.0-SNAPSHOT.jar  &> streaming.log  &nohup /opt/bigdata/spark/bin/spark-submit    --class  com.bigdata.xuele.streaming.SparkStreamingKmd  --master yarn  --deploy-mode cluster \--files "/home/spark/x_spark_job/log4j.properties" \
 --executor-cores 3   --driver-memory 3g   --executor-memory 3g  --num-executors 12    --jars  $jars  \
 --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j.properties"   \
 --driver-class-path /opt/bigdata/jars/spark/kafka-log4j-appender-0.9.0.0.jar:/opt/bigdata/jars/spark/kafka_2.11-0.8.2.1.jar:/opt/bigdata/jars/spark/metrics-core-2.2.0.jar:/opt/bigdata/jars/spark/kafka-clients-0.8.2.1.jar \
 --driver-library-path /opt/bigdata/jars/spark/kafka-log4j-appender-0.9.0.0.jar:/opt/bigdata/jars/spark/kafka_2.11-0.8.2.1.jar:/opt/bigdata/jars/spark/metrics-core-2.2.0.jar:/opt/bigdata/jars/spark/kafka-clients-0.8.2.1.jar  \
 --conf spark.executor.extraClassPath=/opt/bigdata/jars/spark/kafka_2.11-0.8.2.1.jar:/opt/bigdata/jars/spark/metrics-core-2.2.0.jar:/opt/bigdata/jars/spark/kafka-clients-0.8.2.1.jar   \
 --conf spark.executor.extraLibraryPath=/opt/bigdata/jars/spark/kafka_2.11-0.8.2.1.jar:/opt/bigdata/jars/spark/metrics-core-2.2.0.jar:/opt/bigdata/jars/spark/kafka-clients-0.8.2.1.jar  \
 kpdiag-stream-1.0.0-SNAPSHOT.jar &> kp.log &

注意上面提交脚本中,/opt/bigdata/jars/spark/这个路径引用的jar包,必须在每台hadoop机器上都要存在,sparkstreaming运行过程中,会从本地加载jar包,此外log4j.properties文件以及参数里面--jars 后面的依赖jar 可以在提交机器上放一份即可,不需要每台机器上都存放。

提交任务后,在kafka的节点上执行消费者命令就能看到对应的log输出: 执行命令:

kafka-console-consumer --zookeeper 192.168.201.5:2181 --topic kp_diag_log

收集到的log内容如下:

[2017-01-21 16:37:03,154] [WARN] [Driver] Support for Java 7 is deprecated as of Spark 2.0.0[2017-01-21 16:37:19,714] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0[2017-01-21 16:37:19,738] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0[2017-01-21 16:37:19,739] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0[2017-01-21 16:37:19,738] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0[2017-01-21 16:37:19,739] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0[2017-01-21 16:37:19,740] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0[2017-01-21 16:37:19,738] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0[2017-01-21 16:37:19,739] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0[2017-01-21 16:37:19,842] [WARN] [Executor task launch worker-0] 题目id:b07e88feff464659ab5a351bf1e68ee0在redis不存在

至此,我们的log就统一收集成功了,后续我们可以把log从kafka导入到es中,就可以任意分析和查询了。

这里需要注意一点,sparkstreaming运行时候,系统本身也有大量的log,如果把这个系统log也收集到kafka里面本身的量是非常大的,而且好多信息不重要,其实 我们只需要关注业务重点log即可,主要是WARN+ERROR级别的,调试的时候可以把info级别打开,代码里重点关注的log都放在warn级别,异常什么的放在ERROR即可 这样排查问题时候也容易而且了避免了大量log的产生从应用本身性能的影响。


有什么问题可以扫码关注微信公众号:我是攻城师(woshigcs),在后台留言咨询。 技术债不能欠,健康债更不能欠, 求道之路,与君同行。

输入图片说明


这篇关于如何收集SparkSteaming运行日志实时进入kafka中的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/990650

相关文章

Debezium 与 Apache Kafka 的集成方式步骤详解

《Debezium与ApacheKafka的集成方式步骤详解》本文详细介绍了如何将Debezium与ApacheKafka集成,包括集成概述、步骤、注意事项等,通过KafkaConnect,D... 目录一、集成概述二、集成步骤1. 准备 Kafka 环境2. 配置 Kafka Connect3. 安装 D

Java中Springboot集成Kafka实现消息发送和接收功能

《Java中Springboot集成Kafka实现消息发送和接收功能》Kafka是一个高吞吐量的分布式发布-订阅消息系统,主要用于处理大规模数据流,它由生产者、消费者、主题、分区和代理等组件构成,Ka... 目录一、Kafka 简介二、Kafka 功能三、POM依赖四、配置文件五、生产者六、消费者一、Kaf

Kafka拦截器的神奇操作方法

《Kafka拦截器的神奇操作方法》Kafka拦截器是一种强大的机制,用于在消息发送和接收过程中插入自定义逻辑,它们可以用于消息定制、日志记录、监控、业务逻辑集成、性能统计和异常处理等,本文介绍Kafk... 目录前言拦截器的基本概念Kafka 拦截器的定义和基本原理:拦截器是 Kafka 消息传递的不可或缺

四种简单方法 轻松进入电脑主板 BIOS 或 UEFI 固件设置

《四种简单方法轻松进入电脑主板BIOS或UEFI固件设置》设置BIOS/UEFI是计算机维护和管理中的一项重要任务,它允许用户配置计算机的启动选项、硬件设置和其他关键参数,该怎么进入呢?下面... 随着计算机技术的发展,大多数主流 PC 和笔记本已经从传统 BIOS 转向了 UEFI 固件。很多时候,我们也

如何在一台服务器上使用docker运行kafka集群

《如何在一台服务器上使用docker运行kafka集群》文章详细介绍了如何在一台服务器上使用Docker运行Kafka集群,包括拉取镜像、创建网络、启动Kafka容器、检查运行状态、编写启动和关闭脚本... 目录1.拉取镜像2.创建集群之间通信的网络3.将zookeeper加入到网络中4.启动kafka集群

IDEA中的Kafka管理神器详解

《IDEA中的Kafka管理神器详解》这款基于IDEA插件实现的Kafka管理工具,能够在本地IDE环境中直接运行,简化了设置流程,为开发者提供了更加紧密集成、高效且直观的Kafka操作体验... 目录免安装:IDEA中的Kafka管理神器!简介安装必要的插件创建 Kafka 连接第一步:创建连接第二步:选

JS常用组件收集

收集了一些平时遇到的前端比较优秀的组件,方便以后开发的时候查找!!! 函数工具: Lodash 页面固定: stickUp、jQuery.Pin 轮播: unslider、swiper 开关: switch 复选框: icheck 气泡: grumble 隐藏元素: Headroom

C#实战|大乐透选号器[6]:实现实时显示已选择的红蓝球数量

哈喽,你好啊,我是雷工。 关于大乐透选号器在前面已经记录了5篇笔记,这是第6篇; 接下来实现实时显示当前选中红球数量,蓝球数量; 以下为练习笔记。 01 效果演示 当选择和取消选择红球或蓝球时,在对应的位置显示实时已选择的红球、蓝球的数量; 02 标签名称 分别设置Label标签名称为:lblRedCount、lblBlueCount

搭建Kafka+zookeeper集群调度

前言 硬件环境 172.18.0.5        kafkazk1        Kafka+zookeeper                Kafka Broker集群 172.18.0.6        kafkazk2        Kafka+zookeeper                Kafka Broker集群 172.18.0.7        kafkazk3

【编程底层思考】垃圾收集机制,GC算法,垃圾收集器类型概述

Java的垃圾收集(Garbage Collection,GC)机制是Java语言的一大特色,它负责自动管理内存的回收,释放不再使用的对象所占用的内存。以下是对Java垃圾收集机制的详细介绍: 一、垃圾收集机制概述: 对象存活判断:垃圾收集器定期检查堆内存中的对象,判断哪些对象是“垃圾”,即不再被任何引用链直接或间接引用的对象。内存回收:将判断为垃圾的对象占用的内存进行回收,以便重新使用。