14.6 Storm事物

2024-06-07 19:08
文章标签 事物 storm 14.6

本文主要是介绍14.6 Storm事物,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Storm

事务

只有用事务的概念才能完成Exactly-once

要保证事务,就要强有序!

事务如果对应这批数据如果出错了,必须要重新计算正确才能去下面的事务

,所以就得保证事务ID所对应的数据得是一样的!就得保证再次取得时候还能从数据源取到数据!

Storm是单条处理,那这样的话

 

Design 1

为了去做到事务的代价:

每条Tuple对应一个Txid事务ID,为了去确保正确性,数据库存更多信息

数据库存 Count=2 Txid=1

 

如果新过来一条数据的Txid大于已经存在数据库里的Txid的话,就更新数据库!

如果新过来的一条数据的Txid等于已经存在数据库里面的Txid的话,就Skip跳过!

如果新过来的一条数据的Txid小于已经存在的数据库里面的Txid的话,不可能发生的事情!

 

Why 什么不可能?

因为强有序这个机制在Spout端Storm就会来控制!要保证强有序只能在一个节点才能保证!

怎么控制的?

上一条没有回调ack()方法,就不会发射下一条!

 

问题:

1,每次为了保证顺序,那我Storm集群里面每次只能处理一条,吞吐量太低!

2,每次一个事务对应一个消息都要来存一次数据库!数据库调用太多!

 

Design 2

为了去解决Design 1的问题,就要变为微批!

IBatchBolt

 

问题:

一个批次虽然在一层Bolt里面并行起来计算了,但是其他层的Bolt还是会处于空闲状态!

 

Design 3

Storm最终方案

解决上面的空闲问题,为了去解决空闲,把之前的事务分为了两部分,

1,计算阶段:可以多个批次同时来算

2,提交阶段:必须最后存数据库的要保证强顺序

 

前面这个设计不管是哪种都有一个前提,就是要每个Txid对应的都是同一批次的数据!!!

 

比较合适的数据源,就是Kafka,因为数据如果计算错误,还是可以再次取到的!

 

TransactionalTopologyBuilder

事务源实现的ITransactionalSpout<MyMeta>

MyMeta可以认为是你定义一个事务的它的格式!

ITransactionalSpout.Coordinator<X>

协调器,单线程,由它来控制什么时候去发射下一批次事务数据!

定义好元信息,MetaData,就是从什么位置拿数据,拿多少条数据!

ITransactionalSpout.Emitter<X>

来做真正的读取数据,以及发射数据!

 

BaseTranscationalBolt其实它就是BaseBatchBolt微批处理单元

BaseTranscationalBolt等同于IBatchBolt<TransactionAttempt>

如果BaseTranscationalBolt实现了ICommitter,那通常就是最后保证去存数据库的阶段,也就是提交阶段,就通过实现这个接口来保证提交的强有序!

 

对于不需要事物的时候,我们用的kafkaSpout(KafkaCOnfig)

分区事物TransactionalPartitonKafkaSpout

分区事务:

为了就是应对Kafka这种分布式队列

说白了就是数据源有多个Partition分区,那样的话就不能像之前一样所有的emitter都从同一个位置来拿数据

所谓的分区事务,其实就是后面每个Emitter对应一个独立的partition

 

不透明事务:

为了就是使得程序不会因为数据源取不到数据而卡住!

上面的普通事物还是分区事物都是默认条件都是Txid不变的话对应的永远是同一批数据!

这样的话,就有可能因为同一批数据因为数据源挂了的问题想从新取的时候取不到!

那样的话不透明事物相当于改变上面的规则,在数据源挂了从新取不到数据的话,同一个Txid就对应一批新的数据。

代价:

就是要存Previous Result,为了可以恢复上一次的结果!

 

https://github.com/nathanmarz/storm-contrib/blob/kafka0.7/storm-kafka/src/jvm/storm/kafka/TestTopology.java

new OpaqueTransactionalKafkaSpout(new KafkaConfig());

Trident:

是storm里面高级API,默认情况下Trident都是微匹!说白了就是从TridentSpout里面出来的都是一批一批的!

TridentTopology topology = new TridentTopology();


建立一个实时计算的一个Topology

TridentState state = topology.newStream("spoutID",TridentKafkaSpout(),new Fields("sentence"))

对每条数据进行操作

.each(new Fields("sentence"), new Split(), new Fields("word"))

按照字段分组

.groupBy(new Fields("word"))

聚合顺便存储

.persistAggregate(new MemoryMapState(), new Count(), new Fields("count"))

 

MemoryMapState()

RedisState()

 

因为我们Trident它也支持事务,所以

RedisState.nonTranscational()

RedisState.transcational()

RedisState.opaque()

其它数据源

