Spark Streaming流式计算的WordCount入门

2024-05-15 03:38

本文主要是介绍Spark Streaming流式计算的WordCount入门,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

[size=medium]
Spark Streaming是一种近实时的流式计算模型,它将作业分解成一批一批的短小的批处理任务,然后并行计算,具有可扩展,高容错,高吞吐,实时性高等一系列优点,在某些场景可达到与Storm一样的处理程度或优于storm,也可以无缝集成多重日志收集工具或队列中转器,比如常见的 kakfa,flume,redis,logstash等,计算完后的数据结果,也可以
存储到各种存储系统中,如HDFS,数据库等,一张简单的数据流图如下:
[/size]
[img]http://dl2.iteye.com/upload/attachment/0114/8695/cd52fa34-b057-3dff-b1ea-80d12e458531.png[/img]
[size=medium]
内部处理流程:
[/size]

[img]http://dl2.iteye.com/upload/attachment/0114/8697/db341088-8cc6-351b-b924-2b94ffc0deff.png[/img]


[img]http://dl2.iteye.com/upload/attachment/0114/8699/9d34f9e6-03c2-3ce0-b6e9-7893ff7de0ae.png[/img]

[size=medium]
下面来看一个wordcount级别的入门例子,注意需要导入相关的包:
[/size]
//下面不需要使用的依赖,大家可根据情况去舍name := "scala-spark"version := "1.0"scalaVersion := "2.11.7"//使用公司的私服resolvers += "Local Maven Repository" at "http://dev.bizbook-inc.com:8083/nexus/content/groups/public/"//使用内部仓储externalResolvers := Resolver.withDefaultResolvers(resolvers.value, mavenCentral = false)//Hadoop的依赖libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.7.1" //% "provided"//Habse的依赖libraryDependencies += "org.apache.hbase" % "hbase-client" % "0.98.12-hadoop2" // % "provided"libraryDependencies += "org.apache.hbase" % "hbase-common" % "0.98.12-hadoop2"  //% "provided"libraryDependencies += "org.apache.hbase" % "hbase-server" % "0.98.12-hadoop2" //% "provided"//Spark的依赖libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "1.6.0" //% "provided"//Spark SQL 依赖libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "1.6.0" //% "provided"//Spark For Hive 依赖libraryDependencies += "org.apache.spark" % "spark-hive_2.11" % "1.6.0"//Spark for StreaminglibraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "1.6.0"//java servlet 依赖libraryDependencies += "javax.servlet" % "javax.servlet-api" % "3.0.1" //% "provided"


[/size]

