Skip to content

benchmark: Add parquet h2o support #16804

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jul 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 74 additions & 9 deletions benchmarks/bench.sh
Original file line number Diff line number Diff line change
Expand Up @@ -100,15 +100,24 @@ clickbench_pushdown: ClickBench queries against partitioned (100 files) parqu
clickbench_extended: ClickBench \"inspired\" queries against a single parquet (DataFusion specific)

# H2O.ai Benchmarks (Group By, Join, Window)
h2o_small: h2oai benchmark with small dataset (1e7 rows) for groupby, default file format is csv
h2o_medium: h2oai benchmark with medium dataset (1e8 rows) for groupby, default file format is csv
h2o_big: h2oai benchmark with large dataset (1e9 rows) for groupby, default file format is csv
h2o_small_join: h2oai benchmark with small dataset (1e7 rows) for join, default file format is csv
h2o_medium_join: h2oai benchmark with medium dataset (1e8 rows) for join, default file format is csv
h2o_big_join: h2oai benchmark with large dataset (1e9 rows) for join, default file format is csv
h2o_small_window: Extended h2oai benchmark with small dataset (1e7 rows) for window, default file format is csv
h2o_medium_window: Extended h2oai benchmark with medium dataset (1e8 rows) for window, default file format is csv
h2o_big_window: Extended h2oai benchmark with large dataset (1e9 rows) for window, default file format is csv
h2o_small: h2oai benchmark with small dataset (1e7 rows) for groupby, default file format is csv
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Later, we can clean it up with additional size/format options
like
./bench.sh run h2o_join medium parquet

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion @2010YOUY01 , it's more clear.

h2o_medium: h2oai benchmark with medium dataset (1e8 rows) for groupby, default file format is csv
h2o_big: h2oai benchmark with large dataset (1e9 rows) for groupby, default file format is csv
h2o_small_join: h2oai benchmark with small dataset (1e7 rows) for join, default file format is csv
h2o_medium_join: h2oai benchmark with medium dataset (1e8 rows) for join, default file format is csv
h2o_big_join: h2oai benchmark with large dataset (1e9 rows) for join, default file format is csv
h2o_small_window: Extended h2oai benchmark with small dataset (1e7 rows) for window, default file format is csv
h2o_medium_window: Extended h2oai benchmark with medium dataset (1e8 rows) for window, default file format is csv
h2o_big_window: Extended h2oai benchmark with large dataset (1e9 rows) for window, default file format is csv
h2o_small_parquet: h2oai benchmark with small dataset (1e7 rows) for groupby, file format is parquet
h2o_medium_parquet: h2oai benchmark with medium dataset (1e8 rows) for groupby, file format is parquet
h2o_big_parquet: h2oai benchmark with large dataset (1e9 rows) for groupby, file format is parquet
h2o_small_join_parquet: h2oai benchmark with small dataset (1e7 rows) for join, file format is parquet
h2o_medium_join_parquet: h2oai benchmark with medium dataset (1e8 rows) for join, file format is parquet
h2o_big_join_parquet: h2oai benchmark with large dataset (1e9 rows) for join, file format is parquet
h2o_small_window_parquet: Extended h2oai benchmark with small dataset (1e7 rows) for window, file format is parquet
h2o_medium_window_parquet: Extended h2oai benchmark with medium dataset (1e8 rows) for window, file format is parquet
h2o_big_window_parquet: Extended h2oai benchmark with large dataset (1e9 rows) for window, file format is parquet

