RisingWave的动态过滤器和时间过滤器的用法

2024-02-22 15:12

本文主要是介绍RisingWave的动态过滤器和时间过滤器的用法,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

动态过滤器

动态过滤器能够实时过滤数据流,并允许定义传入数据必须满足的条件才能进行处理。

动态过滤器demo

CREATE TABLE sales(id int ,profit_margin double ,PRIMARY KEY (id)
);CREATE TABLE products(product_name string ,product_profit double);--返回products中利润率大于sales表中记录的最大利润率的所有的product_name
CREATE MATERIALIZED VIEW product_profit_v AS
WITH max_profit AS (
SELECT max(profit_margin) max FROM sales
) 
SELECT product_name FROM products, max_profit 
WHERE product_profit > max;--sink
CREATE SINK product_profit_v_sink FROM product_profit_v 
WITH (connector='kafka',properties.bootstrap.server='192.168.1.100:8092',topic='product_profit_v_sink_t'
)
FORMAT PLAIN ENCODE JSON(force_append_only='true'
);

测试数据

INSERT INTO sales values(1,10);
INSERT INTO sales values(2,20);
INSERT INTO sales values(3,30);INSERT INTO sales values(2,8);
INSERT INTO sales values(1,8);INSERT INTO products values('a',10);
INSERT INTO products values('b',15);

消费

docker run -it --rm edenhill/kcat:1.7.1 kcat -b 192.168.1.100:8092 -t product_profit_v_sink_t -C -J

时间过滤器

时间过滤器允许根据特定时间(例如当前时间、特定日期或日期范围)过滤数据

CREATE TABLE t_minutes(id integer ,minute timestamp) APPEND ONLY;set timezone = 'PRC';--PRC(People’s Republic of China)
alter database dev set timezone='PRC'; -- dbname为数据库名称
show timezone;
--select pg_typeof(now());--查看数据类型--用法一:删除并清理过期
--筛选出一周之内的数据
SELECT * FROM t_minutes where minute > NOW() - INTERVAL '1 day';--用法二:延迟表变更
--以minute字段为基准延迟一分钟输出
CREATE MATERIALIZED VIEW timer_t_minutes AS
SELECT * FROM t_minutes where minute + INTERVAL '1 minute' <= now(); --在这里需要注意时区务必要对其--sink
CREATE SINK timer_t_minutes_sink FROM timer_t_minutes 
WITH (connector='kafka',properties.bootstrap.server='192.168.1.100:8092',topic='timer_t_minutes_sink_t'
)
FORMAT PLAIN ENCODE JSON(force_append_only='true'
);

测试数据

INSERT INTO t_minutes values(2,'2020-01-01 00:00:00'::TIMESTAMP);
INSERT INTO t_minutes values(2,'2023-01-01 00:00:00'::TIMESTAMP);
INSERT INTO t_minutes values(3,cast(now() as timestamp) - INTERVAL '1 hour'); 
INSERT INTO t_minutes values(4,cast(now() as timestamp) + INTERVAL '1 minute'); 
INSERT INTO t_minutes values(5,cast(now() as timestamp)); 

消费

docker run -it --rm edenhill/kcat:1.7.1 kcat -b 192.168.1.100:8092 -t timer_t_minutes_sink_t -C

参考:
Dynamic filters
Temporal filters

这篇关于RisingWave的动态过滤器和时间过滤器的用法的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/735653

相关文章

Python的Darts库实现时间序列预测

《Python的Darts库实现时间序列预测》Darts一个集统计、机器学习与深度学习模型于一体的Python时间序列预测库,本文主要介绍了Python的Darts库实现时间序列预测,感兴趣的可以了解... 目录目录一、什么是 Darts?二、安装与基本配置安装 Darts导入基础模块三、时间序列数据结构与

MyBatis Plus实现时间字段自动填充的完整方案

《MyBatisPlus实现时间字段自动填充的完整方案》在日常开发中,我们经常需要记录数据的创建时间和更新时间,传统的做法是在每次插入或更新操作时手动设置这些时间字段,这种方式不仅繁琐,还容易遗漏,... 目录前言解决目标技术栈实现步骤1. 实体类注解配置2. 创建元数据处理器3. 服务层代码优化填充机制详

Java使用Javassist动态生成HelloWorld类

《Java使用Javassist动态生成HelloWorld类》Javassist是一个非常强大的字节码操作和定义库,它允许开发者在运行时创建新的类或者修改现有的类,本文将简单介绍如何使用Javass... 目录1. Javassist简介2. 环境准备3. 动态生成HelloWorld类3.1 创建CtC

C++统计函数执行时间的最佳实践

《C++统计函数执行时间的最佳实践》在软件开发过程中,性能分析是优化程序的重要环节,了解函数的执行时间分布对于识别性能瓶颈至关重要,本文将分享一个C++函数执行时间统计工具,希望对大家有所帮助... 目录前言工具特性核心设计1. 数据结构设计2. 单例模式管理器3. RAII自动计时使用方法基本用法高级用法

Python中logging模块用法示例总结

《Python中logging模块用法示例总结》在Python中logging模块是一个强大的日志记录工具,它允许用户将程序运行期间产生的日志信息输出到控制台或者写入到文件中,:本文主要介绍Pyt... 目录前言一. 基本使用1. 五种日志等级2.  设置报告等级3. 自定义格式4. C语言风格的格式化方法

SpringBoot 获取请求参数的常用注解及用法

《SpringBoot获取请求参数的常用注解及用法》SpringBoot通过@RequestParam、@PathVariable等注解支持从HTTP请求中获取参数,涵盖查询、路径、请求体、头、C... 目录SpringBoot 提供了多种注解来方便地从 HTTP 请求中获取参数以下是主要的注解及其用法:1

C# LiteDB处理时间序列数据的高性能解决方案

《C#LiteDB处理时间序列数据的高性能解决方案》LiteDB作为.NET生态下的轻量级嵌入式NoSQL数据库,一直是时间序列处理的优选方案,本文将为大家大家简单介绍一下LiteDB处理时间序列数... 目录为什么选择LiteDB处理时间序列数据第一章:LiteDB时间序列数据模型设计1.1 核心设计原则

Java中HashMap的用法详细介绍

《Java中HashMap的用法详细介绍》JavaHashMap是一种高效的数据结构,用于存储键值对,它是基于哈希表实现的,提供快速的插入、删除和查找操作,:本文主要介绍Java中HashMap... 目录一.HashMap1.基本概念2.底层数据结构:3.HashCode和equals方法为什么重写Has

Android协程高级用法大全

《Android协程高级用法大全》这篇文章给大家介绍Android协程高级用法大全,本文结合实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友跟随小编一起学习吧... 目录1️⃣ 协程作用域(CoroutineScope)与生命周期绑定Activity/Fragment 中手

Python异步编程之await与asyncio基本用法详解

《Python异步编程之await与asyncio基本用法详解》在Python中,await和asyncio是异步编程的核心工具,用于高效处理I/O密集型任务(如网络请求、文件读写、数据库操作等),接... 目录一、核心概念二、使用场景三、基本用法1. 定义协程2. 运行协程3. 并发执行多个任务四、关键