Spark Streaming(四)—— Spark Streaming输出

2024-06-19 04:38
文章标签 输出 spark streaming

本文主要是介绍Spark Streaming(四)—— Spark Streaming输出,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

输出操作允许DStream的操作推到如数据库、文件系统等外部系统中。目前,定义了下面几种输出操作:
在这里插入图片描述
使用Spark SQL来查询Spark Streaming处理的数据:

import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.SparkSessionobject MyNetwordWordCountWithSQL {def main(args: Array[String]): Unit = {System.setProperty("hadoop.home.dir", "G:\\bin\\hadoop-2.5.2")Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)val conf = new SparkConf().setMaster("local[2]").setAppName("MyNetwordWordCountWithSQL")val ssc = new StreamingContext(conf,Seconds(5))val lines = ssc.socketTextStream("192.168.15.131",1234,StorageLevel.MEMORY_ONLY)val words = lines.flatMap(_.split(" "))//集成Spark SQL 使用SQL语句进行WordCountwords.foreachRDD( rdd => {// 以前创建spark:// val spark = SparkSession.builder().master("local").appName("SparkSQLDemo3").getOrCreate() // 现在将master和appName封装进conf// 使用单例模式,创建SparkSession对象	// 必须使用当前的StreamingContext对应的SparkContext创建一个SparkSession。此外,必须这样做的另一个原因是使得应用可以在driver程序故障时得以重新启动,这是通过创建一个可以延迟实例化的单例SparkSession来实现的。val spark = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()import spark.implicits._// 将RDD[String]转换成DataFrameval df1 = rdd.toDF("word")df1.createOrReplaceTempView("words")      spark.sql("select word , count(1) from words group by word").show()})ssc.start()ssc.awaitTermination()} 
}

问题:
在这里插入图片描述
报错:Connection对象不是一个可被序列化的对象,不能RDD的每个Worker上运行;即:Connection不能在RDD分布式环境中的每个分区上运行,因为不同的分区可能运行在不同的Worker上。所以需要在每个RDD分区上单独创建Connection对象。 应该改为:
在这里插入图片描述

这篇关于Spark Streaming(四)—— Spark Streaming输出的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1074031

相关文章

C++中实现调试日志输出

《C++中实现调试日志输出》在C++编程中,调试日志对于定位问题和优化代码至关重要,本文将介绍几种常用的调试日志输出方法,并教你如何在日志中添加时间戳,希望对大家有所帮助... 目录1. 使用 #ifdef _DEBUG 宏2. 加入时间戳:精确到毫秒3.Windows 和 MFC 中的调试日志方法MFC

Python使用Colorama库美化终端输出的操作示例

《Python使用Colorama库美化终端输出的操作示例》在开发命令行工具或调试程序时,我们可能会希望通过颜色来区分重要信息,比如警告、错误、提示等,而Colorama是一个简单易用的Python库... 目录python Colorama 库详解:终端输出美化的神器1. Colorama 是什么?2.

SpringBoot操作spark处理hdfs文件的操作方法

《SpringBoot操作spark处理hdfs文件的操作方法》本文介绍了如何使用SpringBoot操作Spark处理HDFS文件,包括导入依赖、配置Spark信息、编写Controller和Ser... 目录SpringBoot操作spark处理hdfs文件1、导入依赖2、配置spark信息3、cont

顺序表之创建,判满,插入,输出

文章目录 🍊自我介绍🍊创建一个空的顺序表,为结构体在堆区分配空间🍊插入数据🍊输出数据🍊判断顺序表是否满了,满了返回值1,否则返回0🍊main函数 你的点赞评论就是对博主最大的鼓励 当然喜欢的小伙伴可以:点赞+关注+评论+收藏(一键四连)哦~ 🍊自我介绍   Hello,大家好,我是小珑也要变强(也是小珑),我是易编程·终身成长社群的一名“创始团队·嘉宾”

AI(文生语音)-TTS 技术线路探索学习:从拼接式参数化方法到Tacotron端到端输出

AI(文生语音)-TTS 技术线路探索学习:从拼接式参数化方法到Tacotron端到端输出 在数字化时代,文本到语音(Text-to-Speech, TTS)技术已成为人机交互的关键桥梁,无论是为视障人士提供辅助阅读,还是为智能助手注入声音的灵魂,TTS 技术都扮演着至关重要的角色。从最初的拼接式方法到参数化技术,再到现今的深度学习解决方案,TTS 技术经历了一段长足的进步。这篇文章将带您穿越时

Spark MLlib模型训练—聚类算法 PIC(Power Iteration Clustering)

Spark MLlib模型训练—聚类算法 PIC(Power Iteration Clustering) Power Iteration Clustering (PIC) 是一种基于图的聚类算法,用于在大规模数据集上进行高效的社区检测。PIC 算法的核心思想是通过迭代图的幂运算来发现数据中的潜在簇。该算法适用于处理大规模图数据,特别是在社交网络分析、推荐系统和生物信息学等领域具有广泛应用。Spa

如何将一个文件里不包含某个字符的行输出到另一个文件?

第一种: grep -v 'string' filename > newfilenamegrep -v 'string' filename >> newfilename 第二种: sed -n '/string/!'p filename > newfilenamesed -n '/string/!'p filename >> newfilename

Detectorn2预训练模型复现:数据准备、训练命令、日志分析与输出目录

Detectorn2预训练模型复现:数据准备、训练命令、日志分析与输出目录 在深度学习项目中,目标检测是一项重要的任务。本文将详细介绍如何使用Detectron2进行目标检测模型的复现训练,涵盖训练数据准备、训练命令、训练日志分析、训练指标以及训练输出目录的各个文件及其作用。特别地,我们将演示在训练过程中出现中断后,如何使用 resume 功能继续训练,并将我们复现的模型与Model Zoo中的

第六章习题11.输出以下图形

🌏个人博客:尹蓝锐的博客 希望文章能够给到初学的你一些启发~ 如果觉得文章对你有帮助的话,点赞 + 关注+ 收藏支持一下笔者吧~ 1、题目要求: 输出以下图形

LibSVM学习(五)——分界线的输出

对于学习SVM人来说,要判断SVM效果,以图形的方式输出的分解线是最直观的。LibSVM自带了一个可视化的程序svm-toy,用来输出类之间的分界线。他是先把样本文件载入,然后进行训练,通过对每个像素点的坐标进行判断,看属于哪一类,就附上那类的颜色,从而使类与类之间形成分割线。我们这一节不讨论svm-toy怎么使用,因为这个是“傻瓜”式的,没什么好讨论的。这一节我们主要探讨怎么结合训练结果文件