Skip to content

Commit 689d67d

Browse files
authored
Merge branch 'main' into support_parquet_for_h2o
2 parents da1a31d + 63dd4e2 commit 689d67d

File tree

23 files changed

+1396
-222
lines changed

23 files changed

+1396
-222
lines changed

datafusion-examples/examples/default_column_values.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ impl TableProvider for DefaultValueTableProvider {
263263
.with_projection(projection.cloned())
264264
.with_limit(limit)
265265
.with_file_group(file_group)
266-
.with_expr_adapter(Arc::new(DefaultValuePhysicalExprAdapterFactory) as _);
266+
.with_expr_adapter(Some(Arc::new(DefaultValuePhysicalExprAdapterFactory) as _));
267267

268268
Ok(Arc::new(DataSourceExec::new(Arc::new(
269269
file_scan_config.build(),

datafusion-examples/examples/json_shredding.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ impl TableProvider for ExampleTableProvider {
273273
.with_limit(limit)
274274
.with_file_group(file_group)
275275
// if the rewriter needs a reference to the table schema you can bind self.schema() here
276-
.with_expr_adapter(Arc::new(ShreddedJsonRewriterFactory) as _);
276+
.with_expr_adapter(Some(Arc::new(ShreddedJsonRewriterFactory) as _));
277277

278278
Ok(Arc::new(DataSourceExec::new(Arc::new(
279279
file_scan_config.build(),

datafusion/core/src/datasource/listing/table.rs

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ use datafusion_execution::{
4848
use datafusion_expr::{
4949
dml::InsertOp, Expr, SortExpr, TableProviderFilterPushDown, TableType,
5050
};
51+
use datafusion_physical_expr::schema_rewriter::PhysicalExprAdapterFactory;
5152
use datafusion_physical_expr_common::sort_expr::LexOrdering;
5253
use datafusion_physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics};
5354
use futures::{future, stream, Stream, StreamExt, TryStreamExt};
@@ -99,6 +100,8 @@ pub struct ListingTableConfig {
99100
schema_source: SchemaSource,
100101
/// Optional [`SchemaAdapterFactory`] for creating schema adapters
101102
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
103+
/// Optional [`PhysicalExprAdapterFactory`] for creating physical expression adapters
104+
physical_expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
102105
}
103106

104107
impl ListingTableConfig {
@@ -281,6 +284,7 @@ impl ListingTableConfig {
281284
options: Some(listing_options),
282285
schema_source: self.schema_source,
283286
schema_adapter_factory: self.schema_adapter_factory,
287+
physical_expr_adapter_factory: self.physical_expr_adapter_factory,
284288
})
285289
}
286290

@@ -300,6 +304,7 @@ impl ListingTableConfig {
300304
options: _,
301305
schema_source,
302306
schema_adapter_factory,
307+
physical_expr_adapter_factory,
303308
} = self;
304309

305310
let (schema, new_schema_source) = match file_schema {
@@ -322,6 +327,7 @@ impl ListingTableConfig {
322327
options: Some(options),
323328
schema_source: new_schema_source,
324329
schema_adapter_factory,
330+
physical_expr_adapter_factory,
325331
})
326332
}
327333
None => internal_err!("No `ListingOptions` set for inferring schema"),
@@ -364,6 +370,7 @@ impl ListingTableConfig {
364370
options: Some(options),
365371
schema_source: self.schema_source,
366372
schema_adapter_factory: self.schema_adapter_factory,
373+
physical_expr_adapter_factory: self.physical_expr_adapter_factory,
367374
})
368375
}
369376
None => config_err!("No `ListingOptions` set for inferring schema"),
@@ -415,6 +422,26 @@ impl ListingTableConfig {
415422
pub fn schema_adapter_factory(&self) -> Option<&Arc<dyn SchemaAdapterFactory>> {
416423
self.schema_adapter_factory.as_ref()
417424
}
425+
426+
/// Set the [`PhysicalExprAdapterFactory`] for the [`ListingTable`]
427+
///
428+
/// The expression adapter factory is used to create physical expression adapters that can
429+
/// handle schema evolution and type conversions when evaluating expressions
430+
/// with different schemas than the table schema.
431+
///
432+
/// If not provided, a default physical expression adapter factory will be used unless a custom
433+
/// `SchemaAdapterFactory` is set, in which case only the `SchemaAdapterFactory` will be used.
434+
///
435+
/// See <https://github.com/apache/datafusion/issues/16800> for details on this transition.
436+
pub fn with_physical_expr_adapter_factory(
437+
self,
438+
physical_expr_adapter_factory: Arc<dyn PhysicalExprAdapterFactory>,
439+
) -> Self {
440+
Self {
441+
physical_expr_adapter_factory: Some(physical_expr_adapter_factory),
442+
..self
443+
}
444+
}
418445
}
419446

420447
/// Options for creating a [`ListingTable`]
@@ -911,6 +938,8 @@ pub struct ListingTable {
911938
column_defaults: HashMap<String, Expr>,
912939
/// Optional [`SchemaAdapterFactory`] for creating schema adapters
913940
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
941+
/// Optional [`PhysicalExprAdapterFactory`] for creating physical expression adapters
942+
expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
914943
}
915944

916945
impl ListingTable {
@@ -952,6 +981,7 @@ impl ListingTable {
952981
constraints: Constraints::default(),
953982
column_defaults: HashMap::new(),
954983
schema_adapter_factory: config.schema_adapter_factory,
984+
expr_adapter_factory: config.physical_expr_adapter_factory,
955985
};
956986

957987
Ok(table)
@@ -1196,6 +1226,7 @@ impl TableProvider for ListingTable {
11961226
.with_limit(limit)
11971227
.with_output_ordering(output_ordering)
11981228
.with_table_partition_cols(table_partition_cols)
1229+
.with_expr_adapter(self.expr_adapter_factory.clone())
11991230
.build(),
12001231
)
12011232
.await
@@ -1995,7 +2026,6 @@ mod tests {
19952026
#[tokio::test]
19962027
async fn test_insert_into_append_new_parquet_files_session_overrides() -> Result<()> {
19972028
let mut config_map: HashMap<String, String> = HashMap::new();
1998-
config_map.insert("datafusion.execution.batch_size".into(), "10".into());
19992029
config_map.insert(
20002030
"datafusion.execution.soft_max_rows_per_output_file".into(),
20012031
"10".into(),
@@ -2060,7 +2090,7 @@ mod tests {
20602090
"datafusion.execution.parquet.write_batch_size".into(),
20612091
"5".into(),
20622092
);
2063-
config_map.insert("datafusion.execution.batch_size".into(), "1".into());
2093+
config_map.insert("datafusion.execution.batch_size".into(), "10".into());
20642094
helper_test_append_new_files_to_table(
20652095
ParquetFormat::default().get_ext(),
20662096
FileCompressionType::UNCOMPRESSED,
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use arrow::{
19+
array::{ArrayRef, Int32Array},
20+
datatypes::{DataType, Field, Schema},
21+
record_batch::RecordBatch,
22+
};
23+
use datafusion_datasource::memory::MemorySourceConfig;
24+
use datafusion_execution::TaskContext;
25+
use datafusion_physical_plan::{common::collect, ExecutionPlan};
26+
use std::sync::Arc;
27+
28+
/// Helper function to create a memory source with the given batch size and collect all batches
29+
async fn create_and_collect_batches(
30+
batch_size: usize,
31+
) -> datafusion_common::Result<Vec<RecordBatch>> {
32+
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
33+
let array = Int32Array::from_iter_values(0..batch_size as i32);
34+
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array) as ArrayRef])?;
35+
let exec = MemorySourceConfig::try_new_exec(&[vec![batch]], schema, None)?;
36+
let ctx = Arc::new(TaskContext::default());
37+
let stream = exec.execute(0, ctx)?;
38+
collect(stream).await
39+
}
40+
41+
/// Helper function to create a memory source with multiple batches and collect all results
42+
async fn create_and_collect_multiple_batches(
43+
input_batches: Vec<RecordBatch>,
44+
) -> datafusion_common::Result<Vec<RecordBatch>> {
45+
let schema = input_batches[0].schema();
46+
let exec = MemorySourceConfig::try_new_exec(&[input_batches], schema, None)?;
47+
let ctx = Arc::new(TaskContext::default());
48+
let stream = exec.execute(0, ctx)?;
49+
collect(stream).await
50+
}
51+
52+
#[tokio::test]
53+
async fn datasource_splits_large_batches() -> datafusion_common::Result<()> {
54+
let batch_size = 20000;
55+
let batches = create_and_collect_batches(batch_size).await?;
56+
57+
assert!(batches.len() > 1);
58+
let max = batches.iter().map(|b| b.num_rows()).max().unwrap();
59+
assert!(
60+
max <= datafusion_execution::config::SessionConfig::new()
61+
.options()
62+
.execution
63+
.batch_size
64+
);
65+
let total: usize = batches.iter().map(|b| b.num_rows()).sum();
66+
assert_eq!(total, batch_size);
67+
Ok(())
68+
}
69+
70+
#[tokio::test]
71+
async fn datasource_exact_batch_size_no_split() -> datafusion_common::Result<()> {
72+
let session_config = datafusion_execution::config::SessionConfig::new();
73+
let configured_batch_size = session_config.options().execution.batch_size;
74+
75+
let batches = create_and_collect_batches(configured_batch_size).await?;
76+
77+
// Should not split when exactly equal to batch_size
78+
assert_eq!(batches.len(), 1);
79+
assert_eq!(batches[0].num_rows(), configured_batch_size);
80+
Ok(())
81+
}
82+
83+
#[tokio::test]
84+
async fn datasource_small_batch_no_split() -> datafusion_common::Result<()> {
85+
// Test with batch smaller than the batch size (8192)
86+
let small_batch_size = 512; // Less than 8192
87+
88+
let batches = create_and_collect_batches(small_batch_size).await?;
89+
90+
// Should not split small batches below the batch size
91+
assert_eq!(batches.len(), 1);
92+
assert_eq!(batches[0].num_rows(), small_batch_size);
93+
Ok(())
94+
}
95+
96+
#[tokio::test]
97+
async fn datasource_empty_batch_clean_termination() -> datafusion_common::Result<()> {
98+
let batches = create_and_collect_batches(0).await?;
99+
100+
// Empty batch should result in one empty batch
101+
assert_eq!(batches.len(), 1);
102+
assert_eq!(batches[0].num_rows(), 0);
103+
Ok(())
104+
}
105+
106+
#[tokio::test]
107+
async fn datasource_multiple_empty_batches() -> datafusion_common::Result<()> {
108+
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
109+
let empty_array = Int32Array::from_iter_values(std::iter::empty::<i32>());
110+
let empty_batch =
111+
RecordBatch::try_new(schema.clone(), vec![Arc::new(empty_array) as ArrayRef])?;
112+
113+
// Create multiple empty batches
114+
let input_batches = vec![empty_batch.clone(), empty_batch.clone(), empty_batch];
115+
let batches = create_and_collect_multiple_batches(input_batches).await?;
116+
117+
// Should preserve empty batches without issues
118+
assert_eq!(batches.len(), 3);
119+
for batch in &batches {
120+
assert_eq!(batch.num_rows(), 0);
121+
}
122+
Ok(())
123+
}

datafusion/core/tests/execution/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,5 @@
1616
// under the License.
1717

1818
mod coop;
19+
mod datasource_split;
1920
mod logical_plan;

datafusion/core/tests/integration_tests/schema_adapter_integration_tests.rs

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,70 @@ async fn test_parquet_integration_with_schema_adapter() -> Result<()> {
148148
Ok(())
149149
}
150150

151+
#[cfg(feature = "parquet")]
152+
#[tokio::test]
153+
async fn test_parquet_integration_with_schema_adapter_and_expression_rewriter() -> Result<()> {
154+
// Create a temporary directory for our test file
155+
let tmp_dir = TempDir::new()?;
156+
let file_path = tmp_dir.path().join("test.parquet");
157+
let file_path_str = file_path.to_str().unwrap();
158+
159+
// Create test data
160+
let schema = Arc::new(Schema::new(vec![
161+
Field::new("id", DataType::Int32, false),
162+
Field::new("name", DataType::Utf8, true),
163+
]));
164+
165+
let batch = RecordBatch::try_new(
166+
schema.clone(),
167+
vec![
168+
Arc::new(arrow::array::Int32Array::from(vec![1, 2, 3])),
169+
Arc::new(arrow::array::StringArray::from(vec!["a", "b", "c"])),
170+
],
171+
)?;
172+
173+
// Write test parquet file
174+
let file = std::fs::File::create(file_path_str)?;
175+
let props = WriterProperties::builder().build();
176+
let mut writer = ArrowWriter::try_new(file, schema.clone(), Some(props))?;
177+
writer.write(&batch)?;
178+
writer.close()?;
179+
180+
// Create a session context
181+
let ctx = SessionContext::new();
182+
183+
// Create a ParquetSource with the adapter factory
184+
let source = ParquetSource::default()
185+
.with_schema_adapter_factory(Arc::new(UppercaseAdapterFactory {}));
186+
187+
// Create a scan config
188+
let config = FileScanConfigBuilder::new(
189+
ObjectStoreUrl::parse(&format!("file://{}", file_path_str))?,
190+
schema.clone(),
191+
)
192+
.with_source(source)
193+
.build();
194+
195+
// Create a data source executor
196+
let exec = DataSourceExec::from_data_source(config);
197+
198+
// Collect results
199+
let task_ctx = ctx.task_ctx();
200+
let stream = exec.execute(0, task_ctx)?;
201+
let batches = datafusion::physical_plan::common::collect(stream).await?;
202+
203+
// There should be one batch
204+
assert_eq!(batches.len(), 1);
205+
206+
// Verify the schema has uppercase column names
207+
let result_schema = batches[0].schema();
208+
assert_eq!(result_schema.field(0).name(), "ID");
209+
assert_eq!(result_schema.field(1).name(), "NAME");
210+
211+
Ok(())
212+
}
213+
214+
151215
#[tokio::test]
152216
async fn test_multi_source_schema_adapter_reuse() -> Result<()> {
153217
// This test verifies that the same schema adapter factory can be reused

datafusion/core/tests/parquet/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ mod filter_pushdown;
5050
mod page_pruning;
5151
mod row_group_pruning;
5252
mod schema;
53+
mod schema_adapter;
5354
mod schema_coercion;
5455
mod utils;
5556

0 commit comments

Comments
 (0)