Skip to content

Commit 26106a2

Browse files
BlakeOrthalamb
andauthored
Auto detect hive column partitioning with ListingTableFactory / CREATE EXTERNAL TABLE (#17232)
* Fix: ListingTableFactory hive column detection - Fixes an issue in the ListingTableFactory where hive columns are not detected and incorporated into the table schema when an explicit schema has not been set by the user - Fixes an issue where subdirectories that do not follow Hive formatting (e.g. key=value) could be erroneously interpreted as contributing to the table schema * Adds configuration, tests, and docs - Adds a configuration option to enable or disable hive partition schema inference - Adds configuration option documentation and unit tests - Adds additional sqllogic tests specifically targeting partitioned listing tables - Adds user guide docs for migration and external table behavior for both the CLI and DDL guides * Fix merge problem * Update slt test * Make upgrade guide more concise * Fixes spelling and doc table reference issues --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent c83466c commit 26106a2

File tree

10 files changed

+288
-17
lines changed

10 files changed

+288
-17
lines changed

datafusion/common/src/config.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -455,6 +455,11 @@ config_namespace! {
455455
/// tables (e.g. `/table/year=2021/month=01/data.parquet`).
456456
pub listing_table_ignore_subdirectory: bool, default = true
457457

458+
/// Should a `ListingTable` created through the `ListingTableFactory` infer table
459+
/// partitions from Hive compliant directories. Defaults to true (partition columns are
460+
/// inferred and will be represented in the table schema).
461+
pub listing_table_factory_infer_partitions: bool, default = true
462+
458463
/// Should DataFusion support recursive CTEs
459464
pub enable_recursive_ctes: bool, default = true
460465

datafusion/core/src/datasource/listing/table.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -802,6 +802,9 @@ impl ListingOptions {
802802
.rev()
803803
.skip(1) // get parents only; skip the file itself
804804
.rev()
805+
// Partitions are expected to follow the format "column_name=value", so we
806+
// should ignore any path part that cannot be parsed into the expected format
807+
.filter(|s| s.contains('='))
805808
.map(|s| s.split('=').take(1).collect())
806809
.collect_vec()
807810
})

datafusion/core/src/datasource/listing_table_factory.rs

Lines changed: 110 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -63,14 +63,39 @@ impl TableProviderFactory for ListingTableFactory {
6363
))?
6464
.create(session_state, &cmd.options)?;
6565

