Netty采集数据高效写入TDengine

2024-09-03 14:52

本文主要是介绍Netty采集数据高效写入TDengine,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在现代数据处理应用中,高效的数据采集与存储至关重要。Netty 是一个高性能的异步事件驱动的网络应用程序框架,非常适合用于构建高效的数据采集服务。本文将介绍如何使用 Netty 搭建一个数据采集服务,并通过优化的方式将数据高效地写入 TDengine 数据库。

设计思路

我们的目标是构建一个高效的数据采集服务,该服务能够接收来自多个客户端的数据,并在数据量达到一定阈值或经过一定时间后批量写入 TDengine 数据库。为了实现这一目标,我们需要解决以下几个关键问题:

  1. 数据缓冲:需要一个高效的缓存结构来暂存接收到的数据。

  2. 批量处理:当数据量达到一定阈值或经过一定时间后,应将数据批量写入数据库。

  3. 并发控制:确保在多线程环境下数据处理的安全性。

  4. 配置动态调整:允许配置参数如批量大小和最大等待时间的动态调整。

实现过程

1. 数据缓冲

为了高效地暂存数据,我们使用 ConcurrentLinkedQueue 作为数据缓冲区。这种队列是线程安全的,并且提供了高效的插入和删除操作。

private final ConcurrentLinkedQueue<String> buffer = new ConcurrentLinkedQueue<>();

2. 批量处理

当数据达到一定数量或经过一定时间后,我们将启动一个批量插入操作。为了实现这一点,我们使用了两个主要的组件:

  • 计数器:用于跟踪当前缓存中的数据数量。

  • 定时任务:如果数据没有达到阈值,则设置一个定时任务来处理数据。

private final AtomicInteger counter = new AtomicInteger(0);
private ScheduledFuture<?> scheduledFuture = null;

3. 并发控制

为了确保数据处理的安全性,我们使用 ReentrantLock 来保护批量插入操作。此外,我们还使用 AtomicBoolean 来标识当前是否有线程正在进行批量插入操作。

private final ReentrantLock lock = new ReentrantLock();
private final AtomicBoolean isBatchInserting = new AtomicBoolean(false);

4. 配置动态调整

我们使用 Nacos 配置中心来动态调整批量大小和最大等待时间。这样可以在不重启服务的情况下调整这些参数。

@NacosValue(value = "${batchSize:1000}", autoRefreshed = true)
private volatile int batchSize;  // 阈值
​
@NacosValue(value = "${maxWaitTime:500}", autoRefreshed = true)
private volatile long maxWaitTime;  // 最大延迟时间(毫秒)

5. 核心方法

channelRead 方法

每当从客户端接收到一条数据时,都会调用此方法。在此方法中,我们将数据添加到缓冲区,并更新计数器。如果数据达到阈值,则立即执行批量插入。

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {String data = (String) msg;buffer.add(data);int currentCount = counter.incrementAndGet();
​if (currentCount == 1) {scheduleBatchInsert();}
​if (currentCount >= batchSize) {batchInsert();}
}
scheduleBatchInsert 方法

计数器为1的时候,我们会安排一个定时任务来预备处理数据,以保证即便数据条目没有达到设定的阈值,也会被及时批量写入数据库中。

private void scheduleBatchInsert() {scheduledFuture = scheduler.schedule(this::batchInsert, maxWaitTime, TimeUnit.MILLISECONDS);
}
batchInsert 方法

此方法负责实际的批量插入操作。首先,它会检查是否已经有线程正在进行批量插入。如果是,则直接返回。如果不是,则获取锁,并开始处理数据。