# Join Order Benchmark (IMDB)
imdb: Join Order Benchmark (JOB) using the IMDB dataset converted to parquet
Expand Down Expand Up @@ -245,6 +254,34 @@ main() {
h2o_big_window)
data_h2o_join "BIG" "CSV"
;;
h2o_small_parquet)
data_h2o "SMALL" "PARQUET"
;;
h2o_medium_parquet)
data_h2o "MEDIUM" "PARQUET"
;;
h2o_big_parquet)
data_h2o "BIG" "PARQUET"
;;
h2o_small_join_parquet)
data_h2o_join "SMALL" "PARQUET"
;;
h2o_medium_join_parquet)
data_h2o_join "MEDIUM" "PARQUET"
;;
h2o_big_join_parquet)
data_h2o_join "BIG" "PARQUET"
;;
# h2o window benchmark uses the same data as the h2o join
h2o_small_window_parquet)
data_h2o_join "SMALL" "PARQUET"
;;
h2o_medium_window_parquet)
data_h2o_join "MEDIUM" "PARQUET"
;;
h2o_big_window_parquet)
data_h2o_join "BIG" "PARQUET"
;;
external_aggr)
# same data as for tpch
data_tpch "1"
Expand Down Expand Up @@ -381,6 +418,34 @@ main() {
h2o_big_window)
run_h2o_window "BIG" "CSV" "window"
;;
h2o_small_parquet)
run_h2o "SMALL" "PARQUET"
;;
h2o_medium_parquet)
run_h2o "MEDIUM" "PARQUET"
;;
h2o_big_parquet)
run_h2o "BIG" "PARQUET"
;;
h2o_small_join_parquet)
run_h2o_join "SMALL" "PARQUET"
;;
h2o_medium_join_parquet)
run_h2o_join "MEDIUM" "PARQUET"
;;
h2o_big_join_parquet)
run_h2o_join "BIG" "PARQUET"
;;
# h2o window benchmark uses the same data as the h2o join
h2o_small_window_parquet)
run_h2o_window "SMALL" "PARQUET"
;;
h2o_medium_window_parquet)
run_h2o_window "MEDIUM" "PARQUET"
;;
h2o_big_window_parquet)
run_h2o_window "BIG" "PARQUET"
;;
external_aggr)
run_external_aggr
;;
Expand Down
79 changes: 46 additions & 33 deletions benchmarks/src/h2o.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::util::{BenchmarkRun, CommonOpt};
use datafusion::logical_expr::{ExplainFormat, ExplainOption};
use datafusion::{error::Result, prelude::SessionContext};
use datafusion_common::{
exec_datafusion_err, instant::Instant, internal_err, DataFusionError,
exec_datafusion_err, instant::Instant, internal_err, DataFusionError, TableReference,
};
use std::path::{Path, PathBuf};
use structopt::StructOpt;
Expand Down Expand Up @@ -92,18 +92,18 @@ impl RunOpt {
// Register tables depending on which h2o benchmark is being run
// (groupby/join/window)
if self.queries_path.to_str().unwrap().ends_with("groupby.sql") {
self.register_data(&ctx).await?;
self.register_data("x", self.path.as_os_str().to_str().unwrap(), &ctx)
.await?;
} else if self.queries_path.to_str().unwrap().ends_with("join.sql") {
let join_paths: Vec<&str> = self.join_paths.split(',').collect();
let table_name: Vec<&str> = vec!["x", "small", "medium", "large"];
for (i, path) in join_paths.iter().enumerate() {
ctx.register_csv(table_name[i], path, Default::default())
.await?;
self.register_data(table_name[i], path, &ctx).await?;
}
} else if self.queries_path.to_str().unwrap().ends_with("window.sql") {
// Only register the 'large' table in h2o-join dataset
let h2o_join_large_path = self.join_paths.split(',').nth(3).unwrap();
ctx.register_csv("large", h2o_join_large_path, Default::default())
self.register_data("large", h2o_join_large_path, &ctx)
.await?;
} else {
return internal_err!("Invalid query file path");
Expand Down Expand Up @@ -147,39 +147,52 @@ impl RunOpt {
Ok(())
}

async fn register_data(&self, ctx: &SessionContext) -> Result<()> {
async fn register_data(
&self,
table_ref: impl Into<TableReference>,
table_path: impl AsRef<str>,
ctx: &SessionContext,
) -> Result<()> {
let csv_options = Default::default();
let parquet_options = Default::default();
let path = self.path.as_os_str().to_str().unwrap();

if self.path.extension().map(|s| s == "csv").unwrap_or(false) {
ctx.register_csv("x", path, csv_options)
.await
.map_err(|e| {
DataFusionError::Context(
format!("Registering 'table' as {path}"),
Box::new(e),
)
})
.expect("error registering csv");
}

if self
.path
let table_path_str = table_path.as_ref();

let extension = Path::new(table_path_str)
.extension()
.map(|s| s == "parquet")
.unwrap_or(false)
{
ctx.register_parquet("x", path, parquet_options)
.await
.map_err(|e| {
DataFusionError::Context(
format!("Registering 'table' as {path}"),
Box::new(e),
)
})
.expect("error registering parquet");
.and_then(|s| s.to_str())
.unwrap_or("");

match extension {
"csv" => {
ctx.register_csv(table_ref, table_path_str, csv_options)
.await
.map_err(|e| {
DataFusionError::Context(
format!("Registering 'table' as {table_path_str}"),
Box::new(e),
)
})
.expect("error registering csv");
}
"parquet" => {
ctx.register_parquet(table_ref, table_path_str, parquet_options)
.await
.map_err(|e| {
DataFusionError::Context(
format!("Registering 'table' as {table_path_str}"),
Box::new(e),
)
})
.expect("error registering parquet");
}
_ => {
return Err(DataFusionError::Plan(format!(
"Unsupported file extension: {extension}",
)));
}
}

Ok(())
}
}
Expand Down