Spark Streaming整合log4j、Flume与Kafka的案例

2024-09-06 20:38

本文主要是介绍Spark Streaming整合log4j、Flume与Kafka的案例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

点击上方蓝色字体,选择“设为星标

回复”资源“获取更多资源

来源:作者TAI_SPARK,http://suo.im/5w7LF8

大数据技术与架构

点击右侧关注,大数据开发领域最强公众号!

暴走大数据

点击右侧关注,暴走大数据!

1.框架

2.log4j完成模拟日志输出

设置模拟日志格式,log4j.properties:

log4j.rootLogger = INFO,stdoutlog4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = %d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n

模拟日志输出,LoggerGenerator.java:

import org.apache.log4j.Logger;/*** 模拟日志产生*/
public class LoggerGenerator {private static Logger logger = Logger.getLogger(LoggerGenerator.class.getName());public static void main(String[] args) throws Exception{int index = 0;while(true){Thread.sleep(1000);logger.info("value:" + index++);}}
}

运行结果:

2020-03-07 18:21:37,637 [main] [LoggerGenerator] [INFO] - current value is:0
2020-03-07 18:21:38,639 [main] [LoggerGenerator] [INFO] - current value is:1
2020-03-07 18:21:39,639 [main] [LoggerGenerator] [INFO] - current value is:2
2020-03-07 18:21:40,640 [main] [LoggerGenerator] [INFO] - current value is:3
2020-03-07 18:21:41,640 [main] [LoggerGenerator] [INFO] - current value is:4
2020-03-07 18:21:42,641 [main] [LoggerGenerator] [INFO] - current value is:5
2020-03-07 18:21:43,641 [main] [LoggerGenerator] [INFO] - current value is:6
2020-03-07 18:21:44,642 [main] [LoggerGenerator] [INFO] - current value is:7
2020-03-07 18:21:45,642 [main] [LoggerGenerator] [INFO] - current value is:8
2020-03-07 18:21:46,642 [main] [LoggerGenerator] [INFO] - current value is:9
2020-03-07 18:21:47,643 [main] [LoggerGenerator] [INFO] - current value is:10

3.Flume收集log4j日志

$FLUME_HOME/conf/streaming.conf:

agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=log-sink#define source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=0.0.0.0
agent1.sources.avro-source.port=41414#define channel
agent1.channels.logger-channel.type=memory#define sink
agent1.sinks.log-sink.type=loggeragent1.sources.avro-source.channels=logger-channel
agent1.sinks.log-sink.channel=logger-channel

启动Flume(注意输出到控制台上为INFO,console,不是点【.】):

flume-ng agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/streaming.conf \
--name agent1 \
-Dflume.root.logger=INFO,console

pom.xml加上一个jar包:

    <dependency><groupId>org.apache.flume.flume-ng-clients</groupId><artifactId>flume-ng-log4jappender</artifactId><version>1.6.0</version></dependency>

修改log4j.properties,使其与Flume链接:

log4j.rootLogger = INFO,stdout,flumelog4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = %d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%nlog4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = hadoop000
log4j.appender.flume.Port = 41414
log4j.appender.flume.UnsafeMode = true

启动log4j:

Flume采集成功

4.KafkaSink链接Kafka与Flume

使用Kafka第一件事是把Zookeeper启动起来~

./zkServer.sh start

启动Kafka

./kafka-server-start.sh -daemon /home/hadoop/app/kafka_2.11-0.9.0.0/config/server.properties 

看下Kafka列表(用./kafka-topics.sh会报错,用“./”加文件名.sh执行时,必须给.sh文件加x执行权限):

kafka-topics.sh --list --zookeeper hadoop000:2181

创建一个topic:

kafka-topics.sh --create \
--zookeeper hadoop000:2181 \
--replication-factor 1 \
--partitions 1 \
--topic tp_streamingtopic

对接Flume与Kafka,设置Flume的conf,取名为streaming2.conf:

Kafka sink需要的参数有(每个版本不一样,具体可以查阅官网):

  • sink类型填KafkaSink

  • 需要链接的Kafka topic

  • Kafka中间件broker的地址与端口号

  • 是否使用握手机制

  • 每次发送的数据大小

agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=kafka-sink#define source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=0.0.0.0
agent1.sources.avro-source.port=41414#define channel
agent1.channels.logger-channel.type=memory#define sink
agent1.sinks.kafka-sink.type=org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafka-sink.topic = tp_streamingtopic
agent1.sinks.kafka-sink.brokerList = hadoop000:9092
agent1.sinks.kafka-sink.requiredAcks = 1
agent1.sinks.kafka-sink.batchSize = 20agent1.sources.avro-source.channels=logger-channel
agent1.sinks.kafka-sink.channel=logger-channel

启动Flume:

flume-ng agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/streaming2.conf \
--name agent1 \
-Dflume.root.logger=INFO,console

Kafka需要启动一个消费者消费Flume中Kafka sink来的数据:

./kafka-console-consumer.sh --zookeeper hadoop000:2181 --topic tp_streamingtopic

启动log4j:

成功传输~

5.Spark Streaming消费Kafka数据

