From da1a31d43939dfca03b2a45d1757551a8a983fc2 Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Wed, 16 Jul 2025 23:37:20 +0800 Subject: [PATCH 1/5] Add parquet h2o support --- benchmarks/bench.sh | 84 ++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 75 insertions(+), 9 deletions(-) diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh index 7339aba78f20..b350794b6699 100755 --- a/benchmarks/bench.sh +++ b/benchmarks/bench.sh @@ -100,15 +100,24 @@ clickbench_pushdown: ClickBench queries against partitioned (100 files) parqu clickbench_extended: ClickBench \"inspired\" queries against a single parquet (DataFusion specific) # H2O.ai Benchmarks (Group By, Join, Window) -h2o_small: h2oai benchmark with small dataset (1e7 rows) for groupby, default file format is csv -h2o_medium: h2oai benchmark with medium dataset (1e8 rows) for groupby, default file format is csv -h2o_big: h2oai benchmark with large dataset (1e9 rows) for groupby, default file format is csv -h2o_small_join: h2oai benchmark with small dataset (1e7 rows) for join, default file format is csv -h2o_medium_join: h2oai benchmark with medium dataset (1e8 rows) for join, default file format is csv -h2o_big_join: h2oai benchmark with large dataset (1e9 rows) for join, default file format is csv -h2o_small_window: Extended h2oai benchmark with small dataset (1e7 rows) for window, default file format is csv -h2o_medium_window: Extended h2oai benchmark with medium dataset (1e8 rows) for window, default file format is csv -h2o_big_window: Extended h2oai benchmark with large dataset (1e9 rows) for window, default file format is csv +h2o_small: h2oai benchmark with small dataset (1e7 rows) for groupby, default file format is csv +h2o_medium: h2oai benchmark with medium dataset (1e8 rows) for groupby, default file format is csv +h2o_big: h2oai benchmark with large dataset (1e9 rows) for groupby, default file format is csv +h2o_small_join: h2oai benchmark with small dataset (1e7 rows) for join, default file format is csv +h2o_medium_join: h2oai benchmark with medium dataset (1e8 rows) for join, default file format is csv +h2o_big_join: h2oai benchmark with large dataset (1e9 rows) for join, default file format is csv +h2o_small_window: Extended h2oai benchmark with small dataset (1e7 rows) for window, default file format is csv +h2o_medium_window: Extended h2oai benchmark with medium dataset (1e8 rows) for window, default file format is csv +h2o_big_window: Extended h2oai benchmark with large dataset (1e9 rows) for window, default file format is csv +h2o_small_parquet: h2oai benchmark with small dataset (1e7 rows) for groupby, file format is parquet +h2o_medium_parquet: h2oai benchmark with medium dataset (1e8 rows) for groupby, file format is parquet +h2o_big_parquet: h2oai benchmark with large dataset (1e9 rows) for groupby, file format is parquet +h2o_small_join_parquet: h2oai benchmark with small dataset (1e7 rows) for join, file format is parquet +h2o_medium_join_parquet: h2oai benchmark with medium dataset (1e8 rows) for join, file format is parquet +h2o_big_join_parquet: h2oai benchmark with large dataset (1e9 rows) for join, file format is parquet +h2o_small_window_parquet: Extended h2oai benchmark with small dataset (1e7 rows) for window, file format is parquet +h2o_medium_window_parquet: Extended h2oai benchmark with medium dataset (1e8 rows) for window, file format is parquet +h2o_big_window_parquet: Extended h2oai benchmark with large dataset (1e9 rows) for window, file format is parquet # Join Order Benchmark (IMDB) imdb: Join Order Benchmark (JOB) using the IMDB dataset converted to parquet @@ -245,6 +254,34 @@ main() { h2o_big_window) data_h2o_join "BIG" "CSV" ;; + h2o_small_parquet) + data_h2o "SMALL" "PARQUET" + ;; + h2o_medium_parquet) + data_h2o "MEDIUM" "PARQUET" + ;; + h2o_big_parquet) + data_h2o "BIG" "PARQUET" + ;; + h2o_small_join_parquet) + data_h2o_join "SMALL" "PARQUET" + ;; + h2o_medium_join_parquet) + data_h2o_join "MEDIUM" "PARQUET" + ;; + h2o_big_join_parquet) + data_h2o_join "BIG" "PARQUET" + ;; + # h2o window benchmark uses the same data as the h2o join + h2o_small_window_parquet) + data_h2o_join "SMALL" "PARQUET" + ;; + h2o_medium_window_parquet) + data_h2o_join "MEDIUM" "PARQUET" + ;; + h2o_big_window_parquet) + data_h2o_join "BIG" "PARQUET" + ;; external_aggr) # same data as for tpch data_tpch "1" @@ -381,6 +418,34 @@ main() { h2o_big_window) run_h2o_window "BIG" "CSV" "window" ;; + h2o_small_parquet) + run_h2o "SMALL" "PARQUET" + ;; + h2o_medium_parquet) + run_h2o "MEDIUM" "PARQUET" + ;; + h2o_big_parquet) + run_h2o "BIG" "PARQUET" + ;; + h2o_small_join_parquet) + run_h2o_join "SMALL" "PARQUET" + ;; + h2o_medium_join_parquet) + run_h2o_join "MEDIUM" "PARQUET" + ;; + h2o_big_join_parquet) + run_h2o_join "BIG" "PARQUET" + ;; + # h2o window benchmark uses the same data as the h2o join + h2o_small_window_parquet) + run_h2o_window "SMALL" "PARQUET" + ;; + h2o_medium_window_parquet) + run_h2o_window "MEDIUM" "PARQUET" + ;; + h2o_big_window_parquet) + run_h2o_window "BIG" "PARQUET" + ;; external_aggr) run_external_aggr ;; @@ -775,6 +840,7 @@ data_h2o() { # Set virtual environment directory VIRTUAL_ENV="${PWD}/venv" + rm -rf "$VIRTUAL_ENV" # Create a virtual environment using the detected Python command $PYTHON_CMD -m venv "$VIRTUAL_ENV" From cd306fd3373e5670e8cbc9592973b59f9b6b4873 Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Thu, 17 Jul 2025 13:50:08 +0800 Subject: [PATCH 2/5] support all cases to run --- benchmarks/src/h2o.rs | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/benchmarks/src/h2o.rs b/benchmarks/src/h2o.rs index 009f1708ef98..c55ace4924f8 100644 --- a/benchmarks/src/h2o.rs +++ b/benchmarks/src/h2o.rs @@ -24,7 +24,7 @@ use crate::util::{BenchmarkRun, CommonOpt}; use datafusion::logical_expr::{ExplainFormat, ExplainOption}; use datafusion::{error::Result, prelude::SessionContext}; use datafusion_common::{ - exec_datafusion_err, instant::Instant, internal_err, DataFusionError, + exec_datafusion_err, instant::Instant, internal_err, DataFusionError, TableReference, }; use std::path::{Path, PathBuf}; use structopt::StructOpt; @@ -92,18 +92,18 @@ impl RunOpt { // Register tables depending on which h2o benchmark is being run // (groupby/join/window) if self.queries_path.to_str().unwrap().ends_with("groupby.sql") { - self.register_data(&ctx).await?; + self.register_data("x", self.path.as_os_str().to_str().unwrap(), &ctx) + .await?; } else if self.queries_path.to_str().unwrap().ends_with("join.sql") { let join_paths: Vec<&str> = self.join_paths.split(',').collect(); let table_name: Vec<&str> = vec!["x", "small", "medium", "large"]; for (i, path) in join_paths.iter().enumerate() { - ctx.register_csv(table_name[i], path, Default::default()) - .await?; + self.register_data(table_name[i], path, &ctx).await?; } } else if self.queries_path.to_str().unwrap().ends_with("window.sql") { // Only register the 'large' table in h2o-join dataset let h2o_join_large_path = self.join_paths.split(',').nth(3).unwrap(); - ctx.register_csv("large", h2o_join_large_path, Default::default()) + self.register_data("large", h2o_join_large_path, &ctx) .await?; } else { return internal_err!("Invalid query file path"); @@ -147,21 +147,28 @@ impl RunOpt { Ok(()) } - async fn register_data(&self, ctx: &SessionContext) -> Result<()> { + async fn register_data( + &self, + table_ref: impl Into, + table_path: impl AsRef, + ctx: &SessionContext, + ) -> Result<()> { let csv_options = Default::default(); let parquet_options = Default::default(); - let path = self.path.as_os_str().to_str().unwrap(); + + let table_path_str = table_path.as_ref(); if self.path.extension().map(|s| s == "csv").unwrap_or(false) { - ctx.register_csv("x", path, csv_options) + ctx.register_csv(table_ref, table_path_str, csv_options) .await .map_err(|e| { DataFusionError::Context( - format!("Registering 'table' as {path}"), + format!("Registering 'table' as {}", table_path_str), Box::new(e), ) }) .expect("error registering csv"); + return Ok(()); } if self @@ -170,11 +177,11 @@ impl RunOpt { .map(|s| s == "parquet") .unwrap_or(false) { - ctx.register_parquet("x", path, parquet_options) + ctx.register_parquet(table_ref, table_path_str, parquet_options) .await .map_err(|e| { DataFusionError::Context( - format!("Registering 'table' as {path}"), + format!("Registering 'table' as {}", table_path_str), Box::new(e), ) }) From 7bcedd50fb52ff6c0d11d9cafdc9b9ad9de39775 Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Thu, 17 Jul 2025 13:55:53 +0800 Subject: [PATCH 3/5] polish --- benchmarks/src/h2o.rs | 61 ++++++++++++++++++++++++------------------- 1 file changed, 34 insertions(+), 27 deletions(-) diff --git a/benchmarks/src/h2o.rs b/benchmarks/src/h2o.rs index c55ace4924f8..0bbf0ac19a01 100644 --- a/benchmarks/src/h2o.rs +++ b/benchmarks/src/h2o.rs @@ -158,35 +158,42 @@ impl RunOpt { let table_path_str = table_path.as_ref(); - if self.path.extension().map(|s| s == "csv").unwrap_or(false) { - ctx.register_csv(table_ref, table_path_str, csv_options) - .await - .map_err(|e| { - DataFusionError::Context( - format!("Registering 'table' as {}", table_path_str), - Box::new(e), - ) - }) - .expect("error registering csv"); - return Ok(()); - } - - if self - .path + let extension = Path::new(table_path_str) .extension() - .map(|s| s == "parquet") - .unwrap_or(false) - { - ctx.register_parquet(table_ref, table_path_str, parquet_options) - .await - .map_err(|e| { - DataFusionError::Context( - format!("Registering 'table' as {}", table_path_str), - Box::new(e), - ) - }) - .expect("error registering parquet"); + .and_then(|s| s.to_str()) + .unwrap_or(""); + + match extension { + "csv" => { + ctx.register_csv(table_ref, table_path_str, csv_options) + .await + .map_err(|e| { + DataFusionError::Context( + format!("Registering 'table' as {}", table_path_str).into(), + Box::new(e), + ) + }) + .expect("error registering csv"); + } + "parquet" => { + ctx.register_parquet(table_ref, table_path_str, parquet_options) + .await + .map_err(|e| { + DataFusionError::Context( + format!("Registering 'table' as {}", table_path_str).into(), + Box::new(e), + ) + }) + .expect("error registering parquet"); + } + _ => { + return Err(DataFusionError::Plan(format!( + "Unsupported file extension: {}", + extension + ))); + } } + Ok(()) } } From 40259fe3420b83cab03581c5c7d4d19aac686fd1 Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Thu, 17 Jul 2025 14:17:13 +0800 Subject: [PATCH 4/5] clippy --- benchmarks/src/h2o.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/benchmarks/src/h2o.rs b/benchmarks/src/h2o.rs index 0bbf0ac19a01..9d4deaf38728 100644 --- a/benchmarks/src/h2o.rs +++ b/benchmarks/src/h2o.rs @@ -169,7 +169,7 @@ impl RunOpt { .await .map_err(|e| { DataFusionError::Context( - format!("Registering 'table' as {}", table_path_str).into(), + format!("Registering 'table' as {table_path_str}"), Box::new(e), ) }) @@ -180,7 +180,7 @@ impl RunOpt { .await .map_err(|e| { DataFusionError::Context( - format!("Registering 'table' as {}", table_path_str).into(), + format!("Registering 'table' as {table_path_str}"), Box::new(e), ) }) @@ -188,8 +188,7 @@ impl RunOpt { } _ => { return Err(DataFusionError::Plan(format!( - "Unsupported file extension: {}", - extension + "Unsupported file extension: {extension}", ))); } } From 5930315c5cf000261da31f8bfdeafd91125e0589 Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Thu, 17 Jul 2025 16:25:10 +0800 Subject: [PATCH 5/5] polish code --- benchmarks/bench.sh | 1 - 1 file changed, 1 deletion(-) diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh index b350794b6699..8952e456398d 100755 --- a/benchmarks/bench.sh +++ b/benchmarks/bench.sh @@ -840,7 +840,6 @@ data_h2o() { # Set virtual environment directory VIRTUAL_ENV="${PWD}/venv" - rm -rf "$VIRTUAL_ENV" # Create a virtual environment using the detected Python command $PYTHON_CMD -m venv "$VIRTUAL_ENV"