本文主要是介绍RisingWave的动态过滤器和时间过滤器的用法,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
动态过滤器
动态过滤器能够实时过滤数据流,并允许定义传入数据必须满足的条件才能进行处理。
动态过滤器demo
CREATE TABLE sales(id int ,profit_margin double ,PRIMARY KEY (id)
);CREATE TABLE products(product_name string ,product_profit double);--返回products中利润率大于sales表中记录的最大利润率的所有的product_name
CREATE MATERIALIZED VIEW product_profit_v AS
WITH max_profit AS (
SELECT max(profit_margin) max FROM sales
)
SELECT product_name FROM products, max_profit
WHERE product_profit > max;--sink
CREATE SINK product_profit_v_sink FROM product_profit_v
WITH (connector='kafka',properties.bootstrap.server='192.168.1.100:8092',topic='product_profit_v_sink_t'
)
FORMAT PLAIN ENCODE JSON(force_append_only='true'
);
测试数据
INSERT INTO sales values(1,10);
INSERT INTO sales values(2,20);
INSERT INTO sales values(3,30);INSERT INTO sales values(2,8);
INSERT INTO sales values(1,8);INSERT INTO products values('a',10);
INSERT INTO products values('b',15);
消费
docker run -it --rm edenhill/kcat:1.7.1 kcat -b 192.168.1.100:8092 -t product_profit_v_sink_t -C -J
时间过滤器
时间过滤器允许根据特定时间(例如当前时间、特定日期或日期范围)过滤数据
CREATE TABLE t_minutes(id integer ,minute timestamp) APPEND ONLY;set timezone = 'PRC';--PRC(People’s Republic of China)
alter database dev set timezone='PRC'; -- dbname为数据库名称
show timezone;
--select pg_typeof(now());--查看数据类型--用法一:删除并清理过期
--筛选出一周之内的数据
SELECT * FROM t_minutes where minute > NOW() - INTERVAL '1 day';--用法二:延迟表变更
--以minute字段为基准延迟一分钟输出
CREATE MATERIALIZED VIEW timer_t_minutes AS
SELECT * FROM t_minutes where minute + INTERVAL '1 minute' <= now(); --在这里需要注意时区务必要对其--sink
CREATE SINK timer_t_minutes_sink FROM timer_t_minutes
WITH (connector='kafka',properties.bootstrap.server='192.168.1.100:8092',topic='timer_t_minutes_sink_t'
)
FORMAT PLAIN ENCODE JSON(force_append_only='true'
);
测试数据
INSERT INTO t_minutes values(2,'2020-01-01 00:00:00'::TIMESTAMP);
INSERT INTO t_minutes values(2,'2023-01-01 00:00:00'::TIMESTAMP);
INSERT INTO t_minutes values(3,cast(now() as timestamp) - INTERVAL '1 hour');
INSERT INTO t_minutes values(4,cast(now() as timestamp) + INTERVAL '1 minute');
INSERT INTO t_minutes values(5,cast(now() as timestamp));
消费
docker run -it --rm edenhill/kcat:1.7.1 kcat -b 192.168.1.100:8092 -t timer_t_minutes_sink_t -C
参考:
Dynamic filters
Temporal filters
这篇关于RisingWave的动态过滤器和时间过滤器的用法的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!