-
Notifications
You must be signed in to change notification settings - Fork 1.6k
benchmark: Add parquet h2o support #16804
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Updated: error for parquet join data generate, it works for group by: ./bench.sh data h2o_medium_join_parquet
***************************
DataFusion Benchmark Runner and Data Generator
COMMAND: data
BENCHMARK: h2o_medium_join_parquet
DATA_DIR: /Users/zhuqi/arrow-datafusion/benchmarks/data
CARGO_COMMAND: cargo run --release
PREFER_HASH_JOIN: true
***************************
Found Python version 3.13, which is suitable.
Using Python command: /opt/homebrew/bin/python3
Installing falsa...
Generating h2o test data in /Users/zhuqi/arrow-datafusion/benchmarks/data/h2o with size=MEDIUM and format=PARQUET
100 rows will be saved into: /Users/zhuqi/arrow-datafusion/benchmarks/data/h2o/J1_1e8_1e2_0.parquet
100000 rows will be saved into: /Users/zhuqi/arrow-datafusion/benchmarks/data/h2o/J1_1e8_1e5_0.parquet
100000000 rows will be saved into: /Users/zhuqi/arrow-datafusion/benchmarks/data/h2o/J1_1e8_1e8_NA.parquet
An SMALL data schema is the following:
id1: int64 not null
id4: string
v2: double
An output format is PARQUET
Batch mode is supported.
In case of memory problems you can try to reduce a batch_size.
Working... ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 0% -:--:--
╭──────────────────────────────────────────────────────────────────────────────────────── Traceback (most recent call last) ─────────────────────────────────────────────────────────────────────────────────────────╮
│ /Users/zhuqi/arrow-datafusion/benchmarks/venv/lib/python3.13/site-packages/falsa/app.py:144 in join │
│ │
│ 141 │ ) │
│ 142 │ │
│ 143 │ for batch in track(join_small.iter_batches(), total=len(join_small.batches)): │
│ ❱ 144 │ │ writer_small.write_batch(batch) │
│ 145 │ writer_small.close() │
│ 146 │ │
│ 147 │ if data_format is Format.DELTA: │
│ │
│ ╭──────────────────────────────────────────────────────────────────────────────────────────────────── locals ────────────────────────────────────────────────────────────────────────────────────────────────────╮ │
│ │ batch = pyarrow.RecordBatch │ │
│ │ id1: int64 not null │ │
│ │ id4: string not null │ │
│ │ v2: double not null │ │
│ │ ---- │ │
│ │ id1: [12,77,106,10,52,105,29,64,46,51,...,110,82,72,8,1,104,69,5,44,25] │ │
│ │ id4: ["id12","id77","id106","id10","id52","id105","id29","id64","id46","id51",...,"id110","id82","id72","id8","id1","id104","id69","id5","id44","id25"] │ │
│ │ v2: │ │
│ │ [53.0075954693085,32.410072200393316,72.68372205230826,71.61363809771296,86.99915627358179,15.539006557813716,23.840799451398684,7.23383214431385,6.366591524991982,20.222312628293857… │ │
│ │ batch_size = 5000000 │ │
│ │ data_filename_big = 'J1_1e8_1e8_NA.parquet' │ │
│ │ data_filename_lhs = 'J1_1e8_NA_0.parquet' │ │
│ │ data_filename_medium = 'J1_1e8_1e5_0.parquet' │ │
│ │ data_filename_small = 'J1_1e8_1e2_0.parquet' │ │
│ │ data_format = <Format.PARQUET: 'PARQUET'> │ │
│ │ generation_seed = 6839596180442651345 │ │
│ │ join_big = <falsa.local_fs.JoinBigGenerator object at 0x105e2fe00> │ │
│ │ join_lhs = <falsa.local_fs.JoinLHSGenerator object at 0x105e2da90> │ │
│ │ join_medium = <falsa.local_fs.JoinMediumGenerator object at 0x105e2fcb0> │ │
│ │ join_small = <falsa.local_fs.JoinSmallGenerator object at 0x105e2fb60> │ │
│ │ k = 10 │ │
│ │ keys_seed = 1026847926404610461 │ │
│ │ n_big = 100000000 │ │
│ │ n_medium = 100000 │ │
│ │ n_small = 100 │ │
│ │ nas = 0 │ │
│ │ output_big = PosixPath('/Users/zhuqi/arrow-datafusion/benchmarks/data/h2o/J1_1e8_1e8_NA.parquet') │ │
│ │ output_dir = PosixPath('/Users/zhuqi/arrow-datafusion/benchmarks/data/h2o') │ │
│ │ output_lhs = PosixPath('/Users/zhuqi/arrow-datafusion/benchmarks/data/h2o/J1_1e8_NA_0.parquet') │ │
│ │ output_medium = PosixPath('/Users/zhuqi/arrow-datafusion/benchmarks/data/h2o/J1_1e8_1e5_0.parquet') │ │
│ │ output_small = PosixPath('/Users/zhuqi/arrow-datafusion/benchmarks/data/h2o/J1_1e8_1e2_0.parquet') │ │
│ │ path_prefix = '/Users/zhuqi/arrow-datafusion/benchmarks/data/h2o' │ │
│ │ seed = 42 │ │
│ │ size = <Size.MEDIUM: 'MEDIUM'> │ │
│ │ writer_small = <pyarrow.parquet.core.ParquetWriter object at 0x1067d4050> │ │
│ ╰────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ │
│ │
│ /Users/zhuqi/arrow-datafusion/benchmarks/venv/lib/python3.13/site-packages/pyarrow/parquet/core.py:1089 in write_batch │
│ │
│ 1086 │ │ │ will be used instead. │
│ 1087 │ │ """ │
│ 1088 │ │ table = pa.Table.from_batches([batch], batch.schema) │
│ ❱ 1089 │ │ self.write_table(table, row_group_size) │
│ 1090 │ │
│ 1091 │ def write_table(self, table, row_group_size=None): │
│ 1092 │ │ """ │
│ │
│ ╭──────────────────────────────────────────────────────────────────────────────────────────────────── locals ────────────────────────────────────────────────────────────────────────────────────────────────────╮ │
│ │ batch = pyarrow.RecordBatch │ │
│ │ id1: int64 not null │ │
│ │ id4: string not null │ │
│ │ v2: double not null │ │
│ │ ---- │ │
│ │ id1: [12,77,106,10,52,105,29,64,46,51,...,110,82,72,8,1,104,69,5,44,25] │ │
│ │ id4: ["id12","id77","id106","id10","id52","id105","id29","id64","id46","id51",...,"id110","id82","id72","id8","id1","id104","id69","id5","id44","id25"] │ │
│ │ v2: │ │
│ │ [53.0075954693085,32.410072200393316,72.68372205230826,71.61363809771296,86.99915627358179,15.539006557813716,23.840799451398684,7.23383214431385,6.366591524991982,20.222312628293857,...,6… │ │
│ │ row_group_size = None │ │
│ │ self = <pyarrow.parquet.core.ParquetWriter object at 0x1067d4050> │ │
│ │ table = pyarrow.Table │ │
│ │ id1: int64 not null │ │
│ │ id4: string not null │ │
│ │ v2: double not null │ │
│ │ ---- │ │
│ │ id1: [[12,77,106,10,52,...,104,69,5,44,25]] │ │
│ │ id4: [["id12","id77","id106","id10","id52",...,"id104","id69","id5","id44","id25"]] │ │
│ │ v2: │ │
│ │ [[53.0075954693085,32.410072200393316,72.68372205230826,71.61363809771296,86.99915627358179,...,26.7118533955444,73.44416011403574,93.63022604514522,51.816253173876824,78.95727980955964]] │ │
│ ╰────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ │
│ │
│ /Users/zhuqi/arrow-datafusion/benchmarks/venv/lib/python3.13/site-packages/pyarrow/parquet/core.py:1113 in write_table │
│ │
│ 1110 │ │ │ msg = ('Table schema does not match schema used to create file: ' │
│ 1111 │ │ │ │ '\ntable:\n{!s} vs. \nfile:\n{!s}' │
│ 1112 │ │ │ │ .format(table.schema, self.schema)) │
│ ❱ 1113 │ │ │ raise ValueError(msg) │
│ 1114 │ │ │
│ 1115 │ │ self.writer.write_table(table, row_group_size=row_group_size) │
│ 1116 │
│ │
│ ╭──────────────────────────────────────────────────────────────────────────────────────────────────── locals ────────────────────────────────────────────────────────────────────────────────────────────────────╮ │
│ │ msg = 'Table schema does not match schema used to create file: \ntable:\nid1: int64 not n'+98 │ │
│ │ row_group_size = None │ │
│ │ self = <pyarrow.parquet.core.ParquetWriter object at 0x1067d4050> │ │
│ │ table = pyarrow.Table │ │
│ │ id1: int64 not null │ │
│ │ id4: string not null │ │
│ │ v2: double not null │ │
│ │ ---- │ │
│ │ id1: [[12,77,106,10,52,...,104,69,5,44,25]] │ │
│ │ id4: [["id12","id77","id106","id10","id52",...,"id104","id69","id5","id44","id25"]] │ │
│ │ v2: │ │
│ │ [[53.0075954693085,32.410072200393316,72.68372205230826,71.61363809771296,86.99915627358179,...,26.7118533955444,73.44416011403574,93.63022604514522,51.816253173876824,78.95727980955964]] │ │
│ ╰────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ │
╰────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
ValueError: Table schema does not match schema used to create file:
table:
id1: int64 not null
id4: string not null
v2: double not null vs.
file:
id1: int64 not null
id4: string
v2: double |
Create the jira for falsa side, it fails with generate parquet data for join set, but it works well with group by. |
Updated, it works now, the falsa has merged the fix and released: mrpowers-io/falsa#28 ./bench.sh data h2o_small_join_parquet
***************************
DataFusion Benchmark Runner and Data Generator
COMMAND: data
BENCHMARK: h2o_small_join_parquet
DATA_DIR: /Users/zhuqi/arrow-datafusion/benchmarks/data
CARGO_COMMAND: cargo run --release
PREFER_HASH_JOIN: true
***************************
Found Python version 3.13, which is suitable.
Using Python command: /opt/homebrew/bin/python3
Installing falsa...
Generating h2o test data in /Users/zhuqi/arrow-datafusion/benchmarks/data/h2o with size=SMALL and format=PARQUET
10 rows will be saved into: /Users/zhuqi/arrow-datafusion/benchmarks/data/h2o/J1_1e7_1e1_0.parquet
10000 rows will be saved into: /Users/zhuqi/arrow-datafusion/benchmarks/data/h2o/J1_1e7_1e4_0.parquet
10000000 rows will be saved into: /Users/zhuqi/arrow-datafusion/benchmarks/data/h2o/J1_1e7_1e7_NA.parquet
An SMALL data schema is the following:
id1: int64 not null
id4: string not null
v2: double not null
An output format is PARQUET
Batch mode is supported.
In case of memory problems you can try to reduce a batch_size.
Working... ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 0:00:00
An MEDIUM data schema is the following:
id1: int64 not null
id2: int64 not null
id4: string not null
id5: string not null
v2: double not null
An output format is PARQUET
Batch mode is supported.
In case of memory problems you can try to reduce a batch_size.
Working... ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 0:00:00
An BIG data schema is the following:
id1: int64 not null
id2: int64 not null
id3: int64 not null
id4: string not null
id5: string not null
id6: string not null
v2: double not null
An output format is PARQUET
Batch mode is supported.
In case of memory problems you can try to reduce a batch_size.
Working... ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 0:00:02
An LSH data schema is the following:
id1: int64 not null
id2: int64 not null
id3: int64 not null
id4: string not null
id5: string not null
id6: string not null
v1: double not null
An output format is PARQUET
Batch mode is supported.
In case of memory problems you can try to reduce a batch_size.
Working... ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 0:00:00 |
@zhuqi-lucas Just for the case falsa generates too much noise in stdout (and runner's output), I can add a command-line argument to suppress it. Something like |
Thank you @SemyonSinchenko , it's ok for benchmark data generation, we can see the details info. |
689d67d
to
7bcedd5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you! it LGTM. I have also tested it locally.
h2o_small_window: Extended h2oai benchmark with small dataset (1e7 rows) for window, default file format is csv | ||
h2o_medium_window: Extended h2oai benchmark with medium dataset (1e8 rows) for window, default file format is csv | ||
h2o_big_window: Extended h2oai benchmark with large dataset (1e9 rows) for window, default file format is csv | ||
h2o_small: h2oai benchmark with small dataset (1e7 rows) for groupby, default file format is csv |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Later, we can clean it up with additional size/format options
like
./bench.sh run h2o_join medium parquet
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion @2010YOUY01 , it's more clear.
benchmarks/bench.sh
Outdated
@@ -775,6 +840,7 @@ data_h2o() { | |||
|
|||
# Set virtual environment directory | |||
VIRTUAL_ENV="${PWD}/venv" | |||
rm -rf "$VIRTUAL_ENV" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a comment for this line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @2010YOUY01 , i removed this line in latest PR, i think this is tested code i added.
Thank you @2010YOUY01 for review! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @zhuqi-lucas! The csv files were taking much of the benchmark time, this should be a nice improvement
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me -- thank you @zhuqi-lucas and @2010YOUY01
I think the CSV tests are quite important as that is what the original benchmark uses (and yes that means it is largely a test of CSV performance)
Thank you @alamb @jonathanc-n for review. |
Which issue does this PR close?
Currently, we only support for CSV format for h2o benchmark, but from the compare with other database result, it is using parquet, so this ticket try to support parquet format benchmark in datafusion.
Details:
#16710 (comment)
cc @alamb @Dandandan @2010YOUY01
Rationale for this change
Currently, we only support for CSV format for h2o benchmark, but from the compare with other database result, it is using parquet, so this ticket try to support parquet format benchmark in datafusion.
What changes are included in this PR?
Currently, we only support for CSV format for h2o benchmark, but from the compare with other database result, it is using parquet, so this ticket try to support parquet format benchmark in datafusion.
Are these changes tested?
Tested group by, join, both ok now.
Are there any user-facing changes?
Yes, new format support.