四、事务拓扑(Transactional Topolgoy)

2024-09-05 08:58

本文主要是介绍四、事务拓扑(Transactional Topolgoy),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1、问题的提出

    怎样做到每个出错的tuple只被处理一次?这样才能统计所有发射出的tuple的数量。


2、简介

Storm 0.7.0引入了Transactional Topology, 它可以保证每个tuple”被且仅被处理一次”, 这样你就可以实现一种非常准确,非常可扩展,并且高度容错方式来实现计数类应用。跟DRPC类似, transactional topology其实不能算是storm的一个特性,它其实是用storm的底层原语spout, bolt, topology, stream等等抽象出来的一个特性。

3、三个设计

  <1>强顺序流:顺序id的tuple+数据库

        比如你想统一个stream里面tuple的总数。那么为了保证统计数字的准确性,你在数据库里面不但要保存tuple的个数, 还要保存这个数字所对应的最新的transaction id。 当你的代码要到数据库里面去更新这个数字的时候,你要判断只有当新的transaction id跟数据库里面保存的transaction id不一样的时候才去更新。考虑两种情况:

  • 数据库里面的transaction id跟当前的transaction id不一样: 由于我们transaction的强顺序性,我们知道当前的tuple肯定没有统计在数据库里面。所以我们可以安全地递增这个数字,并且更新这个transaction id.
  • 数据库里面的transaction id一样: 那么我们知道当前tuple已经统计在数据库里面了,那么可以忽略这个更新。这个tuple肯定之前在更新了数据库之后,反馈给storm的时候失败了(ack超时之类的)。
       缺点: 需要等待一个tuple完全处理成功之后才能去处理下一个tuple 。这个性能是非常差的。这个 需要大量的数据库调用 (只要每个tuple一个数据库调用), 而且这个设计也 没有利用到storm的并行计算能力 , 所以它的可扩展能力是非常差的。

  <2>强顺序batch流

    给整个batch一个transaction id,batch与batch之间的处理是强顺序性的, 而batch内部是可以并行的

    

优点: 减少数据库调用;利用了storm的并行计算能力(每个batch内部可以并行)

缺点:考虑下面这个topology


在bolt 1完成它的处理之后, 它需要等待剩下的bolt去处理当前batch, 直到发射下一个batch

  <3>storm的设计

  • processing阶段: 这个阶段很多batch可以并行计算。
  • commit阶段: 这个阶段各个batch之间需要有强顺序性的保证。所以第二个batch必须要在第一个batch成功提交之后才能提交。
这两个阶段合起来称为一个transaction。许多batch可以在processing阶段的任何时刻并行计算,但是只有一个batch可以处在commit阶段。


4、例子

源代码如下:

