如何兼顾性能+实时性处理缓冲数据?

2024-03-22 08:40

本文主要是介绍如何兼顾性能+实时性处理缓冲数据?,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

一、实例演示

二、待处理的批量数据:Batch


我们经常会遇到这样的数据处理应用场景:我们利用一个组件实时收集外部交付给它的数据,并由它转发给一个外部处理程序进行处理。考虑到性能,它会将数据存储在本地缓冲区,等累积到指定的数量后打包发送;考虑到实时性,数据不能在缓冲区存太长的时间,必须设置一个延时时间,一旦超过这个时间,缓冲的数据必须立即发出去。看似简单的需求,如果需要综合考虑性能、线程安全、内存分配,要实现起来还真有点麻烦。这个问题有不同的解法,本文提供一种实现方案。

一、实例演示
二、待处理的批量数据:Batch<T>
三、接收、缓冲、打包和处理数据:Batcher<T>

一、实例演示

我们先来看看最终达成的效果。在如下这段代码中,我们使用一个Batcher<string>对象来接收应用分发给它的数据,该对象最终会在适当的时机处理它们。调用Batcher<string>构造函数的三个参数分别表示:

  • processor:批量处理数据的委托对象,它指向的Process方法会将当前时间和处理的数据量输出到控制台上;

  • batchSize:单次处理的数据量,当缓冲的数据累积到这个阈值时会触发数据的自动处理。我们将这个阈值设置为10;

  • interval:两次处理处理的最长间隔,我们设置为5秒;

var batcher = new Batcher<string>(processor:Process,batchSize:10,interval: TimeSpan.FromSeconds(5));
var random = new Random();
while (true)
{var count = random.Next(1, 4);for (var i = 0; i < count; i++){batcher.Add(Guid.NewGuid().ToString());}await Task.Delay(1000);
}static void Process(Batch<string> batch)
{using (batch){Console.WriteLine($"[{DateTimeOffset.Now}]{batch.Count} items are delivered.");}
}

如上面的代码片段所示,在一个循环中,我们每隔1秒钟随机添加1-3个数据项。从下图中可以看出,Process方法的调用具有两种触发条件,一是累积的数据量达到设置的阈值10,另一个则是当前时间与上一次处理时间间隔超过5秒。

二、待处理的批量数据:Batch<T>

除了上面实例涉及的Batcher<T>,该解决方案还涉及两个额外的类型,如下这个Batch<T>类型表示最终发送的批量数据。为了避免缓冲数据带来的内存分配,我们使用了一个单独的ArrayPool<T>对象来创建池化的数组,这个功能体现在静态方法CreatePooledArray方法上。

由于构建Batch<T>对象提供的数组来源于对象池,在处理完毕后必须回归对象池,所以我们让这个类型实现了IDisposable接口,并将这一操作实现在Dispose方法种。在调用ArrayPool<T>对象的Return方法时,我们特意将数组清空。由于提供的数组来源于对象池,所以并不能保证每个数据元素都承载了有效的数据,实现的迭代器和返回数量的Count属性对此作了相应的处理。

 
public sealed class Batch<T> : IEnumerable<T>, IDisposable where T : class
{private bool _isDisposed;private int? _count;private readonly T[] _data;private static readonly ArrayPool<T> _pool = ArrayPool<T>.Create();public int Count{get{if (_isDisposed) throw new ObjectDisposedException(nameof(Batch<T>));if(_count.HasValue) return _count.Value;var count = 0;for (int index = 0; index < _data.Length; index++){if (_data[index] is  null){break;}count++;}return (_count = count).Value;}}public Batch(T[] data) => _data = data ?? throw new ArgumentNullException(nameof(data));public void Dispose(){_pool.Return(_data, clearArray: true);_isDisposed = true;}public IEnumerator<T> GetEnumerator() => new Enumerator(this);IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();public static T[] CreatePooledArray(int batchSize) => _pool.Rent(batchSize);private void EnsureNotDisposed(){if (_isDisposed) throw new ObjectDisposedException(nameof(Batch<T>));}private sealed class Enumerator : IEnumerator<T>{private readonly Batch<T> _batch;private readonly T[] _data;private int _index = -1;public Enumerator(Batch<T> batch){_batch = batch;_data = batch._data;}public T Current{get { _batch.EnsureNotDisposed(); return _data[_index]; }}object IEnumerator.Current => Current;public void Dispose() { }public bool MoveNext(){_batch.EnsureNotDisposed();return ++_index < _data.Length && _data[_index] is not null;}public void Reset(){_batch.EnsureNotDisposed();_index = -1;}}
}

三、接收、缓冲、打包和处理数据:Batcher<T>

最终用于打包的Batcher<T>类型定义如下。在构造函数中,我们除了提供上述两个阈值外,还提供了一个Action<Batch<T>>委托完成针对打包数据的处理。针对缓冲数据的处理实现在Process方法中。

public sealed class Batcher<T> where T : class
{private readonly int _interval;private readonly int _batchSize;private readonly Action<Batch<T>> _processor;private volatile Container _container;private readonly Timer _timer;private readonly ReaderWriterLockSlim _lock = new();public Batcher(Action<Batch<T>> processor, int batchSize, TimeSpan interval){_interval = (int)interval.TotalMilliseconds;_batchSize = batchSize;_processor = processor;_container = new Container(batchSize);_timer = new Timer(_ => Process(), null, _interval, Timeout.Infinite);}private void Process(){if (_container.IsEmpty) return;_lock.EnterWriteLock();try{if (_container.IsEmpty) return;var container = Interlocked.Exchange(ref _container, new Container(_batchSize));_ = Task.Run(() => _processor(container.AsBatch()));_timer.Change(_interval, Timeout.Infinite);}finally{_lock.ExitWriteLock();}}public void Add(T item){_lock.EnterReadLock();bool success = false;try{success = _container.TryAdd(item);}finally{_lock.ExitReadLock();}if (!success){Process();new SpinWait().SpinOnce();Add(item);}}private sealed class Container{private volatile int _next = -1;private readonly T[] _data;public bool IsEmpty => _next == -1;public Container(int batchSize) => _data = Batch<T>.CreatePooledArray(batchSize);public bool TryAdd(T item){var index = Interlocked.Increment(ref _next);if (index > _data.Length - 1) return false;_data[index] = item;return true;}public Batch<T> AsBatch() => new(_data);}
}

