Spark Streaming整合log4j、Flume与Kafka的案例

2024-09-06 20:38

本文主要是介绍Spark Streaming整合log4j、Flume与Kafka的案例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

点击上方蓝色字体,选择“设为星标

回复”资源“获取更多资源

来源:作者TAI_SPARK,http://suo.im/5w7LF8

大数据技术与架构

点击右侧关注,大数据开发领域最强公众号!

暴走大数据

点击右侧关注,暴走大数据!

1.框架

2.log4j完成模拟日志输出

设置模拟日志格式,log4j.properties:

log4j.rootLogger = INFO,stdoutlog4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = %d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n

模拟日志输出,LoggerGenerator.java:

import org.apache.log4j.Logger;/*** 模拟日志产生*/
public class LoggerGenerator {private static Logger logger = Logger.getLogger(LoggerGenerator.class.getName());public static void main(String[] args) throws Exception{int index = 0;while(true){Thread.sleep(1000);logger.info("value:" + index++);}}
}

运行结果:

2020-03-07 18:21:37,637 [main] [LoggerGenerator] [INFO] - current value is:0
2020-03-07 18:21:38,639 [main] [LoggerGenerator] [INFO] - current value is:1
2020-03-07 18:21:39,639 [main] [LoggerGenerator] [INFO] - current value is:2
2020-03-07 18:21:40,640 [main] [LoggerGenerator] [INFO] - current value is:3
2020-03-07 18:21:41,640 [main] [LoggerGenerator] [INFO] - current value is:4
2020-03-07 18:21:42,641 [main] [LoggerGenerator] [INFO] - current value is:5
2020-03-07 18:21:43,641 [main] [LoggerGenerator] [INFO] - current value is:6
2020-03-07 18:21:44,642 [main] [LoggerGenerator] [INFO] - current value is:7
2020-03-07 18:21:45,642 [main] [LoggerGenerator] [INFO] - current value is:8
2020-03-07 18:21:46,642 [main] [LoggerGenerator] [INFO] - current value is:9
2020-03-07 18:21:47,643 [main] [LoggerGenerator] [INFO] - current value is:10

3.Flume收集log4j日志

$FLUME_HOME/conf/streaming.conf:

agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=log-sink#define source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=0.0.0.0
agent1.sources.avro-source.port=41414#define channel
agent1.channels.logger-channel.type=memory#define sink
agent1.sinks.log-sink.type=loggeragent1.sources.avro-source.channels=logger-channel
agent1.sinks.log-sink.channel=logger-channel

启动Flume(注意输出到控制台上为INFO,console,不是点【.】):

flume-ng agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/streaming.conf \
--name agent1 \
-Dflume.root.logger=INFO,console

pom.xml加上一个jar包:

    <dependency><groupId>org.apache.flume.flume-ng-clients</groupId><artifactId>flume-ng-log4jappender</artifactId><version>1.6.0</version></dependency>

修改log4j.properties,使其与Flume链接:

log4j.rootLogger = INFO,stdout,flumelog4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = %d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%nlog4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = hadoop000
log4j.appender.flume.Port = 41414
log4j.appender.flume.UnsafeMode = true

启动log4j:

Flume采集成功

4.KafkaSink链接Kafka与Flume

使用Kafka第一件事是把Zookeeper启动起来~

./zkServer.sh start

启动Kafka

./kafka-server-start.sh -daemon /home/hadoop/app/kafka_2.11-0.9.0.0/config/server.properties 

看下Kafka列表(用./kafka-topics.sh会报错,用“./”加文件名.sh执行时,必须给.sh文件加x执行权限):

kafka-topics.sh --list --zookeeper hadoop000:2181

创建一个topic:

kafka-topics.sh --create \
--zookeeper hadoop000:2181 \
--replication-factor 1 \
--partitions 1 \
--topic tp_streamingtopic

对接Flume与Kafka,设置Flume的conf,取名为streaming2.conf:

Kafka sink需要的参数有(每个版本不一样,具体可以查阅官网):

  • sink类型填KafkaSink

  • 需要链接的Kafka topic

  • Kafka中间件broker的地址与端口号

  • 是否使用握手机制

  • 每次发送的数据大小

agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=kafka-sink#define source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=0.0.0.0
agent1.sources.avro-source.port=41414#define channel
agent1.channels.logger-channel.type=memory#define sink
agent1.sinks.kafka-sink.type=org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafka-sink.topic = tp_streamingtopic
agent1.sinks.kafka-sink.brokerList = hadoop000:9092
agent1.sinks.kafka-sink.requiredAcks = 1
agent1.sinks.kafka-sink.batchSize = 20agent1.sources.avro-source.channels=logger-channel
agent1.sinks.kafka-sink.channel=logger-channel

启动Flume:

flume-ng agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/streaming2.conf \
--name agent1 \
-Dflume.root.logger=INFO,console

Kafka需要启动一个消费者消费Flume中Kafka sink来的数据:

./kafka-console-consumer.sh --zookeeper hadoop000:2181 --topic tp_streamingtopic

启动log4j:

成功传输~

5.Spark Streaming消费Kafka数据

package com.taipark.sparkimport kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}/*** Spark Streaming 对接 Kafka*/
object KafkaStreamingApp {def main(args: Array[String]): Unit = {if(args.length != 2){System.err.println("Userage:KafkaStreamingApp<brokers><topics>");System.exit(1);}val Array(brokers,topics) = argsval sparkConf = new SparkConf().setAppName("KafkaReceiverWordCount").setMaster("local[2]")val ssc = new StreamingContext(sparkConf,Seconds(5))val kafkaParams = Map[String,String]("metadata.broker.list"-> brokers)val topicSet = topics.split(",").toSetval messages = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topicSet)//第二位是字符串的值messages.map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()ssc.start()ssc.awaitTermination()}}

入参是Kafka的broker地址与topic名称:

本地Run一下:

欢迎点赞+收藏+转发朋友圈素质三连

文章不错?点个【在看】吧! ????

这篇关于Spark Streaming整合log4j、Flume与Kafka的案例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1143065

相关文章

Golang操作DuckDB实战案例分享

《Golang操作DuckDB实战案例分享》DuckDB是一个嵌入式SQL数据库引擎,它与众所周知的SQLite非常相似,但它是为olap风格的工作负载设计的,DuckDB支持各种数据类型和SQL特性... 目录DuckDB的主要优点环境准备初始化表和数据查询单行或多行错误处理和事务完整代码最后总结Duck

Kafka拦截器的神奇操作方法

《Kafka拦截器的神奇操作方法》Kafka拦截器是一种强大的机制,用于在消息发送和接收过程中插入自定义逻辑,它们可以用于消息定制、日志记录、监控、业务逻辑集成、性能统计和异常处理等,本文介绍Kafk... 目录前言拦截器的基本概念Kafka 拦截器的定义和基本原理:拦截器是 Kafka 消息传递的不可或缺

SpringBoot 整合 Grizzly的过程

《SpringBoot整合Grizzly的过程》Grizzly是一个高性能的、异步的、非阻塞的HTTP服务器框架,它可以与SpringBoot一起提供比传统的Tomcat或Jet... 目录为什么选择 Grizzly?Spring Boot + Grizzly 整合的优势添加依赖自定义 Grizzly 作为

MySQL不使用子查询的原因及优化案例

《MySQL不使用子查询的原因及优化案例》对于mysql,不推荐使用子查询,效率太差,执行子查询时,MYSQL需要创建临时表,查询完毕后再删除这些临时表,所以,子查询的速度会受到一定的影响,本文给大家... 目录不推荐使用子查询和JOIN的原因解决方案优化案例案例1:查询所有有库存的商品信息案例2:使用EX

springboot整合gateway的详细过程

《springboot整合gateway的详细过程》本文介绍了如何配置和使用SpringCloudGateway构建一个API网关,通过实例代码介绍了springboot整合gateway的过程,需要... 目录1. 添加依赖2. 配置网关路由3. 启用Eureka客户端(可选)4. 创建主应用类5. 自定

SpringBoot操作spark处理hdfs文件的操作方法

《SpringBoot操作spark处理hdfs文件的操作方法》本文介绍了如何使用SpringBoot操作Spark处理HDFS文件,包括导入依赖、配置Spark信息、编写Controller和Ser... 目录SpringBoot操作spark处理hdfs文件1、导入依赖2、配置spark信息3、cont

springboot整合 xxl-job及使用步骤

《springboot整合xxl-job及使用步骤》XXL-JOB是一个分布式任务调度平台,用于解决分布式系统中的任务调度和管理问题,文章详细介绍了XXL-JOB的架构,包括调度中心、执行器和Web... 目录一、xxl-job是什么二、使用步骤1. 下载并运行管理端代码2. 访问管理页面,确认是否启动成功

如何在一台服务器上使用docker运行kafka集群

《如何在一台服务器上使用docker运行kafka集群》文章详细介绍了如何在一台服务器上使用Docker运行Kafka集群,包括拉取镜像、创建网络、启动Kafka容器、检查运行状态、编写启动和关闭脚本... 目录1.拉取镜像2.创建集群之间通信的网络3.将zookeeper加入到网络中4.启动kafka集群

SpringBoot整合kaptcha验证码过程(复制粘贴即可用)

《SpringBoot整合kaptcha验证码过程(复制粘贴即可用)》本文介绍了如何在SpringBoot项目中整合Kaptcha验证码实现,通过配置和编写相应的Controller、工具类以及前端页... 目录SpringBoot整合kaptcha验证码程序目录参考有两种方式在springboot中使用k

Spring Boot 中整合 MyBatis-Plus详细步骤(最新推荐)

《SpringBoot中整合MyBatis-Plus详细步骤(最新推荐)》本文详细介绍了如何在SpringBoot项目中整合MyBatis-Plus,包括整合步骤、基本CRUD操作、分页查询、批... 目录一、整合步骤1. 创建 Spring Boot 项目2. 配置项目依赖3. 配置数据源4. 创建实体类