import java.math.BigInteger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.coordination.BatchOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.testing.MemoryTransactionalSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBatchBolt;
import backtype.storm.topology.base.BaseTransactionalBolt;
import backtype.storm.transactional.ICommitter;
import backtype.storm.transactional.TransactionAttempt;
import backtype.storm.transactional.TransactionalTopologyBuilder;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;public class TransactionalGlobalCount {public static final int PARTITION_TAKE_PER_BATCH=3;public static final Map<Integer,List<List<Object>>> DATA=new HashMap<Integer, List<List<Object>>>(){{put(0,new ArrayList<List<Object>>(){{add(new Values("cat"));add(new Values("dog"));add(new Values("chicken"));add(new Values("cat"));add(new Values("dog"));add(new Values("apple"));}});put(1,new ArrayList<List<Object>>(){{add(new Values("cat"));add(new Values("dog"));add(new Values("apple"));add(new Values("banana"));}});put(2,new ArrayList<List<Object>>(){{add(new Values("cat"));add(new Values("cat"));add(new Values("cat"));add(new Values("cat"));add(new Values("cat"));add(new Values("dog"));add(new Values("dog"));add(new Values("dog"));add(new Values("dog"));}});}};public static class Value{int count=0;BigInteger txid;}public static Map<String, Value> DATABASE=new HashMap<String, Value>();public static final String GLOBAL_COUNT_KEY="GLOBAL-COUNT";public static class BatchCount extends BaseBatchBolt{Object _id;BatchOutputCollector _collector;int _count=0;@Overridepublic void prepare(Map conf, TopologyContext context,BatchOutputCollector collector, Object id) {// TODO Auto-generated method stub_collector=collector;_id=id;}@Overridepublic void execute(Tuple tuple) {// TODO Auto-generated method stub_count++;}@Overridepublic void finishBatch() {// TODO Auto-generated method stub_collector.emit(new Values(_id,_count));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// TODO Auto-generated method stubdeclarer.declare(new Fields("id","count"));}}public static class UpdateGlobalCount extends BaseTransactionalBolt implements ICommitter {TransactionAttempt _attempt;BatchOutputCollector _collector;int _sum = 0;@Overridepublic void prepare(Map conf,TopologyContext context,BatchOutputCollector collector,TransactionAttempt attempt) {_collector = collector;_attempt = attempt;}@Overridepublic void execute(Tuple tuple) {_sum+=tuple.getInteger(1);}@Overridepublic void finishBatch() {Value val = DATABASE.get(GLOBAL_COUNT_KEY);Value newval;if(val == null ||!val.txid.equals(_attempt.getTransactionId())) {newval = new Value();newval.txid = _attempt.getTransactionId();if(val==null) {newval.count = _sum;} else {newval.count = _sum + val.count;}DATABASE.put(GLOBAL_COUNT_KEY, newval);} else {newval = val;}_collector.emit(new Values(_attempt, newval.count));System.out.println(_attempt);System.out.println(newval.count);}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("id", "sum"));}}public static void main(String[] args) throws InterruptedException {// TODO Auto-generated method stubMemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("global-count", "spout", spout, 3);builder.setBolt("partial-count", new BatchCount(), 5).shuffleGrouping("spout");builder.setBolt("sum", new UpdateGlobalCount()).globalGrouping("partial-count");LocalCluster cluster=new LocalCluster();Config config=new Config();config.setDebug(true);config.setMaxSpoutPending(3);cluster.submitTopology("global-count-topology", config, builder.buildTopology());Thread.sleep(3000);cluster.shutdown();}
}

详解如下:

<1>构建Topology

                MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("global-count", "spout", spout, 3);builder.setBolt("partial-count", new BatchCount(), 5).shuffleGrouping("spout");builder.setBolt("sum", new UpdateGlobalCount()).globalGrouping("partial-count");</span>

TransactionalTopologyBuilder接受如下的参数

  • 这个transaction topology的id
  • spout在整个topology里面的id。
  • 一个transactional spout。
  • 一个可选的这个transactional spout的并行度。

一个transaction topology里面有一个唯一的TransactionalSpout, 这个spout是通过TransactionalTopologyBuilder的构造函数来制定的。在这个例子里面,MemoryTransactionalSpout被用来从一个内存变量里面读取数据(DATA)。第二个参数制定数据的fields, 第三个参数指定每个batch的最大tuple数量。

<2>第一个bolt:BatchBolt:随机地把输入tuple分给各个task,然后各个task各自统计局部数量

	public static class BatchCount extends BaseBatchBolt{Object _id;BatchOutputCollector _collector;int _count=0;@Overridepublic void prepare(Map conf, TopologyContext context,BatchOutputCollector collector, Object id) {// TODO Auto-generated method stub_id=id;}@Overridepublic void execute(Tuple tuple) {// TODO Auto-generated method stub_count++;}@Overridepublic void finishBatch() {// TODO Auto-generated method stub_collector.emit(new Values(_id,_count));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// TODO Auto-generated method stubdeclarer.declare(new Fields("id","count"));}}

storm会为每个batch创建这个一个BatchCount对象。而这些BatchCount是运行在BatchBoltExecutor里面的。

这个对象的prepare方法接收如下参数:

  • 包含storm config信息的map。
  • TopologyContext
  • OutputCollector
  • 这个batch的id。而在Transactional Topologies里面, 这个id则是一个TransactionAttempt对象。

在transaction topology里面发射的所有的tuple都必须以TransactionAttempt作为第一个field,然后storm可以根据这个field来判断哪些tuple属于一个batch。所以你在发射tuple的时候需要满足这个条件。TransactionAttempt包含两个值: 一个transaction id,一个attempt id。transaction id的作用就是我们上面介绍的对于每个batch是唯一的,而且不管这个batch replay多少次都是一样的。 我们可以把attempt id理解成replay-times, storm利用这个id来区别一个batch发射的tuple的不同版本。transaction id对于每个batch加一, 所以第一个batch的transaction id是”1″, 第二个batch是”2″,以此类推。execute方法会为batch里面的每个tuple执行一次最后, 当这个bolt接收到某个batch的所有的tuple之后, finishBatch方法会被调用。这个例子里面的BatchCount类会在这个时候发射它的局部数量到它的输出流里面去。

<3>第二个bolt:UpdateBlobalCount, 用全局grouping来从汇总这个batch的总的数量。然后再把总的数量更新到数据库里面去。

public static class UpdateGlobalCount extends BaseTransactionalBolt implements ICommitter {TransactionAttempt _attempt;BatchOutputCollector _collector;int _sum = 0;@Overridepublic void prepare(Map conf,TopologyContext context,BatchOutputCollector collector,TransactionAttempt attempt) {_collector = collector;_attempt = attempt;}@Overridepublic void execute(Tuple tuple) {_sum+=tuple.getInteger(1);}@Overridepublic void finishBatch() {Value val = DATABASE.get(GLOBAL_COUNT_KEY);Value newval;if(val == null ||!val.txid.equals(_attempt.getTransactionId())) {newval = new Value();newval.txid = _attempt.getTransactionId();if(val==null) {newval.count = _sum;} else {newval.count = _sum + val.count;}DATABASE.put(GLOBAL_COUNT_KEY, newval);} else {newval = val;}_collector.emit(new Values(_attempt, newval.count));}

UpdateGlobalCount是Transactional Topologies相关的类, 所以它继承自BaseTransactionalBolt。在execute方法里面, UpdateGlobalCount累积这个batch的计数, 比较有趣的是finishBatch方法。首先, 注意这个bolt实现了ICommitter接口。这告诉storm要在这个事务的commit阶段调用finishBatch方法。所以对于finishBatch的调用会保证强顺序性(顺序就是transaction id的升序), 而相对来说execute方法在任何时候都可以执行,processing或者commit阶段都可以。UpdateGlobalCount里面finishBatch方法的逻辑是首先从数据库中获取当前的值,并且把数据库里面的transaction id与当前这个batch的transaction id进行比较。如果他们一样, 那么忽略这个batch。否则把这个batch的结果加到总结果里面去,并且更新数据库。



这篇关于四、事务拓扑(Transactional Topolgoy)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1138510

相关文章

hdu 1285(拓扑排序)

题意: 给各个队间的胜负关系,让排名次,名词相同按从小到大排。 解析: 拓扑排序是应用于有向无回路图(Direct Acyclic Graph,简称DAG)上的一种排序方式,对一个有向无回路图进行拓扑排序后,所有的顶点形成一个序列,对所有边(u,v),满足u 在v 的前面。该序列说明了顶点表示的事件或状态发生的整体顺序。比较经典的是在工程活动上,某些工程完成后,另一些工程才能继续,此时

MySql 事务练习

事务(transaction) -- 事务 transaction-- 事务是一组操作的集合,是一个不可分割的工作单位,事务会将所有的操作作为一个整体一起向系统提交或撤销请求-- 事务的操作要么同时成功,要么同时失败-- MySql的事务默认是自动提交的,当执行一个DML语句,MySql会立即自动隐式提交事务-- 常见案例:银行转账-- 逻辑:A给B转账1000:1.查询

Lua 脚本在 Redis 中执行时的原子性以及与redis的事务的区别

在 Redis 中,Lua 脚本具有原子性是因为 Redis 保证在执行脚本时,脚本中的所有操作都会被当作一个不可分割的整体。具体来说,Redis 使用单线程的执行模型来处理命令,因此当 Lua 脚本在 Redis 中执行时,不会有其他命令打断脚本的执行过程。脚本中的所有操作都将连续执行,直到脚本执行完成后,Redis 才会继续处理其他客户端的请求。 Lua 脚本在 Redis 中原子性的原因

spring事务属性的xml格式配置

实际是使用代理做的事务优化 <!--配置事务的属性--><tx:advice id="txAdvice" transaction-manager="transactionManager"> <tx:attributes> <!--匹配所有以add开头的方法--><tx:method name="add*" propagation="REQUIRED" /> <tx:metho

Spring Cloud整合Seata实现分布式事务

文章目录 1.Seata1.1 官网1.2 下载1.3 通过安装包运行seata1.3.1 解压seata-server-1.3.0.zip1.3.2 修改 conf/file.conf 配置文件1.3.3 修改conf/registry.conf配置文件1.3.4 添加seata配置信息到nacos1.3.5 配置seata服务端数据库表结构1.3.6 启动seata 2.Spring

分布式 事务的几种实现方案

背景 四月初,去面试了本市的一家之前在做办公室无人货架的公司,虽然他们现在在面临着转型,但是对于我这种想从传统企业往互联网行业走的孩子来说,还是比较有吸引力的。 在面试过程中就提到了分布式事务问题。我又一次在没有好好整理的问题上吃了亏,记录一下,还是长记性 !!! 先看面试过程 面试官先是在纸上先画了这样一张图: 让我看这张图按照上面的流程走,有没有什么问题?面试官并没有直接说出来这里面

分布式事务 全面解析

1 面试题 分布式事务了解吗?你们如何解决分布式事务问题的? 2 考点分析 只要聊到做了分布式系统,必问分布式事务,若你对分布式事务一无所知的话,确实很坑,起码得知道有哪些方案,一般怎么来做,每个方案的优缺点是什么。 现在面试,分布式系统成了标配,而分布式系统带来的分布式事务也成了标配. 你做系统肯定要用事务,那你用事务的话,分布式系统之后肯定要用分布式事务. 先不说你搞过没有,起码你

120张网络安全等保拓扑大全

安全意识培训不是一个ppt通吃,不同的场景应该用不同的培训方式和内容http://mp.weixin.qq.com/s?__biz=MzkwNjY1Mzc0Nw==&mid=2247484385&idx=1&sn=92f5e7f3ee36bdb513379b833651711d&chksm=c0e47abdf793f3ab7f4621b64d29c04acc03b45c0fc1eb613c37f3

Sorting It All Out POJ(拓扑排序+floyd)

一个就是简单的拓扑排序,一个就是利用floyd判断是否存在环 #include<cstdio>#include<cstring>#include<vector>#include<queue>using namespace std;#define MAXD 30#define MAX_SIZE 1000vector<int>G[MAXD];int n,m;char L[MAX

【HDU】5222 Exploration(并查集+拓扑排序)

对于无向边使用并查集合并成一个集合,对于有向边使用判断是否存在环 唯一无语的地方就是看输入是百万级的,加个输入挂跑9s,不加挂跑了5s #include<cstdio>#include<cstring>#include<vector>#include<algorithm>using namespace std;#pragma comment(linker, "/STACK:102