Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,8 @@ const DEFAULT_VALUE_METADATA_KEY: &str = "example.default_value";
/// 4. Use the DefaultPhysicalExprAdapter as a fallback for standard schema adaptation
/// 5. Convert string default values to proper types using `ScalarValue::cast_to()` at planning time
///
/// Important: PhysicalExprAdapter is specifically designed for rewriting filter predicates
/// that get pushed down to file scans. For handling missing columns in projections,
/// other mechanisms in DataFusion are used (like SchemaAdapter).
/// Important: PhysicalExprAdapter handles rewriting both filter predicates and projection
/// expressions for file scans, including handling missing columns.
///
/// The metadata-based approach provides a flexible way to store default values as strings
/// and cast them to the appropriate types at planning time, avoiding runtime overhead.
Expand Down Expand Up @@ -144,8 +143,6 @@ pub async fn default_column_values() -> Result<()> {
);
println!("4. Default values from metadata are cast to proper types at planning time");
println!("5. The DefaultPhysicalExprAdapter handles other schema adaptations");
println!("\nNote: PhysicalExprAdapter is specifically for filter predicates.");
println!("For projection columns, different mechanisms handle missing columns.");

Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/data_io/parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ impl ParquetMetadataIndexBuilder {

// Get the schema of the file. A real system might have to handle the
// case where the schema of the file is not the same as the schema of
// the other files e.g. using SchemaAdapter.
// the other files e.g. using PhysicalExprAdapterFactory.
if self.file_schema.is_none() {
self.file_schema = Some(reader.schema().clone());
}
Expand Down
86 changes: 23 additions & 63 deletions datafusion/catalog-listing/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use datafusion_catalog::Session;
use datafusion_common::{config_err, internal_err};
use datafusion_datasource::ListingTableUrl;
use datafusion_datasource::file_compression_type::FileCompressionType;
#[expect(deprecated)]
use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
use std::str::FromStr;
Expand All @@ -44,15 +45,12 @@ pub enum SchemaSource {
/// # Schema Evolution Support
///
/// This configuration supports schema evolution through the optional
/// [`SchemaAdapterFactory`]. You might want to override the default factory when you need:
/// [`PhysicalExprAdapterFactory`]. You might want to override the default factory when you need:
///
/// - **Type coercion requirements**: When you need custom logic for converting between
/// different Arrow data types (e.g., Int32 ↔ Int64, Utf8 ↔ LargeUtf8)
/// - **Column mapping**: You need to map columns with a legacy name to a new name
/// - **Custom handling of missing columns**: By default they are filled in with nulls, but you may e.g. want to fill them in with `0` or `""`.
///
/// If not specified, a [`datafusion_datasource::schema_adapter::DefaultSchemaAdapterFactory`]
/// will be used, which handles basic schema compatibility cases.
#[derive(Debug, Clone, Default)]
pub struct ListingTableConfig {
/// Paths on the `ObjectStore` for creating [`crate::ListingTable`].
Expand All @@ -68,8 +66,6 @@ pub struct ListingTableConfig {
pub options: Option<ListingOptions>,
/// Tracks the source of the schema information
pub(crate) schema_source: SchemaSource,
/// Optional [`SchemaAdapterFactory`] for creating schema adapters
pub(crate) schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
/// Optional [`PhysicalExprAdapterFactory`] for creating physical expression adapters
pub(crate) expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
}
Expand Down Expand Up @@ -218,8 +214,7 @@ impl ListingTableConfig {
file_schema,
options: _,
schema_source,
schema_adapter_factory,
expr_adapter_factory: physical_expr_adapter_factory,
expr_adapter_factory,
} = self;

let (schema, new_schema_source) = match file_schema {
Expand All @@ -241,8 +236,7 @@ impl ListingTableConfig {
file_schema: Some(schema),
options: Some(options),
schema_source: new_schema_source,
schema_adapter_factory,
expr_adapter_factory: physical_expr_adapter_factory,
expr_adapter_factory,
})
}
None => internal_err!("No `ListingOptions` set for inferring schema"),
Expand Down Expand Up @@ -282,71 +276,18 @@ impl ListingTableConfig {
file_schema: self.file_schema,
options: Some(options),
schema_source: self.schema_source,
schema_adapter_factory: self.schema_adapter_factory,
expr_adapter_factory: self.expr_adapter_factory,
})
}
None => config_err!("No `ListingOptions` set for inferring schema"),
}
}

/// Set the [`SchemaAdapterFactory`] for the [`crate::ListingTable`]
///
/// The schema adapter factory is used to create schema adapters that can
/// handle schema evolution and type conversions when reading files with
/// different schemas than the table schema.
///
/// If not provided, a default schema adapter factory will be used.
///
/// # Example: Custom Schema Adapter for Type Coercion
/// ```rust
/// # use std::sync::Arc;
/// # use datafusion_catalog_listing::{ListingTableConfig, ListingOptions};
/// # use datafusion_datasource::schema_adapter::{SchemaAdapterFactory, SchemaAdapter};
/// # use datafusion_datasource::ListingTableUrl;
/// # use datafusion_datasource_parquet::file_format::ParquetFormat;
/// # use arrow::datatypes::{SchemaRef, Schema, Field, DataType};
/// #
/// # #[derive(Debug)]
/// # struct MySchemaAdapterFactory;
/// # impl SchemaAdapterFactory for MySchemaAdapterFactory {
/// # fn create(&self, _projected_table_schema: SchemaRef, _file_schema: SchemaRef) -> Box<dyn SchemaAdapter> {
/// # unimplemented!()
/// # }
/// # }
/// # let table_paths = ListingTableUrl::parse("file:///path/to/data").unwrap();
/// # let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default()));
/// # let table_schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
/// let config = ListingTableConfig::new(table_paths)
/// .with_listing_options(listing_options)
/// .with_schema(table_schema)
/// .with_schema_adapter_factory(Arc::new(MySchemaAdapterFactory));
/// ```
pub fn with_schema_adapter_factory(
self,
schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
) -> Self {
Self {
schema_adapter_factory: Some(schema_adapter_factory),
..self
}
}

/// Get the [`SchemaAdapterFactory`] for this configuration
pub fn schema_adapter_factory(&self) -> Option<&Arc<dyn SchemaAdapterFactory>> {
self.schema_adapter_factory.as_ref()
}

/// Set the [`PhysicalExprAdapterFactory`] for the [`crate::ListingTable`]
///
/// The expression adapter factory is used to create physical expression adapters that can
/// handle schema evolution and type conversions when evaluating expressions
/// with different schemas than the table schema.
///
/// If not provided, a default physical expression adapter factory will be used unless a custom
/// `SchemaAdapterFactory` is set, in which case only the `SchemaAdapterFactory` will be used.
///
/// See <https://github.com/apache/datafusion/issues/16800> for details on this transition.
pub fn with_expr_adapter_factory(
self,
expr_adapter_factory: Arc<dyn PhysicalExprAdapterFactory>,
Expand All @@ -356,4 +297,23 @@ impl ListingTableConfig {
..self
}
}

/// Deprecated: Set the [`SchemaAdapterFactory`] for the [`crate::ListingTable`]
///
/// `SchemaAdapterFactory` has been removed. Use [`Self::with_expr_adapter_factory`]
/// and `PhysicalExprAdapterFactory` instead. See `upgrading.md` for more details.
///
/// This method is a no-op and returns `self` unchanged.
#[deprecated(
since = "52.0.0",
note = "SchemaAdapterFactory has been removed. Use with_expr_adapter_factory and PhysicalExprAdapterFactory instead. See upgrading.md for more details."
)]
#[expect(deprecated)]
pub fn with_schema_adapter_factory(
self,
_schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
) -> Self {
// No-op - just return self unchanged
self
}
}
74 changes: 30 additions & 44 deletions datafusion/catalog-listing/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use datafusion_datasource::file::FileSource;
use datafusion_datasource::file_groups::FileGroup;
use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
use datafusion_datasource::file_sink_config::FileSinkConfig;
#[expect(deprecated)]
use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
use datafusion_datasource::{
ListingTableUrl, PartitionedFile, TableSchema, compute_all_files_statistics,
Expand Down Expand Up @@ -191,8 +192,6 @@ pub struct ListingTable {
constraints: Constraints,
/// Column default expressions for columns that are not physically present in the data files
column_defaults: HashMap<String, Expr>,
/// Optional [`SchemaAdapterFactory`] for creating schema adapters
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
/// Optional [`PhysicalExprAdapterFactory`] for creating physical expression adapters
expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
}
Expand Down Expand Up @@ -235,7 +234,6 @@ impl ListingTable {
collected_statistics: Arc::new(DefaultFileStatisticsCache::default()),
constraints: Constraints::default(),
column_defaults: HashMap::new(),
schema_adapter_factory: config.schema_adapter_factory,
expr_adapter_factory: config.expr_adapter_factory,
};

Expand Down Expand Up @@ -290,48 +288,42 @@ impl ListingTable {
self.schema_source
}

/// Set the [`SchemaAdapterFactory`] for this [`ListingTable`]
/// Deprecated: Set the [`SchemaAdapterFactory`] for this [`ListingTable`]
///
/// The schema adapter factory is used to create schema adapters that can
/// handle schema evolution and type conversions when reading files with
/// different schemas than the table schema.
/// `SchemaAdapterFactory` has been removed. Use [`ListingTableConfig::with_expr_adapter_factory`]
/// and `PhysicalExprAdapterFactory` instead. See `upgrading.md` for more details.
///
/// # Example: Adding Schema Evolution Support
/// ```rust
/// # use std::sync::Arc;
/// # use datafusion_catalog_listing::{ListingTable, ListingTableConfig, ListingOptions};
/// # use datafusion_datasource::ListingTableUrl;
/// # use datafusion_datasource::schema_adapter::{DefaultSchemaAdapterFactory, SchemaAdapter};
/// # use datafusion_datasource_parquet::file_format::ParquetFormat;
/// # use arrow::datatypes::{SchemaRef, Schema, Field, DataType};
/// # let table_path = ListingTableUrl::parse("file:///path/to/data").unwrap();
/// # let options = ListingOptions::new(Arc::new(ParquetFormat::default()));
/// # let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
/// # let config = ListingTableConfig::new(table_path).with_listing_options(options).with_schema(schema);
/// # let table = ListingTable::try_new(config).unwrap();
/// let table_with_evolution = table
/// .with_schema_adapter_factory(Arc::new(DefaultSchemaAdapterFactory));
/// ```
/// See [`ListingTableConfig::with_schema_adapter_factory`] for an example of custom SchemaAdapterFactory.
/// This method is a no-op and returns `self` unchanged.
#[deprecated(
since = "52.0.0",
note = "SchemaAdapterFactory has been removed. Use ListingTableConfig::with_expr_adapter_factory and PhysicalExprAdapterFactory instead. See upgrading.md for more details."
)]
#[expect(deprecated)]
pub fn with_schema_adapter_factory(
self,
schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
_schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
) -> Self {
Self {
schema_adapter_factory: Some(schema_adapter_factory),
..self
}
// No-op - just return self unchanged
self
}

/// Get the [`SchemaAdapterFactory`] for this table
pub fn schema_adapter_factory(&self) -> Option<&Arc<dyn SchemaAdapterFactory>> {
self.schema_adapter_factory.as_ref()
/// Deprecated: Returns the [`SchemaAdapterFactory`] used by this [`ListingTable`].
///
/// `SchemaAdapterFactory` has been removed. Use `PhysicalExprAdapterFactory` instead.
/// See `upgrading.md` for more details.
///
/// Always returns `None`.
#[deprecated(
since = "52.0.0",
note = "SchemaAdapterFactory has been removed. Use PhysicalExprAdapterFactory instead. See upgrading.md for more details."
)]
#[expect(deprecated)]
pub fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
None
}

/// Creates a file source and applies schema adapter factory if available
fn create_file_source_with_schema_adapter(
&self,
) -> datafusion_common::Result<Arc<dyn FileSource>> {
/// Creates a file source for this table
fn create_file_source(&self) -> Arc<dyn FileSource> {
let table_schema = TableSchema::new(
Arc::clone(&self.file_schema),
self.options
Expand All @@ -341,13 +333,7 @@ impl ListingTable {
.collect(),
);

let mut source = self.options.format.file_source(table_schema);
// Apply schema adapter to source if available.
// The source will use this SchemaAdapter to adapt data batches as they flow up the plan.
if let Some(factory) = &self.schema_adapter_factory {
source = source.with_schema_adapter_factory(Arc::clone(factory))?;
}
Ok(source)
self.options.format.file_source(table_schema)
}

/// If file_sort_order is specified, creates the appropriate physical expressions
Expand Down Expand Up @@ -490,7 +476,7 @@ impl TableProvider for ListingTable {
)))));
};

let file_source = self.create_file_source_with_schema_adapter()?;
let file_source = self.create_file_source();

// create the execution plan
let plan = self
Expand Down
20 changes: 6 additions & 14 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1453,11 +1453,10 @@ mod tests {
}

#[tokio::test]
async fn test_statistics_mapping_with_default_factory() -> Result<()> {
async fn test_basic_table_scan() -> Result<()> {
let ctx = SessionContext::new();

// Create a table without providing a custom schema adapter factory
// This should fall back to using DefaultSchemaAdapterFactory
// Test basic table creation and scanning
let path = "table/file.json";
register_test_store(&ctx, &[(path, 10)]);

Expand All @@ -1469,25 +1468,18 @@ mod tests {
let config = ListingTableConfig::new(table_path)
.with_listing_options(opt)
.with_schema(Arc::new(schema));
// Note: NOT calling .with_schema_adapter_factory() to test default behavior

let table = ListingTable::try_new(config)?;

// Verify that no custom schema adapter factory is set
assert!(table.schema_adapter_factory().is_none());

// The scan should work correctly with the default schema adapter
// The scan should work correctly
let scan_result = table.scan(&ctx.state(), None, &[], None).await;
assert!(
scan_result.is_ok(),
"Scan should succeed with default schema adapter"
);
assert!(scan_result.is_ok(), "Scan should succeed");

// Verify that the default adapter handles basic schema compatibility
// Verify file listing works
let result = table.list_files_for_scan(&ctx.state(), &[], None).await?;
assert!(
!result.file_groups.is_empty(),
"Should list files successfully with default adapter"
"Should list files successfully"
);

Ok(())
Expand Down
Loading