Skip to content

Commit 46afb3b

Browse files
authored
benchmark: Add parquet h2o support (#16804)
* Add parquet h2o support * support all cases to run * polish * clippy * polish code
1 parent afd8235 commit 46afb3b

File tree

2 files changed

+120
-42
lines changed

2 files changed

+120
-42
lines changed

benchmarks/bench.sh

Lines changed: 74 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -100,15 +100,24 @@ clickbench_pushdown: ClickBench queries against partitioned (100 files) parqu
100100
clickbench_extended: ClickBench \"inspired\" queries against a single parquet (DataFusion specific)
101101
102102
# H2O.ai Benchmarks (Group By, Join, Window)
103-
h2o_small: h2oai benchmark with small dataset (1e7 rows) for groupby, default file format is csv
104-
h2o_medium: h2oai benchmark with medium dataset (1e8 rows) for groupby, default file format is csv
105-
h2o_big: h2oai benchmark with large dataset (1e9 rows) for groupby, default file format is csv
106-
h2o_small_join: h2oai benchmark with small dataset (1e7 rows) for join, default file format is csv
107-
h2o_medium_join: h2oai benchmark with medium dataset (1e8 rows) for join, default file format is csv
108-
h2o_big_join: h2oai benchmark with large dataset (1e9 rows) for join, default file format is csv
109-
h2o_small_window: Extended h2oai benchmark with small dataset (1e7 rows) for window, default file format is csv
110-
h2o_medium_window: Extended h2oai benchmark with medium dataset (1e8 rows) for window, default file format is csv
111-
h2o_big_window: Extended h2oai benchmark with large dataset (1e9 rows) for window, default file format is csv
103+
h2o_small: h2oai benchmark with small dataset (1e7 rows) for groupby, default file format is csv
104+
h2o_medium: h2oai benchmark with medium dataset (1e8 rows) for groupby, default file format is csv
105+
h2o_big: h2oai benchmark with large dataset (1e9 rows) for groupby, default file format is csv
106+
h2o_small_join: h2oai benchmark with small dataset (1e7 rows) for join, default file format is csv
107+
h2o_medium_join: h2oai benchmark with medium dataset (1e8 rows) for join, default file format is csv
108+
h2o_big_join: h2oai benchmark with large dataset (1e9 rows) for join, default file format is csv
109+
h2o_small_window: Extended h2oai benchmark with small dataset (1e7 rows) for window, default file format is csv
110+
h2o_medium_window: Extended h2oai benchmark with medium dataset (1e8 rows) for window, default file format is csv
111+
h2o_big_window: Extended h2oai benchmark with large dataset (1e9 rows) for window, default file format is csv
112+
h2o_small_parquet: h2oai benchmark with small dataset (1e7 rows) for groupby, file format is parquet
113+
h2o_medium_parquet: h2oai benchmark with medium dataset (1e8 rows) for groupby, file format is parquet
114+
h2o_big_parquet: h2oai benchmark with large dataset (1e9 rows) for groupby, file format is parquet
115+
h2o_small_join_parquet: h2oai benchmark with small dataset (1e7 rows) for join, file format is parquet
116+
h2o_medium_join_parquet: h2oai benchmark with medium dataset (1e8 rows) for join, file format is parquet
117+
h2o_big_join_parquet: h2oai benchmark with large dataset (1e9 rows) for join, file format is parquet
118+
h2o_small_window_parquet: Extended h2oai benchmark with small dataset (1e7 rows) for window, file format is parquet
119+
h2o_medium_window_parquet: Extended h2oai benchmark with medium dataset (1e8 rows) for window, file format is parquet
120+
h2o_big_window_parquet: Extended h2oai benchmark with large dataset (1e9 rows) for window, file format is parquet
112121
113122
# Join Order Benchmark (IMDB)
114123
imdb: Join Order Benchmark (JOB) using the IMDB dataset converted to parquet
@@ -245,6 +254,34 @@ main() {
245254
h2o_big_window)
246255
data_h2o_join "BIG" "CSV"
247256
;;
257+
h2o_small_parquet)
258+
data_h2o "SMALL" "PARQUET"
259+
;;
260+
h2o_medium_parquet)
261+
data_h2o "MEDIUM" "PARQUET"
262+
;;
263+
h2o_big_parquet)
264+
data_h2o "BIG" "PARQUET"
265+
;;
266+
h2o_small_join_parquet)
267+
data_h2o_join "SMALL" "PARQUET"
268+
;;
269+
h2o_medium_join_parquet)
270+
data_h2o_join "MEDIUM" "PARQUET"
271+
;;
272+
h2o_big_join_parquet)
273+
data_h2o_join "BIG" "PARQUET"
274+
;;
275+
# h2o window benchmark uses the same data as the h2o join
276+
h2o_small_window_parquet)
277+
data_h2o_join "SMALL" "PARQUET"
278+
;;
279+
h2o_medium_window_parquet)
280+
data_h2o_join "MEDIUM" "PARQUET"
281+
;;
282+
h2o_big_window_parquet)
283+
data_h2o_join "BIG" "PARQUET"
284+
;;
248285
external_aggr)
249286
# same data as for tpch
250287
data_tpch "1"
@@ -381,6 +418,34 @@ main() {
381418
h2o_big_window)
382419
run_h2o_window "BIG" "CSV" "window"
383420
;;
421+
h2o_small_parquet)
422+
run_h2o "SMALL" "PARQUET"
423+
;;
424+
h2o_medium_parquet)
425+
run_h2o "MEDIUM" "PARQUET"
426+
;;
427+
h2o_big_parquet)
428+
run_h2o "BIG" "PARQUET"
429+
;;
430+
h2o_small_join_parquet)
431+
run_h2o_join "SMALL" "PARQUET"
432+
;;
433+
h2o_medium_join_parquet)
434+
run_h2o_join "MEDIUM" "PARQUET"
435+
;;
436+
h2o_big_join_parquet)
437+
run_h2o_join "BIG" "PARQUET"
438+
;;
439+
# h2o window benchmark uses the same data as the h2o join
440+
h2o_small_window_parquet)
441+
run_h2o_window "SMALL" "PARQUET"
442+
;;
443+
h2o_medium_window_parquet)
444+
run_h2o_window "MEDIUM" "PARQUET"
445+
;;
446+
h2o_big_window_parquet)
447+
run_h2o_window "BIG" "PARQUET"
448+
;;
384449
external_aggr)
385450
run_external_aggr
386451
;;

benchmarks/src/h2o.rs

Lines changed: 46 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use crate::util::{BenchmarkRun, CommonOpt};
2424
use datafusion::logical_expr::{ExplainFormat, ExplainOption};
2525
use datafusion::{error::Result, prelude::SessionContext};
2626
use datafusion_common::{
27-
exec_datafusion_err, instant::Instant, internal_err, DataFusionError,
27+
exec_datafusion_err, instant::Instant, internal_err, DataFusionError, TableReference,
2828
};
2929
use std::path::{Path, PathBuf};
3030
use structopt::StructOpt;
@@ -92,18 +92,18 @@ impl RunOpt {
9292
// Register tables depending on which h2o benchmark is being run
9393
// (groupby/join/window)
9494
if self.queries_path.to_str().unwrap().ends_with("groupby.sql") {
95-
self.register_data(&ctx).await?;
95+
self.register_data("x", self.path.as_os_str().to_str().unwrap(), &ctx)
96+
.await?;
9697
} else if self.queries_path.to_str().unwrap().ends_with("join.sql") {
9798
let join_paths: Vec<&str> = self.join_paths.split(',').collect();
9899
let table_name: Vec<&str> = vec!["x", "small", "medium", "large"];
99100
for (i, path) in join_paths.iter().enumerate() {
100-
ctx.register_csv(table_name[i], path, Default::default())
101-
.await?;
101+
self.register_data(table_name[i], path, &ctx).await?;
102102
}
103103
} else if self.queries_path.to_str().unwrap().ends_with("window.sql") {
104104
// Only register the 'large' table in h2o-join dataset
105105
let h2o_join_large_path = self.join_paths.split(',').nth(3).unwrap();
106-
ctx.register_csv("large", h2o_join_large_path, Default::default())
106+
self.register_data("large", h2o_join_large_path, &ctx)
107107
.await?;
108108
} else {
109109
return internal_err!("Invalid query file path");
@@ -147,39 +147,52 @@ impl RunOpt {
147147
Ok(())
148148
}
149149

150-
async fn register_data(&self, ctx: &SessionContext) -> Result<()> {
150+
async fn register_data(
151+
&self,
152+
table_ref: impl Into<TableReference>,
153+
table_path: impl AsRef<str>,
154+
ctx: &SessionContext,
155+
) -> Result<()> {
151156
let csv_options = Default::default();
152157
let parquet_options = Default::default();
153-
let path = self.path.as_os_str().to_str().unwrap();
154-
155-
if self.path.extension().map(|s| s == "csv").unwrap_or(false) {
156-
ctx.register_csv("x", path, csv_options)
157-
.await
158-
.map_err(|e| {
159-
DataFusionError::Context(
160-
format!("Registering 'table' as {path}"),
161-
Box::new(e),
162-
)
163-
})
164-
.expect("error registering csv");
165-
}
166158

167-
if self
168-
.path
159+
let table_path_str = table_path.as_ref();
160+
161+
let extension = Path::new(table_path_str)
169162
.extension()
170-
.map(|s| s == "parquet")
171-
.unwrap_or(false)
172-
{
173-
ctx.register_parquet("x", path, parquet_options)
174-
.await
175-
.map_err(|e| {
176-
DataFusionError::Context(
177-
format!("Registering 'table' as {path}"),
178-
Box::new(e),
179-
)
180-
})
181-
.expect("error registering parquet");
163+
.and_then(|s| s.to_str())
164+
.unwrap_or("");
165+
166+
match extension {
167+
"csv" => {
168+
ctx.register_csv(table_ref, table_path_str, csv_options)
169+
.await
170+
.map_err(|e| {
171+
DataFusionError::Context(
172+
format!("Registering 'table' as {table_path_str}"),
173+
Box::new(e),
174+
)
175+
})
176+
.expect("error registering csv");
177+
}
178+
"parquet" => {
179+
ctx.register_parquet(table_ref, table_path_str, parquet_options)
180+
.await
181+
.map_err(|e| {
182+
DataFusionError::Context(
183+
format!("Registering 'table' as {table_path_str}"),
184+
Box::new(e),
185+
)
186+
})
187+
.expect("error registering parquet");
188+
}
189+
_ => {
190+
return Err(DataFusionError::Plan(format!(
191+
"Unsupported file extension: {extension}",
192+
)));
193+
}
182194
}
195+
183196
Ok(())
184197
}
185198
}

0 commit comments

Comments
 (0)