自定义Spark Streaming接收器(Receivers)

2024-05-27 12:58

本文主要是介绍自定义Spark Streaming接收器(Receivers),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Spark Streaming除了可以使用内置的接收器(Receivers,比如Flume、Kafka、Kinesis、files和sockets等)来接收流数据,还可以自定义接收器来从任意的流中接收数据。开发者们可以自己实现org.apache.spark.streaming.receiver.Receiver类来从其他的数据源中接收数据。本文将介绍如何实现自定义接收器,并且在Spark Streaming应用程序中使用。我们可以用Scala或者Java来实现自定义接收器。

文章目录 [hide]

  • 1 实现自定义接收器
  • 2 在Spark Streaming应用程序中使用自定义接收器
  • 3 接收器的可靠性
  • 4 实现并使用基于Actor的自定义接收器





 接收线程中可能会出现任何异常,这些异常都需要被捕获,并且恰当地处理来避免接收器挂掉。restart()将通过异步地调用onStop()和onStart() 方法来重启接收器。stop()将会调用onStop()方法并且中止接收器。同样, reportError()将会向Driver发送错误信息(这些错误信息可以在logs和UI中看到),而不停止或者重启接收器。


  User : 过往记忆
  Date : 2016 03 03
  Time : 22 : 52 : 23
  bolg : http : //www.iteblog.com
  本文地址:http : //www.iteblog.com/archives/1594
  过往记忆博客微信公共帐号:iteblog _ hadoop
