Storm浅析

2024-09-06 05:48
文章标签 浅析 storm

本文主要是介绍Storm浅析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

本文分为几个模块:

1:Storm的原理和基本架构

2:Storm的应用场景及实例

3:Storm与Spark的比较

下面开始介绍,参考资料会列在文章末尾。

1:Storm的原理和基本架构

(1)原理及核心概念

分布式的实时计算系统,能够可信任的处理大量的流式数据,就好比Hadoop对于批量数据进行的处理一样;通常来说,Hadoop能够进行大批量数据的离线处理,但是在实时计算上的表现实在是不尽如人意;而Storm就可以担当这部分的作用。

说一下Storm一些核心概念:


网上盗图,接下来参照这张图对于Storm的一些核心概念加以介绍:

  1. Input data source:数据的输入源,就是需要加以处理的数据来源,数据的来源很多,可以自行设定非常简单的数据来源,也可以采集来自kafka,JDBC等的数据;上图是官网截取的,基本上覆盖了大量的数据来源,很方便的实现流式数据的快速处理
  2. Spout:从这里,才真正算是进入了Storm的核心处理逻辑,从数据源接收过来的数据,在spout中,转化为Tuple,才能参与到后续的处理逻辑中,用户可以自行设定Spout
  3. Tuple:这是Storm里的核心概念,是其中的主要数据结构,是有序元素的列表,默认情况下,其支持所有的数据类型,由Spout传输给Bolt
  4. Tuple Stream:简单说,就是批量的Tuple。
  5. Bolt:真正的逻辑处理单元,内部拥有execute方法,用户可以继承并且加以实现,实现自己的处理逻辑,bolt可以一个也可以多个,可以进行链接处理。

(2)Storm的基本架构:


从网上找到的一张图,参照于Hadoop的架构,来理解其核心组件:

  1. Nimbus:如上图,就好比Hadoop中的JobTracker,是集群中的主节点,负责分发用户代码,把需要处理的任务指派给具体的Supervisor,再由其上的Worker进行实际的处理。
  2. Supervisor:集群中的从节点,负责管理机器上运行的Worker进程,这里,需要注意,worker是一个进程,其内部还可以启动多个线程来进行任务的处理;通常,我们再指定的时候,会在此处通过指定端口号,来指定机器上到底启动多少个worker。
  3. Zookeeper:基本只要牵涉到集群,都需要用到zookeeper,这也符合其作为动物园管理员的职责,通过zookeeper,nimbus会感知到Supervisor的下线和上线,会合理分配资源,完成Topology的处理
  4. Topology:这就好比我们平时提交的一个Application,只是换了一个名称而已。

(3)StreamGroupings

这里,单独把分发策略拿出来,其决定了上个Bolt处理完的数据,到底以何种策略下发给下一个Bolt,目前,在Storm里有8中分发策略:

  1. Shuffle grouping: Tuples are randomly distributed across the bolt's tasks in a way such that each bolt is guaranteed to get an equal number of tuples;就是完全随机发送,保证接下来的每个bolt尽可能处理等量的数据,这与MapReduce中的shuffle不同,那个shuffle是根据Hash算法来决定的,根据业务的处理,通常不能实现等量的数据,而这里强调的是随机发送,尽可能实现等量效果。
  2. Fields grouping: The stream is partitioned by the fields specified in the grouping. For example, if the stream is grouped by the "user-id" field, tuples with the same "user-id" will always go to the same task, but tuples with different "user-id"'s may go to different tasks.字面上来说,就是指定区域划分的方式,拥有相同字段值的数据,将会发送到同一个业务逻辑进行处理,这里称为Task。
  3. Partial Key grouping: The stream is partitioned by the fields specified in the grouping, like the Fields grouping, but are load balanced between two downstream bolts, which provides better utilization of resources when the incoming data is skewed. This paper provides a good explanation of how it works and the advantages it provides.这意思是说,下面有好几个Task来进行均匀接盘?
  4. All grouping: The stream is replicated across all the bolt's tasks. Use this grouping with care.
  5. Global grouping: The entire stream goes to a single one of the bolt's tasks. Specifically, it goes to the task with the lowest id.所有的流水都指向一个Bolt的同一个Task。
  6. None grouping: This grouping specifies that you don't care how the stream is grouped. Currently, none groupings are equivalent to shuffle groupings. Eventually though, Storm will push down bolts with none groupings to execute in the same thread as the bolt or spout they subscribe from (when possible).
  7. Direct grouping: This is a special kind of grouping. A stream grouped this way means that the producer of the tuple decides which task of the consumer will receive this tuple. Direct groupings can only be declared on streams that have been declared as direct streams. Tuples emitted to a direct stream must be emitted using one of the [emitDirect](javadocs/org/apache/storm/task/OutputCollector.html#emitDirect(int, int, java.util.List) methods. A bolt can get the task ids of its consumers by either using the provided TopologyContext or by keeping track of the output of the emit method in OutputCollector (which returns the task ids that the tuple was sent to).自行指定接盘的Bolt。
  8. Local or shuffle grouping: If the target bolt has one or more tasks in the same worker process, tuples will be shuffled to just those in-process tasks. Otherwise, this acts like a normal shuffle grouping.

(4)动态组件概念


这里,有一些动态组件的概念,不太好理解,先贴在这里。

具体的执行逻辑,是运行在Supervisor上的,每一个Supervisor就是一个真正的节点,可以是一台物理机,也可能是一台虚拟机;而实际的worker,则是有一个端口的进程,而在节点上可以运行多个进程,如此来实现并行;通常来说,worker就是一个JVM进程,而我们知道,在JVM内部,还可以开多个线程进行并行,这两重下来,并行度大大提高了。

上图中描述了Task,我们在代码开发中,定义了自己的Spout/Bolt逻辑,而具体执行的时候,则是由线程来进行执行,这时候,每个线程就可以称为Task,而一个Bolt,可能对应多个线程,也就是说,可能有多个线程同时在执行Bolt的逻辑;代码中,我们可以自行指定一个Bolt之行时候需要的Task个数。

找到了另一张图,贴在这里:


2:Storm的应用场景及实例

应用场景:

Storm has many use cases: realtime analytics, online machine learning, continuous computation, distributed RPC, ETL, and more. Storm is fast: a benchmark clocked it at over a million tuples processed per second per node. It is scalable, fault-tolerant, guarantees your data will be processed, and is easy to set up and operate.;翻译过来就是,可以用于实时数据分析、线上机器学习、持续计算、分布式RPC、数据的ETL操作,能够达到每个节点,每秒处理百万元祖的能力,安装容易,操作简单,容错性强,可信赖。

对于大数据来说,WordCount程序,就相当于我们学习开发语言过程中,务必要掌握的hello world一样,虽然简单,但是麻雀虽小,五脏俱全,能够体现出整理的执行逻辑。

先创建MySpout类,在Spout类中,最重要的时就会nextTuple方法,Storm框架默认会一直调用这个方法,源源不断产生Tuple,输送给后续处理的Bolt。

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;import java.util.Map;/*** Created by yangyuzhao on 2017/4/27.*/
public class MySpout extends BaseRichSpout {SpoutOutputCollector collector;//初始化方法public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {this.collector = collector;}//storm 框架在 while(true) 调用nextTuple方法public void nextTuple() {collector.emit(new Values("i am lilei love hanmeimei"));}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("love"));}
}

接下来创建MySplitBolt类,作为第一个处理的逻辑,用于处理单词的切分

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.Map;/*** Created by yangyuzhao on 2017/4/27.*/
public class MySplitBolt extends BaseRichBolt {OutputCollector collector;//初始化方法public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.collector = collector;}// 被storm框架 while(true) 循环调用  传入参数tuplepublic void execute(Tuple input) {String line = input.getString(0);String[] arrWords = line.split(" ");for (String word:arrWords){collector.emit(new Values(word,1));}}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word","num"));}
}

接下来创建MyCountBolt类,作为第一个处理的逻辑,用于处理单词计数:

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;import java.util.HashMap;
import java.util.Map;/*** Created by yangyuzhao on 2017/4/27.*/
public class MyCountBolt extends BaseRichBolt {OutputCollector collector;Map<String, Integer> map = new HashMap<String, Integer>();public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.collector = collector;}public void execute(Tuple input) {String word = input.getString(0);Integer num = input.getInteger(1);System.out.println(Thread.currentThread().getId() + "    word:"+word);if (map.containsKey(word)){Integer count = map.get(word);map.put(word,count + num);}else {map.put(word,num);}
//        System.out.println("count:"+map);}public void declareOutputFields(OutputFieldsDeclarer declarer) {//不输出}
}

接下来,是真正可以运行起来的类,其规定了整体的运行逻辑,以及上述定义的Spout和Bolt之间的关系界定,并且定义了Bolt和Spout运行的task数量等信息。

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;/*** Created by yangyuzhao on 2017/4/27.*/
public class WordCountTopologMain {public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {//1:准备一个TopologyBuilderTopologyBuilder topologyBuilder = new TopologyBuilder();//2:这里,定义了Spout的使用,并行度为2,也就是说有两个task用于处理输入;前面的名称指定了Spout的专属ID,用于后续处理能够指定topologyBuilder.setSpout("mySpout",new MySpout(),2);//3:这里,定义了接下来的SplitBolt,定义了承接关系//其承接来自于Spout的输入,并且定义了并行度为2,而且定义了StreamingGroup,这样可以完全随机地接收来自于前面的spout的数据//并且交由两个Task来进行处理。topologyBuilder.setBolt("mybolt1",new MySplitBolt(),2).shuffleGrouping("mySpout");//4:这里,定义接下来的承接关系,用MyCountBolt来承接mybole1发送过来的Tuple,topologyBuilder.setBolt("mybolt2",new MyCountBolt(),4).fieldsGrouping("mybolt1", new Fields("word"));
//        topologyBuilder.setBolt("mybolt2",new MyCountBolt(),4).shuffleGrouping("mybolt1");//  config.setNumWorkers(2);//2、创建一个configuration,用来指定当前topology 需要的worker的数量Config config =  new Config();config.setNumWorkers(2);//3、提交任务  -----两种模式 本地模式和集群模式
//        StormSubmitter.submitTopology("mywordcount",config,topologyBuilder.createTopology());LocalCluster localCluster = new LocalCluster();localCluster.submitTopology("mywordcount",config,topologyBuilder.createTopology());}
}

5:Storm的常用命令

关于web-gui页面,可以通过安装部署了Storm的nimbus机器的IP加端口号8080来启动和查询,可以在页面上实现Topology的各种操作。

而在命令行方面,有如下一些操作:

  1. bin/storm jar examples/storm-starter/storm-starter-topologies-0.9.6.jar storm.starter.WordCountTopology wordcount;上述参数,第一个指定的是索要运行的jar包的位置,任务启动后会把该jar包发送到多个supervisor机器上;第二个指定的是要运行的主类,该类中指定了Topology运行过程的主要逻辑;wordcount,指定了本次提交的Topology的名称,可以在web页面看到,并且对任务进行一些操作。
  2. 杀死任务命令:storm wordcount -w 10;指定在10秒后杀掉指定的topology
  3. 停用任务命令:storm 的deactiviate wordcount

6:Storm的消息容错机制

  • 如果Spout想要实现可靠的处理机制,则需要记录其发射出来的tuple,当下游的bolt处理失败,或者后续的bolt处理失败,spout就能够重新发射;所以,可靠机制源于Spout,其必须要能够知道tuple是否最终处理成功了
  • Storm通过调用Spout的nextTuple()方法来发送一个tuple,如果想要实现可靠的消息处理,首先要给每个发出的tuple带上唯一的ID,代码中用collector发送信息的时候,可以指定一个id,可以用UUID生成,也可以使用MD5。
  • 这里,说一下Storm内部存在的Ack机制,在其系统中,存在一个特殊的任务,叫做acker,负责跟踪每个消息的处理轨迹,

7:Spark与Storm的区别

其实,这里更应该说是Spark-Streaming与storm的区别,因为spark目前也在朝着打造一个生态圈的目标而努力,拥有spark-sql,能够实现类似Hive的数据仓库管理;而Saprk-Streaming,则是用来进行实时处理,类似于Storm的功能;二者实现的功能相似,但实际上还是有些区别的。

对比点

Storm

Spark Streaming

实时计算模型

纯实时,来一条数据,处理一条数据

准实时,对一个时间段内的数据收集起来,作为一个RDD,再处理

实时计算延迟度

毫秒级

秒级

吞吐量

事务机制

支持完善

支持,但不够完善

健壮性 / 容错性

ZooKeeper,Acker,非常强

Checkpoint,WAL,一般

动态调整并行度

支持

不支持

  • 实时性来说,Storm的实时性更强,基本上就是来一条数据,就处理一条数据;在编写Spark代码的时候,会发现,其本身就是收集一段时间的数据来进行统一处理,虽然可以尽可能缩小这个时间,但如果数据瞬间涌入过多的话,其性能相比于Storm还是有些不足的。
  • 健壮性来说,Storm的实现中使用了zookeeper来实现,而且还有Ack机制,对于数据是否处理成功能够感知到而Spark则是采取了业界常用的WAL,即预写日志和CheckPoint机制,相比之下,健壮性要差一些
  • 并行度的适时调整:对于一个公司来说,业务肯定会存在高峰期和低谷期,所以storm能够动态调整实时计算程序的并行度,能够最大限度利用集群资源,这点也很棒;而Spark是实现不了的。
  • 但是,Spark最好的一点在于,其吞吐量比较大,而且Spark-Streaming位于Spark生态圈中,如果想要加入许多的附加功能,可以用Spark自己的组件就能够实现无缝对接,这一点是Storm无法相比的,因为Storm就是专门用于做实时处理的,其他功能的实现,肯定性能要差一些。

参考资料如下:

  1. https://www.w3cschool.cn/apache_storm/apache_storm_core_concepts.html

  2. http://blog.csdn.net/jiweiwong/article/details/50837700

  3. https://www.cnblogs.com/xinfang520/p/7852015.html

  4. https://www.cnblogs.com/yaohaitao/p/5703288.html

  5. https://www.cnblogs.com/mrchige/p/5907863.html

  6. http://blog.csdn.net/kuring_k/article/details/51872112

这篇关于Storm浅析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1141177

相关文章

浅析Spring Security认证过程

类图 为了方便理解Spring Security认证流程,特意画了如下的类图,包含相关的核心认证类 概述 核心验证器 AuthenticationManager 该对象提供了认证方法的入口,接收一个Authentiaton对象作为参数; public interface AuthenticationManager {Authentication authenticate(Authenti

(入门篇)JavaScript 网页设计案例浅析-简单的交互式图片轮播

网页设计已经成为了每个前端开发者的必备技能,而 JavaScript 作为前端三大基础之一,更是为网页赋予了互动性和动态效果。本篇文章将通过一个简单的 JavaScript 案例,带你了解网页设计中的一些常见技巧和技术原理。今天就说一说一个常见的图片轮播效果。相信大家在各类电商网站、个人博客或者展示页面中,都看到过这种轮播图。它的核心功能是展示多张图片,并且用户可以通过点击按钮,左右切换图片。

风暴项目个性化推荐系统浅析

风暴项目的主要任务是搭建自媒体平台,作为主开发人员的我希望把工作重心放在个性化推荐系统上。 目前风暴项目的个性化推荐是基于用户行为信息记录实现的,也就是说对于每条资讯,数据库中有字段标明其类型。建立一张用户浏览表,对用户的浏览行为进行记录,从中可以获取当前用户对哪类资讯感兴趣。 若用户第一次登陆,则按默认规则选取热点资讯做推荐,及所有资讯按浏览量降序排序,取前4个。另外,我考虑到后期可能有商业

中国书法——孙溟㠭浅析碑帖《越州石氏帖》

孙溟㠭浅析碑帖《越州石氏帖》 《越州石氏帖》  是一部汇集多本摹刻的帖,南宋时期的会稽石邦哲(字熙明)把家藏的一些法书碑帖集中一起摹刻成的,宋理宗时临安书商陈思《宝刻丛编》有记載这部帖的目录。现在还存有宋代时拓的残缺本,大多是相传的晋朝唐朝的小楷,后人多有临摹学习,并以此版本重新摹刻。 (图片来源于网络) 图文/氿波整理

浅析网页不安装插件播放RTSP/FLV视频的方法

早期很多摄像头视频流使用的是RTSP、RTMP协议,播放这类协议的视频通常是在网页上安装插件。但现在越来越多的用户,对于网页安装插件比较反感,且随着移动设备的普及,用户更多的希望使用手机、平板等移动设备,直接可以查看这些协议的视频。那是否有什么方案可以直接网页打开RTSP、RTMP协议的视频,直接观看不用安装插件呢?而且对于摄像头的数据,尽可能低延迟的获取实时画面。  其实很多摄像头厂家也注意到

浅析c/c++中 struct的区别

(1)C的struct与C++的class的区别。 (2)C++中的struct和class的区别。 在第一种情况下,struct与class有着非常明显的区别。C是一种过程化的语言,struct只是作为一种复杂数据类型定义,struct中只能定义成员变量,不能定义成员函数(在纯粹的C语言中,struct不能定义成员函数,只能定义变量)。例如下面的C代码片断: 复制代码代码如下:

Flink Exactly-Once 投递实现浅析

本文作者:Paul Lin 文章来源:https://www.whitewood.me 随着近来越来越多的业务迁移到 Flink 上,对 Flink 作业的准确性要求也随之进一步提高,其中最为关键的是如何在不同业务场景下保证 exactly-once 的投递语义。虽然不少实时系统(e.g. 实时计算/消息队列)都宣称支持 exactly-once,exactly-once 投递似乎是一个已被解

烟道灰酸洗废水稀有金属铼回收工艺浅析

铼是一种重要的稀有金属,因其独特的物理和化学性质,在航空航天、电子工业、石油化工等领域有着广泛的应用。由于铼的稀有性和重要性,从烟道灰中回收铼的技术和方法成为了研究的热点。以下是几种主要的烟道灰回收铼技术: ●    化学溶解法:通过选择合适的化学溶剂,如硝酸、硫酸等强酸,以及过氧化氢等氧化剂,将含铼废弃物中的铼溶解出来。 ●    溶剂萃取法:利用有机溶剂从含铼废水中萃取铼,通过选择合适的萃取剂

2024年高教社杯数学建模国赛赛题浅析——助攻快速选题

一图流——一张图读懂国赛 总体概述: A题偏几何与运动学模型,适合有几何与物理背景的队伍,数据处理复杂性中等。 B题侧重统计和优化,适合有运筹学和经济学背景的队伍,数据处理较为直接但涉及多步骤的决策优化。 C题属于优化类问题,涉及复杂的多变量优化与不确定性分析,数据处理难度大。 D题涉及概率和优化,特别是几何概率模型的推导,理论难度较高。 E题数据量较大,重点在于大规模交通数据的分

2024 年全国大学生数学建模竞赛(国赛)浅析

需要完整资料,请关注WX:“小何数模”! (需要完整B、C和E题资料请关注WX:“小何数模”,获取资料链接!) 本次万众瞩目的全国大学生数学建模赛题已正式出炉,无论是赛题难度还是认可度,该比赛都是数模届的独一档,含金量极高,可以用于保研加分、简历添彩等各方面。考虑到大家解题实属不易,为了帮助大家取得好成绩,在国赛建模中夺得国奖,下面学长就赛题给出个人浅析,供大家参考! 首先针对本科生可选的