打通实时流处理log4j-flume-kafka-structured-streaming

2024-09-06 20:58

本文主要是介绍打通实时流处理log4j-flume-kafka-structured-streaming,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

大数据技术与架构

点击右侧关注,大数据开发领域最强公众号!

暴走大数据

点击右侧关注,暴走大数据!

模拟产生log4j日志

jar包依赖 pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
<dependency><groupId>log4j</groupId><artifactId>log4j</artifactId>
</dependency>
<dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency><groupId>org.apache.flume.flume-ng-clients</groupId><artifactId>flume-ng-log4jappender</artifactId><version>1.8.0</version>
</dependency>

java代码 LoggerGenerator.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class LoggerGenerator {private static Logger logger = Logger.getLogger(LoggerGenerator.class.getName());public static void main(String[] args) throws Exception{int index = 0;while(true) {Thread.sleep(1000);logger.info("value : " + index++);}// $ kafka-topics.sh --list --zookeeper 127.0.0.1:2181}
}

log4j.properties配置

1
2
3
4
5
6
7
8
9
10
11
12
log4j.rootLogger=INFO,stdout,flumelog4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target = System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%nlog4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = 127.0.0.1
log4j.appender.flume.Port = 44444
log4j.appender.flume.UnsafeMode = true
kafka broker启动

提前创建好topic【不是必须的】
flume-ng启动后,启动一个kafka console consulmer观察数据

1
2
3
$ kafka-server-start.sh $KAFKA_HOME/config/server.properties$ kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic default_flume_topic
flume-ng配置和启动

前面文章用过的avro-memory-kafka.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# avro-memory-kafka.conf# Name the components on this agent
avro-memory-kafka.sources = avro-source
avro-memory-kafka.sinks = kafka-sink
avro-memory-kafka.channels = momory-channel# Describe/configure the source
avro-memory-kafka.sources.avro-source.type = avro
avro-memory-kafka.sources.avro-source.bind = 127.0.0.1
avro-memory-kafka.sources.avro-source.port = 44444# Describe the sink
# Must be set to org.apache.flume.sink.kafka.KafkaSin
avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
avro-memory-kafka.sinks.kafka-sink.kafka.bootstrap.servers = 127.0.0.1:9092
avro-memory-kafka.sinks.kafka-sink.kafka.topic = default_flume_topic# Use a channel which buffers events in memory
avro-memory-kafka.channels.momory-channel.type = memory
avro-memory-kafka.channels.momory-channel.capacity = 1000
avro-memory-kafka.channels.momory-channel.transactionCapacity = 100# Bind the source and sink to the channel
avro-memory-kafka.sources.avro-source.channels = momory-channel
avro-memory-kafka.sinks.kafka-sink.channel = momory-channel

启动flume-ng

1
2
3
$ nohup flume-ng agent --conf conf --conf-file conf/avro-memory-kafka.conf --name avro-memory-kafka > avro-memory-kafka.out 2>&1 &$ kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic default_flume_topic --from-beginning --new-consumer
spark structured streaming实时流处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
topic = 'kafka_streaming_topic'
brokers = "127.0.0.1:9092"spark = SparkSession.builder.appName("log4j-flume-kafka-structured-streaming").getOrCreate()lines = spark.readStream.format("kafka").option("kafka.bootstrap.servers", brokers).option("subscribe", topic).option("startingOffsets", """{"%s":{"0": 7}}""" % topic).load().selectExpr("CAST(value AS STRING)")# 自定义处理传输的数据-比如JSON串
words = lines.select(explode(split(lines.value, ' ')).alias('word')
)
word_counts = words.groupBy('word').count()query = word_counts.writeStream.outputMode("complete").format("console").start()
query.awaitTermination()

欢迎点赞+收藏+转发朋友圈素质三连

文章不错?点个【在看】吧! ????

这篇关于打通实时流处理log4j-flume-kafka-structured-streaming的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1143102

相关文章

Kafka拦截器的神奇操作方法

《Kafka拦截器的神奇操作方法》Kafka拦截器是一种强大的机制,用于在消息发送和接收过程中插入自定义逻辑,它们可以用于消息定制、日志记录、监控、业务逻辑集成、性能统计和异常处理等,本文介绍Kafk... 目录前言拦截器的基本概念Kafka 拦截器的定义和基本原理:拦截器是 Kafka 消息传递的不可或缺

Go语言使用Buffer实现高性能处理字节和字符

《Go语言使用Buffer实现高性能处理字节和字符》在Go中,bytes.Buffer是一个非常高效的类型,用于处理字节数据的读写操作,本文将详细介绍一下如何使用Buffer实现高性能处理字节和... 目录1. bytes.Buffer 的基本用法1.1. 创建和初始化 Buffer1.2. 使用 Writ

Python视频处理库VidGear使用小结

《Python视频处理库VidGear使用小结》VidGear是一个高性能的Python视频处理库,本文主要介绍了Python视频处理库VidGear使用小结,文中通过示例代码介绍的非常详细,对大家的... 目录一、VidGear的安装二、VidGear的主要功能三、VidGear的使用示例四、VidGea

Python结合requests和Cheerio处理网页内容的操作步骤

《Python结合requests和Cheerio处理网页内容的操作步骤》Python因其简洁明了的语法和强大的库支持,成为了编写爬虫程序的首选语言之一,requests库是Python中用于发送HT... 目录一、前言二、环境搭建三、requests库的基本使用四、Cheerio库的基本使用五、结合req

使用Python处理CSV和Excel文件的操作方法

《使用Python处理CSV和Excel文件的操作方法》在数据分析、自动化和日常开发中,CSV和Excel文件是非常常见的数据存储格式,ython提供了强大的工具来读取、编辑和保存这两种文件,满足从基... 目录1. CSV 文件概述和处理方法1.1 CSV 文件格式的基本介绍1.2 使用 python 内

如何使用celery进行异步处理和定时任务(django)

《如何使用celery进行异步处理和定时任务(django)》文章介绍了Celery的基本概念、安装方法、如何使用Celery进行异步任务处理以及如何设置定时任务,通过Celery,可以在Web应用中... 目录一、celery的作用二、安装celery三、使用celery 异步执行任务四、使用celery

SpringBoot操作spark处理hdfs文件的操作方法

《SpringBoot操作spark处理hdfs文件的操作方法》本文介绍了如何使用SpringBoot操作Spark处理HDFS文件,包括导入依赖、配置Spark信息、编写Controller和Ser... 目录SpringBoot操作spark处理hdfs文件1、导入依赖2、配置spark信息3、cont

如何在一台服务器上使用docker运行kafka集群

《如何在一台服务器上使用docker运行kafka集群》文章详细介绍了如何在一台服务器上使用Docker运行Kafka集群,包括拉取镜像、创建网络、启动Kafka容器、检查运行状态、编写启动和关闭脚本... 目录1.拉取镜像2.创建集群之间通信的网络3.将zookeeper加入到网络中4.启动kafka集群

IDEA中的Kafka管理神器详解

《IDEA中的Kafka管理神器详解》这款基于IDEA插件实现的Kafka管理工具,能够在本地IDE环境中直接运行,简化了设置流程,为开发者提供了更加紧密集成、高效且直观的Kafka操作体验... 目录免安装:IDEA中的Kafka管理神器!简介安装必要的插件创建 Kafka 连接第一步:创建连接第二步:选

MyBatis延迟加载的处理方案

《MyBatis延迟加载的处理方案》MyBatis支持延迟加载(LazyLoading),允许在需要数据时才从数据库加载,而不是在查询结果第一次返回时就立即加载所有数据,延迟加载的核心思想是,将关联对... 目录MyBATis如何处理延迟加载?延迟加载的原理1. 开启延迟加载2. 延迟加载的配置2.1 使用