private void batchInsert() {if (isBatchInserting.compareAndSet(false, true)) {lock.lock();try {if (counter.get() == 0) {return;}
​List<String> dataToInsert = new ArrayList<>();while (!buffer.isEmpty()) {String data = buffer.poll();if (data != null) {dataToInsert.add(data);}}
​counter.set(0);
​if (scheduledFuture != null && !scheduledFuture.isDone()) {scheduledFuture.cancel(false);}
​if (!dataToInsert.isEmpty()) {try {insertIntoTDengine(dataToInsert);} catch (Exception e) {logger.error("Failed to insert data into TDengine", e);}}} finally {lock.unlock();isBatchInserting.set(false);  // 设置标志位为 false}}
}
insertIntoTDengine 方法

此方法实现了将数据写入 TDengine 的逻辑。具体实现取决于 TDengine 的 API 或者使用的 ORM 框架。

private void insertIntoTDengine(List<String> dataToInsert) {// 实现使用 MyBatisPlus 写入 TDengine 的逻辑,可以参照https://blog.csdn.net/qq_47741012/article/details/141181396
}

6. 生命周期管理

为了确保服务的健壮性,我们需要处理通道关闭和异常捕获事件。此外,还需要提供关闭服务的方法来释放资源。

// 客户端断开连接
@Override
public void channelInactive(ChannelHandlerContext ctx) {cancelScheduledTask();super.channelInactive(ctx);
}
​
Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {logger.error("Exception caught in DataCollectionHandler", cause);ctx.close();
}
​
public void cancelScheduledTask() {if (scheduledFuture != null && !scheduledFuture.isCancelled()) {scheduledFuture.cancel(false);}
}

总结

通过上述设计和实现,我们构建了一个高效的数据采集服务,能够实时接收数据并在数据量达到阈值或经过一定时间后批量写入 TDengine 数据库。这种设计不仅提高了数据处理的效率,还确保了在高并发环境下的数据安全性和一致性。

这篇关于Netty采集数据高效写入TDengine的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1133271

相关文章

详谈redis跟数据库的数据同步问题

《详谈redis跟数据库的数据同步问题》文章讨论了在Redis和数据库数据一致性问题上的解决方案,主要比较了先更新Redis缓存再更新数据库和先更新数据库再更新Redis缓存两种方案,文章指出,删除R... 目录一、Redis 数据库数据一致性的解决方案1.1、更新Redis缓存、删除Redis缓存的区别二

Redis事务与数据持久化方式

《Redis事务与数据持久化方式》该文档主要介绍了Redis事务和持久化机制,事务通过将多个命令打包执行,而持久化则通过快照(RDB)和追加式文件(AOF)两种方式将内存数据保存到磁盘,以防止数据丢失... 目录一、Redis 事务1.1 事务本质1.2 数据库事务与redis事务1.2.1 数据库事务1.

Oracle Expdp按条件导出指定表数据的方法实例

《OracleExpdp按条件导出指定表数据的方法实例》:本文主要介绍Oracle的expdp数据泵方式导出特定机构和时间范围的数据,并通过parfile文件进行条件限制和配置,文中通过代码介绍... 目录1.场景描述 2.方案分析3.实验验证 3.1 parfile文件3.2 expdp命令导出4.总结

更改docker默认数据目录的方法步骤

《更改docker默认数据目录的方法步骤》本文主要介绍了更改docker默认数据目录的方法步骤,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一... 目录1.查看docker是否存在并停止该服务2.挂载镜像并安装rsync便于备份3.取消挂载备份和迁

不删数据还能合并磁盘? 让电脑C盘D盘合并并保留数据的技巧

《不删数据还能合并磁盘?让电脑C盘D盘合并并保留数据的技巧》在Windows操作系统中,合并C盘和D盘是一个相对复杂的任务,尤其是当你不希望删除其中的数据时,幸运的是,有几种方法可以实现这一目标且在... 在电脑生产时,制造商常为C盘分配较小的磁盘空间,以确保软件在运行过程中不会出现磁盘空间不足的问题。但在

Java如何接收并解析HL7协议数据

《Java如何接收并解析HL7协议数据》文章主要介绍了HL7协议及其在医疗行业中的应用,详细描述了如何配置环境、接收和解析数据,以及与前端进行交互的实现方法,文章还分享了使用7Edit工具进行调试的经... 目录一、前言二、正文1、环境配置2、数据接收:HL7Monitor3、数据解析:HL7Busines

Mybatis拦截器如何实现数据权限过滤

《Mybatis拦截器如何实现数据权限过滤》本文介绍了MyBatis拦截器的使用,通过实现Interceptor接口对SQL进行处理,实现数据权限过滤功能,通过在本地线程变量中存储数据权限相关信息,并... 目录背景基础知识MyBATis 拦截器介绍代码实战总结背景现在的项目负责人去年年底离职,导致前期规

Redis KEYS查询大批量数据替代方案

《RedisKEYS查询大批量数据替代方案》在使用Redis时,KEYS命令虽然简单直接,但其全表扫描的特性在处理大规模数据时会导致性能问题,甚至可能阻塞Redis服务,本文将介绍SCAN命令、有序... 目录前言KEYS命令问题背景替代方案1.使用 SCAN 命令2. 使用有序集合(Sorted Set)

SpringBoot整合Canal+RabbitMQ监听数据变更详解

《SpringBoot整合Canal+RabbitMQ监听数据变更详解》在现代分布式系统中,实时获取数据库的变更信息是一个常见的需求,本文将介绍SpringBoot如何通过整合Canal和Rabbit... 目录需求步骤环境搭建整合SpringBoot与Canal实现客户端Canal整合RabbitMQSp

MyBatis框架实现一个简单的数据查询操作

《MyBatis框架实现一个简单的数据查询操作》本文介绍了MyBatis框架下进行数据查询操作的详细步骤,括创建实体类、编写SQL标签、配置Mapper、开启驼峰命名映射以及执行SQL语句等,感兴趣的... 基于在前面几章我们已经学习了对MyBATis进行环境配置,并利用SqlSessionFactory核