package com.taipark.sparkimport kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}/*** Spark Streaming 对接 Kafka*/
object KafkaStreamingApp {def main(args: Array[String]): Unit = {if(args.length != 2){System.err.println("Userage:KafkaStreamingApp<brokers><topics>");System.exit(1);}val Array(brokers,topics) = argsval sparkConf = new SparkConf().setAppName("KafkaReceiverWordCount").setMaster("local[2]")val ssc = new StreamingContext(sparkConf,Seconds(5))val kafkaParams = Map[String,String]("metadata.broker.list"-> brokers)val topicSet = topics.split(",").toSetval messages = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topicSet)//第二位是字符串的值messages.map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()ssc.start()ssc.awaitTermination()}}

入参是Kafka的broker地址与topic名称:

本地Run一下:

欢迎点赞+收藏+转发朋友圈素质三连

文章不错?点个【在看】吧! ????

这篇关于Spark Streaming整合log4j、Flume与Kafka的案例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1143065

相关文章

springboot整合阿里云百炼DeepSeek实现sse流式打印的操作方法

《springboot整合阿里云百炼DeepSeek实现sse流式打印的操作方法》:本文主要介绍springboot整合阿里云百炼DeepSeek实现sse流式打印,本文给大家介绍的非常详细,对大... 目录1.开通阿里云百炼,获取到key2.新建SpringBoot项目3.工具类4.启动类5.测试类6.测

深入理解Apache Kafka(分布式流处理平台)

《深入理解ApacheKafka(分布式流处理平台)》ApacheKafka作为现代分布式系统中的核心中间件,为构建高吞吐量、低延迟的数据管道提供了强大支持,本文将深入探讨Kafka的核心概念、架构... 目录引言一、Apache Kafka概述1.1 什么是Kafka?1.2 Kafka的核心概念二、Ka

Python中使用正则表达式精准匹配IP地址的案例

《Python中使用正则表达式精准匹配IP地址的案例》Python的正则表达式(re模块)是完成这个任务的利器,但你知道怎么写才能准确匹配各种合法的IP地址吗,今天我们就来详细探讨这个问题,感兴趣的朋... 目录为什么需要IP正则表达式?IP地址的基本结构基础正则表达式写法精确匹配0-255的数字验证IP地

MySQL高级查询之JOIN、子查询、窗口函数实际案例

《MySQL高级查询之JOIN、子查询、窗口函数实际案例》:本文主要介绍MySQL高级查询之JOIN、子查询、窗口函数实际案例的相关资料,JOIN用于多表关联查询,子查询用于数据筛选和过滤,窗口函... 目录前言1. JOIN(连接查询)1.1 内连接(INNER JOIN)1.2 左连接(LEFT JOI

springboot循环依赖问题案例代码及解决办法

《springboot循环依赖问题案例代码及解决办法》在SpringBoot中,如果两个或多个Bean之间存在循环依赖(即BeanA依赖BeanB,而BeanB又依赖BeanA),会导致Spring的... 目录1. 什么是循环依赖?2. 循环依赖的场景案例3. 解决循环依赖的常见方法方法 1:使用 @La

一文详解如何从零构建Spring Boot Starter并实现整合

《一文详解如何从零构建SpringBootStarter并实现整合》SpringBoot是一个开源的Java基础框架,用于创建独立、生产级的基于Spring框架的应用程序,:本文主要介绍如何从... 目录一、Spring Boot Starter的核心价值二、Starter项目创建全流程2.1 项目初始化(

Spring Boot 整合 MyBatis 连接数据库及常见问题

《SpringBoot整合MyBatis连接数据库及常见问题》MyBatis是一个优秀的持久层框架,支持定制化SQL、存储过程以及高级映射,下面详细介绍如何在SpringBoot项目中整合My... 目录一、基本配置1. 添加依赖2. 配置数据库连接二、项目结构三、核心组件实现(示例)1. 实体类2. Ma

SpringBoot整合jasypt实现重要数据加密

《SpringBoot整合jasypt实现重要数据加密》Jasypt是一个专注于简化Java加密操作的开源工具,:本文主要介绍详细介绍了如何使用jasypt实现重要数据加密,感兴趣的小伙伴可... 目录jasypt简介 jasypt的优点SpringBoot使用jasypt创建mapper接口配置文件加密

SpringBoot整合MybatisPlus的基本应用指南

《SpringBoot整合MybatisPlus的基本应用指南》MyBatis-Plus,简称MP,是一个MyBatis的增强工具,在MyBatis的基础上只做增强不做改变,下面小编就来和大家介绍一下... 目录一、MyBATisPlus简介二、SpringBoot整合MybatisPlus1、创建数据库和

MySQL中实现多表查询的操作方法(配sql+实操图+案例巩固 通俗易懂版)

《MySQL中实现多表查询的操作方法(配sql+实操图+案例巩固通俗易懂版)》本文主要讲解了MySQL中的多表查询,包括子查询、笛卡尔积、自连接、多表查询的实现方法以及多列子查询等,通过实际例子和操... 目录复合查询1. 回顾查询基本操作group by 分组having1. 显示部门号为10的部门名,员