本文主要是介绍打通实时流处理log4j-flume-kafka-structured-streaming,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
大数据技术与架构
点击右侧关注,大数据开发领域最强公众号!
暴走大数据
点击右侧关注,暴走大数据!
模拟产生log4j日志
jar包依赖 pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
| <dependency><groupId>log4j</groupId><artifactId>log4j</artifactId>
</dependency>
<dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency><groupId>org.apache.flume.flume-ng-clients</groupId><artifactId>flume-ng-log4jappender</artifactId><version>1.8.0</version>
</dependency>
|
java代码 LoggerGenerator.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| public class LoggerGenerator {private static Logger logger = Logger.getLogger(LoggerGenerator.class.getName());public static void main(String[] args) throws Exception{int index = 0;while(true) {Thread.sleep(1000);logger.info("value : " + index++);}// $ kafka-topics.sh --list --zookeeper 127.0.0.1:2181}
}
|
log4j.properties配置
1
2
3
4
5
6
7
8
9
10
11
12
| log4j.rootLogger=INFO,stdout,flumelog4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target = System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%nlog4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = 127.0.0.1
log4j.appender.flume.Port = 44444
log4j.appender.flume.UnsafeMode = true
|
kafka broker启动
提前创建好topic【不是必须的】
flume-ng启动后,启动一个kafka console consulmer观察数据
1
2
3
| $ kafka-server-start.sh $KAFKA_HOME/config/server.properties$ kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic default_flume_topic
|
flume-ng配置和启动
前面文章用过的avro-memory-kafka.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| # avro-memory-kafka.conf# Name the components on this agent
avro-memory-kafka.sources = avro-source
avro-memory-kafka.sinks = kafka-sink
avro-memory-kafka.channels = momory-channel# Describe/configure the source
avro-memory-kafka.sources.avro-source.type = avro
avro-memory-kafka.sources.avro-source.bind = 127.0.0.1
avro-memory-kafka.sources.avro-source.port = 44444# Describe the sink
# Must be set to org.apache.flume.sink.kafka.KafkaSin
avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
avro-memory-kafka.sinks.kafka-sink.kafka.bootstrap.servers = 127.0.0.1:9092
avro-memory-kafka.sinks.kafka-sink.kafka.topic = default_flume_topic# Use a channel which buffers events in memory
avro-memory-kafka.channels.momory-channel.type = memory
avro-memory-kafka.channels.momory-channel.capacity = 1000
avro-memory-kafka.channels.momory-channel.transactionCapacity = 100# Bind the source and sink to the channel
avro-memory-kafka.sources.avro-source.channels = momory-channel
avro-memory-kafka.sinks.kafka-sink.channel = momory-channel
|
启动flume-ng
1
2
3
| $ nohup flume-ng agent --conf conf --conf-file conf/avro-memory-kafka.conf --name avro-memory-kafka > avro-memory-kafka.out 2>&1 &$ kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic default_flume_topic --from-beginning --new-consumer
|
spark structured streaming实时流处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| topic = 'kafka_streaming_topic'
brokers = "127.0.0.1:9092"spark = SparkSession.builder.appName("log4j-flume-kafka-structured-streaming").getOrCreate()lines = spark.readStream.format("kafka").option("kafka.bootstrap.servers", brokers).option("subscribe", topic).option("startingOffsets", """{"%s":{"0": 7}}""" % topic).load().selectExpr("CAST(value AS STRING)")# 自定义处理传输的数据-比如JSON串
words = lines.select(explode(split(lines.value, ' ')).alias('word')
)
word_counts = words.groupBy('word').count()query = word_counts.writeStream.outputMode("complete").format("console").start()
query.awaitTermination()
|
欢迎点赞+收藏+转发朋友圈素质三连
文章不错?点个【在看】吧! ????
这篇关于打通实时流处理log4j-flume-kafka-structured-streaming的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!