Skip to content

Commit f38862e

Browse files
authored
Merge branch 'tab/batch-iceberg-3' into tab/refine-track
2 parents fe7cbd2 + 4f5b6c5 commit f38862e

File tree

220 files changed

+13030
-8054
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

220 files changed

+13030
-8054
lines changed

Cargo.lock

Lines changed: 307 additions & 271 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ apache-avro = { git = "https://github.com/risingwavelabs/avro", rev = "25113ba88
113113
] }
114114
arc-swap = "1"
115115
arrow-udf-runtime = "0.8.0"
116-
async-openai = "0.29.0"
116+
async-openai = "0.30.1"
117117
auto_enums = { version = "0.8", features = ["futures03", "tokio1"] }
118118
await-tree = { version = "0.3.2-alpha.2", features = ["serde", "attributes"] }
119119
aws-config = { version = "1", default-features = false, features = [
@@ -140,7 +140,7 @@ aws-sdk-sqs = { version = "1", default-features = false, features = [
140140
] }
141141
aws-smithy-http = "0.62"
142142
aws-smithy-runtime = "1.8"
143-
aws-smithy-runtime-api = "1.8"
143+
aws-smithy-runtime-api = "1.9"
144144
aws-smithy-types = { version = "1.3", default-features = false, features = [
145145
"hyper-0-14-x", # required by aws sdk
146146
] }
@@ -149,7 +149,7 @@ axum = "=0.7.4" # TODO: 0.7.5+ does not work with current toolchain
149149
axum-extra = "0.9"
150150
chrono = { version = "0.4.40", default-features = false }
151151
clap = { version = "4", features = ["cargo", "derive", "env"] }
152-
criterion = { version = "0.5", features = ["async_futures"] }
152+
criterion = { version = "0.7", features = ["async_futures"] }
153153
# Use a forked version which removes the dependencies on dynamo db to reduce
154154
# compile time and binary size.
155155
deltalake = { version = "0.26", features = ["s3", "gcs", "datafusion"] }
@@ -167,20 +167,22 @@ hashbrown0_14 = { package = "hashbrown", version = "0.14", features = [
167167
] }
168168
hytra = "0.1"
169169
# branch dev_rebase_main_20250325
170-
iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "d1b8cabf4f8a33090d018b539bfec6e7c623d7c4", features = [
170+
iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "c15a32b4220cf1b02132129eb34a5ea301215b7f", features = [
171171
"storage-s3",
172172
"storage-gcs",
173173
"storage-azblob",
174174
"storage-azdls",
175175
] }
176-
iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "d1b8cabf4f8a33090d018b539bfec6e7c623d7c4" }
177-
iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "d1b8cabf4f8a33090d018b539bfec6e7c623d7c4" }
176+
iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "c15a32b4220cf1b02132129eb34a5ea301215b7f" }
177+
iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "c15a32b4220cf1b02132129eb34a5ea301215b7f" }
178+
178179
indexmap = { version = "2.12.0", features = ["serde"] }
179180
itertools = "0.14.0"
180181
jni = { version = "0.21.1", features = ["invocation"] }
181182
jsonbb = { version = "0.2.2", features = ["float_roundtrip"] }
183+
jsonwebtoken = { version = "10", features = ["aws_lc_rs"] }
182184
linkme = { version = "0.3.32", features = ["used_linker"] }
183-
lru = "0.14"
185+
lru = "0.16"
184186
madsim = "0.2.34"
185187
mixtrics = { version = "0.2", features = ["prometheus"] }
186188
mysql_async = { version = "0.36", features = ["native-tls-tls", "rust_decimal"] }
@@ -203,7 +205,7 @@ rdkafka = { package = "madsim-rdkafka", version = "0.4.4", features = [
203205
"cmake-build",
204206
] }
205207
redis = { version = "0.32" }
206-
regex = "1.11"
208+
regex = "1.12"
207209
reqwest = { version = "0.12.2", features = ["json", "stream"] }
208210
risingwave_backup = { path = "./src/storage/backup" }
209211
risingwave_batch = { path = "./src/batch" }

README.md

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,16 @@ To learn about other installation options, such as using a Docker image, see the
6464