class CustomReceiver(host : String, port : Int)
   extends Receiver[String](StorageLevel.MEMORY _ AND _ DISK _ 2 ) with Logging {
   def onStart() {
     // Start the thread that receives data over a connection
     new Thread( "Socket Receiver" ) {
       override def run() { receive() }
   def onStop() {
    // There is nothing much to do as the thread calling receive()
    // is designed to stop by itself if isStopped() returns false
   /** Create a socket connection and receive data until receiver is stopped */
   private def receive() {
     var socket : Socket = null
     var userInput : String = null
     try {
      // Connect to host:port
      socket = new Socket(host, port)
      // Until stopped or connection broken continue reading
      val reader = new BufferedReader( new InputStreamReader(socket.getInputStream(), "UTF-8" ))
      userInput = reader.readLine()
      while (!isStopped && userInput ! = null ) {
        userInput = reader.readLine()
      // Restart in an attempt to connect again when server is active again
      restart( "Trying to connect again" )
     } catch {
      case e : java.net.ConnectException = >
        // restart if could not connect to server
        restart( "Error connecting to " + host + ":" + port, e)
      case t : Throwable = >
        // restart if there is any other error
        restart( "Error receiving data" , t)

在Spark Streaming应用程序中使用自定义接收器

  自定义的接收器可以通过使用streamingContext.receiverStream()方法来在Spark Streaming应用程序中使用。这将使用自定义接收器接收到的数据来创建input DStream,如下:

// Assuming ssc is the StreamingContext
val lines = ssc.receiverStream( new CustomReceiver(host, port))
val words = lines.flatMap( _ .split( " " ))


package org.apache.spark.examples.streaming
import java.io.{BufferedReader, InputStream, InputStreamReader}
import java.net.Socket
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.receiver.Receiver
  * Custom Receiver that receives data over a socket. Received bytes is interpreted as
  * text and \n delimited lines are considered as records. They are then counted and printed.
  * To run this on your local machine, you need to first run a Netcat server
  *    `$ nc -lk 9999`
  * and then run the example
  *    `$ bin/run-example org.apache.spark.examples.streaming.CustomReceiver localhost 9999`
object CustomReceiver {
   def main(args : Array[String]) {
     if (args.length < 2 ) {
       System.err.println( "Usage: CustomReceiver <hostname> <port>" )
       System.exit( 1 )
     // Create the context with a 1 second batch size
     val sparkConf = new SparkConf().setAppName( "CustomReceiver" )
     val ssc = new StreamingContext(sparkConf, Seconds( 1 ))
     // Create a input stream with the custom receiver on target ip:port and count the
     // words in input stream of \n delimited text (eg. generated by 'nc')
     val lines = ssc.receiverStream( new CustomReceiver(args( 0 ), args( 1 ).toInt))
     val words = lines.flatMap( _ .split( " " ))
     val wordCounts = words.map(x = > (x, 1 )).reduceByKey( _ + _ )
class CustomReceiver(host : String, port : Int)
   extends Receiver[String](StorageLevel.MEMORY _ AND _ DISK _ 2 ) with Logging {
   def onStart() {
     // Start the thread that receives data over a connection
     new Thread( "Socket Receiver" ) {
       override def run() { receive() }
   def onStop() {
    // There is nothing much to do as the thread calling receive()
    // is designed to stop by itself isStopped() returns false
   /** Create a socket connection and receive data until receiver is stopped */
   private def receive() {
    var socket : Socket = null
    var userInput : String = null
    try {
      logInfo( "Connecting to " + host + ":" + port)
      socket = new Socket(host, port)
      logInfo( "Connected to " + host + ":" + port)
      val reader = new BufferedReader( new InputStreamReader(socket.getInputStream(), "UTF-8" ))
      userInput = reader.readLine()
      while (!isStopped && userInput ! = null ) {
        userInput = reader.readLine()
      logInfo( "Stopped receiving" )
      restart( "Trying to connect again" )
    } catch {
      case e : java.net.ConnectException = >
        restart( "Error connecting to " + host + ":" + port, e)
      case t : Throwable = >
        restart( "Error receiving data" , t)











  用户自定义的Akka Actor同样可以被用于数据的接收。ActorHelper trait可以使用在任何的Akka actor中,并且可以使用store(...)方法把接收到的数据存储在Spark中。actor的监管策略做相关的配置来处理异常等,如下:

  User : 过往记忆
  Date : 2016 03 03
  Time : 22 : 52 : 23
  bolg : http : //www.iteblog.com
  本文地址:http : //www.iteblog.com/archives/1594
  过往记忆博客微信公共帐号:iteblog _ hadoop
class CustomActor extends Actor with ActorHelper {
   def receive = {
     case data : String = > store(data)


// Assuming ssc is the StreamingContext
val lines = ssc.actorStream[String](Props( new CustomActor()), "CustomReceiver" )


package org.apache.spark.examples.streaming
import scala.collection.mutable.LinkedHashSet
import scala.reflect.ClassTag
import scala.util.Random
import akka.actor. _
import com.typesafe.config.ConfigFactory
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.akka.{ActorReceiver, AkkaUtils}
case class SubscribeReceiver(receiverActor : ActorRef)
case class UnsubscribeReceiver(receiverActor : ActorRef)
  * Sends the random content to every receiver subscribed with 1/2
  *  second delay.
class FeederActor extends Actor {
   val rand = new Random()
   val receivers = new LinkedHashSet[ActorRef]()
   val strings : Array[String] = Array( "words " , "may " , "count " )
   def makeMessage() : String = {
     val x = rand.nextInt( 3 )
     strings(x) + strings( 2 - x)
    * A thread to generate random messages
   new Thread() {
     override def run() {
       while ( true ) {
         Thread.sleep( 500 )
         receivers.foreach( _ ! makeMessage)
   def receive : Receive = {
     case SubscribeReceiver(receiverActor : ActorRef) = >
       println( "received subscribe from %s" .format(receiverActor.toString))
       receivers + = receiverActor
     case UnsubscribeReceiver(receiverActor : ActorRef) = >
       println( "received unsubscribe from %s" .format(receiverActor.toString))
       receivers - = receiverActor
  * A sample actor as receiver, is also simplest. This receiver actor
  * goes and subscribe to a typical publisher/feeder actor and receives
  * data.
  * @see [[org.apache.spark.examples.streaming.FeederActor]]
class SampleActorReceiver[T](urlOfPublisher : String) extends ActorReceiver {
   lazy private val remotePublisher = context.actorSelection(urlOfPublisher)
   override def preStart() : Unit = remotePublisher ! SubscribeReceiver(context.self)
   def receive : PartialFunction[Any, Unit] = {
     case msg = > store(msg.asInstanceOf[T])
   override def postStop() : Unit = remotePublisher ! UnsubscribeReceiver(context.self)
  * A sample feeder actor
  * Usage: FeederActor <hostname> <port>
  *   <hostname> and <port> describe the AkkaSystem that Spark Sample feeder would start on.
object FeederActor {
   def main(args : Array[String]) {
     if (args.length < 2 ){
       System.err.println( "Usage: FeederActor <hostname> <port>\n" )
       System.exit( 1 )
     val Seq(host, port) = args.toSeq
     val akkaConf = ConfigFactory.parseString(
       s "" "akka.actor.provider = " akka.remote.RemoteActorRefProvider "
          |akka.remote.enabled-transports = [" akka.remote.netty.tcp "]
          |akka.remote.netty.tcp.hostname = " $host "
          |akka.remote.netty.tcp.port = $port
          |" "" .stripMargin)
        val actorSystem = ActorSystem( "test" , akkaConf)
     val feeder = actorSystem.actorOf(Props[FeederActor], "FeederActor" )
     println( "Feeder started as:" + feeder)
  * A sample word count program demonstrating the use of plugging in
  * Actor as Receiver
  * Usage: ActorWordCount <hostname> <port>
  *   <hostname> and <port> describe the AkkaSystem that Spark Sample feeder is running on.
  * To run this example locally, you may run Feeder Actor as
  *    `$ bin/run-example org.apache.spark.examples.streaming.FeederActor localhost 9999`
  * and then run the example
  *    `$ bin/run-example org.apache.spark.examples.streaming.ActorWordCount localhost 9999`
object ActorWordCount {
   def main(args : Array[String]) {
     if (args.length < 2 ) {
         "Usage: ActorWordCount <hostname> <port>" )
       System.exit( 1 )
     val Seq(host, port) = args.toSeq
     val sparkConf = new SparkConf().setAppName( "ActorWordCount" )
     // Create the context and set the batch size
     val ssc = new StreamingContext(sparkConf, Seconds( 2 ))
      * Following is the use of AkkaUtils.createStream to plug in custom actor as receiver
      * An important point to note:
      * Since Actor may exist outside the spark framework, It is thus user's responsibility
      * to ensure the type safety, i.e type of data received and InputDStream
      * should be same.
      * For example: Both AkkaUtils.createStream and SampleActorReceiver are parameterized
      * to same type to ensure type safety.
     val lines = AkkaUtils.createStream[String](
         "akka.tcp://test@%s:%s/user/FeederActor" .format(host, port.toInt)),
       "SampleReceiver" )
     // compute wordcount
     lines.flatMap( _ .split( "\\s+" )).map(x = > (x, 1 )).reduceByKey( _ + _ ).print()
本文翻译自:《Spark Streaming Custom Receivers》:http://spark.apache.org/docs/latest/streaming-custom-receivers.html


这篇关于自定义Spark Streaming接收器(Receivers)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!




《SpringBoot自定义注解如何解决公共字段填充问题》本文介绍了在系统开发中,如何使用AOP切面编程实现公共字段自动填充的功能,从而简化代码,通过自定义注解和切面类,可以统一处理创建时间和修改时间... 目录1.1 问题分析1.2 实现思路1.3 代码开发1.3.1 步骤一1.3.2 步骤二1.3.3

dubbo3 filter(过滤器)如何自定义过滤器

《dubbo3filter(过滤器)如何自定义过滤器》dubbo3filter(过滤器)类似于javaweb中的filter和springmvc中的intercaptor,用于在请求发送前或到达前进... 目录dubbo3 filter(过滤器)简介dubbo 过滤器运行时机自定义 filter第一种 @A




《SpringBoot操作spark处理hdfs文件的操作方法》本文介绍了如何使用SpringBoot操作Spark处理HDFS文件,包括导入依赖、配置Spark信息、编写Controller和Ser... 目录SpringBoot操作spark处理hdfs文件1、导入依赖2、配置spark信息3、cont

SpringBoot 自定义消息转换器使用详解

《SpringBoot自定义消息转换器使用详解》本文详细介绍了SpringBoot消息转换器的知识,并通过案例操作演示了如何进行自定义消息转换器的定制开发和使用,感兴趣的朋友一起看看吧... 目录一、前言二、SpringBoot 内容协商介绍2.1 什么是内容协商2.2 内容协商机制深入理解2.2.1 内容

【前端学习】AntV G6-08 深入图形与图形分组、自定义节点、节点动画(下)

【课程链接】 AntV G6:深入图形与图形分组、自定义节点、节点动画(下)_哔哩哔哩_bilibili 本章十吾老师讲解了一个复杂的自定义节点中,应该怎样去计算和绘制图形,如何给一个图形制作不间断的动画,以及在鼠标事件之后产生动画。(有点难,需要好好理解) <!DOCTYPE html><html><head><meta charset="UTF-8"><title>06


目录 一. 结构体的内存对齐 1.1 为什么存在内存对齐? 1.2 修改默认对齐数 二. 结构体传参 三. 结构体实现位段 一. 结构体的内存对齐 在前面的文章里我们已经讲过一部分的内存对齐的知识,并举出了两个例子,我们再举出两个例子继续说明: struct S3{double a;int b;char c;};int mian(){printf("%zd\n",s

Spring 源码解读:自定义实现Bean定义的注册与解析

引言 在Spring框架中,Bean的注册与解析是整个依赖注入流程的核心步骤。通过Bean定义,Spring容器知道如何创建、配置和管理每个Bean实例。本篇文章将通过实现一个简化版的Bean定义注册与解析机制,帮助你理解Spring框架背后的设计逻辑。我们还将对比Spring中的BeanDefinition和BeanDefinitionRegistry,以全面掌握Bean注册和解析的核心原理。

Oracle type (自定义类型的使用)

oracle - type   type定义: oracle中自定义数据类型 oracle中有基本的数据类型,如number,varchar2,date,numeric,float....但有时候我们需要特殊的格式, 如将name定义为(firstname,lastname)的形式,我们想把这个作为一个表的一列看待,这时候就要我们自己定义一个数据类型 格式 :create or repla

Spark MLlib模型训练—聚类算法 PIC(Power Iteration Clustering)

Spark MLlib模型训练—聚类算法 PIC(Power Iteration Clustering) Power Iteration Clustering (PIC) 是一种基于图的聚类算法,用于在大规模数据集上进行高效的社区检测。PIC 算法的核心思想是通过迭代图的幂运算来发现数据中的潜在簇。该算法适用于处理大规模图数据,特别是在社交网络分析、推荐系统和生物信息学等领域具有广泛应用。Spa