本文主要是介绍14.6 Storm事物,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
Storm
事务
只有用事务的概念才能完成Exactly-once
要保证事务,就要强有序!
事务如果对应这批数据如果出错了,必须要重新计算正确才能去下面的事务
,所以就得保证事务ID所对应的数据得是一样的!就得保证再次取得时候还能从数据源取到数据!
Storm是单条处理,那这样的话
Design 1
为了去做到事务的代价:
每条Tuple对应一个Txid事务ID,为了去确保正确性,数据库存更多信息
数据库存 Count=2 Txid=1
如果新过来一条数据的Txid大于已经存在数据库里的Txid的话,就更新数据库!
如果新过来的一条数据的Txid等于已经存在数据库里面的Txid的话,就Skip跳过!
如果新过来的一条数据的Txid小于已经存在的数据库里面的Txid的话,不可能发生的事情!
Why 什么不可能?
因为强有序这个机制在Spout端Storm就会来控制!要保证强有序只能在一个节点才能保证!
怎么控制的?
上一条没有回调ack()方法,就不会发射下一条!
问题:
1,每次为了保证顺序,那我Storm集群里面每次只能处理一条,吞吐量太低!
2,每次一个事务对应一个消息都要来存一次数据库!数据库调用太多!
Design 2
为了去解决Design 1的问题,就要变为微批!
IBatchBolt
问题:
一个批次虽然在一层Bolt里面并行起来计算了,但是其他层的Bolt还是会处于空闲状态!
Design 3
Storm最终方案
解决上面的空闲问题,为了去解决空闲,把之前的事务分为了两部分,
1,计算阶段:可以多个批次同时来算
2,提交阶段:必须最后存数据库的要保证强顺序
前面这个设计不管是哪种都有一个前提,就是要每个Txid对应的都是同一批次的数据!!!
比较合适的数据源,就是Kafka,因为数据如果计算错误,还是可以再次取到的!
TransactionalTopologyBuilder
事务源实现的ITransactionalSpout<MyMeta>
MyMeta可以认为是你定义一个事务的它的格式!
ITransactionalSpout.Coordinator<X>
协调器,单线程,由它来控制什么时候去发射下一批次事务数据!
定义好元信息,MetaData,就是从什么位置拿数据,拿多少条数据!
ITransactionalSpout.Emitter<X>
来做真正的读取数据,以及发射数据!
BaseTranscationalBolt其实它就是BaseBatchBolt微批处理单元
BaseTranscationalBolt等同于IBatchBolt<TransactionAttempt>
如果BaseTranscationalBolt实现了ICommitter,那通常就是最后保证去存数据库的阶段,也就是提交阶段,就通过实现这个接口来保证提交的强有序!
对于不需要事物的时候,我们用的kafkaSpout(KafkaCOnfig)
分区事物TransactionalPartitonKafkaSpout
分区事务:
为了就是应对Kafka这种分布式队列
说白了就是数据源有多个Partition分区,那样的话就不能像之前一样所有的emitter都从同一个位置来拿数据
所谓的分区事务,其实就是后面每个Emitter对应一个独立的partition
不透明事务:
为了就是使得程序不会因为数据源取不到数据而卡住!
上面的普通事物还是分区事物都是默认条件都是Txid不变的话对应的永远是同一批数据!
这样的话,就有可能因为同一批数据因为数据源挂了的问题想从新取的时候取不到!
那样的话不透明事物相当于改变上面的规则,在数据源挂了从新取不到数据的话,同一个Txid就对应一批新的数据。
代价:
就是要存Previous Result,为了可以恢复上一次的结果!
https://github.com/nathanmarz/storm-contrib/blob/kafka0.7/storm-kafka/src/jvm/storm/kafka/TestTopology.java
new OpaqueTransactionalKafkaSpout(new KafkaConfig());
Trident:
是storm里面高级API,默认情况下Trident都是微匹!说白了就是从TridentSpout里面出来的都是一批一批的!
TridentTopology topology = new TridentTopology();
建立一个实时计算的一个Topology
TridentState state = topology.newStream("spoutID",TridentKafkaSpout(),new Fields("sentence"))
对每条数据进行操作
.each(new Fields("sentence"), new Split(), new Fields("word"))
按照字段分组
.groupBy(new Fields("word"))
聚合顺便存储
.persistAggregate(new MemoryMapState(), new Count(), new Fields("count"))
MemoryMapState()
RedisState()
因为我们Trident它也支持事务,所以
RedisState.nonTranscational()
RedisState.transcational()
RedisState.opaque()
其它数据源
MemcachedState.opaque(serverLocations
要做到这个事务,有两头要配合,你Spout支持事务,如果你的State不支持事务,也没用!
http://storm.apache.org/releases/0.9.6/Trident-state.html
不透明事务
OpaqueTridentKafkaSpout()
RedisState.opaque()
普通事务
TransactionalTridentKafkaSpout()
RedisState.transcational()
普通事务
TransactionalTridentKafkaSpout()
RedisState.opaque()
建议一个实时查询
topology.newDRPCStream("queryName", new DRPCSpount())
// 对接进来的参数进行处理
.each(new Fields("args"), new Split(), new Fields("word"))
// 查询
.stateQuery(state, new MapGet(), new Fields("count"))
// 过滤查询出来为null值
.each(new FilterNull())
.each(new Sum())
Split extends BaseFunction
重写execute()方法!
FilterNull extends BaseFilter
重写的 boolean isKeep()
Sum extends CombineAggregator
有了上面的实时计算,还有实时查询,就做到了变算变查,就可以
通过
DRPCClient client = new DRPCClient("node1",3772);
client.execute("queryName","args");
这篇关于14.6 Storm事物的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!