打通实时流处理log4j-flume-kafka-structured-streaming

2024-09-06 20:58

本文主要是介绍打通实时流处理log4j-flume-kafka-structured-streaming,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

大数据技术与架构

点击右侧关注,大数据开发领域最强公众号!

暴走大数据

点击右侧关注,暴走大数据!

模拟产生log4j日志

jar包依赖 pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
<dependency><groupId>log4j</groupId><artifactId>log4j</artifactId>
</dependency>
<dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency><groupId>org.apache.flume.flume-ng-clients</groupId><artifactId>flume-ng-log4jappender</artifactId><version>1.8.0</version>
</dependency>

java代码 LoggerGenerator.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class LoggerGenerator {private static Logger logger = Logger.getLogger(LoggerGenerator.class.getName());public static void main(String[] args) throws Exception{int index = 0;while(true) {Thread.sleep(1000);logger.info("value : " + index++);}// $ kafka-topics.sh --list --zookeeper 127.0.0.1:2181}
}

log4j.properties配置

1
2
3
4
5
6
7
8
9
10
11
12
log4j.rootLogger=INFO,stdout,flumelog4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target = System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%nlog4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = 127.0.0.1
log4j.appender.flume.Port = 44444
log4j.appender.flume.UnsafeMode = true
kafka broker启动

提前创建好topic【不是必须的】
flume-ng启动后,启动一个kafka console consulmer观察数据

1
2
3
$ kafka-server-start.sh $KAFKA_HOME/config/server.properties$ kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic default_flume_topic
flume-ng配置和启动

前面文章用过的avro-memory-kafka.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# avro-memory-kafka.conf# Name the components on this agent
avro-memory-kafka.sources = avro-source
avro-memory-kafka.sinks = kafka-sink
avro-memory-kafka.channels = momory-channel# Describe/configure the source
avro-memory-kafka.sources.avro-source.type = avro
avro-memory-kafka.sources.avro-source.bind = 127.0.0.1
avro-memory-kafka.sources.avro-source.port = 44444# Describe the sink
# Must be set to org.apache.flume.sink.kafka.KafkaSin
avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
avro-memory-kafka.sinks.kafka-sink.kafka.bootstrap.servers = 127.0.0.1:9092
avro-memory-kafka.sinks.kafka-sink.kafka.topic = default_flume_topic# Use a channel which buffers events in memory
avro-memory-kafka.channels.momory-channel.type = memory
avro-memory-kafka.channels.momory-channel.capacity = 1000
avro-memory-kafka.channels.momory-channel.transactionCapacity = 100# Bind the source and sink to the channel
avro-memory-kafka.sources.avro-source.channels = momory-channel
avro-memory-kafka.sinks.kafka-sink.channel = momory-channel

启动flume-ng

1
2
3
$ nohup flume-ng agent --conf conf --conf-file conf/avro-memory-kafka.conf --name avro-memory-kafka > avro-memory-kafka.out 2>&1 &$ kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic default_flume_topic --from-beginning --new-consumer
spark structured streaming实时流处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
topic = 'kafka_streaming_topic'
brokers = "127.0.0.1:9092"spark = SparkSession.builder.appName("log4j-flume-kafka-structured-streaming").getOrCreate()lines = spark.readStream.format("kafka").option("kafka.bootstrap.servers", brokers).option("subscribe", topic).option("startingOffsets", """{"%s":{"0": 7}}""" % topic).load().selectExpr("CAST(value AS STRING)")# 自定义处理传输的数据-比如JSON串
words = lines.select(explode(split(lines.value, ' ')).alias('word')
)
word_counts = words.groupBy('word').count()query = word_counts.writeStream.outputMode("complete").format("console").start()
query.awaitTermination()

欢迎点赞+收藏+转发朋友圈素质三连

文章不错?点个【在看】吧! ????

这篇关于打通实时流处理log4j-flume-kafka-structured-streaming的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1143102

相关文章

Python处理函数调用超时的四种方法

《Python处理函数调用超时的四种方法》在实际开发过程中,我们可能会遇到一些场景,需要对函数的执行时间进行限制,例如,当一个函数执行时间过长时,可能会导致程序卡顿、资源占用过高,因此,在某些情况下,... 目录前言func-timeout1. 安装 func-timeout2. 基本用法自定义进程subp

Java字符串处理全解析(String、StringBuilder与StringBuffer)

《Java字符串处理全解析(String、StringBuilder与StringBuffer)》:本文主要介绍Java字符串处理全解析(String、StringBuilder与StringBu... 目录Java字符串处理全解析:String、StringBuilder与StringBuffer一、St

浅析Java中如何优雅地处理null值

《浅析Java中如何优雅地处理null值》这篇文章主要为大家详细介绍了如何结合Lambda表达式和Optional,让Java更优雅地处理null值,感兴趣的小伙伴可以跟随小编一起学习一下... 目录场景 1:不为 null 则执行场景 2:不为 null 则返回,为 null 则返回特定值或抛出异常场景

深入理解Apache Kafka(分布式流处理平台)

《深入理解ApacheKafka(分布式流处理平台)》ApacheKafka作为现代分布式系统中的核心中间件,为构建高吞吐量、低延迟的数据管道提供了强大支持,本文将深入探讨Kafka的核心概念、架构... 目录引言一、Apache Kafka概述1.1 什么是Kafka?1.2 Kafka的核心概念二、Ka

resultMap如何处理复杂映射问题

《resultMap如何处理复杂映射问题》:本文主要介绍resultMap如何处理复杂映射问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录resultMap复杂映射问题Ⅰ 多对一查询:学生——老师Ⅱ 一对多查询:老师——学生总结resultMap复杂映射问题

Python FastAPI+Celery+RabbitMQ实现分布式图片水印处理系统

《PythonFastAPI+Celery+RabbitMQ实现分布式图片水印处理系统》这篇文章主要为大家详细介绍了PythonFastAPI如何结合Celery以及RabbitMQ实现简单的分布式... 实现思路FastAPI 服务器Celery 任务队列RabbitMQ 作为消息代理定时任务处理完整

C#使用SQLite进行大数据量高效处理的代码示例

《C#使用SQLite进行大数据量高效处理的代码示例》在软件开发中,高效处理大数据量是一个常见且具有挑战性的任务,SQLite因其零配置、嵌入式、跨平台的特性,成为许多开发者的首选数据库,本文将深入探... 目录前言准备工作数据实体核心技术批量插入:从乌龟到猎豹的蜕变分页查询:加载百万数据异步处理:拒绝界面

Springboot处理跨域的实现方式(附Demo)

《Springboot处理跨域的实现方式(附Demo)》:本文主要介绍Springboot处理跨域的实现方式(附Demo),具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不... 目录Springboot处理跨域的方式1. 基本知识2. @CrossOrigin3. 全局跨域设置4.

python+opencv处理颜色之将目标颜色转换实例代码

《python+opencv处理颜色之将目标颜色转换实例代码》OpenCV是一个的跨平台计算机视觉库,可以运行在Linux、Windows和MacOS操作系统上,:本文主要介绍python+ope... 目录下面是代码+ 效果 + 解释转HSV: 关于颜色总是要转HSV的掩膜再标注总结 目标:将红色的部分滤

Python实现自动化接收与处理手机验证码

《Python实现自动化接收与处理手机验证码》在移动互联网时代,短信验证码已成为身份验证、账号注册等环节的重要安全手段,本文将介绍如何利用Python实现验证码的自动接收,识别与转发,需要的可以参考下... 目录引言一、准备工作1.1 硬件与软件需求1.2 环境配置二、核心功能实现2.1 短信监听与获取2.