我们创建了一个内部类型Container作为存放数据的容器,具体数据存放在一个数组中,字段_index代表下一个添加数组存放的索引。TryAdd方法将指定的数据添加到数组中,我们使用InterLocked.Increase方法解决并发问题。如果越界返回False,表示添加失败,否则返回True,表示成功添加。Container的数组通过Batch<T>的静态方法CreatePooledArray提供的。Container类型还提供了一个AsBatch方法将数据封装成Batch<T>对象。

使用者在处理数据的时候,只需要将待处理数据作为Add方法的参数添加到缓冲区就可以了。Add方法会调用Container的TryAdd方法将指定的对象添加了池化数组中。如果TryAdd返回false,意味着数组存满了,由于此时正在发生Container替换操作,所以我们利用自旋等待的方式提高效率。

我们通过一个Timer解决缓冲数据的及时处理的问题。由于Porcess方法承载的针对缓冲数据的处理有两种触发形式:缓存数据数量超过阈值,缓冲时间超过设置的时限,我们不得不使用一个ReaderWriterLockSlim解决该方法和Add方法之间针对同一个Container对象的争用问题(我的初衷是提供完全无锁的设计,想了很久发现很难,有兴趣的朋友不妨可以想想完全无锁的解决方案是否可行)。

 引入地址

这篇关于如何兼顾性能+实时性处理缓冲数据?的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/834963

相关文章

Vue3 的 shallowRef 和 shallowReactive:优化性能

大家对 Vue3 的 ref 和 reactive 都很熟悉,那么对 shallowRef 和 shallowReactive 是否了解呢? 在编程和数据结构中,“shallow”(浅层)通常指对数据结构的最外层进行操作,而不递归地处理其内部或嵌套的数据。这种处理方式关注的是数据结构的第一层属性或元素,而忽略更深层次的嵌套内容。 1. 浅层与深层的对比 1.1 浅层(Shallow) 定义

大模型研发全揭秘:客服工单数据标注的完整攻略

在人工智能(AI)领域,数据标注是模型训练过程中至关重要的一步。无论你是新手还是有经验的从业者,掌握数据标注的技术细节和常见问题的解决方案都能为你的AI项目增添不少价值。在电信运营商的客服系统中,工单数据是客户问题和解决方案的重要记录。通过对这些工单数据进行有效标注,不仅能够帮助提升客服自动化系统的智能化水平,还能优化客户服务流程,提高客户满意度。本文将详细介绍如何在电信运营商客服工单的背景下进行

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

关于数据埋点,你需要了解这些基本知识

产品汪每天都在和数据打交道,你知道数据来自哪里吗? 移动app端内的用户行为数据大多来自埋点,了解一些埋点知识,能和数据分析师、技术侃大山,参与到前期的数据采集,更重要是让最终的埋点数据能为我所用,否则可怜巴巴等上几个月是常有的事。   埋点类型 根据埋点方式,可以区分为: 手动埋点半自动埋点全自动埋点 秉承“任何事物都有两面性”的道理:自动程度高的,能解决通用统计,便于统一化管理,但个性化定

性能测试介绍

性能测试是一种测试方法,旨在评估系统、应用程序或组件在现实场景中的性能表现和可靠性。它通常用于衡量系统在不同负载条件下的响应时间、吞吐量、资源利用率、稳定性和可扩展性等关键指标。 为什么要进行性能测试 通过性能测试,可以确定系统是否能够满足预期的性能要求,找出性能瓶颈和潜在的问题,并进行优化和调整。 发现性能瓶颈:性能测试可以帮助发现系统的性能瓶颈,即系统在高负载或高并发情况下可能出现的问题

无人叉车3d激光slam多房间建图定位异常处理方案-墙体画线地图切分方案

墙体画线地图切分方案 针对问题:墙体两侧特征混淆误匹配,导致建图和定位偏差,表现为过门跳变、外月台走歪等 ·解决思路:预期的根治方案IGICP需要较长时间完成上线,先使用切分地图的工程化方案,即墙体两侧切分为不同地图,在某一侧只使用该侧地图进行定位 方案思路 切分原理:切分地图基于关键帧位置,而非点云。 理论基础:光照是直线的,一帧点云必定只能照射到墙的一侧,无法同时照到两侧实践考虑:关

使用SecondaryNameNode恢复NameNode的数据

1)需求: NameNode进程挂了并且存储的数据也丢失了,如何恢复NameNode 此种方式恢复的数据可能存在小部分数据的丢失。 2)故障模拟 (1)kill -9 NameNode进程 [lytfly@hadoop102 current]$ kill -9 19886 (2)删除NameNode存储的数据(/opt/module/hadoop-3.1.4/data/tmp/dfs/na

异构存储(冷热数据分离)

异构存储主要解决不同的数据,存储在不同类型的硬盘中,达到最佳性能的问题。 异构存储Shell操作 (1)查看当前有哪些存储策略可以用 [lytfly@hadoop102 hadoop-3.1.4]$ hdfs storagepolicies -listPolicies (2)为指定路径(数据存储目录)设置指定的存储策略 hdfs storagepolicies -setStoragePo

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置