Spark Streaming(二十六)DStream基本数据源、高级数据源

2024-02-27 17:08

本文主要是介绍Spark Streaming(二十六)DStream基本数据源、高级数据源,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

输入DStream和Receiver

输入DStream其实就是从数据源接收到的输入数据流的DStream。每个DStream都与一个Receiver对象一一对应。SparkStreaming提供了两种内置数据源支持。

  • 基本的数据源:Streaming API中直接提供的数据源。例如文件系统和套接字连接。
  • 高级数据源:Kafka、Flume、Kinesis等数据源,这种高级数据源需要提供额外Maven依赖。
    注意:因为Receiver接收器是运行在Executor上的,并且每个Receiver运行需要一个Core,如果指定多个Receiver接收器同时接收数据,那么就要给Executor分配足够的Core,因为Streaming程序运行的时候,Excutor是长期运行的。并且在本地测试的时候,MasterURl不要指定local或者local[1],因为Streaming应用程序最少需要两个线程,一个用于接受数据,一个用于处理接受到的数据,如果你指定一个线程,那么程序只会运行Receiver接收器,而不会有其他的线程去处理接收到的数据。

输入DStream的基本数据源

利用Streaming API来创建DSteam,它提供了通过文件(任何兼容HDFS文件系统的分布式文件系统)创建DStream的输入源。以下API方法的功能就是SparkStreaming将监控dataDirectory这个目录,并且处理该目录中任何创建的文件(不支持监控嵌套目录)。