6565
RisingWave delivers a unified streaming data platform that combines **ultra-low-latency stream processing** and **Iceberg-native data management**.
6666

67-
### Streaming analytics
67+
### Low-latency streaming processing and analytics
6868
RisingWave integrates real-time stream processing and low-latency serving in a single system. It continuously ingests data from streaming and batch sources, performs incremental computations across streams and tables with end-to-end freshness under 100 ms. Materialized views can be served directly within RisingWave with 10–20 ms p99 query latency, or delivered to downstream systems.
6969

70-
### Iceberg-based lakehouse ingestion and management
70+
### Iceberg lakehouse ingestion, transformation, and management
7171
RisingWave treats Apache Iceberg™ as a first-class citizen. It directly hosts and manages the Iceberg REST catalog, allowing users to create and operate Iceberg tables through a PostgreSQL-compatible interface. RisingWave supports two write modes: Merge-on-Read (MoR) and Copy-on-Write (CoW), to suit different ingestion and query patterns. It also provides built-in table maintenance capabilities, including compaction, small-file optimization, vacuum, and snapshot cleanup, ensuring efficient and consistent data management without external tools or pipelines.
7272

73+
_Plug: [Nimtable](https://github.com/nimtable/nimtable) is an observability tool developed by RisingWave for easily exploring and managing Iceberg tables._
74+
75+
76+
7377
## Key design decisions
7478

7579
RisingWave is designed to be easier to use and more cost-efficient:
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
# Benchmark configuration for GAP_FILL in EOWC (Emit On Window Close) mode
2+
# This benchmark measures the throughput of processing incoming data through GAP_FILL with EOWC
3+
benchmark_name: gap_fill_eowc
4+
5+
# SQL to set up the initial schema and data (run once)
6+
setup_sql: |
7+
CREATE TABLE sensor_data_eowc (
8+
ts TIMESTAMP,
9+
value DOUBLE,
10+
WATERMARK FOR ts AS ts - INTERVAL '1' MINUTE,
11+
PRIMARY KEY (ts)
12+
) APPEND ONLY;
13+
14+
CREATE MATERIALIZED VIEW gap_filled_sensors_eowc AS
15+
SELECT ts, value
16+
FROM GAP_FILL(sensor_data_eowc, ts, INTERVAL '1' MINUTE)
17+
EMIT ON WINDOW CLOSE;
18+
19+
# SQL to prepare the data before each run
20+
prepare_sql: |
21+
-- No preparation needed for each run
22+
23+
# SQL to clean up after each run
24+
conclude_sql: |
25+
-- APPEND ONLY tables do not support DELETE
26+
-- We use different time ranges for each run to avoid primary key conflicts
27+
28+
# SQL to clean up everything after all runs are complete
29+
cleanup_sql: |
30+
DROP MATERIALIZED VIEW IF EXISTS gap_filled_sensors_eowc;
31+
DROP TABLE IF EXISTS sensor_data_eowc;
32+
33+
# SQL to benchmark - Insert sparse time series data and measure processing time
34+
# This tests the throughput of GAP_FILL with EOWC processing incoming data
35+
# Note: Uses current timestamp to ensure unique time ranges for each run
36+
benchmark_sql: |
37+
-- Insert data with gaps (every 5 minutes) starting from NOW()
38+
-- Each run will use a different time range since NOW() advances
39+
INSERT INTO sensor_data_eowc (ts, value)
40+
SELECT
41+
NOW() + (i * INTERVAL '5 minutes'),
42+
20.0 + (i * 0.5)
43+
FROM generate_series(0, 100) AS i;
44+
45+
-- Insert a timestamp far in the future to advance watermark and close all windows
46+
-- Watermark is defined as ts - INTERVAL '1' MINUTE, so this will close all windows
47+
INSERT INTO sensor_data_eowc (ts, value)
48+
VALUES (
49+
NOW() + INTERVAL '10 hours', -- Far enough to close all windows
50+
999.0
51+
);
52+
53+
-- Wait for watermark to advance and windows to close
54+
SELECT pg_sleep(3);
55+
56+
-- Query the results - windows should now be closed by watermark advancement
57+
SELECT COUNT(*) FROM gap_filled_sensors_eowc;
58+
59+
# Number of times to run the benchmark
60+
runs: 30
61+
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
# Benchmark configuration for GAP_FILL in normal streaming mode
2+
benchmark_name: gap_fill_streaming
3+
4+
# SQL to set up the initial schema and data (run once)
5+
setup_sql: |
6+
CREATE TABLE sensor_data (
7+
ts TIMESTAMP,
8+
value DOUBLE,
9+
PRIMARY KEY (ts)
10+
);
11+
12+
CREATE MATERIALIZED VIEW gap_filled_sensors AS
13+
SELECT ts, value
14+
FROM GAP_FILL(sensor_data, ts, INTERVAL '1' MINUTE);
15+
16+
# SQL to prepare the data before each run
17+
prepare_sql: |
18+
-- No preparation needed for each run
19+
20+
# SQL to clean up after each run
21+
conclude_sql: |
22+
-- Clean up the inserted data after each run
23+
DELETE FROM sensor_data;
24+
25+
# SQL to clean up everything after all runs are complete
26+
cleanup_sql: |
27+
DROP MATERIALIZED VIEW IF EXISTS gap_filled_sensors;
28+
DROP TABLE IF EXISTS sensor_data;
29+
30+
# SQL to benchmark - Insert sparse time series data and measure processing time
31+
# This tests the throughput of GAP_FILL processing incoming data
32+
benchmark_sql: |
33+
INSERT INTO sensor_data (ts, value)
34+
SELECT
35+
'2024-05-21 10:00:00'::TIMESTAMP + (i * INTERVAL '5 minutes'),
36+
20.0 + (i * 0.5)
37+
FROM generate_series(0, 100) AS i;
38+
39+
-- Wait for all data to be processed
40+
SELECT COUNT(*) FROM gap_filled_sensors;
41+
42+
# Number of times to run the benchmark
43+
runs: 30
44+
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
statement ok
2+
SET RW_IMPLICIT_FLUSH TO TRUE;
3+
4+
statement ok
5+
drop table if exists car_sales cascade;
6+
7+
statement ok
8+
drop table if exists car_info cascade;
9+
10+
statement ok
11+
drop table if exists car_regions cascade;
12+
13+
statement ok
14+
drop table if exists t cascade;
15+
16+
statement ok
17+
create table car_sales(id int, car_id int, region_id int, price int);
18+
19+
statement ok
20+
create table car_info(id int, name varchar);
21+
22+
statement ok
23+
create table car_regions(id int, region varchar);
24+
25+
# Create table t that is NOT used in the query
26+
statement ok
27+
create table t(a int, b int, c int);
28+
29+
# Test 1: Should fail because 't' is specified in backfill_order but not used in query
30+
statement error Table or source 't' specified in backfill_order is not used in the query
31+
create materialized view m1
32+
with (backfill_order = FIXED(car_regions -> car_sales, t -> car_sales))
33+
as
34+
with price_ranges as (
35+
select
36+
car_info.name as name,
37+
car_sales.price as price,
38+
round(log10(1 + car_sales.price)::numeric, 1) as price_range
39+
from car_sales join car_info
40+
on car_sales.car_id = car_info.id
41+
join car_regions
42+
on car_sales.region_id = car_regions.id
43+
)
44+
select
45+
name,
46+
price_range,
47+
count(*) as sales_count,
48+
sum(price) as sales_volume,
49+
avg(price) as sales_avg,
50+
min(price) as sales_min,
51+
max(price) as sales_max,
52+
approx_percentile(0.5) WITHIN GROUP (ORDER BY price) as sales_est_median,
53+
approx_percentile(0.01) WITHIN GROUP (ORDER BY price) as sales_est_bottom_1_percent,
54+
approx_percentile(0.99) WITHIN GROUP (ORDER BY price) as sales_est_top_1_percent
55+
FROM
56+
price_ranges
57+
GROUP BY name, price_range;
58+
59+
# Test 2: Should also fail when 't' is on the right side of the arrow
60+
statement error Table or source 't' specified in backfill_order is not used in the query
61+
create materialized view m2
62+
with (backfill_order = FIXED(car_sales -> t))
63+
as
64+
select
65+
car_info.name as name,
66+
car_sales.price as price
67+
from car_sales join car_info
68+
on car_sales.car_id = car_info.id
69+
join car_regions
70+
on car_sales.region_id = car_regions.id;
71+
72+
# Test 3: Should succeed when only actual tables from the query are specified
73+
statement ok
74+
create materialized view m3
75+
with (backfill_order = FIXED(car_regions -> car_sales))
76+
as
77+
select
78+
car_info.name as name,
79+
car_sales.price as price
80+
from car_sales join car_info
81+
on car_sales.car_id = car_info.id
82+
join car_regions
83+
on car_sales.region_id = car_regions.id;
84+
85+
# Cleanup
86+
statement ok
87+
drop materialized view m3;
88+
89+
statement ok
90+
drop table car_sales;
91+
92+
statement ok
93+
drop table car_info;
94+
95+
statement ok
96+
drop table car_regions;
97+
98+
statement ok
99+
drop table t;

e2e_test/batch/catalog/pg_settings.slt.part

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ user server_version
6161
user server_version_num
6262
user sink_decouple
6363
user sink_rate_limit
64+
user slow_ddl_notification_secs
6465
user source_rate_limit
6566
user standard_conforming_strings
6667
user statement_timeout

e2e_test/batch/describe_fragments.slt

Lines changed: 3 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -133,35 +133,17 @@ skipif madsim
133133
system ok
134134
psql_validate.py --db $__DATABASE__ --sql "DESCRIBE FRAGMENTS describe_plan_test.idx" \
135135
--expected 'Fragment % (Actor %)
136-
StreamMaterialize { columns: [name, age, created_at, tbl.id(hidden)], stream_key: [tbl.id], pk_columns: [name, age, tbl.id], pk_conflict: NoCheck }
136+
StreamMaterialize { columns: [name, age, created_at, tbl.id(hidden)], stream_key: [name, tbl.id], pk_columns: [name, age, tbl.id], pk_conflict: NoCheck }
137137
├── output: [ tbl.name, tbl.age, tbl.created_at, tbl.id ]
138-
├── stream key: [ tbl.id ]
138+
├── stream key: [ tbl.name, tbl.id ]
139139
└── MergeExecutor { output: [ tbl.name, tbl.age, tbl.created_at, tbl.id ], stream key: [ tbl.id ] }
140140
(empty)
141141
Fragment % (Actor %)
142-
StreamTableScan { table: tbl, columns: [name, age, created_at, id] }
143-
├── output: [ tbl.name, tbl.age, tbl.created_at, tbl.id ]
144-
├── stream key: [ tbl.id ]
142+
StreamTableScan { table: tbl, columns: [name, age, created_at, id] } { output: [ tbl.name, tbl.age, tbl.created_at, tbl.id ], stream key: [ tbl.id ] }
145143
├── Upstream { output: [ name, age, created_at, id ], stream key: [] }
146144
└── BatchPlanNode { output: [ name, age, created_at, id ], stream key: [] }'
147145

148146

149-
skipif madsim
150-
system ok
151-
psql_validate.py --db $__DATABASE__ --sql "DESCRIBE FRAGMENTS describe_plan_test.idx" \
152-
--expected 'Fragment % (Actor %)
153-
StreamMaterialize { columns: [name, age, created_at, tbl.id(hidden)], stream_key: [tbl.id], pk_columns: [name, age, tbl.id], pk_conflict: NoCheck }
154-
├── output: [ tbl.name, tbl.age, tbl.created_at, tbl.id ]
155-
├── stream key: [ tbl.id ]
156-
└── MergeExecutor { output: [ tbl.name, tbl.age, tbl.created_at, tbl.id ], stream key: [ tbl.id ] }
157-
(empty)
158-
Fragment % (Actor %)
159-
StreamTableScan { table: tbl, columns: [name, age, created_at, id] }
160-
├── output: [ tbl.name, tbl.age, tbl.created_at, tbl.id ]
161-
├── stream key: [ tbl.id ]
162-
├── Upstream { output: [ name, age, created_at, id ], stream key: [] }
163-
└── BatchPlanNode { output: [ name, age, created_at, id ], stream key: [] }'
164-
165147

166148
skipif madsim
167149
system ok

0 commit comments

Comments
 (0)