RisingWave分布式SQL流处理数据库调研

2024-02-21 00:36

本文主要是介绍RisingWave分布式SQL流处理数据库调研,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

概述

RisingWave是一款分布式SQL流处理数据库,旨在帮助用户降低实时应用的的开发成本。作为专为云上分布式流处理而设计的系统,RisingWave为用户提供了与PostgreSQL类似的使用体验,官方宣称具备比Flink高出10倍的性能(指throughput)以及更低的成本。RisingWave开发只需要关注SQL开发,而不需要像Flink那样去关注

  • RisingWave与Flink不同的是,RisingWave既可以做流处理也可以存储;而Flink只是流处理框架,而不能存储数据,计算后的数据需要存储到外部系统中。官方宣称可以完全替代FlinkSQL。
  • RisingWave与批数据库不同的是,RisingWave可以做流处理,按预定义逻辑实时处理数据,官网宣称可以做到流批一体,批数据库只能处理批数据。

使用场景

RisingWave 的强项是流处理,底层存储为行存,更加适合对已存储的数据高并发点查,而并非全表扫描。RisingWave 的主要使用场景包括了监控、报警、实时动态报表、流式 ETL、机器学习特征工程等。其已经运用到金融交易、制造业、新媒体、物流等领域。
但是,RisingWave 不适合做分析型随机查询。为支持分析型随机查询,用户还需将数据导入到实时分析数据库中进行操作。不少用户将 RisingWave 与 ClickHouse、Apache Doris 等实时分析数据库组合使用:他们使用 RisingWave 做流计算,同时使用实时分析数据库进行分析型随机查询。RisingWave 已经支持到sink ClickHouse、Apache Doris等OLTP中,具体可以参考RisingWave Sink

注意:
RisingWave 不支持读写事务处理,但其支持只读事务。在生产中,使用 RisingWave 的最佳实践是将 RisingWave 放在事务型数据库的下游。RisingWave 通过 CDC 从事务型数据库中读取已经被序列化过的数据。

RisingWave 应用

部署

RisingWave 单机试玩模式

docker run -itd \
-p 4566:4566 \
-p 5691:5691 \
--privileged \
--name=risingwave \
risingwavelabs/risingwave:latest playground

RisingWave 单机 Docker Compose 部署模式(测试推荐这种模式部署,以下测试基于此种模式)

clone the risingwave repository.

git clone https://github.com/risingwavelabs/risingwave.git

进入docker目录

cd docker

启动RisingWave集群

#使用MinIO存储状态后端,standalone模式启动
export RW_IMAGE=risingwavelabs/risingwave:latest
export ENABLE_TELEMETRY=true
docker compose up -d

安装postgresql客户端

由于RisingWave兼容postgresql协议,所以通过postgresql客户端可以直接操作RisingWave
安装postgresql客户端

yum install -y postgresql

使用 psql 连接

psql -h localhost -p 4566 -d dev -U root

启动mysql并开启binlog

  • 启动mysql
# 查看详细默认配置docker run -it --rm mysql:5.7 --verbose --help#启动mysql server
docker run -d \
--name mysql5.7 \
--restart=always \
-p 3306:3306 \
-e MYSQL_ROOT_PASSWORD=123456 \
-v /data/mysql5.7/data:/var/lib/mysql \#数据文件
-v /data/mysql5.7/conf:/etc/mysql/conf.d \#配置文件
-v /data/mysql5.7/log:/var/log \#日志文件
mysql:5.7 \
--character-set-server=utf8mb4 \
--collation-server=utf8mb4_unicode_ci \
--log-bin=/var/lib/mysql/mysql-bin \#开启binlog配置
--server-id=2 #开启binlog配置
  • 链接mysql

docker exec -it mysql5.7 mysql -h127.0.0.1 -P3306 -p’123456’

  • 验证是否开启 binlog

show variables like ‘%log_bin%’;

  • 授权
--授权RisingWave作为slave访问mysql binlog
grant RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, SELECT on *.* to 'root'@'%' IDENTIFIED BY '123456';
--grant ALL PRIVILEGES on db01.* to 'root'@'%' IDENTIFIED BY '123456';
flush  privileges;
--取消授权,如有需要
REVOKE  GRANT OPTION on *.* FROM 'root'@'%';
REVOKE  ALL PRIVILEGES on *.* FROM 'root'@'%';
REVOKE  ALL PRIVILEGES on db01.* FROM 'root'@'%';
flush  privileges;
--查看授权
show grants for root@'%';

部署kafka

  • 启动kafka
# step-1
docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeper:latest
# step-2
# 启动Kafka,将以下的俩个192.168.1.100换为本身的IP地址bash
docker run  -d \
--name kafka \
--restart=always \
-p 8092:8092 \
-e KAFKA_BROKER_ID=1 \
-e KAFKA_ZOOKEEPER_CONNECT=192.168.1.100:2181/kafka \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.100:8092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:8092 \
-t wurstmeister/kafka
  • 与kafka交互
