Ack 框架分析

2024-06-08 00:32
文章标签 分析 框架 ack

本文主要是介绍Ack 框架分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Ack介绍

 每个Spout Tuple保存一个ack-val的校验值,它的初始值是0,然后每发射一个Tuple或者ack一个Tuple,Tuple的ID都要跟这个校验值异或一下,并把得到的值更新为ack-val的新值。

如果每个发射出去的Tuple都被ack了,最后ack-val一定是0(因为一个数字跟自己异或得到的值是0)。如果ack-val为0,表示这个Tuple树就被完整处理过了。当达到超时时间,ack-val不为0,则Tuple处理失败了。


Ack框架执行过程

1) Storm的Spout中对每条发射出去的消息生产一个MessageId 对象,内容为<RootId,消息ID>,消息ID为一个64位的随机数,并且Spout会以RootId为键,以消息为值,放到自己的pendingMap中,并且只保留一段时间,具体时间有Topology.message.timeout.secs决定,超时后则调用Spout的fail方法。

2)Spout发送消息出去之后,给Acker Bolt 发射一条Tuple消息,消息的内容为[ tuple-id , ack-val, task-id]。

    a) tuple-id 为消息的RootId

    b) Spout发送的消息有一个或者多个接受目标Task,对所有的目标Task的消息ID进行异或,得到ack-val。

    c) task-id 为spout的ID,这样Acker就知道是哪个Spout发送过来的Ack信息了。

    d)发送消息的StreamId是_ack_init(ACKER-INIT-STREAM-ID)。

3)Acker Bolt收到StreamId为ACKER-INIT-STREAM-ID的消息后,会在自己的pending对象中添加一个记录{tuple-id :  { task-id : ack-val  } },记录中的各项值从Spout中发送过来。

4)Bolt收到的消息中(来源于Spout或者父Bolt) 同样会包含MessageId对象。Bolt在发射信息的过程中,对每个需要接受该信息的Task,会创建一个新的MessageId对象。该MessageId对象会发送给目标Task,并且该MessageId的消息Id和接受到的消息Id进行异或,把得到的ack-val发送给Acker Bolt。发送给Acker的消息内容为<tuple-id, ack-val>,消息的StreamId为_ack_ack(ACKER_ACK_STREAM_ID).

5)Acker Bolt收到StreamId 为ACKER_ACK_STREAM_ID的消息后,根据tuple-id从pending中取出老的ack-val,并将新的ack-val进行异或操作,跟新到新的pending中。

6)如果第5步的异或结果为0,则Acker Bolt 认为从Spout发出的消息已经正确处理完毕了。就会给spout发送通知。消息的内容为tuple-id,StreamId 为_ack_ack(ACKER_ACK_STREAM_ID)).

7)Spout收到StreamId后,则将pendingMap中的tuple-id记录删除,并调用Spout的ack方法。

8)如果第1步中spout发射消息的时候,不指定消息ID,则Storm不会启动ACK跟踪。如果系统中不含Acker Bolt,也不会启用Ack。

9)如果Bolt调用fail方法,会给Acker Bolt发送StreamId为_ack_fail(ACKER_FAIL_STREAM_ID)的消息。Acker Bolt收到ACKER_FAIL_STREAM_ID的消息,会将该消息转发给对应的spout。spout收到fail消息后,则执行spout的fail方法。

10)Acker Bolt的pending中,只会保存一段时间的跟踪信息,具体时间根据topology.message.timeout.secs决定的,超过这个时间,就会删除这个tuple-id的跟踪信息。如果后续收到Bolt发送的跟踪信息,则会出发Acker发送ACKER_FAIL_STREAM_ID的消息。


举例说明,Ack框架执行过程


 

1) spout产生一个Tuple,其初始化的消息ID为0100,Spout同时将该消息ID发送给Acker和Bolt1.

2)Bolt1收到Spout发送过来的消息ID为0100消息,经过处理之后,产生新的消息,消息ID为0010,Bolt1就讲 0100 xor 0010的结果发送给Acker。

3)Bolt2接收到Bolt1的消息,处理完后,没有后续的消息产生,则直接将Bolt1的消息ID转发给Acker。