66+
let mut table_path = ListingTableUrl::parse(&cmd.location)?;
67+
let file_extension = match table_path.is_collection() {
68+
// Setting the extension to be empty instead of allowing the default extension seems
69+
// odd, but was done to ensure existing behavior isn't modified. It seems like this
70+
// could be refactored to either use the default extension or set the fully expected
71+
// extension when compression is included (e.g. ".csv.gz")
72+
true => "",
73+
false => &get_extension(cmd.location.as_str()),
74+
};
75+
let mut options = ListingOptions::new(file_format)
76+
.with_session_config_options(session_state.config())
77+
.with_file_extension(file_extension);
78+
6679
let (provided_schema, table_partition_cols) = if cmd.schema.fields().is_empty() {
80+
let infer_parts = session_state
81+
.config_options()
82+
.execution
83+
.listing_table_factory_infer_partitions;
84+
let part_cols = if cmd.table_partition_cols.is_empty() && infer_parts {
85+
options
86+
.infer_partitions(session_state, &table_path)
87+
.await?
88+
.into_iter()
89+
} else {
90+
cmd.table_partition_cols.clone().into_iter()
91+
};
92+
6793
(
6894
None,
69-
cmd.table_partition_cols
70-
.iter()
71-
.map(|x| {
95+
part_cols
96+
.map(|p| {
7297
(
73-
x.clone(),
98+
p,
7499
DataType::Dictionary(
75100
Box::new(DataType::UInt16),
76101
Box::new(DataType::Utf8),
@@ -106,19 +131,7 @@ impl TableProviderFactory for ListingTableFactory {
106131
(Some(schema), table_partition_cols)
107132
};
108133

109-
let mut table_path = ListingTableUrl::parse(&cmd.location)?;
110-
let file_extension = match table_path.is_collection() {
111-
// Setting the extension to be empty instead of allowing the default extension seems
112-
// odd, but was done to ensure existing behavior isn't modified. It seems like this
113-
// could be refactored to either use the default extension or set the fully expected
114-
// extension when compression is included (e.g. ".csv.gz")
115-
true => "",
116-
false => &get_extension(cmd.location.as_str()),
117-
};
118-
let options = ListingOptions::new(file_format)
119-
.with_file_extension(file_extension)
120-
.with_session_config_options(session_state.config())
121-
.with_table_partition_cols(table_partition_cols);
134+
options = options.with_table_partition_cols(table_partition_cols);
122135

123136
options
124137
.validate_partitions(session_state, &table_path)
@@ -192,6 +205,7 @@ fn get_extension(path: &str) -> String {
192205

193206
#[cfg(test)]
194207
mod tests {
208+
use datafusion_execution::config::SessionConfig;
195209
use glob::Pattern;
196210
use std::collections::HashMap;
197211
use std::fs;
@@ -419,4 +433,83 @@ mod tests {
419433
let listing_options = listing_table.options();
420434
assert_eq!("", listing_options.file_extension);
421435
}
436+
437+
#[tokio::test]
438+
async fn test_create_with_hive_partitions() {
439+
let dir = tempfile::tempdir().unwrap();
440+
let mut path = PathBuf::from(dir.path());
441+
path.extend(["key1=value1", "key2=value2"]);
442+
fs::create_dir_all(&path).unwrap();
443+
path.push("data.parquet");
444+
fs::File::create_new(&path).unwrap();
445+
446+
let factory = ListingTableFactory::new();
447+
let context = SessionContext::new();
448+
let state = context.state();
449+
let name = TableReference::bare("foo");
450+
451+
let cmd = CreateExternalTable {
452+
name,
453+
location: dir.path().to_str().unwrap().to_string(),
454+
file_type: "parquet".to_string(),
455+
schema: Arc::new(DFSchema::empty()),
456+
table_partition_cols: vec![],
457+
if_not_exists: false,
458+
temporary: false,
459+
definition: None,
460+
order_exprs: vec![],
461+
unbounded: false,
462+
options: HashMap::new(),
463+
constraints: Constraints::default(),
464+
column_defaults: HashMap::new(),
465+
};
466+
let table_provider = factory.create(&state, &cmd).await.unwrap();
467+
let listing_table = table_provider
468+
.as_any()
469+
.downcast_ref::<ListingTable>()
470+
.unwrap();
471+
472+
let listing_options = listing_table.options();
473+
let dtype =
474+
DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8));
475+
let expected_cols = vec![
476+
(String::from("key1"), dtype.clone()),
477+
(String::from("key2"), dtype.clone()),
478+
];
479+
assert_eq!(expected_cols, listing_options.table_partition_cols);
480+
481+
// Ensure partition detection can be disabled via config
482+
let factory = ListingTableFactory::new();
483+
let mut cfg = SessionConfig::new();
484+
cfg.options_mut()
485+
.execution
486+
.listing_table_factory_infer_partitions = false;
487+
let context = SessionContext::new_with_config(cfg);
488+
let state = context.state();
489+
let name = TableReference::bare("foo");
490+
491+
let cmd = CreateExternalTable {
492+
name,
493+
location: dir.path().to_str().unwrap().to_string(),
494+
file_type: "parquet".to_string(),
495+
schema: Arc::new(DFSchema::empty()),
496+
table_partition_cols: vec![],
497+
if_not_exists: false,
498+
temporary: false,
499+
definition: None,
500+
order_exprs: vec![],
501+
unbounded: false,
502+
options: HashMap::new(),
503+
constraints: Constraints::default(),
504+
column_defaults: HashMap::new(),
505+
};
506+
let table_provider = factory.create(&state, &cmd).await.unwrap();
507+
let listing_table = table_provider
508+
.as_any()
509+
.downcast_ref::<ListingTable>()
510+
.unwrap();
511+
512+
let listing_options = listing_table.options();
513+
assert!(listing_options.table_partition_cols.is_empty());
514+
}
422515
}

datafusion/sqllogictest/test_files/information_schema.slt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ datafusion.execution.collect_statistics true
220220
datafusion.execution.enable_recursive_ctes true
221221
datafusion.execution.enforce_batch_size_in_joins false
222222
datafusion.execution.keep_partition_by_columns false
223+
datafusion.execution.listing_table_factory_infer_partitions true
223224
datafusion.execution.listing_table_ignore_subdirectory true
224225
datafusion.execution.max_buffered_batches_per_output_file 2
225226
datafusion.execution.meta_fetch_concurrency 32
@@ -334,6 +335,7 @@ datafusion.execution.collect_statistics true Should DataFusion collect statistic
334335
datafusion.execution.enable_recursive_ctes true Should DataFusion support recursive CTEs
335336
datafusion.execution.enforce_batch_size_in_joins false Should DataFusion enforce batch size in joins or not. By default, DataFusion will not enforce batch size in joins. Enforcing batch size in joins can reduce memory usage when joining large tables with a highly-selective join filter, but is also slightly slower.
336337
datafusion.execution.keep_partition_by_columns false Should DataFusion keep the columns used for partition_by in the output RecordBatches
338+
datafusion.execution.listing_table_factory_infer_partitions true Should a `ListingTable` created through the `ListingTableFactory` infer table partitions from Hive compliant directories. Defaults to true (partition columns are inferred and will be represented in the table schema).
337339
datafusion.execution.listing_table_ignore_subdirectory true Should sub directories be ignored when scanning directories for data files. Defaults to true (ignores subdirectories), consistent with Hive. Note that this setting does not affect reading partitioned tables (e.g. `/table/year=2021/month=01/data.parquet`).
338340
datafusion.execution.max_buffered_batches_per_output_file 2 This is the maximum number of RecordBatches buffered for each output file being worked. Higher values can potentially give faster write performance at the cost of higher peak memory consumption
339341
datafusion.execution.meta_fetch_concurrency 32 Number of files to read in parallel when inferring schema and statistics

datafusion/sqllogictest/test_files/insert_to_external.slt

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,34 @@ select * from partitioned_insert_test order by a,b,c
175175
1 20 200
176176
2 20 200
177177

178+
statement count 0
179+
CREATE EXTERNAL TABLE
180+
partitioned_insert_test_readback
181+
STORED AS csv
182+
LOCATION 'test_files/scratch/insert_to_external/insert_to_partitioned/';
183+
184+
query TTT
185+
describe partitioned_insert_test_readback;
186+
----
187+
c Int64 YES
188+
a Dictionary(UInt16, Utf8) NO
189+
b Dictionary(UInt16, Utf8) NO
190+
191+
query ITT
192+
select * from partitioned_insert_test_readback order by a,b,c;
193+
----
194+
1 10 100
195+
1 10 200
196+
1 20 100
197+
2 20 100
198+
1 20 200
199+
2 20 200
200+
201+
query I
202+
select count(*) from partitioned_insert_test_readback where b=100;
203+
----
204+
3
205+
178206
statement ok
179207
CREATE EXTERNAL TABLE
180208
partitioned_insert_test_verify(c bigint)
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
query I
19+
copy (values('foo'), ('bar'))
20+
to 'test_files/scratch/listing_table_partitions/single_part/a=1/file1.parquet';
21+
----
22+
2
23+
24+
query I
25+
copy (values('baz'))
26+
to 'test_files/scratch/listing_table_partitions/single_part/a=1/file2.parquet';
27+
----
28+
1
29+
30+
statement count 0
31+
create external table single_part
32+
stored as parquet location 'test_files/scratch/listing_table_partitions/single_part/';
33+
34+
query TT
35+
select * from single_part order by (column1);
36+
----
37+
bar 1
38+
baz 1
39+
foo 1
40+
41+
query I
42+
copy (values('foo'), ('bar')) to 'test_files/scratch/listing_table_partitions/multi_part/a=1/b=100/file1.parquet';
43+
----
44+
2
45+
46+
query I
47+
copy (values('baz')) to 'test_files/scratch/listing_table_partitions/multi_part/a=1/b=200/file1.parquet';
48+
----
49+
1
50+
51+
statement count 0
52+
create external table multi_part
53+
stored as parquet location 'test_files/scratch/listing_table_partitions/multi_part/';
54+
55+
query TTT
56+
select * from multi_part where b=200;
57+
----
58+
baz 1 200
59+
60+
statement count 0
61+
set datafusion.execution.listing_table_factory_infer_partitions = false;
62+
63+
statement count 0
64+
create external table infer_disabled
65+
stored as parquet location 'test_files/scratch/listing_table_partitions/multi_part/';
66+
67+
query T
68+
select * from infer_disabled order by (column1);
69+
----
70+
bar
71+
baz
72+
foo
73+
74+
statement count 0
75+
set datafusion.execution.listing_table_factory_infer_partitions = true;

docs/source/library-user-guide/upgrading.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,17 @@
2424
**Note:** DataFusion `50.0.0` has not been released yet. The information provided in this section pertains to features and changes that have already been merged to the main branch and are awaiting release in this version.
2525
You can see the current [status of the `50.0.0 `release here](https://github.com/apache/datafusion/issues/16799)
2626

27+
### ListingTable automatically detects Hive Partitioned tables
28+
29+
DataFusion 50.0.0 automatically infers Hive partitions when using the `ListingTableFactory` and `CREATE EXTERNAL TABLE`. Previously,
30+
when creating a `ListingTable`, datasets that use Hive partitioning (e.g.
31+
`/table_root/column1=value1/column2=value2/data.parquet`) would not have the Hive columns reflected in
32+
the table's schema or data. The previous behavior can be
33+
restored by setting the `datafusion.execution.listing_table_factory_infer_partitions` configuration option to `false`.
34+
See [issue #17049] for more details.
35+
36+
[issue #17049]: https://github.com/apache/datafusion/issues/17049
37+
2738
### `MSRV` updated to 1.86.0
2839

2940
The Minimum Supported Rust Version (MSRV) has been updated to [`1.86.0`].

docs/source/user-guide/cli/datasources.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,30 @@ STORED AS PARQUET
162162
LOCATION 'gs://bucket/my_table/';
163163
```
164164
165+
When specifying a directory path that has a Hive compliant partition structure, by default, DataFusion CLI will
166+
automatically parse and incorporate the Hive columns and their values into the table's schema and data. Given the
167+
following remote object paths:
168+
169+
```console
170+
gs://bucket/my_table/a=1/b=100/file1.parquet
171+
gs://bucket/my_table/a=2/b=200/file2.parquet
172+
```
173+
174+
`my_table` can be queried and filtered on the Hive columns:
175+
176+
```sql
177+
CREATE EXTERNAL TABLE my_table
178+
STORED AS PARQUET
179+
LOCATION 'gs://bucket/my_table/';
180+
181+
SELECT count(*) FROM my_table WHERE b=200;
182+
+----------+
183+
| count(*) |
184+
+----------+
185+
| 1 |
186+
+----------+
187+
```
188+
165189
# Formats
166190
167191
## Parquet

0 commit comments

Comments
 (0)