#list
docker run -it --rm wurstmeister/kafka kafka-topics.sh --bootstrap-server 192.168.1.100:8092 --list
#create topic
docker run -it --rm wurstmeister/kafka kafka-topics.sh --bootstrap-server 192.168.1.100:8092 --create --replication-factor 1 --partitions 1 --topic test2
#producer
docker run -it --rm wurstmeister/kafka kafka-console-producer.sh --bootstrap-server 192.168.1.100:8092 --topic test1
#consumer
docker run -it --rm wurstmeister/kafka kafka-console-consumer.sh --bootstrap-server 192.168.1.100:8092 --topic test1
  • 或通过kcat与kafka交互
docker pull edenhill/kcat:1.7.1
docker run -it --rm edenhill/kcat:1.7.1 kcat -b 192.168.1.100:8092 -t test_sink_topic -C -J
docker run -it --rm edenhill/kcat:1.7.1 kcat -b 192.168.1.100:8092 -t test_sink_topic -C

RisingWave 使用demo

  1. 数据导出sink demo
-- create table
CREATE TABLE t1 (v1 int, v2 int) 
WITH (connector = 'datagen',fields.v1.kind = 'sequence',fields.v1.start = '1',fields.v2.kind = 'random',fields.v2.min = '-10',fields.v2.max = '10',fields.v2.seed = '1',datagen.rows.per.second = '10') ROW FORMAT JSON;
-- create sink
CREATE SINK test_sink_1
FROM t1 
WITH (properties.bootstrap.server = '192.168.1.100:8092',topic = 'test_sink_topic',connector = 'kafka',primary_key = 'v1'
)
FORMAT UPSERT ENCODE JSON;

查看kafka sink 结果

docker run -it --rm edenhill/kcat:1.7.1 kcat -b 192.168.1.100:8092 -t test_sink_topic -C -J

  1. 连接器 source
--source 连接器
CREATE SOURCE IF NOT EXISTS source_1 (v1 integer,v2 integer,
)
WITH (connector='kafka',topic='test_sink_topic',properties.bootstrap.server='192.168.1.100:8092',scan.startup.mode='latest',
) FORMAT PLAIN ENCODE JSON;
-- table连接器
CREATE TABLE IF NOT EXISTS table_1 (v1 integer,v2 integer,
)
WITH (connector='kafka',topic='test_sink_topic',properties.bootstrap.server='192.168.1.100:8092',scan.startup.mode='latest',
) FORMAT PLAIN ENCODE JSON;
  1. Change Data Capture (CDC) 直连 MySQL CDC
    --mysql ddl:create database db01;use db01;CREATE TABLE orders (order_id int(11) NOT NULL AUTO_INCREMENT,price decimal(11),PRIMARY KEY (order_id));-- risingwave ddlCREATE TABLE orders (order_id int,price decimal,PRIMARY KEY (order_id)) WITH (connector = 'mysql-cdc',hostname = '192.168.1.100',port = '3306',username = 'root',password = '123456',database.name = 'db01',table.name = 'orders',);--mysql dmlinsert into orders(price) values(12),(10),(23);insert into orders(price) values(12),(10);update orders set price=100  where order_id=1;delete from orders where order_id=3;-- risingwave验证数据select * from orders ;
  1. 直接导出物化视图/表数据 (CREATE SINK FROM)
CREATE TABLE t11 (v1 int, v2 int) 
WITH (connector = 'datagen',fields.v1.kind = 'sequence',fields.v1.start = '1',fields.v2.kind = 'random',fields.v2.min = '-10',fields.v2.max = '10',fields.v2.seed = '1',datagen.rows.per.second = '10') ROW FORMAT JSON;create materialized view mv_t11 as select count(*) from t11;CREATE SINK sink1 FROM mv_t11 
WITH (connector='kafka',properties.bootstrap.server='192.168.1.100:8092',topic='t_sink1'
)
FORMAT PLAIN ENCODE JSON(force_append_only='true'
);

check结果

docker run -it --rm edenhill/kcat:1.7.1 kcat -b 192.168.1.100:8092 -t t_sink1 -C -J

  1. 导出 Query 的数据(CREATE SINK AS)
CREATE TABLE t11 (v1 int, v2 int) 
WITH (connector = 'datagen',fields.v1.kind = 'sequence',fields.v1.start = '1',fields.v2.kind = 'random',fields.v2.min = '-10',fields.v2.max = '10',fields.v2.seed = '1',datagen.rows.per.second = '10') ROW FORMAT JSON;CREATE SINK sink2 AS 
SELECT avg(v1) as avg_v1, avg(v2) as avg_v2 
FROM t1
WITH (connector='kafka',properties.bootstrap.server='192.168.1.100:8092',topic='t_sink2'
)
FORMAT PLAIN ENCODE JSON(force_append_only='true'
);

check结果

docker run -it --rm edenhill/kcat:1.7.1 kcat -b 192.168.1.100:8092 -t t_sink1 -C -J