package com.tools.streamingimport org.apache.spark.SparkConfimport org.apache.spark.streaming._/**  * Created by qindongliang on 2016/1/28.  */object StreamingWordCount {  def main(args: Array[String]) {    //开本地线程两个处理    val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")    //每隔10秒计算一批数据    val ssc = new StreamingContext(conf, Seconds(10))    //监控机器ip为192.168.1.187:9999端号的数据,注意必须是这个9999端号服务先启动nc -l 9999,否则会报错,但进程不会中断    val lines = ssc.socketTextStream("192.168.1.187", 9999)    //按空格切分输入数据    val words = lines.flatMap(_.split(" "))    //计算wordcount    val pairs = words.map(word => (word, 1))    //word ++    val wordCounts = pairs.reduceByKey(_ + _)    //排序结果集打印,先转成rdd,然后排序true升序,false降序,可以指定key和value排序_._1是key,_._2是value    val sortResult=wordCounts.transform(rdd=>rdd.sortBy(_._2,false))    sortResult.print()    ssc.start()             // 开启计算    ssc.awaitTermination()  // 阻塞等待计算  }}



[size=medium]

然后在对应的linux机器上,开一个nc服务,并写入一些数据:
[/size]
nc -l 9999a a a c c d d v v e p x x x x  o


[size=medium]
然后在控制台,可见计算结果,并且是排好序的:
[/size]

[img]http://dl2.iteye.com/upload/attachment/0114/8701/59164830-4741-32b2-987a-f008c571ad14.png[/img]

[size=medium]

至此,第一个体验流式计算的demo就入门了,后面我们还可以继续完善这个例子,比如从kakfa或者redis里面接受数据,然后存储到hbase,或者mysql或者solr,lucene,elasticsearch索引中,用来给前端js图表绘图所用。


参考文章:

[url]http://blog.scottlogic.com/2013/07/29/spark-stream-analysis.html[/url]
[url]http://spark.apache.org/docs/latest/streaming-programming-guide.html[/url]
[/size]
[b][color=green][size=large]
有什么问题 可以扫码关注微信公众号:我是攻城师(woshigcs),在后台留言咨询。
本公众号的内容是有关搜索和大数据技术和互联网等方面内容的分享,也是一个温馨的技术互动交流的小家园
[/size][/color][/b]
[img]http://dl2.iteye.com/upload/attachment/0104/9948/3214000f-5633-3c17-a3d7-83ebda9aebff.jpg[/img]

这篇关于Spark Streaming流式计算的WordCount入门的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/990700

相关文章

Spring Boot + MyBatis Plus 高效开发实战从入门到进阶优化(推荐)

《SpringBoot+MyBatisPlus高效开发实战从入门到进阶优化(推荐)》本文将详细介绍SpringBoot+MyBatisPlus的完整开发流程,并深入剖析分页查询、批量操作、动... 目录Spring Boot + MyBATis Plus 高效开发实战:从入门到进阶优化1. MyBatis

Python FastAPI入门安装使用

《PythonFastAPI入门安装使用》FastAPI是一个现代、快速的PythonWeb框架,用于构建API,它基于Python3.6+的类型提示特性,使得代码更加简洁且易于绶护,这篇文章主要介... 目录第一节:FastAPI入门一、FastAPI框架介绍什么是ASGI服务(WSGI)二、FastAP

Spring AI集成DeepSeek实现流式输出的操作方法

《SpringAI集成DeepSeek实现流式输出的操作方法》本文介绍了如何在SpringBoot中使用Sse(Server-SentEvents)技术实现流式输出,后端使用SpringMVC中的S... 目录一、后端代码二、前端代码三、运行项目小天有话说题外话参考资料前面一篇文章我们实现了《Spring

Java 8 Stream filter流式过滤器详解

《Java8Streamfilter流式过滤器详解》本文介绍了Java8的StreamAPI中的filter方法,展示了如何使用lambda表达式根据条件过滤流式数据,通过实际代码示例,展示了f... 目录引言 一.Java 8 Stream 的过滤器(filter)二.Java 8 的 filter、fi

Python如何计算两个不同类型列表的相似度

《Python如何计算两个不同类型列表的相似度》在编程中,经常需要比较两个列表的相似度,尤其是当这两个列表包含不同类型的元素时,下面小编就来讲讲如何使用Python计算两个不同类型列表的相似度吧... 目录摘要引言数字类型相似度欧几里得距离曼哈顿距离字符串类型相似度Levenshtein距离Jaccard相

使用C#代码计算数学表达式实例

《使用C#代码计算数学表达式实例》这段文字主要讲述了如何使用C#语言来计算数学表达式,该程序通过使用Dictionary保存变量,定义了运算符优先级,并实现了EvaluateExpression方法来... 目录C#代码计算数学表达式该方法很长,因此我将分段描述下面的代码片段显示了下一步以下代码显示该方法如

SpringBoot操作spark处理hdfs文件的操作方法

《SpringBoot操作spark处理hdfs文件的操作方法》本文介绍了如何使用SpringBoot操作Spark处理HDFS文件,包括导入依赖、配置Spark信息、编写Controller和Ser... 目录SpringBoot操作spark处理hdfs文件1、导入依赖2、配置spark信息3、cont

如何用Java结合经纬度位置计算目标点的日出日落时间详解

《如何用Java结合经纬度位置计算目标点的日出日落时间详解》这篇文章主详细讲解了如何基于目标点的经纬度计算日出日落时间,提供了在线API和Java库两种计算方法,并通过实际案例展示了其应用,需要的朋友... 目录前言一、应用示例1、天安门升旗时间2、湖南省日出日落信息二、Java日出日落计算1、在线API2

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

数论入门整理(updating)

一、gcd lcm 基础中的基础,一般用来处理计算第一步什么的,分数化简之类。 LL gcd(LL a, LL b) { return b ? gcd(b, a % b) : a; } <pre name="code" class="cpp">LL lcm(LL a, LL b){LL c = gcd(a, b);return a / c * b;} 例题: