Skip to content

Commit e21ca2e

Browse files
committed
Remove SchemaAdapter
1 parent 899a762 commit e21ca2e

File tree

26 files changed

+36
-2407
lines changed

26 files changed

+36
-2407
lines changed

datafusion-examples/examples/custom_data_source/default_column_values.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,8 @@ const DEFAULT_VALUE_METADATA_KEY: &str = "example.default_value";
6363
/// 4. Use the DefaultPhysicalExprAdapter as a fallback for standard schema adaptation
6464
/// 5. Convert string default values to proper types using `ScalarValue::cast_to()` at planning time
6565
///
66-
/// Important: PhysicalExprAdapter is specifically designed for rewriting filter predicates
67-
/// that get pushed down to file scans. For handling missing columns in projections,
68-
/// other mechanisms in DataFusion are used (like SchemaAdapter).
66+
/// Important: PhysicalExprAdapter handles rewriting both filter predicates and projection
67+
/// expressions for file scans, including handling missing columns.
6968
///
7069
/// The metadata-based approach provides a flexible way to store default values as strings
7170
/// and cast them to the appropriate types at planning time, avoiding runtime overhead.

datafusion-examples/examples/data_io/parquet_index.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -511,7 +511,7 @@ impl ParquetMetadataIndexBuilder {
511511

512512
// Get the schema of the file. A real system might have to handle the
513513
// case where the schema of the file is not the same as the schema of
514-
// the other files e.g. using SchemaAdapter.
514+
// the other files e.g. using PhysicalExprAdapterFactory.
515515
if self.file_schema.is_none() {
516516
self.file_schema = Some(reader.schema().clone());
517517
}

datafusion/catalog-listing/src/config.rs

Lines changed: 3 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ use datafusion_catalog::Session;
2121
use datafusion_common::{config_err, internal_err};
2222
use datafusion_datasource::ListingTableUrl;
2323
use datafusion_datasource::file_compression_type::FileCompressionType;
24-
use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
2524
use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
2625
use std::str::FromStr;
2726
use std::sync::Arc;
@@ -44,15 +43,12 @@ pub enum SchemaSource {
4443
/// # Schema Evolution Support
4544
///
4645
/// This configuration supports schema evolution through the optional
47-
/// [`SchemaAdapterFactory`]. You might want to override the default factory when you need:
46+
/// [`PhysicalExprAdapterFactory`]. You might want to override the default factory when you need:
4847
///
4948
/// - **Type coercion requirements**: When you need custom logic for converting between
5049
/// different Arrow data types (e.g., Int32 ↔ Int64, Utf8 ↔ LargeUtf8)
5150
/// - **Column mapping**: You need to map columns with a legacy name to a new name
5251
/// - **Custom handling of missing columns**: By default they are filled in with nulls, but you may e.g. want to fill them in with `0` or `""`.
53-
///
54-
/// If not specified, a [`datafusion_datasource::schema_adapter::DefaultSchemaAdapterFactory`]
55-
/// will be used, which handles basic schema compatibility cases.
5652
#[derive(Debug, Clone, Default)]
5753
pub struct ListingTableConfig {
5854
/// Paths on the `ObjectStore` for creating [`crate::ListingTable`].
@@ -68,8 +64,6 @@ pub struct ListingTableConfig {
6864
pub options: Option<ListingOptions>,
6965
/// Tracks the source of the schema information
7066
pub(crate) schema_source: SchemaSource,
71-
/// Optional [`SchemaAdapterFactory`] for creating schema adapters
72-
pub(crate) schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
7367
/// Optional [`PhysicalExprAdapterFactory`] for creating physical expression adapters
7468
pub(crate) expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
7569
}
@@ -218,8 +212,7 @@ impl ListingTableConfig {
218212
file_schema,
219213
options: _,
220214
schema_source,
221-
schema_adapter_factory,
222-
expr_adapter_factory: physical_expr_adapter_factory,
215+
expr_adapter_factory,
223216
} = self;
224217

225218
let (schema, new_schema_source) = match file_schema {
@@ -241,8 +234,7 @@ impl ListingTableConfig {
241234
file_schema: Some(schema),
242235
options: Some(options),
243236
schema_source: new_schema_source,
244-
schema_adapter_factory,
245-
expr_adapter_factory: physical_expr_adapter_factory,
237+
expr_adapter_factory,
246238
})
247239
}
248240
None => internal_err!("No `ListingOptions` set for inferring schema"),
@@ -282,71 +274,18 @@ impl ListingTableConfig {
282274
file_schema: self.file_schema,
283275
options: Some(options),
284276
schema_source: self.schema_source,
285-
schema_adapter_factory: self.schema_adapter_factory,
286277
expr_adapter_factory: self.expr_adapter_factory,
287278
})
288279
}
289280
None => config_err!("No `ListingOptions` set for inferring schema"),
290281
}
291282
}
292283

293-
/// Set the [`SchemaAdapterFactory`] for the [`crate::ListingTable`]
294-
///
295-
/// The schema adapter factory is used to create schema adapters that can
296-
/// handle schema evolution and type conversions when reading files with
297-
/// different schemas than the table schema.
298-
///
299-
/// If not provided, a default schema adapter factory will be used.
300-
///
301-
/// # Example: Custom Schema Adapter for Type Coercion
302-
/// ```rust
303-
/// # use std::sync::Arc;
304-
/// # use datafusion_catalog_listing::{ListingTableConfig, ListingOptions};
305-
/// # use datafusion_datasource::schema_adapter::{SchemaAdapterFactory, SchemaAdapter};
306-
/// # use datafusion_datasource::ListingTableUrl;
307-
/// # use datafusion_datasource_parquet::file_format::ParquetFormat;
308-
/// # use arrow::datatypes::{SchemaRef, Schema, Field, DataType};
309-
/// #
310-
/// # #[derive(Debug)]
311-
/// # struct MySchemaAdapterFactory;
312-
/// # impl SchemaAdapterFactory for MySchemaAdapterFactory {
313-
/// # fn create(&self, _projected_table_schema: SchemaRef, _file_schema: SchemaRef) -> Box<dyn SchemaAdapter> {
314-
/// # unimplemented!()
315-
/// # }
316-
/// # }
317-
/// # let table_paths = ListingTableUrl::parse("file:///path/to/data").unwrap();
318-
/// # let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default()));
319-
/// # let table_schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
320-
/// let config = ListingTableConfig::new(table_paths)
321-
/// .with_listing_options(listing_options)
322-
/// .with_schema(table_schema)
323-
/// .with_schema_adapter_factory(Arc::new(MySchemaAdapterFactory));
324-
/// ```
325-
pub fn with_schema_adapter_factory(
326-
self,
327-
schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
328-
) -> Self {
329-
Self {
330-
schema_adapter_factory: Some(schema_adapter_factory),
331-
..self
332-
}
333-
}
334-
335-
/// Get the [`SchemaAdapterFactory`] for this configuration
336-
pub fn schema_adapter_factory(&self) -> Option<&Arc<dyn SchemaAdapterFactory>> {
337-
self.schema_adapter_factory.as_ref()
338-
}
339-
340284
/// Set the [`PhysicalExprAdapterFactory`] for the [`crate::ListingTable`]
341285
///
342286
/// The expression adapter factory is used to create physical expression adapters that can
343287
/// handle schema evolution and type conversions when evaluating expressions
344288
/// with different schemas than the table schema.
345-
///
346-
/// If not provided, a default physical expression adapter factory will be used unless a custom
347-
/// `SchemaAdapterFactory` is set, in which case only the `SchemaAdapterFactory` will be used.
348-
///
349-
/// See <https://github.com/apache/datafusion/issues/16800> for details on this transition.
350289
pub fn with_expr_adapter_factory(
351290
self,
352291
expr_adapter_factory: Arc<dyn PhysicalExprAdapterFactory>,

datafusion/catalog-listing/src/table.rs

Lines changed: 4 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ use datafusion_datasource::file::FileSource;
2929
use datafusion_datasource::file_groups::FileGroup;
3030
use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
3131
use datafusion_datasource::file_sink_config::FileSinkConfig;
32-
use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
3332
use datafusion_datasource::{
3433
ListingTableUrl, PartitionedFile, TableSchema, compute_all_files_statistics,
3534
};
@@ -191,8 +190,6 @@ pub struct ListingTable {
191190
constraints: Constraints,
192191
/// Column default expressions for columns that are not physically present in the data files
193192
column_defaults: HashMap<String, Expr>,
194-
/// Optional [`SchemaAdapterFactory`] for creating schema adapters
195-
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
196193
/// Optional [`PhysicalExprAdapterFactory`] for creating physical expression adapters
197194
expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
198195
}
@@ -235,7 +232,6 @@ impl ListingTable {
235232
collected_statistics: Arc::new(DefaultFileStatisticsCache::default()),
236233
constraints: Constraints::default(),
237234
column_defaults: HashMap::new(),
238-
schema_adapter_factory: config.schema_adapter_factory,
239235
expr_adapter_factory: config.expr_adapter_factory,
240236
};
241237

@@ -290,48 +286,8 @@ impl ListingTable {
290286
self.schema_source
291287
}
292288

293-
/// Set the [`SchemaAdapterFactory`] for this [`ListingTable`]
294-
///
295-
/// The schema adapter factory is used to create schema adapters that can
296-
/// handle schema evolution and type conversions when reading files with
297-
/// different schemas than the table schema.
298-
///
299-
/// # Example: Adding Schema Evolution Support
300-
/// ```rust
301-
/// # use std::sync::Arc;
302-
/// # use datafusion_catalog_listing::{ListingTable, ListingTableConfig, ListingOptions};
303-
/// # use datafusion_datasource::ListingTableUrl;
304-
/// # use datafusion_datasource::schema_adapter::{DefaultSchemaAdapterFactory, SchemaAdapter};
305-
/// # use datafusion_datasource_parquet::file_format::ParquetFormat;
306-
/// # use arrow::datatypes::{SchemaRef, Schema, Field, DataType};
307-
/// # let table_path = ListingTableUrl::parse("file:///path/to/data").unwrap();
308-
/// # let options = ListingOptions::new(Arc::new(ParquetFormat::default()));
309-
/// # let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
310-
/// # let config = ListingTableConfig::new(table_path).with_listing_options(options).with_schema(schema);
311-
/// # let table = ListingTable::try_new(config).unwrap();
312-
/// let table_with_evolution = table
313-
/// .with_schema_adapter_factory(Arc::new(DefaultSchemaAdapterFactory));
314-
/// ```
315-
/// See [`ListingTableConfig::with_schema_adapter_factory`] for an example of custom SchemaAdapterFactory.
316-
pub fn with_schema_adapter_factory(
317-
self,
318-
schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
319-
) -> Self {
320-
Self {
321-
schema_adapter_factory: Some(schema_adapter_factory),
322-
..self
323-
}
324-
}
325-
326-
/// Get the [`SchemaAdapterFactory`] for this table
327-
pub fn schema_adapter_factory(&self) -> Option<&Arc<dyn SchemaAdapterFactory>> {
328-
self.schema_adapter_factory.as_ref()
329-
}
330-
331-
/// Creates a file source and applies schema adapter factory if available
332-
fn create_file_source_with_schema_adapter(
333-
&self,
334-
) -> datafusion_common::Result<Arc<dyn FileSource>> {
289+
/// Creates a file source for this table
290+
fn create_file_source(&self) -> Arc<dyn FileSource> {
335291
let table_schema = TableSchema::new(
336292
Arc::clone(&self.file_schema),
337293
self.options
@@ -341,13 +297,7 @@ impl ListingTable {
341297
.collect(),
342298
);
343299

344-
let mut source = self.options.format.file_source(table_schema);
345-
// Apply schema adapter to source if available.
346-
// The source will use this SchemaAdapter to adapt data batches as they flow up the plan.
347-
if let Some(factory) = &self.schema_adapter_factory {
348-
source = source.with_schema_adapter_factory(Arc::clone(factory))?;
349-
}
350-
Ok(source)
300+
self.options.format.file_source(table_schema)
351301
}
352302

353303
/// If file_sort_order is specified, creates the appropriate physical expressions
@@ -490,7 +440,7 @@ impl TableProvider for ListingTable {
490440
)))));
491441
};
492442

493-
let file_source = self.create_file_source_with_schema_adapter()?;
443+
let file_source = self.create_file_source();
494444

495445
// create the execution plan
496446
let plan = self

datafusion/core/src/datasource/listing/table.rs

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1443,11 +1443,10 @@ mod tests {
14431443
}
14441444

14451445
#[tokio::test]
1446-
async fn test_statistics_mapping_with_default_factory() -> Result<()> {
1446+
async fn test_basic_table_scan() -> Result<()> {
14471447
let ctx = SessionContext::new();
14481448

1449-
// Create a table without providing a custom schema adapter factory
1450-
// This should fall back to using DefaultSchemaAdapterFactory
1449+
// Test basic table creation and scanning
14511450
let path = "table/file.json";
14521451
register_test_store(&ctx, &[(path, 10)]);
14531452

@@ -1459,25 +1458,18 @@ mod tests {
14591458
let config = ListingTableConfig::new(table_path)
14601459
.with_listing_options(opt)
14611460
.with_schema(Arc::new(schema));
1462-
// Note: NOT calling .with_schema_adapter_factory() to test default behavior
14631461

14641462
let table = ListingTable::try_new(config)?;
14651463

1466-
// Verify that no custom schema adapter factory is set
1467-
assert!(table.schema_adapter_factory().is_none());
1468-
1469-
// The scan should work correctly with the default schema adapter
1464+
// The scan should work correctly
14701465
let scan_result = table.scan(&ctx.state(), None, &[], None).await;
1471-
assert!(
1472-
scan_result.is_ok(),
1473-
"Scan should succeed with default schema adapter"
1474-
);
1466+
assert!(scan_result.is_ok(), "Scan should succeed");
14751467

1476-
// Verify that the default adapter handles basic schema compatibility
1468+
// Verify file listing works
14771469
let result = table.list_files_for_scan(&ctx.state(), &[], None).await?;
14781470
assert!(
14791471
!result.file_groups.is_empty(),
1480-
"Should list files successfully with default adapter"
1472+
"Should list files successfully"
14811473
);
14821474

14831475
Ok(())

datafusion/core/src/datasource/mod.rs

Lines changed: 1 addition & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ pub use datafusion_catalog::default_table_source;
4242
pub use datafusion_catalog::memory;
4343
pub use datafusion_catalog::stream;
4444
pub use datafusion_catalog::view;
45-
pub use datafusion_datasource::schema_adapter;
4645
pub use datafusion_datasource::sink;
4746
pub use datafusion_datasource::source;
4847
pub use datafusion_datasource::table_schema;
@@ -60,15 +59,12 @@ mod tests {
6059
record_batch::RecordBatch,
6160
};
6261
use datafusion_common::{
63-
record_batch,
6462
test_util::batches_to_sort_string,
6563
tree_node::{Transformed, TransformedResult, TreeNode},
6664
Result, ScalarValue,
6765
};
6866
use datafusion_datasource::{
69-
file_scan_config::FileScanConfigBuilder,
70-
schema_adapter::DefaultSchemaAdapterFactory, source::DataSourceExec,
71-
PartitionedFile,
67+
file_scan_config::FileScanConfigBuilder, source::DataSourceExec, PartitionedFile,
7268
};
7369
use datafusion_datasource_parquet::source::ParquetSource;
7470
use datafusion_physical_expr::expressions::{Column, Literal};
@@ -151,58 +147,6 @@ mod tests {
151147
"###);
152148
}
153149

154-
#[test]
155-
fn default_schema_adapter() {
156-
let table_schema = Schema::new(vec![
157-
Field::new("a", DataType::Int32, true),
158-
Field::new("b", DataType::Utf8, true),
159-
]);
160-
161-
// file has a subset of the table schema fields and different type
162-
let file_schema = Schema::new(vec![
163-
Field::new("c", DataType::Float64, true), // not in table schema
164-
Field::new("b", DataType::Float64, true),
165-
]);
166-
167-
let adapter = DefaultSchemaAdapterFactory::from_schema(Arc::new(table_schema));
168-
let (mapper, indices) = adapter.map_schema(&file_schema).unwrap();
169-
assert_eq!(indices, vec![1]);
170-
171-
let file_batch = record_batch!(("b", Float64, vec![1.0, 2.0])).unwrap();
172-
173-
let mapped_batch = mapper.map_batch(file_batch).unwrap();
174-
175-
// the mapped batch has the correct schema and the "b" column has been cast to Utf8
176-
let expected_batch = record_batch!(
177-
("a", Int32, vec![None, None]), // missing column filled with nulls
178-
("b", Utf8, vec!["1.0", "2.0"]) // b was cast to string and order was changed
179-
)
180-
.unwrap();
181-
assert_eq!(mapped_batch, expected_batch);
182-
}
183-
184-
#[test]
185-
fn default_schema_adapter_non_nullable_columns() {
186-
let table_schema = Schema::new(vec![
187-
Field::new("a", DataType::Int32, false), // "a"" is declared non nullable
188-
Field::new("b", DataType::Utf8, true),
189-
]);
190-
let file_schema = Schema::new(vec![
191-
// since file doesn't have "a" it will be filled with nulls
192-
Field::new("b", DataType::Float64, true),
193-
]);
194-
195-
let adapter = DefaultSchemaAdapterFactory::from_schema(Arc::new(table_schema));
196-
let (mapper, indices) = adapter.map_schema(&file_schema).unwrap();
197-
assert_eq!(indices, vec![0]);
198-
199-
let file_batch = record_batch!(("b", Float64, vec![1.0, 2.0])).unwrap();
200-
201-
// Mapping fails because it tries to fill in a non-nullable column with nulls
202-
let err = mapper.map_batch(file_batch).unwrap_err().to_string();
203-
assert!(err.contains("Invalid argument error: Column 'a' is declared as non-nullable but contains null values"), "{err}");
204-
}
205-
206150
#[derive(Debug)]
207151
struct TestPhysicalExprAdapterFactory;
208152

0 commit comments

Comments
 (0)