4)Acker中,此时ack-val值已经为0,因此StreamId为ACKER_ACK_STREAM_ID的流上发送相应的消息。Spout收到消息后,调用spout的ack方法,完成整个消息流的ack操作,确认所有的消息都被正确处理了。







这篇关于Ack 框架分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1040745

相关文章

Spring事务中@Transactional注解不生效的原因分析与解决

《Spring事务中@Transactional注解不生效的原因分析与解决》在Spring框架中,@Transactional注解是管理数据库事务的核心方式,本文将深入分析事务自调用的底层原理,解释为... 目录1. 引言2. 事务自调用问题重现2.1 示例代码2.2 问题现象3. 为什么事务自调用会失效3

Python Dash框架在数据可视化仪表板中的应用与实践记录

《PythonDash框架在数据可视化仪表板中的应用与实践记录》Python的PlotlyDash库提供了一种简便且强大的方式来构建和展示互动式数据仪表板,本篇文章将深入探讨如何使用Dash设计一... 目录python Dash框架在数据可视化仪表板中的应用与实践1. 什么是Plotly Dash?1.1

找不到Anaconda prompt终端的原因分析及解决方案

《找不到Anacondaprompt终端的原因分析及解决方案》因为anaconda还没有初始化,在安装anaconda的过程中,有一行是否要添加anaconda到菜单目录中,由于没有勾选,导致没有菜... 目录问题原因问http://www.chinasem.cn题解决安装了 Anaconda 却找不到 An

Spring定时任务只执行一次的原因分析与解决方案

《Spring定时任务只执行一次的原因分析与解决方案》在使用Spring的@Scheduled定时任务时,你是否遇到过任务只执行一次,后续不再触发的情况?这种情况可能由多种原因导致,如未启用调度、线程... 目录1. 问题背景2. Spring定时任务的基本用法3. 为什么定时任务只执行一次?3.1 未启用

基于Flask框架添加多个AI模型的API并进行交互

《基于Flask框架添加多个AI模型的API并进行交互》:本文主要介绍如何基于Flask框架开发AI模型API管理系统,允许用户添加、删除不同AI模型的API密钥,感兴趣的可以了解下... 目录1. 概述2. 后端代码说明2.1 依赖库导入2.2 应用初始化2.3 API 存储字典2.4 路由函数2.5 应

Python GUI框架中的PyQt详解

《PythonGUI框架中的PyQt详解》PyQt是Python语言中最强大且广泛应用的GUI框架之一,基于Qt库的Python绑定实现,本文将深入解析PyQt的核心模块,并通过代码示例展示其应用场... 目录一、PyQt核心模块概览二、核心模块详解与示例1. QtCore - 核心基础模块2. QtWid

C++ 各种map特点对比分析

《C++各种map特点对比分析》文章比较了C++中不同类型的map(如std::map,std::unordered_map,std::multimap,std::unordered_multima... 目录特点比较C++ 示例代码 ​​​​​​代码解释特点比较1. std::map底层实现:基于红黑

最新Spring Security实战教程之Spring Security安全框架指南

《最新SpringSecurity实战教程之SpringSecurity安全框架指南》SpringSecurity是Spring生态系统中的核心组件,提供认证、授权和防护机制,以保护应用免受各种安... 目录前言什么是Spring Security?同类框架对比Spring Security典型应用场景传统

Spring、Spring Boot、Spring Cloud 的区别与联系分析

《Spring、SpringBoot、SpringCloud的区别与联系分析》Spring、SpringBoot和SpringCloud是Java开发中常用的框架,分别针对企业级应用开发、快速开... 目录1. Spring 框架2. Spring Boot3. Spring Cloud总结1. Sprin

Spring 中 BeanFactoryPostProcessor 的作用和示例源码分析

《Spring中BeanFactoryPostProcessor的作用和示例源码分析》Spring的BeanFactoryPostProcessor是容器初始化的扩展接口,允许在Bean实例化前... 目录一、概览1. 核心定位2. 核心功能详解3. 关键特性二、Spring 内置的 BeanFactory