总结

RisingWave 提供与 PostgreSQL 兼容的标准SQL接口。用户可以像使用 PostgreSQL 一样处理数据流。屏蔽了实时处理底层需要遇到的一些技术细节(状态存储,数据一致性,分布式集群扩展等),供应用方快速的开发实时数据流,进行流式ETL。具有以下特性:同步的实时性(可以保证实时的新鲜度,doris等OLAP引擎采用异步实时)、强一致性(doris等OLAP引擎仅提供最终一致性)、高可用、高并发、流处理语义、资源隔离。可以应用在一些数据看版,监控,实时指标等场景。

相关文章

github 仓库
官方文档
中文文档
创始人知乎主页
Slack

这篇关于RisingWave分布式SQL流处理数据库调研的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/730052

相关文章

Spring Security基于数据库验证流程详解

Spring Security 校验流程图 相关解释说明(认真看哦) AbstractAuthenticationProcessingFilter 抽象类 /*** 调用 #requiresAuthentication(HttpServletRequest, HttpServletResponse) 决定是否需要进行验证操作。* 如果需要验证,则会调用 #attemptAuthentica

SQL中的外键约束

外键约束用于表示两张表中的指标连接关系。外键约束的作用主要有以下三点: 1.确保子表中的某个字段(外键)只能引用父表中的有效记录2.主表中的列被删除时,子表中的关联列也会被删除3.主表中的列更新时,子表中的关联元素也会被更新 子表中的元素指向主表 以下是一个外键约束的实例展示

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

如何去写一手好SQL

MySQL性能 最大数据量 抛开数据量和并发数,谈性能都是耍流氓。MySQL没有限制单表最大记录数,它取决于操作系统对文件大小的限制。 《阿里巴巴Java开发手册》提出单表行数超过500万行或者单表容量超过2GB,才推荐分库分表。性能由综合因素决定,抛开业务复杂度,影响程度依次是硬件配置、MySQL配置、数据表设计、索引优化。500万这个值仅供参考,并非铁律。 博主曾经操作过超过4亿行数据

无人叉车3d激光slam多房间建图定位异常处理方案-墙体画线地图切分方案

墙体画线地图切分方案 针对问题:墙体两侧特征混淆误匹配,导致建图和定位偏差,表现为过门跳变、外月台走歪等 ·解决思路:预期的根治方案IGICP需要较长时间完成上线,先使用切分地图的工程化方案,即墙体两侧切分为不同地图,在某一侧只使用该侧地图进行定位 方案思路 切分原理:切分地图基于关键帧位置,而非点云。 理论基础:光照是直线的,一帧点云必定只能照射到墙的一侧,无法同时照到两侧实践考虑:关

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

MySQL数据库宕机,启动不起来,教你一招搞定!

作者介绍:老苏,10余年DBA工作运维经验,擅长Oracle、MySQL、PG、Mongodb数据库运维(如安装迁移,性能优化、故障应急处理等)公众号:老苏畅谈运维欢迎关注本人公众号,更多精彩与您分享。 MySQL数据库宕机,数据页损坏问题,启动不起来,该如何排查和解决,本文将为你说明具体的排查过程。 查看MySQL error日志 查看 MySQL error日志,排查哪个表(表空间

MySQL高性能优化规范

前言:      笔者最近上班途中突然想丰富下自己的数据库优化技能。于是在查阅了多篇文章后,总结出了这篇! 数据库命令规范 所有数据库对象名称必须使用小写字母并用下划线分割 所有数据库对象名称禁止使用mysql保留关键字(如果表名中包含关键字查询时,需要将其用单引号括起来) 数据库对象的命名要能做到见名识意,并且最后不要超过32个字符 临时库表必须以tmp_为前缀并以日期为后缀,备份

【生成模型系列(初级)】嵌入(Embedding)方程——自然语言处理的数学灵魂【通俗理解】

【通俗理解】嵌入(Embedding)方程——自然语言处理的数学灵魂 关键词提炼 #嵌入方程 #自然语言处理 #词向量 #机器学习 #神经网络 #向量空间模型 #Siri #Google翻译 #AlexNet 第一节:嵌入方程的类比与核心概念【尽可能通俗】 嵌入方程可以被看作是自然语言处理中的“翻译机”,它将文本中的单词或短语转换成计算机能够理解的数学形式,即向量。 正如翻译机将一种语言

[MySQL表的增删改查-进阶]

🌈个人主页:努力学编程’ ⛅个人推荐: c语言从初阶到进阶 JavaEE详解 数据结构 ⚡学好数据结构,刷题刻不容缓:点击一起刷题 🌙心灵鸡汤:总有人要赢,为什么不能是我呢 💻💻💻数据库约束 🔭🔭🔭约束类型 not null: 指示某列不能存储 NULL 值unique: 保证某列的每行必须有唯一的值default: 规定没有给列赋值时的默认值.primary key: