本文主要是介绍14.1 Storm简介,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
http://storm.apache.org/releases/0.9.6/Guaranteeing-message-processing.html
Storm是非常优秀的开源的分布式实时流式计算系统,满足了很多实时流式计算的需求
纯的实时流式计算框架,一条一条来处理,所有延时可以做到很低,ms毫秒级别
Storm架构:
Nimbus 分配task任务,分配资源,主节点
zookeeper:存元信息,Nimbus通过ZK来得知手下有几个Supervisor从节点可以调用
Supervisor 从节点,启动和销毁Worker进程,并把自己的信息写入ZK
Worker 进程,端口号,内存,包含多少线程,启动和销毁Executor,同时把自己的信息落地到磁盘是为了告诉Supervisor,同时也会把自己的信息写到Zookeeper里面
Executor 线程:通过Executor来真正执行Spout/Bolt Task任务里面的方法
编程模型:
DAG 有效无环图(Topology应用程序)
Spout 源数据,数据的源头,storm来说都是主动去外部数据源去取数据,storm这边不会做任何存储
Bolt 处理的单元
Tuple 是传递消息的单元
streaming 流 连接起来spout,Bolt和下游更多的Bolt
24*7不间断执行,只有咱们主动去kill的时候才会停止;相反的我们要在数据进来之前,我们的storm就应该已经运行起来了
去构建Topology我们需要TopologyBuilder:
builder.setSpout(spout名称,你要写的逻辑,并行度(线程数))
上游和下游通过比如shuffleGrouping分组策略连接到一起
bulider.setBolt(bolt名称,你要处理的逻辑,并行度).shuffleGrouping("spout名称")
Config conf = new Config();
conf.setDebug(true);就是把日志级别变成DEBUG
conf.setMaxSpoutPending(80); 意味着下游处理的数据最多只能有80个tuple tree
conf.setMessageTimeoutSecs(30); 意味着下游处理的tuple tree从头到尾以及衍生出来的所以tuple都得在30秒之内处理完成
conf.setNumAckers(); 这个是有优化的点了,acker数量越多,确认的速度就越快,默认是一个worker进来里面一个acker
conf.setNumWorkers(); 设置此应用在集群里面使用的进程的数量
至多一次(IRichSpout + IRichBolt):
1, 如果你把acker的数量直接设置为0,就意味着你想要直接使用至多一次!如果这样设置也就不会去审视代码!
2, SpoutOutputCollector.emit()方法里面不去写第二个msgId
3, 在Bolt处理单元里面发射数据的时候Outputcollector.emit()时候不去绑定,也就是第一个参数tuple不去加
IRichSpout <-->BaseRichSpout
IRichSpout同时实现了ISpout和Icomponent
open() 在这个方法里面,可以获得SpoutOutputCollentor
close()会在咱们正常比如通过页面kill掉Topology的时候来调用,如果直接杀Worker进程那就不会被调用了
active()如果暂停后点击了active()方法才会被调用
deactive()是在页面点击deactive按钮的时候,或者使用./bin/storm deactive来暂停程序
nextTuple()这个方法可以认为是死循环,不断在被线程调用task里面这个方法;在这个里面就可以通过collector.emit()往下游发射数据
collector.emit(new Values());Values其实就是实现Java.util.List
对应IComponent里面有两个方法:
declareOutputFields() 在方法里面我们还需要去定义new Fields();
getComponentConfiguration() 在这个方法里面可以拿到__system
IRichBolt <-->BaseRichBolt
IRichBolt同时实现了IBolt和Icomponent
prepare() 在这个方法里面呢,可以获得OutputCollector
cleanup() 会在咱们正常比如通过页面kill掉Topology的时候来调用,如果咱们直接杀Worker进程那就不会调用了
execute(Tuple input) 数据驱动的,也就是上游发来一条数据,这个方法才会被调用一次
至少一次(IRichSpout + IRichBolt):
1, 我们需要在IRichSpout里面去通过SpoutOutputCollector.emit()给加上MsgID
2, 我们要在IRichBolt里面去做anchoring,Outputcollector.emit()。说白了就是绑定衍生出来的tuple到Tuple Tree
3, 我们需要在IRichBolt里面去做ack或者fail方法
IRichSpout + IBasicBolt
我们只需要在IRichSpout里面去通过SpoutOutputCollector.emit()给加上MsgID
如果ack什么都不用做,如果是显示的去fail,throw new FailedException();
如果是Fail了,它会通过__acker去回调到Spout里面的fail方法,我们可以通过fail方法里面的
Object msgId来再次通过SpoutOutputCollector.emit()往下游去发射数据!
如果Ack了,它会通过__acker去回调到Spout里面的ack方法,这样我们也可以通过msgId来记录哪些消息处理成功了
数据传输:
Zmq
Zmq也是开源的消息传递的框架,虽然叫mq,但它并不是一个message queue,而是一个封装的比较 好的
Netty
netty是NIO的网络框架,效率比较高。之所以有netty是storm在apache之后呢,zmq的license和 storm的license不兼容的,bolt处理完消息后会告诉Spout
高可用性:
异常处理
消息可靠性保证机制
可维护性:
Storm有个UI可以看跑在上面的程序监控
Storm实时低延迟,主要有两个原因:
storm进程是常驻内存的,不像hadoop里面是不断的启停的,就没有不断启停的开销
第二点,Storm的数据是不经过磁盘的,都是在内存里面,处理完就没有了,处理完就没有了, 数据的交换经过网络,这样就避免磁盘IO的开销,所以Storm可以很低的延迟。
如何控制Spark计算错误,重试的次数?
通过配置文件可以来配置,
如何控制Storm计算错误,重试的次数?
其实重试次数是咱们在fail方法里面自己控制的!
这篇关于14.1 Storm简介的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!