MemcachedState.opaque(serverLocations

 

要做到这个事务,有两头要配合,你Spout支持事务,如果你的State不支持事务,也没用!

http://storm.apache.org/releases/0.9.6/Trident-state.html

 

不透明事务

OpaqueTridentKafkaSpout()

RedisState.opaque()

 

普通事务

TransactionalTridentKafkaSpout()

RedisState.transcational()

 

普通事务

TransactionalTridentKafkaSpout()

RedisState.opaque()

 

建议一个实时查询

topology.newDRPCStream("queryName", new DRPCSpount())

// 对接进来的参数进行处理

.each(new Fields("args"), new Split(), new Fields("word"))

// 查询

.stateQuery(state, new MapGet(), new Fields("count"))

// 过滤查询出来为null值

.each(new FilterNull())

.each(new Sum())

 

Split extends BaseFunction

重写execute()方法!

 

FilterNull extends BaseFilter

重写的 boolean isKeep()

 

Sum extends CombineAggregator

 

有了上面的实时计算,还有实时查询,就做到了变算变查,就可以

通过

DRPCClient client = new DRPCClient("node1",3772);

client.execute("queryName","args");

这篇关于14.6 Storm事物的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1040040

相关文章

Nodejs sequelize 事物处理

Transactions - 事务 Sequelize 支持两种使用事务的方法: 一个将根据 promise 链的结果自动提交或回滚事务,(如果启用)用回调将该事务传递给所有调用而另一个 leave committing,回滚并将事务传递给用户。 主要区别在于托管事务使用一个回调,对非托管事务而言期望 promise 返回一个 promise 的结果。 托管事务(auto-callba

spring 事物使用场景说明

事务使用场景。 在某些业务场景下,如果一个请求中,需要同时写入多张表的数据。为了保证操作的原子性(要么同时成功,要么同时失败),避免数据不一致的情况,我们一般都会用到spring事务。 确实,spring事务用起来贼爽,就用一个简单的注解:@Transactional,就能轻松搞定事务。我猜大部分小伙伴也是这样用的,而且一直用一直爽。 但如果你使用不当,它也会坑你于无形。就会遇到事务失效

Storm浅析

本文分为几个模块: 1:Storm的原理和基本架构 2:Storm的应用场景及实例 3:Storm与Spark的比较 下面开始介绍,参考资料会列在文章末尾。 1:Storm的原理和基本架构 (1)原理及核心概念 分布式的实时计算系统,能够可信任的处理大量的流式数据,就好比Hadoop对于批量数据进行的处理一样;通常来说,Hadoop能够进行大批量数据的离线处理,但是在实时计算上的表现

storm安装、运行

环境:centos6.4 软件: jzmq-master-----java与c++通讯的桥梁,有了它,就可以使用zeromp了(提供了接口,把数据放入到zeromq中) storm-0.8.2 zeromq-2.1.7-----号称史上最牛逼的消息队列(用c++写的) zookeeper-3.4.5 1.编译安装ZMQ: tar -xzf zeromq-2.1.7.tar.gz cd ze

一、storm基础概念

1、什么是storm        Storm是一个分布式的、高容错的实时计算系统。 Storm对于实时计算的的意义相当于Hadoop对于批处理的意义。Hadoop为我们提供了Map和Reduce原语,使我们对数据进行批处理变的非常的简单和优美。同样,Storm也对数据的实时计算提供了简单Spout和Bolt原语。 Storm适用的场景: (1)、流数据处理:Storm可以用来用来处理源源不断的

Storm AI : 最佳长文写作工具

作者:老余捞鱼 原创不易,转载请标明出处及原作者。 写在前面的话:       正如Storm其名,这场风暴已经在欧美学术圈开始刮起来了。想象一下,当你准备写一篇论文或者一部长篇报告时,只需要告诉Storm你的写作主题是什么,它就能自动帮你全网深挖资料和收集多维度的参考信息,创建好大纲。接下来Storm还会扮演专家与你来上几轮对话问答,并在随后的几秒钟内将你的主题转换为长

Storm计算框架

工作流程 主要组件 streams spouts Bolt Tuple Topology stream grouping

Apache Storm:入门了解

前言 Storm 是一个开源的分布式实时计算系统,它能够处理无边界的数据流,类似于 Hadoop 对于批量数据处理的作用,但是 Storm 更侧重于实时数据流的处理。以下是关于 Storm 的一些关键特性及其应用场景的详细介绍: 特性 实时处理: Storm 能够实时处理数据流,而不是像 Hadoop 那样需要先收集一批数据再进行处理。它可以持续不断地处理数据,这意味着一旦数据到达,就

吃透Redis系列(三):Redis管道,发布/订阅,事物,过期时间 详细介绍

Redis系列文章: 吃透Redis系列(一):Linux下Redis安装 吃透Redis系列(二):Redis六大数据类型详细用法 吃透Redis系列(三):Redis管道,发布/订阅,事物,过期时间 详细介绍 吃透Redis系列(四):布隆(bloom)过滤器详细介绍 吃透Redis系列(五):RDB和AOF持久化详细介绍 吃透Redis系列(六):主从复制详细介绍 吃透Redi

高级java每日一道面试题-2024年9月01日-基础篇-事物的隔离级别?

如果有遗漏,评论区告诉我进行补充 面试官: 事物的隔离级别? 我回答: 事务的隔离级别是一个重要的考点。事务的隔离级别决定了一个事务在访问数据库时如何受到其他事务的影响,以及不同事务之间的并发控制程度。下面将详细解释Java中事务的隔离级别。 事务的隔离级别 事务的隔离级别主要包括以下几种: 读未提交(Read Uncommitted) 定义:事务可以读取其他事务未提交的数据。即,一