package com.lyz.streaming.datasource.genericimport org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.{SparkConf, rdd}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}object FileStreamTest {def main(args: Array[String]): Unit = {//初始化SparkConfval conf = new SparkConf().setMaster("local[2]").setAppName("Streamingtest")//设置序列化类型.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")val streaming = new StreamingContext(conf, Seconds(2))/** * LongWritable:读取HDFS文件返回的偏移量,也就是一行文本所在文件的位置* Text:读取HDFS文件的返回Text类型,也就是Java中的String类型* TextInputFormat:指定读取HDFS文件的类型*/val stream: InputDStream[(LongWritable, Text)] = streaming.fileStream[LongWritable, Text, TextInputFormat]("hdfs://xxxxx:8020/spark/data")stream.foreachRDD(rdd => {rdd.foreach(res => {println(res._2)})})streaming.start()streaming.awaitTermination()}
}

注意

  • 监控的目录中的文件格式要一样
  • 目录中创建文件的方式只能是移动或者重命名
  • 移动后更改文件,SparkStreaming是不会读取新的数据的,因为监控文件以后,在SparkStreaming中的数据是不可变的了。
    对于简单的文件,可以利用streamingContext.textFileStream(dataDiretory)方法读取文件,文件流不需要Receiver接收器,不需要单独配备Core

自定义Receiver接收器

SparkStreaming可以接收任何数据源发来的数据流,如果数据源超出了SparkStreaming内置的数据源限制,那么开发人员可以根据实际的业务自定义Receiver数据接收器。需要注意的是自定义接收器只能支持JavaScala语言。


/**** 自定义一个类,继承抽象类Receiver,并且要根据业务指定Receiver的泛型,这个泛型就是接收到的数据后给* SparkStreaming的数据类型* Receiver的构造参数指定就是接受到数据后的本地化策略*/
class CustomReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {def onStart() {/*** 因为onStart的方法和onStop方法不能无限制的阻塞,所以其中一方需要开启一个单独的线程。*/new Thread("Socket Receiver") {override def run() {receive()}}.start()}def onStop() {// There is nothing much to do as the thread calling receive()// is designed to stop by itself if isStopped() returns false}/*** 具体的数据输入源,需要对异常进行捕获,防止接收器故障*/private def receive() {var socket: Socket = nullvar userInput: String = ""try {// Connect to host:portsocket = new Socket(host, port)// Until stopped or connection broken continue readingval reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))userInput = reader.readLine()while (!isStopped && userInput != null) {userInput = reader.readLine()this.store(userInput)}reader.close()socket.close()/*** restart方法的执行流程就是首先异步调用调用onStop方法之后,在延期调用onStart方法,** stop<exception>:将会调用onStop方法停止Receiver接收器。** reportError<exception>:就是停止也不重启Reveiver接收器,将错误信息报告给Driver程序*/restart("Trying to connect again")} catch {case e: java.net.ConnectException =>// restart if could not connect to serverrestart("Error connecting to " + host + ":" + port, e)case t: Throwable =>// restart if there is any other errorrestart("Error receiving data", t)}}
}

高级数据源之Kafka

Apache Kafka是基于生产者-消费者模型的一个分布式的消息服务。SparkStreamingKafka集成,提供了简单的并行性,Kafka分区和Spark分区是1.1对应关系。

KafkaSparkStreaming对应关系

spark-streaming-kafka-0-8spark-streaming-kafka-0-10
Broker Version0.8.2.1 or higher0.10.0 or higher

添加Maven依赖

<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.11</artifactId><version>2.2.2</version>
</dependency>

通过Kafka直接创建DStream,需要注意的是,kafka导入的包是org.apache.spark.streaming.kafka010

package com.lyz.streaming.datasource.advanceimport org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribeobject KafkaDataSourceTest {def main(args: Array[String]): Unit = {val kafkaParams = Map[String, Object]("bootstrap.servers" -> "localhost:9092", //指定Kafka的集群地址"key.deserializer" -> classOf[StringDeserializer], //指定key的反序列化器"value.deserializer" -> classOf[StringDeserializer], //指定值的反序列化器"group.id" -> "", //consumer的分组id"auto.offset.reset" -> "latest", //从新定义消费者以后,不从头消费分区里的数据,只消费定义消费者以后的数据"enable.auto.commit" -> (false: java.lang.Boolean) //是否自动提交offsets,也就是更新kafka里的offset,表示已经被消费过了)//定义消费主题topicval topics = Array("topic1", "topic2")val sparkConf: SparkConf = new SparkConf().setMaster("local[3]").setAppName("KafkaDatasourceTest")val streamingContext: StreamingContext = new StreamingContext(sparkConf, Seconds(2))/*** PreferConsistent:分区策略 ---->在可用的Executor之间均匀分配分区* PreferBrokers:分区策略 ---->只有当执行程序与Kafka代理程序位于相同的节点时,才可以使用。* PreferFixed:分区策略 ---->分区之间的负载偏差比较大,就该用这个分区策略* Subscribe:消费策略 ---->消费固定主题上的消息* SubscribePattern:消费策略 ---->消费正则匹配到的主题上的消息* Assign:消费策略 ---->消费固定分区集合上消息* ConsumerRecord:包含了主题名、分区号、分区记录的偏移量、具体的值*/val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](streamingContext,PreferConsistent,Subscribe[String, String](topics, kafkaParams))kafkaStream.map(record => (record.key(), record.value()))}
}

注意:如果Spark批处理的时间间隔大于Kafka默认的心跳回话超时时间(30s),请适当增减Kafkaheartbeat.internal.mssession.timeout.ms对于大于五分钟的批次,需要改变,group.max.session.timeout.ms。有关Kafka具体的配置参数请参考http://kafka.apache.org/documentation.html#newconsumerconfigs

LocationStrategies位置策略

新的Kafka消费者API能够预先将分区上消息缓存到缓冲区中。处于性能原因,Spark会将消息保留在Executor上,而不是每个批次都重新创建它们。并且能够在消费者的宿主机上适当的调度分区。如果需要禁用消费者缓存可以配置spark.streaming.kafka.consumer.cahce.enablefalse

object KafkaDataSourceTest {def main(args: Array[String]): Unit = {val kafkaParams = ...val topics = ...val sparkConf: SparkConf = ....val streamingContext: StreamingContext =....//禁用消费者预先缓存分区上的消息sparkConf.set("spark.streaming.kafka.consumer.cahce.enable", "false")val kafkaStream: InputDStream[ConsumerRecord[String, String]] = ...}
}

大多数情况下你可以使用LocationStrategies.PreferConsistent来指定Executor之间的均匀的分区策略。如果你的ExecutorKafka代理位于同一台宿主机上,那么你就可以使用LocationStrategies.PreferBrokers,这将优先为该分区上的Kafka Leader安排分区。如果分区分区的负载偏移差过大,那么就可以选择LocationStrategies.PreferFixed来手动安排分区。

消费者的默认缓存大小是64,如果你希望并行处理超过64*Executor大小的数据,那么你就需要修改spark.streaming.kafka.consumer.cache.maxCapacity参数。

object KafkaDataSourceTest {def main(args: Array[String]): Unit = {val kafkaParams = ...val topics = ...val sparkConf: SparkConf = ....val streamingContext: StreamingContext =....//配置SparkStreaming处理最大数据的大小sparkConf.set("spark.streaming.kafka.consumer.cache.maxCapacity", "80")val kafkaStream: InputDStream[ConsumerRecord[String, String]] = ...}
}

ConsumerStrategies消费策略

新的Kafka消费者API可以从不同的主题上消费消息。ConsumerStrategies提供了一个抽象,它可以指定具体消费哪个主题的策略。

ConsumerStrategies.Subscribe指定的是消费固定主题集合上的消息。Consumer.Strategies.SubscribePattern指定的是消费正则匹配到的主题上的消息。Assign指定了消费具体的分区集合上的消息。

基于Kafka创建RDD

如果你有一个更适合批处理的实例的话,你可以根据offset的偏移量范围来创建RDD

package com.lyz.streaming.datasource.advance
import java.util
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{KafkaUtils, LocationStrategies, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}object CreateRddByOffset {def main(args: Array[String]): Unit = {//定义一个Java Map 初始化连接kafka参数val kafkaParams = new util.HashMap[String, Object]()kafkaParams.put("bootstrap.servers", "localhost:9092")kafkaParams.put("key.deserializer", classOf[StringDeserializer])kafkaParams.put("value.deserializer", classOf[StringDeserializer])kafkaParams.put("group.id", "")kafkaParams.put("auto.offset.reset", "latest")kafkaParams.put("enable.auto.commit", false: java.lang.Boolean)val offsetRangs = Array(//参数解释依次是 主题名字、分区索引、offset开始、offset结束OffsetRange("test", 0, 0, 100),OffsetRange("test", 1, 0, 100),OffsetRange("test", 2, 0, 100))val sparkConf: SparkConf = new SparkConf().setMaster("local[3]").setAppName("KafkaDatasourceTest")val streamingContext: StreamingContext = new StreamingContext(sparkConf, Seconds(2))/** * 、根据分区的偏移量来创建RDD,* kafkaParams:这个参数是Java的HashMap*/val rdd = KafkaUtils.createRDD[String, String](streamingContext.sparkContext, kafkaParams, offsetRangs, LocationStrategies.PreferConsistent)}
}

保证数据完整性之获取偏移量并存储

当我们接受到Kafka上的数据以后,会将数据转换成DStream,而DStream底层其实就是时间片上的RDD。我们都知道RDD是以分区的形式散落到每个分区上的,由于RDD上的分区与Kafka上的分区是一一对应的。如果我们操作RDD的时候出现了Shuffle操作,那么RDD的分区与Kafka的分区之间的一一映射关系不会被保留,这样如果中间出现了操作失败的话,由于默认情况下Kafka的消费者会定期提交偏移量,那么消费者就不会再次消费到之前的数据,这样就影响数据的可靠性和准确性。我们可以获取RDD分区对应Kafka分区上的偏移量并保存下来,当处理Shuffle失败的时候,我们就可以按照偏移量重新去Kafka分区上重新加载数据,这样就保证了数据的可靠性和准确性。

注意想要保证数据的可靠性和完整性,我们要关闭消费者自动定期的提交偏移量。 设置Kafkaenable.auto.commit参数 为 (false: java.lang.Boolean)

package com.lyz.streaming.datasource.advanceobject KafkaDataSourceTest {def main(args: Array[String]): Unit = {val kafkaParams = ...//定义消费主题topicval topics = Array("topic1", "topic2")val sparkConf: SparkConf =...val streamingContext: StreamingContext = ...val kafkaStream: InputDStream[ConsumerRecord[String, String]] = ...//获取RDD分区对应Kafka上分区的偏移量kafkaStream.foreachRDD(rdd => {val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRangesrdd.foreachPartition(partion => {val range = offsetRanges(TaskContext.get().partitionId())val topic: String = range.topic //该RDD对应分区上的Kafka主题val fromOffset: Long = range.fromOffset //该RDD对应分区上的数据在Kafka上偏移量开始的位置val untilOffset: Long = range.untilOffset //该RDD对应分区上的数据在Kafka上偏移量结束的位置val partition: Int = range.partition //该RDD对应分区上的数据对应kafka分区的位置})})}
}

保证数据完整性之Kafka本身存储偏移量

Kafka有一个新的偏移量API,用于在特殊的主题中存储偏移量。默认情况下消费者会定期自动提交偏移量,这样消费者消费了Kakfa上的数据以后可能没有结果输出或者出现处理异常,影响了数据的完整性,这就是为什么我们要设置enable.auto.commitfalse的原因。但是使用commitAsync API后就可以解决这个问题

package com.lyz.streaming.datasource.advanceobject CommitAsyncTest {def main(args: Array[String]): Unit = {val kafkaParams = ...//定义消费主题topicval topics = Array("topic1", "topic2")val sparkConf: SparkConf = ...val streamingContext: StreamingContext = ...val kafkaStream: InputDStream[ConsumerRecord[String, String]] = ...kafkaStream.foreachRDD(rdd => {//这个必须处理RDD上的数据之前调用val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRangesrdd.foreachPartition(partion => {//处理你的业务逻辑})/*** 这个API必须在处理RDD数据得到结果之后调用* commitAsync这个方法是线程安全的*/kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)})}
}

保证数据完整性之自定义存储偏移量

为了SparkStreaming业务的事务性,我们最好把业务计算结果和offset同时进行存储,这样可以保证要么都成功,要么都失败,这样就形成了一个原子操作。由于一个业务有可能是不同分区上、不同主题上、不同的offset上的数据,这样我们应该利用Assign来指定消费具体的分区集合上的消息。一下代码我是将offset信息保存在Hbase中,每次处理数据首先从Hbase中查询出上一次offset结束的位置开始消费。

package com.lyz.streaming.datasource.advanceimport java.utilimport org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferFixed
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}/*** 自定义消费的主题和分区测试类*/
object AysncTest {def main(args: Array[String]): Unit = {//Hbase表名val topic_partition_offset = "topic_partition_offset"//Hbase表的列簇名val hbase_comsumer_offset = "hbase_comsumer_offset"//Kafka出书画参数val kafkaParams = Map[String, Object]("bootstrap.servers" -> "192.168.101.215:9092", //指定Kafka的集群地址"key.deserializer" -> classOf[StringDeserializer], //指定key的反序列化器"value.deserializer" -> classOf[StringDeserializer], //指定值的反序列化器"group.id" -> "0", //consumer的分组id,相同组的消费者共同消费一个topic上的数据,每一个消费者消费单独partition数据"auto.offset.reset" -> "latest", //从新定义消费者以后,不从头消费分区里的数据,只消费定义消费者以后的数据"enable.auto.commit" -> (false: java.lang.Boolean) //是否自动提交offsets,也就是更新kafka里的offset,表示已经被消费过了)//定义消费主题topicval topics = Array("topicname", "topicname1")//初始化SparkConfval sparkConf: SparkConf = new SparkConf().setMaster("local[3]").setAppName("KafkaDatasourceTest")//这个配置是解决Spark读取Hbase抛java.io.NotSerializableException异常的问题.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")//初始化SparkStreamingval streamingContext: StreamingContext = new StreamingContext(sparkConf, Seconds(5))//获取Hbase配置val hbaseConf: Configuration = HBaseConfiguration.create()val hbaseConn: Connection = ConnectionFactory.createConnection(hbaseConf) //根据Hbase配置初始化Hbase连接val admin: Admin = hbaseConn.getAdminval tableName: TableName = TableName.valueOf(hbase_comsumer_offset)val isExsist: Boolean = hbaseConn.getAdmin.tableExists(tableName)var stream: InputDStream[ConsumerRecord[String, String]] = null//判断Hbase是否存在offset表,没有就从topic头开始消费if (!isExsist) {//定义一个Hbase表val hTable = new HTableDescriptor(TableName.valueOf(hbase_comsumer_offset))//定义Hbase表的列簇hTable.addFamily(new HColumnDescriptor(Bytes.toBytes(topic_partition_offset)))//创建Hbase表admin.createTable(hTable)stream = KafkaUtils.createDirectStream[String, String](streamingContext,PreferFixed(Map[TopicPartition, String]()),Subscribe[String, String](topics, kafkaParams))} else { //如果有就从上一次消费到的offset开始消费//配置spark读取Hbase的哪个表hbaseConf.set(TableInputFormat.INPUT_TABLE, tableName.toString)//配置spark读取Hbase表中的那个列簇hbaseConf.set(TableInputFormat.SCAN_COLUMN_FAMILY, topic_partition_offset)//Spark读取Hbase上的数据val hbaseRdd = streamingContext.sparkContext.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])//获取Hbase中offset的信息val fromOffset: Map[TopicPartition, Long] = hbaseRdd.map(rdd => {val res: Result = rdd._2val topic: String = Bytes.toString(res.getValue(Bytes.toBytes(topic_partition_offset), Bytes.toBytes("topic")))val partition: Int = Bytes.toInt(res.getValue(Bytes.toBytes(topic_partition_offset), Bytes.toBytes("partition")))val untilOffset: Long = Bytes.toLong(res.getValue(Bytes.toBytes(topic_partition_offset), Bytes.toBytes("untilOffset")))new TopicPartition(topic, partition) -> untilOffset}).collect().toMap//接着从上一次读取的到offset开始读取Kafka上的数据stream = KafkaUtils.createDirectStream[String, String](streamingContext,LocationStrategies.PreferConsistent,ConsumerStrategies.Assign[String, String](fromOffset.keys.toList, kafkaParams, fromOffset))}stream.foreachRDD(rdd => {//这个必须处理RDD上的数据之前调用val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges//将offset存储到Hbase上val t: Table = hbaseConn.getTable(tableName)//定义Put容器val putList = new util.ArrayList[Put]//更新Hbase上的Offerset信息for (offset <- offsetRanges) {val put = new Put(Bytes.toBytes(offset.topic + "_" + kafkaParams.get("group.id")))put.addColumn(Bytes.toBytes(topic_partition_offset), Bytes.toBytes("topic"), Bytes.toBytes(offset.topic))put.addColumn(Bytes.toBytes(topic_partition_offset), Bytes.toBytes("partition"), Bytes.toBytes(offset.partition))put.addColumn(Bytes.toBytes(topic_partition_offset), Bytes.toBytes("untilOffset"), Bytes.toBytes(offset.untilOffset))putList.add(put)}t.put(putList)rdd.foreachPartition(partion => {//你自己的处理业务逻辑})})admin.flush(tableName)streamingContext.start()streamingContext.awaitTermination()}
}

在Spark读取Hbase上的数据的时候,想要精确点,你可以利用HbaseConfiguration配置具体的参数,如下所示

public static final String INPUT_TABLE = "hbase.mapreduce.inputtable";    
private static final String SPLIT_TABLE = "hbase.mapreduce.splittable";    
public static final String SCAN = "hbase.mapreduce.scan";  
public static final String SCAN_ROW_START = "hbase.mapreduce.scan.row.start";   
public static final String SCAN_ROW_STOP = "hbase.mapreduce.scan.row.stop";    
public static final String SCAN_COLUMN_FAMILY = "hbase.mapreduce.scan.column.family";    
public static final String SCAN_COLUMNS = "hbase.mapreduce.scan.columns";   
public static final String SCAN_TIMESTAMP = "hbase.mapreduce.scan.timestamp";    
public static final String SCAN_TIMERANGE_START = "hbase.mapreduce.scan.timerange.start";    
public static final String SCAN_TIMERANGE_END = "hbase.mapreduce.scan.timerange.end";    
public static final String SCAN_MAXVERSIONS = "hbase.mapreduce.scan.maxversions";    
public static final String SCAN_CACHEBLOCKS = "hbase.mapreduce.scan.cacheblocks";    
public static final String SCAN_CACHEDROWS = "hbase.mapreduce.scan.cachedrows";    
public static final String SCAN_BATCHSIZE = "hbase.mapreduce.scan.batchsize";    
public static final String SHUFFLE_MAPS = "hbase.mapreduce.inputtable.shufflemaps";

高级数据源之Flume

Apache Flume是一个分布式、可靠的服务,它能够高效收集、聚合和移动大量日志。这里我们将介绍怎样配置FlumeSparkStreamingFlume上接受数据。SparkStreaming接受Flume上的数据有两种方法。

Flume推送数据给Streaming方式

Flume被设计用来推送数据的,这种方式,SparkStreaming作为一个Avro数据池接受器来接受Flume推送过来的数据。这种方式设置器来比较容易,但是它的缺点就是不使用事物接收数据,并且可能数据量一大,Streaming消费不过来而造成节点的高负载,如果消费节点出现故障,那么数据就会丢失,造成数据的不完整性。

Flume的配置,Flume的具体配置请参考Flume学习总结

#定义sources、sinks、channels
a1.sources = r1
a1.sinks = k1
a1.channels = c1#定义sources的属性配置
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444#定义sinks的属性配置
a1.sinks.avroSink.type = avro
a1.sinks.avroSink.channel = memory
a1.sinks.avroSink.hostname = xxxxx
a1.sinks.avroSink.port = xxxx#定义channels属性配置
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100#利用channel连接source和sink
a1.sources.r1.channels=c1
a1.sinks.avroSink.channels=c1

SparkStreaming代码如下

package com.lyz.streaming.datasource.advanceimport org.apache.spark.{SparkConf, rdd}
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent}
import org.apache.spark.streaming.{Seconds, StreamingContext}object FlumeDataSourceTest {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setMaster("local[3]").setAppName("KafkaDatasourceTest")val streamingContext: StreamingContext = new StreamingContext(sparkConf, Seconds(2))//作为avro数据池,来接受Flume推送过来的数据val stream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createStream(streamingContext, "localhost", 44444)stream.foreachRDD(rdd => {rdd.collect().foreach(println(_))})streamingContext.start()streamingContext.awaitTermination()}
}

Streaming从Flume上拉取数据方式

这种方式Streaming自动从Flume的缓存池中拉取数据,Streaming通过事物从数据池中读取并复制数据,在收到事物完成的通知前,这些数据还会保留在数据池中。这样保证了数据的完整性。

由于这种方式使用的是第三方插件的Sink,它是Scala写的,所以我们要把这插件以及Scala库添加到Flume的类路径中。

插件以及Scala库的Maven

flume-sink -->
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-flume-sink_2.11</artifactId><version>2.2.2</version>
</dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>2.11.8</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.5</version>
</dependency>

Flume配置文件

#定义sources、sinks、channels
a1.sources = r1
a1.sinks = k1
a1.channels = c1#定义sources的属性配置
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444#定义sinks的属性配置
agent.sinks = spark
agent.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink
agent.sinks.spark.hostname = <hostname of the local machine>
agent.sinks.spark.port = <port to listen on for connection from Spark>
agent.sinks.spark.channel = memoryChannel#定义channels属性配置
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100#利用channel连接source和sink
a1.sources.r1.channels=c1
a1.sinks.avroSink.channels=c1

Streaming代码

package com.lyz.streaming.datasource.advanceimport org.apache.spark.{SparkConf, rdd}
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent}
import org.apache.spark.streaming.{Seconds, StreamingContext}object FlumeDataSourceTest {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setMaster("local[3]").setAppName("KafkaDatasourceTest")val streamingContext: StreamingContext = new StreamingContext(sparkConf, Seconds(2))//利用Streaming拉取Flume数据池的数据方式val stream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(streamingContext, "", 44444)stream.foreachRDD(rdd => {rdd.collect().foreach(println(_))})streamingContext.start()streamingContext.awaitTermination()}
}

这篇关于Spark Streaming(二十六)DStream基本数据源、高级数据源的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/753070

相关文章

基本知识点

1、c++的输入加上ios::sync_with_stdio(false);  等价于 c的输入,读取速度会加快(但是在字符串的题里面和容易出现问题) 2、lower_bound()和upper_bound() iterator lower_bound( const key_type &key ): 返回一个迭代器,指向键值>= key的第一个元素。 iterator upper_bou

【IPV6从入门到起飞】5-1 IPV6+Home Assistant(搭建基本环境)

【IPV6从入门到起飞】5-1 IPV6+Home Assistant #搭建基本环境 1 背景2 docker下载 hass3 创建容器4 浏览器访问 hass5 手机APP远程访问hass6 更多玩法 1 背景 既然电脑可以IPV6入站,手机流量可以访问IPV6网络的服务,为什么不在电脑搭建Home Assistant(hass),来控制你的设备呢?@智能家居 @万物互联

系统架构师考试学习笔记第三篇——架构设计高级知识(20)通信系统架构设计理论与实践

本章知识考点:         第20课时主要学习通信系统架构设计的理论和工作中的实践。根据新版考试大纲,本课时知识点会涉及案例分析题(25分),而在历年考试中,案例题对该部分内容的考查并不多,虽在综合知识选择题目中经常考查,但分值也不高。本课时内容侧重于对知识点的记忆和理解,按照以往的出题规律,通信系统架构设计基础知识点多来源于教材内的基础网络设备、网络架构和教材外最新时事热点技术。本课时知识

C 语言的基本数据类型

C 语言的基本数据类型 注:本文面向 C 语言初学者,如果你是熟手,那就不用看了。 有人问我,char、short、int、long、float、double 等这些关键字到底是什么意思,如果说他们是数据类型的话,那么为啥有这么多数据类型呢? 如果写了一句: int a; 那么执行的时候在内存中会有什么变化呢? 橡皮泥大家都玩过吧,一般你买橡皮泥的时候,店家会赠送一些模板。 上

FreeRTOS-基本介绍和移植STM32

FreeRTOS-基本介绍和STM32移植 一、裸机开发和操作系统开发介绍二、任务调度和任务状态介绍2.1 任务调度2.1.1 抢占式调度2.1.2 时间片调度 2.2 任务状态 三、FreeRTOS源码和移植STM323.1 FreeRTOS源码3.2 FreeRTOS移植STM323.2.1 代码移植3.2.2 时钟中断配置 一、裸机开发和操作系统开发介绍 裸机:前后台系

Java 多线程的基本方式

Java 多线程的基本方式 基础实现两种方式: 通过实现Callable 接口方式(可得到返回值):

Spark MLlib模型训练—聚类算法 PIC(Power Iteration Clustering)

Spark MLlib模型训练—聚类算法 PIC(Power Iteration Clustering) Power Iteration Clustering (PIC) 是一种基于图的聚类算法,用于在大规模数据集上进行高效的社区检测。PIC 算法的核心思想是通过迭代图的幂运算来发现数据中的潜在簇。该算法适用于处理大规模图数据,特别是在社交网络分析、推荐系统和生物信息学等领域具有广泛应用。Spa

Java基础回顾系列-第七天-高级编程之IO

Java基础回顾系列-第七天-高级编程之IO 文件操作字节流与字符流OutputStream字节输出流FileOutputStream InputStream字节输入流FileInputStream Writer字符输出流FileWriter Reader字符输入流字节流与字符流的区别转换流InputStreamReaderOutputStreamWriter 文件复制 字符编码内存操作流(

Java基础回顾系列-第五天-高级编程之API类库

Java基础回顾系列-第五天-高级编程之API类库 Java基础类库StringBufferStringBuilderStringCharSequence接口AutoCloseable接口RuntimeSystemCleaner对象克隆 数字操作类Math数学计算类Random随机数生成类BigInteger/BigDecimal大数字操作类 日期操作类DateSimpleDateForma

Java基础回顾系列-第一天-基本语法

基本语法 Java基础回顾系列-第一天-基本语法基础常识人机交互方式常用的DOS命令什么是计算机语言(编程语言) Java语言简介Java程序运行机制Java虚拟机(Java Virtual Machine)垃圾收集机制(Garbage Collection) Java语言的特点面向对象健壮性跨平台性 编写第一个Java程序什么是JDK, JRE下载及安装 JDK配置环境变量 pathHe