From 4dc79c10d2952d39801dcef1c44eabec05bc6e34 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 17 Dec 2025 15:53:33 -0600 Subject: [PATCH 1/7] Remove SchemaAdapter --- .../default_column_values.rs | 5 +- .../examples/data_io/parquet_index.rs | 2 +- datafusion/catalog-listing/src/config.rs | 67 +- datafusion/catalog-listing/src/table.rs | 58 +- .../core/src/datasource/listing/table.rs | 20 +- datafusion/core/src/datasource/mod.rs | 57 +- .../core/src/datasource/physical_plan/mod.rs | 135 --- .../src/datasource/physical_plan/parquet.rs | 10 +- datafusion/core/tests/core_integration.rs | 3 - .../{schema_adapter.rs => expr_adapter.rs} | 7 +- datafusion/core/tests/parquet/mod.rs | 2 +- .../filter_pushdown/util.rs | 28 +- datafusion/core/tests/schema_adapter/mod.rs | 18 - .../schema_adapter_integration_tests.rs | 752 ------------ datafusion/datasource-arrow/src/source.rs | 118 +- datafusion/datasource-avro/src/source.rs | 17 - datafusion/datasource-csv/src/source.rs | 17 - datafusion/datasource-json/src/source.rs | 17 - datafusion/datasource-parquet/src/source.rs | 1 - datafusion/datasource/src/file.rs | 47 +- datafusion/datasource/src/file_scan_config.rs | 6 +- datafusion/datasource/src/mod.rs | 1 - datafusion/datasource/src/schema_adapter.rs | 1047 ----------------- datafusion/datasource/src/test_util.rs | 18 - .../src/schema_rewriter.rs | 4 +- docs/source/library-user-guide/upgrading.md | 18 + 26 files changed, 53 insertions(+), 2422 deletions(-) rename datafusion/core/tests/parquet/{schema_adapter.rs => expr_adapter.rs} (97%) delete mode 100644 datafusion/core/tests/schema_adapter/mod.rs delete mode 100644 datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs delete mode 100644 datafusion/datasource/src/schema_adapter.rs diff --git a/datafusion-examples/examples/custom_data_source/default_column_values.rs b/datafusion-examples/examples/custom_data_source/default_column_values.rs index 227cea5e7c084..5089396ec341e 100644 --- a/datafusion-examples/examples/custom_data_source/default_column_values.rs +++ b/datafusion-examples/examples/custom_data_source/default_column_values.rs @@ -63,9 +63,8 @@ const DEFAULT_VALUE_METADATA_KEY: &str = "example.default_value"; /// 4. Use the DefaultPhysicalExprAdapter as a fallback for standard schema adaptation /// 5. Convert string default values to proper types using `ScalarValue::cast_to()` at planning time /// -/// Important: PhysicalExprAdapter is specifically designed for rewriting filter predicates -/// that get pushed down to file scans. For handling missing columns in projections, -/// other mechanisms in DataFusion are used (like SchemaAdapter). +/// Important: PhysicalExprAdapter handles rewriting both filter predicates and projection +/// expressions for file scans, including handling missing columns. /// /// The metadata-based approach provides a flexible way to store default values as strings /// and cast them to the appropriate types at planning time, avoiding runtime overhead. diff --git a/datafusion-examples/examples/data_io/parquet_index.rs b/datafusion-examples/examples/data_io/parquet_index.rs index 2afc6c1047d5e..e11a303f442a4 100644 --- a/datafusion-examples/examples/data_io/parquet_index.rs +++ b/datafusion-examples/examples/data_io/parquet_index.rs @@ -511,7 +511,7 @@ impl ParquetMetadataIndexBuilder { // Get the schema of the file. A real system might have to handle the // case where the schema of the file is not the same as the schema of - // the other files e.g. using SchemaAdapter. + // the other files e.g. using PhysicalExprAdapterFactory. if self.file_schema.is_none() { self.file_schema = Some(reader.schema().clone()); } diff --git a/datafusion/catalog-listing/src/config.rs b/datafusion/catalog-listing/src/config.rs index e3cd01a191924..baf80db64353c 100644 --- a/datafusion/catalog-listing/src/config.rs +++ b/datafusion/catalog-listing/src/config.rs @@ -21,7 +21,6 @@ use datafusion_catalog::Session; use datafusion_common::{config_err, internal_err}; use datafusion_datasource::ListingTableUrl; use datafusion_datasource::file_compression_type::FileCompressionType; -use datafusion_datasource::schema_adapter::SchemaAdapterFactory; use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; use std::str::FromStr; use std::sync::Arc; @@ -44,15 +43,12 @@ pub enum SchemaSource { /// # Schema Evolution Support /// /// This configuration supports schema evolution through the optional -/// [`SchemaAdapterFactory`]. You might want to override the default factory when you need: +/// [`PhysicalExprAdapterFactory`]. You might want to override the default factory when you need: /// /// - **Type coercion requirements**: When you need custom logic for converting between /// different Arrow data types (e.g., Int32 ↔ Int64, Utf8 ↔ LargeUtf8) /// - **Column mapping**: You need to map columns with a legacy name to a new name /// - **Custom handling of missing columns**: By default they are filled in with nulls, but you may e.g. want to fill them in with `0` or `""`. -/// -/// If not specified, a [`datafusion_datasource::schema_adapter::DefaultSchemaAdapterFactory`] -/// will be used, which handles basic schema compatibility cases. #[derive(Debug, Clone, Default)] pub struct ListingTableConfig { /// Paths on the `ObjectStore` for creating [`crate::ListingTable`]. @@ -68,8 +64,6 @@ pub struct ListingTableConfig { pub options: Option, /// Tracks the source of the schema information pub(crate) schema_source: SchemaSource, - /// Optional [`SchemaAdapterFactory`] for creating schema adapters - pub(crate) schema_adapter_factory: Option>, /// Optional [`PhysicalExprAdapterFactory`] for creating physical expression adapters pub(crate) expr_adapter_factory: Option>, } @@ -218,8 +212,7 @@ impl ListingTableConfig { file_schema, options: _, schema_source, - schema_adapter_factory, - expr_adapter_factory: physical_expr_adapter_factory, + expr_adapter_factory, } = self; let (schema, new_schema_source) = match file_schema { @@ -241,8 +234,7 @@ impl ListingTableConfig { file_schema: Some(schema), options: Some(options), schema_source: new_schema_source, - schema_adapter_factory, - expr_adapter_factory: physical_expr_adapter_factory, + expr_adapter_factory, }) } None => internal_err!("No `ListingOptions` set for inferring schema"), @@ -282,7 +274,6 @@ impl ListingTableConfig { file_schema: self.file_schema, options: Some(options), schema_source: self.schema_source, - schema_adapter_factory: self.schema_adapter_factory, expr_adapter_factory: self.expr_adapter_factory, }) } @@ -290,63 +281,11 @@ impl ListingTableConfig { } } - /// Set the [`SchemaAdapterFactory`] for the [`crate::ListingTable`] - /// - /// The schema adapter factory is used to create schema adapters that can - /// handle schema evolution and type conversions when reading files with - /// different schemas than the table schema. - /// - /// If not provided, a default schema adapter factory will be used. - /// - /// # Example: Custom Schema Adapter for Type Coercion - /// ```rust - /// # use std::sync::Arc; - /// # use datafusion_catalog_listing::{ListingTableConfig, ListingOptions}; - /// # use datafusion_datasource::schema_adapter::{SchemaAdapterFactory, SchemaAdapter}; - /// # use datafusion_datasource::ListingTableUrl; - /// # use datafusion_datasource_parquet::file_format::ParquetFormat; - /// # use arrow::datatypes::{SchemaRef, Schema, Field, DataType}; - /// # - /// # #[derive(Debug)] - /// # struct MySchemaAdapterFactory; - /// # impl SchemaAdapterFactory for MySchemaAdapterFactory { - /// # fn create(&self, _projected_table_schema: SchemaRef, _file_schema: SchemaRef) -> Box { - /// # unimplemented!() - /// # } - /// # } - /// # let table_paths = ListingTableUrl::parse("file:///path/to/data").unwrap(); - /// # let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default())); - /// # let table_schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)])); - /// let config = ListingTableConfig::new(table_paths) - /// .with_listing_options(listing_options) - /// .with_schema(table_schema) - /// .with_schema_adapter_factory(Arc::new(MySchemaAdapterFactory)); - /// ``` - pub fn with_schema_adapter_factory( - self, - schema_adapter_factory: Arc, - ) -> Self { - Self { - schema_adapter_factory: Some(schema_adapter_factory), - ..self - } - } - - /// Get the [`SchemaAdapterFactory`] for this configuration - pub fn schema_adapter_factory(&self) -> Option<&Arc> { - self.schema_adapter_factory.as_ref() - } - /// Set the [`PhysicalExprAdapterFactory`] for the [`crate::ListingTable`] /// /// The expression adapter factory is used to create physical expression adapters that can /// handle schema evolution and type conversions when evaluating expressions /// with different schemas than the table schema. - /// - /// If not provided, a default physical expression adapter factory will be used unless a custom - /// `SchemaAdapterFactory` is set, in which case only the `SchemaAdapterFactory` will be used. - /// - /// See for details on this transition. pub fn with_expr_adapter_factory( self, expr_adapter_factory: Arc, diff --git a/datafusion/catalog-listing/src/table.rs b/datafusion/catalog-listing/src/table.rs index 1dbfaf381a4b5..14d6747b7e120 100644 --- a/datafusion/catalog-listing/src/table.rs +++ b/datafusion/catalog-listing/src/table.rs @@ -29,7 +29,6 @@ use datafusion_datasource::file::FileSource; use datafusion_datasource::file_groups::FileGroup; use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder}; use datafusion_datasource::file_sink_config::FileSinkConfig; -use datafusion_datasource::schema_adapter::SchemaAdapterFactory; use datafusion_datasource::{ ListingTableUrl, PartitionedFile, TableSchema, compute_all_files_statistics, }; @@ -191,8 +190,6 @@ pub struct ListingTable { constraints: Constraints, /// Column default expressions for columns that are not physically present in the data files column_defaults: HashMap, - /// Optional [`SchemaAdapterFactory`] for creating schema adapters - schema_adapter_factory: Option>, /// Optional [`PhysicalExprAdapterFactory`] for creating physical expression adapters expr_adapter_factory: Option>, } @@ -235,7 +232,6 @@ impl ListingTable { collected_statistics: Arc::new(DefaultFileStatisticsCache::default()), constraints: Constraints::default(), column_defaults: HashMap::new(), - schema_adapter_factory: config.schema_adapter_factory, expr_adapter_factory: config.expr_adapter_factory, }; @@ -290,48 +286,8 @@ impl ListingTable { self.schema_source } - /// Set the [`SchemaAdapterFactory`] for this [`ListingTable`] - /// - /// The schema adapter factory is used to create schema adapters that can - /// handle schema evolution and type conversions when reading files with - /// different schemas than the table schema. - /// - /// # Example: Adding Schema Evolution Support - /// ```rust - /// # use std::sync::Arc; - /// # use datafusion_catalog_listing::{ListingTable, ListingTableConfig, ListingOptions}; - /// # use datafusion_datasource::ListingTableUrl; - /// # use datafusion_datasource::schema_adapter::{DefaultSchemaAdapterFactory, SchemaAdapter}; - /// # use datafusion_datasource_parquet::file_format::ParquetFormat; - /// # use arrow::datatypes::{SchemaRef, Schema, Field, DataType}; - /// # let table_path = ListingTableUrl::parse("file:///path/to/data").unwrap(); - /// # let options = ListingOptions::new(Arc::new(ParquetFormat::default())); - /// # let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)])); - /// # let config = ListingTableConfig::new(table_path).with_listing_options(options).with_schema(schema); - /// # let table = ListingTable::try_new(config).unwrap(); - /// let table_with_evolution = table - /// .with_schema_adapter_factory(Arc::new(DefaultSchemaAdapterFactory)); - /// ``` - /// See [`ListingTableConfig::with_schema_adapter_factory`] for an example of custom SchemaAdapterFactory. - pub fn with_schema_adapter_factory( - self, - schema_adapter_factory: Arc, - ) -> Self { - Self { - schema_adapter_factory: Some(schema_adapter_factory), - ..self - } - } - - /// Get the [`SchemaAdapterFactory`] for this table - pub fn schema_adapter_factory(&self) -> Option<&Arc> { - self.schema_adapter_factory.as_ref() - } - - /// Creates a file source and applies schema adapter factory if available - fn create_file_source_with_schema_adapter( - &self, - ) -> datafusion_common::Result> { + /// Creates a file source for this table + fn create_file_source(&self) -> Arc { let table_schema = TableSchema::new( Arc::clone(&self.file_schema), self.options @@ -341,13 +297,7 @@ impl ListingTable { .collect(), ); - let mut source = self.options.format.file_source(table_schema); - // Apply schema adapter to source if available. - // The source will use this SchemaAdapter to adapt data batches as they flow up the plan. - if let Some(factory) = &self.schema_adapter_factory { - source = source.with_schema_adapter_factory(Arc::clone(factory))?; - } - Ok(source) + self.options.format.file_source(table_schema) } /// If file_sort_order is specified, creates the appropriate physical expressions @@ -490,7 +440,7 @@ impl TableProvider for ListingTable { ))))); }; - let file_source = self.create_file_source_with_schema_adapter()?; + let file_source = self.create_file_source(); // create the execution plan let plan = self diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 7f957108cf787..bf9a154316b98 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -1453,11 +1453,10 @@ mod tests { } #[tokio::test] - async fn test_statistics_mapping_with_default_factory() -> Result<()> { + async fn test_basic_table_scan() -> Result<()> { let ctx = SessionContext::new(); - // Create a table without providing a custom schema adapter factory - // This should fall back to using DefaultSchemaAdapterFactory + // Test basic table creation and scanning let path = "table/file.json"; register_test_store(&ctx, &[(path, 10)]); @@ -1469,25 +1468,18 @@ mod tests { let config = ListingTableConfig::new(table_path) .with_listing_options(opt) .with_schema(Arc::new(schema)); - // Note: NOT calling .with_schema_adapter_factory() to test default behavior let table = ListingTable::try_new(config)?; - // Verify that no custom schema adapter factory is set - assert!(table.schema_adapter_factory().is_none()); - - // The scan should work correctly with the default schema adapter + // The scan should work correctly let scan_result = table.scan(&ctx.state(), None, &[], None).await; - assert!( - scan_result.is_ok(), - "Scan should succeed with default schema adapter" - ); + assert!(scan_result.is_ok(), "Scan should succeed"); - // Verify that the default adapter handles basic schema compatibility + // Verify file listing works let result = table.list_files_for_scan(&ctx.state(), &[], None).await?; assert!( !result.file_groups.is_empty(), - "Should list files successfully with default adapter" + "Should list files successfully" ); Ok(()) diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs index 7d37ef8cf24aa..b40d7092f0f87 100644 --- a/datafusion/core/src/datasource/mod.rs +++ b/datafusion/core/src/datasource/mod.rs @@ -42,7 +42,6 @@ pub use datafusion_catalog::default_table_source; pub use datafusion_catalog::memory; pub use datafusion_catalog::stream; pub use datafusion_catalog::view; -pub use datafusion_datasource::schema_adapter; pub use datafusion_datasource::sink; pub use datafusion_datasource::source; pub use datafusion_datasource::table_schema; @@ -60,13 +59,13 @@ mod tests { record_batch::RecordBatch, }; use datafusion_common::{ - Result, ScalarValue, record_batch, + Result, ScalarValue, test_util::batches_to_sort_string, tree_node::{Transformed, TransformedResult, TreeNode}, }; use datafusion_datasource::{ PartitionedFile, file_scan_config::FileScanConfigBuilder, - schema_adapter::DefaultSchemaAdapterFactory, source::DataSourceExec, + source::DataSourceExec, }; use datafusion_datasource_parquet::source::ParquetSource; use datafusion_physical_expr::expressions::{Column, Literal}; @@ -149,58 +148,6 @@ mod tests { "###); } - #[test] - fn default_schema_adapter() { - let table_schema = Schema::new(vec![ - Field::new("a", DataType::Int32, true), - Field::new("b", DataType::Utf8, true), - ]); - - // file has a subset of the table schema fields and different type - let file_schema = Schema::new(vec![ - Field::new("c", DataType::Float64, true), // not in table schema - Field::new("b", DataType::Float64, true), - ]); - - let adapter = DefaultSchemaAdapterFactory::from_schema(Arc::new(table_schema)); - let (mapper, indices) = adapter.map_schema(&file_schema).unwrap(); - assert_eq!(indices, vec![1]); - - let file_batch = record_batch!(("b", Float64, vec![1.0, 2.0])).unwrap(); - - let mapped_batch = mapper.map_batch(file_batch).unwrap(); - - // the mapped batch has the correct schema and the "b" column has been cast to Utf8 - let expected_batch = record_batch!( - ("a", Int32, vec![None, None]), // missing column filled with nulls - ("b", Utf8, vec!["1.0", "2.0"]) // b was cast to string and order was changed - ) - .unwrap(); - assert_eq!(mapped_batch, expected_batch); - } - - #[test] - fn default_schema_adapter_non_nullable_columns() { - let table_schema = Schema::new(vec![ - Field::new("a", DataType::Int32, false), // "a"" is declared non nullable - Field::new("b", DataType::Utf8, true), - ]); - let file_schema = Schema::new(vec![ - // since file doesn't have "a" it will be filled with nulls - Field::new("b", DataType::Float64, true), - ]); - - let adapter = DefaultSchemaAdapterFactory::from_schema(Arc::new(table_schema)); - let (mapper, indices) = adapter.map_schema(&file_schema).unwrap(); - assert_eq!(indices, vec![0]); - - let file_batch = record_batch!(("b", Float64, vec![1.0, 2.0])).unwrap(); - - // Mapping fails because it tries to fill in a non-nullable column with nulls - let err = mapper.map_batch(file_batch).unwrap_err().to_string(); - assert!(err.contains("Invalid argument error: Column 'a' is declared as non-nullable but contains null values"), "{err}"); - } - #[derive(Debug)] struct TestPhysicalExprAdapterFactory; diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index c57b08545b752..04c8ea129d05c 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -51,138 +51,3 @@ pub use datafusion_datasource::file_sink_config::*; pub use datafusion_datasource::file_stream::{ FileOpenFuture, FileOpener, FileStream, OnError, }; - -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use arrow::array::{ - BinaryArray, BooleanArray, Float32Array, Int32Array, Int64Array, RecordBatch, - StringArray, UInt64Array, - cast::AsArray, - types::{Float32Type, Float64Type, UInt32Type}, - }; - use arrow::datatypes::{DataType, Field, Schema}; - use arrow_schema::SchemaRef; - - use crate::datasource::schema_adapter::{ - DefaultSchemaAdapterFactory, SchemaAdapterFactory, - }; - - #[test] - fn schema_mapping_map_batch() { - let table_schema = Arc::new(Schema::new(vec![ - Field::new("c1", DataType::Utf8, true), - Field::new("c2", DataType::UInt32, true), - Field::new("c3", DataType::Float64, true), - ])); - - let adapter = DefaultSchemaAdapterFactory - .create(table_schema.clone(), table_schema.clone()); - - let file_schema = Schema::new(vec![ - Field::new("c1", DataType::Utf8, true), - Field::new("c2", DataType::UInt64, true), - Field::new("c3", DataType::Float32, true), - ]); - - let (mapping, _) = adapter.map_schema(&file_schema).expect("map schema failed"); - - let c1 = StringArray::from(vec!["hello", "world"]); - let c2 = UInt64Array::from(vec![9_u64, 5_u64]); - let c3 = Float32Array::from(vec![2.0_f32, 7.0_f32]); - let batch = RecordBatch::try_new( - Arc::new(file_schema), - vec![Arc::new(c1), Arc::new(c2), Arc::new(c3)], - ) - .unwrap(); - - let mapped_batch = mapping.map_batch(batch).unwrap(); - - assert_eq!(mapped_batch.schema(), table_schema); - assert_eq!(mapped_batch.num_columns(), 3); - assert_eq!(mapped_batch.num_rows(), 2); - - let c1 = mapped_batch.column(0).as_string::(); - let c2 = mapped_batch.column(1).as_primitive::(); - let c3 = mapped_batch.column(2).as_primitive::(); - - assert_eq!(c1.value(0), "hello"); - assert_eq!(c1.value(1), "world"); - assert_eq!(c2.value(0), 9_u32); - assert_eq!(c2.value(1), 5_u32); - assert_eq!(c3.value(0), 2.0_f64); - assert_eq!(c3.value(1), 7.0_f64); - } - - #[test] - fn schema_adapter_map_schema_with_projection() { - let table_schema = Arc::new(Schema::new(vec![ - Field::new("c0", DataType::Utf8, true), - Field::new("c1", DataType::Utf8, true), - Field::new("c2", DataType::Float64, true), - Field::new("c3", DataType::Int32, true), - Field::new("c4", DataType::Float32, true), - ])); - - let file_schema = Schema::new(vec![ - Field::new("id", DataType::Int32, true), - Field::new("c1", DataType::Boolean, true), - Field::new("c2", DataType::Float32, true), - Field::new("c3", DataType::Binary, true), - Field::new("c4", DataType::Int64, true), - ]); - - let indices = vec![1, 2, 4]; - let schema = SchemaRef::from(table_schema.project(&indices).unwrap()); - let adapter = DefaultSchemaAdapterFactory.create(schema, table_schema.clone()); - let (mapping, projection) = adapter.map_schema(&file_schema).unwrap(); - - let id = Int32Array::from(vec![Some(1), Some(2), Some(3)]); - let c1 = BooleanArray::from(vec![Some(true), Some(false), Some(true)]); - let c2 = Float32Array::from(vec![Some(2.0_f32), Some(7.0_f32), Some(3.0_f32)]); - let c3 = BinaryArray::from_opt_vec(vec![ - Some(b"hallo"), - Some(b"danke"), - Some(b"super"), - ]); - let c4 = Int64Array::from(vec![1, 2, 3]); - let batch = RecordBatch::try_new( - Arc::new(file_schema), - vec![ - Arc::new(id), - Arc::new(c1), - Arc::new(c2), - Arc::new(c3), - Arc::new(c4), - ], - ) - .unwrap(); - let rows_num = batch.num_rows(); - let projected = batch.project(&projection).unwrap(); - let mapped_batch = mapping.map_batch(projected).unwrap(); - - assert_eq!( - mapped_batch.schema(), - Arc::new(table_schema.project(&indices).unwrap()) - ); - assert_eq!(mapped_batch.num_columns(), indices.len()); - assert_eq!(mapped_batch.num_rows(), rows_num); - - let c1 = mapped_batch.column(0).as_string::(); - let c2 = mapped_batch.column(1).as_primitive::(); - let c4 = mapped_batch.column(2).as_primitive::(); - - assert_eq!(c1.value(0), "true"); - assert_eq!(c1.value(1), "false"); - assert_eq!(c1.value(2), "true"); - - assert_eq!(c2.value(0), 2.0_f64); - assert_eq!(c2.value(1), 7.0_f64); - assert_eq!(c2.value(2), 3.0_f64); - - assert_eq!(c4.value(0), 1.0_f32); - assert_eq!(c4.value(1), 2.0_f32); - assert_eq!(c4.value(2), 3.0_f32); - } -} diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index 4613561c666e9..051942846a1df 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -306,7 +306,7 @@ mod tests { let batch = RecordBatch::try_new(file_schema.clone(), vec![c1]).unwrap(); - // Since c2 is missing from the file and we didn't supply a custom `SchemaAdapterFactory`, + // Since c2 is missing from the file and we didn't supply a custom `PhysicalExprAdapterFactory`, // the default behavior is to fill in missing columns with nulls. // Thus this predicate will come back as false. let filter = col("c2").eq(lit(1_i32)); @@ -364,7 +364,7 @@ mod tests { let batch = RecordBatch::try_new(file_schema.clone(), vec![c1]).unwrap(); - // Since c2 is missing from the file and we didn't supply a custom `SchemaAdapterFactory`, + // Since c2 is missing from the file and we didn't supply a custom `PhysicalExprAdapterFactory`, // the default behavior is to fill in missing columns with nulls. // Thus this predicate will come back as false. let filter = col("c2").eq(lit("abc")); @@ -426,7 +426,7 @@ mod tests { let batch = RecordBatch::try_new(file_schema.clone(), vec![c1, c3]).unwrap(); - // Since c2 is missing from the file and we didn't supply a custom `SchemaAdapterFactory`, + // Since c2 is missing from the file and we didn't supply a custom `PhysicalExprAdapterFactory`, // the default behavior is to fill in missing columns with nulls. // Thus this predicate will come back as false. let filter = col("c2").eq(lit("abc")); @@ -488,7 +488,7 @@ mod tests { let batch = RecordBatch::try_new(file_schema.clone(), vec![c3.clone(), c3]).unwrap(); - // Since c2 is missing from the file and we didn't supply a custom `SchemaAdapterFactory`, + // Since c2 is missing from the file and we didn't supply a custom `PhysicalExprAdapterFactory`, // the default behavior is to fill in missing columns with nulls. // Thus this predicate will come back as false. let filter = col("c2").eq(lit("abc")); @@ -1324,7 +1324,7 @@ mod tests { async fn parquet_exec_with_int96_from_spark() -> Result<()> { // arrow-rs relies on the chrono library to convert between timestamps and strings, so // instead compare as Int64. The underlying type should be a PrimitiveArray of Int64 - // anyway, so this should be a zero-copy non-modifying cast at the SchemaAdapter. + // anyway, so this should be a zero-copy non-modifying cast. let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, true)])); let testdata = datafusion_common::test_util::parquet_test_data(); diff --git a/datafusion/core/tests/core_integration.rs b/datafusion/core/tests/core_integration.rs index cc4dfcf72059f..bdbe72245323d 100644 --- a/datafusion/core/tests/core_integration.rs +++ b/datafusion/core/tests/core_integration.rs @@ -48,9 +48,6 @@ mod optimizer; /// Run all tests that are found in the `physical_optimizer` directory mod physical_optimizer; -/// Run all tests that are found in the `schema_adapter` directory -mod schema_adapter; - /// Run all tests that are found in the `serde` directory mod serde; diff --git a/datafusion/core/tests/parquet/schema_adapter.rs b/datafusion/core/tests/parquet/expr_adapter.rs similarity index 97% rename from datafusion/core/tests/parquet/schema_adapter.rs rename to datafusion/core/tests/parquet/expr_adapter.rs index 7e2240cf6b794..cc7d3ada45c9b 100644 --- a/datafusion/core/tests/parquet/schema_adapter.rs +++ b/datafusion/core/tests/parquet/expr_adapter.rs @@ -221,11 +221,8 @@ async fn test_custom_schema_adapter_and_custom_expression_adapter() { /// Test demonstrating how to implement a custom PhysicalExprAdapterFactory /// that fills missing columns with non-null default values. /// -/// This is the recommended migration path for users who previously used -/// SchemaAdapterFactory to fill missing columns with default values. -/// Instead of transforming batches after reading (SchemaAdapter::map_batch), -/// the PhysicalExprAdapterFactory rewrites expressions to use literals for -/// missing columns, achieving the same result more efficiently. +/// PhysicalExprAdapterFactory rewrites expressions to use literals for +/// missing columns, handling schema evolution efficiently at planning time. #[tokio::test] async fn test_physical_expr_adapter_with_non_null_defaults() { // File only has c1 column diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index 0a0478dd03c21..35b5918d9e8bf 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -46,13 +46,13 @@ use tempfile::NamedTempFile; mod custom_reader; #[cfg(feature = "parquet_encryption")] mod encryption; +mod expr_adapter; mod external_access_plan; mod file_statistics; mod filter_pushdown; mod page_pruning; mod row_group_pruning; mod schema; -mod schema_adapter; mod schema_coercion; mod utils; diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs index 93e928b51a18a..1afdc4823f0a4 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs @@ -22,8 +22,7 @@ use datafusion_common::{Result, config::ConfigOptions, internal_err}; use datafusion_datasource::{ PartitionedFile, file::FileSource, file_scan_config::FileScanConfig, file_scan_config::FileScanConfigBuilder, file_stream::FileOpenFuture, - file_stream::FileOpener, schema_adapter::DefaultSchemaAdapterFactory, - schema_adapter::SchemaAdapterFactory, source::DataSourceExec, + file_stream::FileOpener, source::DataSourceExec, }; use datafusion_physical_expr_common::physical_expr::fmt_sql; use datafusion_physical_optimizer::PhysicalOptimizerRule; @@ -51,7 +50,6 @@ use std::{ pub struct TestOpener { batches: Vec, batch_size: Option, - schema: SchemaRef, projection: Option>, predicate: Option>, } @@ -73,8 +71,6 @@ impl FileOpener for TestOpener { batches = new_batches.into_iter().collect(); } - let factory = DefaultSchemaAdapterFactory::from_schema(Arc::clone(&self.schema)); - let (mapper, projection) = factory.map_schema(&batches[0].schema()).unwrap(); let mut new_batches = Vec::new(); for batch in batches { let batch = if let Some(predicate) = &self.predicate { @@ -82,9 +78,6 @@ impl FileOpener for TestOpener { } else { batch }; - - let batch = batch.project(&projection).unwrap(); - let batch = mapper.map_batch(batch).unwrap(); new_batches.push(batch); } batches = new_batches; @@ -109,10 +102,8 @@ pub struct TestSource { predicate: Option>, batch_size: Option, batches: Vec, - schema: SchemaRef, metrics: ExecutionPlanMetricsSet, projection: Option>, - schema_adapter_factory: Option>, table_schema: datafusion_datasource::TableSchema, } @@ -121,14 +112,12 @@ impl TestSource { let table_schema = datafusion_datasource::TableSchema::new(Arc::clone(&schema), vec![]); Self { - schema, support, metrics: ExecutionPlanMetricsSet::new(), batches, predicate: None, batch_size: None, projection: None, - schema_adapter_factory: None, table_schema, } } @@ -144,7 +133,6 @@ impl FileSource for TestSource { Ok(Arc::new(TestOpener { batches: self.batches.clone(), batch_size: self.batch_size, - schema: Arc::clone(&self.schema), projection: self.projection.clone(), predicate: self.predicate.clone(), })) @@ -222,20 +210,6 @@ impl FileSource for TestSource { } } - fn with_schema_adapter_factory( - &self, - schema_adapter_factory: Arc, - ) -> Result> { - Ok(Arc::new(Self { - schema_adapter_factory: Some(schema_adapter_factory), - ..self.clone() - })) - } - - fn schema_adapter_factory(&self) -> Option> { - self.schema_adapter_factory.clone() - } - fn table_schema(&self) -> &datafusion_datasource::TableSchema { &self.table_schema } diff --git a/datafusion/core/tests/schema_adapter/mod.rs b/datafusion/core/tests/schema_adapter/mod.rs deleted file mode 100644 index 2f81a43f4736e..0000000000000 --- a/datafusion/core/tests/schema_adapter/mod.rs +++ /dev/null @@ -1,18 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -mod schema_adapter_integration_tests; diff --git a/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs b/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs deleted file mode 100644 index 01242ff41fb93..0000000000000 --- a/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs +++ /dev/null @@ -1,752 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::sync::Arc; - -use arrow::array::RecordBatch; - -use arrow_schema::{DataType, Field, Schema, SchemaRef}; -use bytes::{BufMut, BytesMut}; -use datafusion::common::Result; -use datafusion::config::{ConfigOptions, TableParquetOptions}; -use datafusion::datasource::listing::PartitionedFile; -#[cfg(feature = "parquet")] -use datafusion::datasource::physical_plan::ParquetSource; -use datafusion::datasource::physical_plan::{ - ArrowSource, CsvSource, FileSource, JsonSource, -}; -use datafusion::logical_expr::{col, lit}; -use datafusion::physical_plan::ExecutionPlan; -use datafusion::prelude::SessionContext; -use datafusion_common::ColumnStatistics; -use datafusion_common::config::CsvOptions; -use datafusion_common::record_batch; -use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion_datasource::file_scan_config::FileScanConfigBuilder; -use datafusion_datasource::schema_adapter::{ - SchemaAdapter, SchemaAdapterFactory, SchemaMapper, -}; - -use datafusion::assert_batches_eq; -use datafusion_datasource::TableSchema; -use datafusion_datasource::source::DataSourceExec; -use datafusion_execution::object_store::ObjectStoreUrl; -use datafusion_expr::Expr; -use datafusion_physical_expr::expressions::Column; -use datafusion_physical_expr::planner::logical2physical; -use datafusion_physical_expr::projection::ProjectionExprs; -use datafusion_physical_expr_adapter::{PhysicalExprAdapter, PhysicalExprAdapterFactory}; -use datafusion_physical_expr_common::physical_expr::PhysicalExpr; -use object_store::{ObjectStore, memory::InMemory, path::Path}; -use parquet::arrow::ArrowWriter; - -async fn write_parquet(batch: RecordBatch, store: Arc, path: &str) { - write_batches_to_parquet(&[batch], store, path).await; -} - -/// Write RecordBatches to a Parquet file with each batch in its own row group. -async fn write_batches_to_parquet( - batches: &[RecordBatch], - store: Arc, - path: &str, -) -> usize { - let mut out = BytesMut::new().writer(); - { - let mut writer = - ArrowWriter::try_new(&mut out, batches[0].schema(), None).unwrap(); - for batch in batches { - writer.write(batch).unwrap(); - writer.flush().unwrap(); - } - writer.finish().unwrap(); - } - let data = out.into_inner().freeze(); - let file_size = data.len(); - store.put(&Path::from(path), data.into()).await.unwrap(); - file_size -} - -/// A schema adapter factory that transforms column names to uppercase -#[derive(Debug, PartialEq)] -struct UppercaseAdapterFactory {} - -impl SchemaAdapterFactory for UppercaseAdapterFactory { - fn create( - &self, - projected_table_schema: SchemaRef, - _table_schema: SchemaRef, - ) -> Box { - Box::new(UppercaseAdapter { - table_schema: projected_table_schema, - }) - } -} - -/// Schema adapter that transforms column names to uppercase -#[derive(Debug)] -struct UppercaseAdapter { - table_schema: SchemaRef, -} - -impl SchemaAdapter for UppercaseAdapter { - fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option { - let field = self.table_schema.field(index); - let uppercase_name = field.name().to_uppercase(); - file_schema - .fields() - .iter() - .position(|f| f.name().to_uppercase() == uppercase_name) - } - - fn map_schema( - &self, - file_schema: &Schema, - ) -> Result<(Arc, Vec)> { - let mut projection = Vec::new(); - - // Map each field in the table schema to the corresponding field in the file schema - for table_field in self.table_schema.fields() { - let uppercase_name = table_field.name().to_uppercase(); - if let Some(pos) = file_schema - .fields() - .iter() - .position(|f| f.name().to_uppercase() == uppercase_name) - { - projection.push(pos); - } - } - - let mapper = UppercaseSchemaMapper { - output_schema: self.output_schema(), - projection: projection.clone(), - }; - - Ok((Arc::new(mapper), projection)) - } -} - -impl UppercaseAdapter { - fn output_schema(&self) -> SchemaRef { - let fields: Vec = self - .table_schema - .fields() - .iter() - .map(|f| { - Field::new( - f.name().to_uppercase().as_str(), - f.data_type().clone(), - f.is_nullable(), - ) - }) - .collect(); - - Arc::new(Schema::new(fields)) - } -} - -#[derive(Debug)] -struct UppercaseSchemaMapper { - output_schema: SchemaRef, - projection: Vec, -} - -impl SchemaMapper for UppercaseSchemaMapper { - fn map_batch(&self, batch: RecordBatch) -> Result { - let columns = self - .projection - .iter() - .map(|&i| batch.column(i).clone()) - .collect::>(); - Ok(RecordBatch::try_new(self.output_schema.clone(), columns)?) - } - - fn map_column_statistics( - &self, - stats: &[ColumnStatistics], - ) -> Result> { - Ok(self - .projection - .iter() - .map(|&i| stats.get(i).cloned().unwrap_or_default()) - .collect()) - } -} - -/// A physical expression adapter factory that maps uppercase column names to lowercase -#[derive(Debug)] -struct UppercasePhysicalExprAdapterFactory; - -impl PhysicalExprAdapterFactory for UppercasePhysicalExprAdapterFactory { - fn create( - &self, - _logical_file_schema: SchemaRef, - physical_file_schema: SchemaRef, - ) -> Arc { - Arc::new(UppercasePhysicalExprAdapter { - physical_file_schema, - }) - } -} - -#[derive(Debug)] -struct UppercasePhysicalExprAdapter { - physical_file_schema: SchemaRef, -} - -impl PhysicalExprAdapter for UppercasePhysicalExprAdapter { - fn rewrite(&self, expr: Arc) -> Result> { - expr.transform(|e| { - if let Some(column) = e.as_any().downcast_ref::() { - // Map uppercase column name (from logical schema) to lowercase (in physical file) - let lowercase_name = column.name().to_lowercase(); - if let Ok(idx) = self.physical_file_schema.index_of(&lowercase_name) { - return Ok(Transformed::yes( - Arc::new(Column::new(&lowercase_name, idx)) - as Arc, - )); - } - } - Ok(Transformed::no(e)) - }) - .data() - } -} - -#[derive(Clone)] -struct ParquetTestCase { - table_schema: TableSchema, - batches: Vec, - predicate: Option, - projection: Option, - push_down_filters: bool, -} - -impl ParquetTestCase { - fn new(table_schema: TableSchema, batches: Vec) -> Self { - Self { - table_schema, - batches, - predicate: None, - projection: None, - push_down_filters: true, - } - } - - fn push_down_filters(mut self, pushdown_filters: bool) -> Self { - self.push_down_filters = pushdown_filters; - self - } - - fn with_predicate(mut self, predicate: Expr) -> Self { - self.predicate = Some(predicate); - self - } - - fn with_projection(mut self, projection: ProjectionExprs) -> Self { - self.projection = Some(projection); - self - } - - async fn execute(self) -> Result> { - let store = Arc::new(InMemory::new()) as Arc; - let store_url = ObjectStoreUrl::parse("memory://").unwrap(); - let path = "test.parquet"; - let file_size = - write_batches_to_parquet(&self.batches, store.clone(), path).await; - - let ctx = SessionContext::new(); - ctx.register_object_store(store_url.as_ref(), Arc::clone(&store)); - - let mut table_options = TableParquetOptions::default(); - // controlled via ConfigOptions flag; ParquetSources ORs them so if either is true then pushdown is enabled - table_options.global.pushdown_filters = false; - let mut file_source = Arc::new( - ParquetSource::new(self.table_schema.table_schema().clone()) - .with_table_parquet_options(table_options), - ) as Arc; - - if let Some(projection) = self.projection { - file_source = file_source.try_pushdown_projection(&projection)?.unwrap(); - } - - if let Some(predicate) = &self.predicate { - let filter_expr = - logical2physical(predicate, self.table_schema.table_schema()); - let mut config = ConfigOptions::default(); - config.execution.parquet.pushdown_filters = self.push_down_filters; - let result = file_source.try_pushdown_filters(vec![filter_expr], &config)?; - file_source = result.updated_node.unwrap(); - } - - let config = FileScanConfigBuilder::new(store_url.clone(), file_source) - .with_file(PartitionedFile::new(path, file_size as u64)) // size 0 for test - .with_expr_adapter(None) - .build(); - - let exec = DataSourceExec::from_data_source(config); - let task_ctx = ctx.task_ctx(); - let stream = exec.execute(0, task_ctx)?; - datafusion::physical_plan::common::collect(stream).await - } -} - -/// Test reading and filtering a Parquet file where the table schema is flipped (c, b, a) vs. the physical file schema (a, b, c) -#[tokio::test] -#[cfg(feature = "parquet")] -async fn test_parquet_flipped_projection() -> Result<()> { - // Create test data with columns (a, b, c) - the file schema - let batch1 = record_batch!( - ("a", Int32, vec![1, 2]), - ("b", Utf8, vec!["x", "y"]), - ("c", Float64, vec![1.1, 2.2]) - )?; - let batch2 = record_batch!( - ("a", Int32, vec![3]), - ("b", Utf8, vec!["z"]), - ("c", Float64, vec![3.3]) - )?; - - // Create a table schema with flipped column order (c, b, a) - let table_schema = Arc::new(Schema::new(vec![ - Field::new("c", DataType::Float64, false), - Field::new("b", DataType::Utf8, true), - Field::new("a", DataType::Int32, false), - ])); - let table_schema = TableSchema::from_file_schema(table_schema); - - let test_case = ParquetTestCase::new(table_schema.clone(), vec![batch1, batch2]); - - // Test reading with flipped schema - let batches = test_case.clone().execute().await?; - #[rustfmt::skip] - let expected = [ - "+-----+---+---+", - "| c | b | a |", - "+-----+---+---+", - "| 1.1 | x | 1 |", - "| 2.2 | y | 2 |", - "| 3.3 | z | 3 |", - "+-----+---+---+", - ]; - assert_batches_eq!(expected, &batches); - - // Test with a projection that selects (b, a) - let projection = ProjectionExprs::from_indices(&[1, 2], table_schema.table_schema()); - let batches = test_case - .clone() - .with_projection(projection.clone()) - .execute() - .await?; - #[rustfmt::skip] - let expected = [ - "+---+---+", - "| b | a |", - "+---+---+", - "| x | 1 |", - "| y | 2 |", - "| z | 3 |", - "+---+---+", - ]; - assert_batches_eq!(expected, &batches); - - // Test with a filter on b, a - // a = 1 or b != 'foo' and a = 3 -> matches [{a=1,b=x},{b=z,a=3}] - let filter = col("a") - .eq(lit(1)) - .or(col("b").not_eq(lit("foo")).and(col("a").eq(lit(3)))); - let batches = test_case - .clone() - .with_projection(projection.clone()) - .with_predicate(filter.clone()) - .execute() - .await?; - #[rustfmt::skip] - let expected = [ - "+---+---+", - "| b | a |", - "+---+---+", - "| x | 1 |", - "| z | 3 |", - "+---+---+", - ]; - assert_batches_eq!(expected, &batches); - - // Test with only statistics-based filter pushdown (no row-level filtering) - // Since we have 2 row groups and the filter matches rows in both, stats pruning alone won't filter any - let batches = test_case - .clone() - .with_projection(projection) - .with_predicate(filter) - .push_down_filters(false) - .execute() - .await?; - #[rustfmt::skip] - let expected = [ - "+---+---+", - "| b | a |", - "+---+---+", - "| x | 1 |", - "| y | 2 |", - "| z | 3 |", - "+---+---+", - ]; - assert_batches_eq!(expected, &batches); - - // Test with a filter that can prune via statistics: a > 10 (no rows match) - let filter = col("a").gt(lit(10)); - let batches = test_case - .clone() - .with_predicate(filter) - .push_down_filters(false) - .execute() - .await?; - // Stats show a has max=3, so a > 10 prunes all row groups - assert_eq!(batches.len(), 0); - - // With a filter that matches only the first row group: a < 3 - let filter = col("a").lt(lit(3)); - let batches = test_case - .clone() - .with_predicate(filter) - .push_down_filters(false) - .execute() - .await?; - #[rustfmt::skip] - let expected = [ - "+-----+---+---+", - "| c | b | a |", - "+-----+---+---+", - "| 1.1 | x | 1 |", - "| 2.2 | y | 2 |", - "+-----+---+---+", - ]; - assert_batches_eq!(expected, &batches); - - Ok(()) -} - -/// Test reading a Parquet file that is missing a column specified in the table schema, which should get filled in with nulls by default. -/// We test with the file having columns (a, c) and the table schema having (a, b, c) -#[tokio::test] -#[cfg(feature = "parquet")] -async fn test_parquet_missing_column() -> Result<()> { - // Create test data with columns (a, c) as 2 batches - // | a | c | - // |---|-----| - // | 1 | 1.1 | - // | 2 | 2.2 | - // | ~ | ~~~ | - // | 3 | 3.3 | - let batch1 = record_batch!(("a", Int32, vec![1, 2]), ("c", Float64, vec![1.1, 2.2]))?; - let batch2 = record_batch!(("a", Int32, vec![3]), ("c", Float64, vec![3.3]))?; - - // Create a table schema with an extra column 'b' (a, b, c) - let logical_file_schema = Arc::new(Schema::new(vec![ - Field::new("a", DataType::Int32, false), - Field::new("b", DataType::Utf8, true), - Field::new("c", DataType::Float64, false), - ])); - let table_schema = TableSchema::from_file_schema(logical_file_schema.clone()); - - let test_case = ParquetTestCase::new(table_schema.clone(), vec![batch1, batch2]); - - let batches = test_case.clone().execute().await?; - #[rustfmt::skip] - let expected = [ - "+---+---+-----+", - "| a | b | c |", - "+---+---+-----+", - "| 1 | | 1.1 |", - "| 2 | | 2.2 |", - "| 3 | | 3.3 |", - "+---+---+-----+", - ]; - assert_batches_eq!(expected, &batches); - - // And with a projection applied that selects (`c, `a`, `b`) - let projection = - ProjectionExprs::from_indices(&[2, 0, 1], table_schema.table_schema()); - let batches = test_case - .clone() - .with_projection(projection) - .execute() - .await?; - #[rustfmt::skip] - let expected = [ - "+-----+---+---+", - "| c | a | b |", - "+-----+---+---+", - "| 1.1 | 1 | |", - "| 2.2 | 2 | |", - "| 3.3 | 3 | |", - "+-----+---+---+", - ]; - assert_batches_eq!(expected, &batches); - - // And with a filter on a, b - // a = 1 or b is null and a = 3 - let filter = col("a") - .eq(lit(1)) - .or(col("b").is_null().and(col("a").eq(lit(3)))); - let batches = test_case - .clone() - .with_predicate(filter.clone()) - .execute() - .await?; - #[rustfmt::skip] - let expected = [ - "+---+---+-----+", - "| a | b | c |", - "+---+---+-----+", - "| 1 | | 1.1 |", - "| 3 | | 3.3 |", - "+---+---+-----+", - ]; - assert_batches_eq!(expected, &batches); - // With only statistics-based filter pushdown - let batches = test_case - .clone() - .with_predicate(filter) - .push_down_filters(false) - .execute() - .await?; - #[rustfmt::skip] - let expected = [ - "+---+---+-----+", - "| a | b | c |", - "+---+---+-----+", - "| 1 | | 1.1 |", - "| 2 | | 2.2 |", - "| 3 | | 3.3 |", - "+---+---+-----+", - ]; - assert_batches_eq!(expected, &batches); - - // Filter `b is not null or a = 24` doesn't match any rows - let filter = col("b").is_not_null().or(col("a").eq(lit(24))); - let batches = test_case - .clone() - .with_predicate(filter.clone()) - .execute() - .await?; - // There should be zero batches - assert_eq!(batches.len(), 0); - // With only statistics-based filter pushdown - let batches = test_case - .clone() - .with_predicate(filter) - .push_down_filters(false) - .execute() - .await?; - // There should be zero batches - assert_eq!(batches.len(), 0); - // Check another filter: `b = 'foo' and a = 24` should also prune data with only statistics-based pushdown - let filter = col("b").eq(lit("foo")).and(col("a").eq(lit(24))); - let batches = test_case - .clone() - .with_predicate(filter) - .push_down_filters(false) - .execute() - .await?; - // There should be zero batches - assert_eq!(batches.len(), 0); - // On the other hand `b is null and a = 2` should prune only the second row group with stats only pruning - let filter = col("b").is_null().and(col("a").eq(lit(2))); - let batches = test_case - .clone() - .with_predicate(filter) - .push_down_filters(false) - .execute() - .await?; - #[rustfmt::skip] - let expected = [ - "+---+---+-----+", - "| a | b | c |", - "+---+---+-----+", - "| 1 | | 1.1 |", - "| 2 | | 2.2 |", - "+---+---+-----+", - ]; - assert_batches_eq!(expected, &batches); - - Ok(()) -} - -#[tokio::test] -#[cfg(feature = "parquet")] -async fn test_parquet_integration_with_physical_expr_adapter() -> Result<()> { - // Create test data - let batch = RecordBatch::try_new( - Arc::new(Schema::new(vec![ - Field::new("id", DataType::Int32, false), - Field::new("name", DataType::Utf8, true), - ])), - vec![ - Arc::new(arrow::array::Int32Array::from(vec![1, 2, 3])), - Arc::new(arrow::array::StringArray::from(vec!["a", "b", "c"])), - ], - )?; - - let store = Arc::new(InMemory::new()) as Arc; - let store_url = ObjectStoreUrl::parse("memory://").unwrap(); - let path = "test.parquet"; - write_parquet(batch.clone(), store.clone(), path).await; - - // Get the actual file size from the object store - let object_meta = store.head(&Path::from(path)).await?; - let file_size = object_meta.size; - - // Create a session context and register the object store - let ctx = SessionContext::new(); - ctx.register_object_store(store_url.as_ref(), Arc::clone(&store)); - - // Create a table schema with uppercase column names - let table_schema = Arc::new(Schema::new(vec![ - Field::new("ID", DataType::Int32, false), - Field::new("NAME", DataType::Utf8, true), - ])); - - // Create a ParquetSource with the table schema (uppercase columns) - let file_source = Arc::new(ParquetSource::new(table_schema.clone())); - - // Use PhysicalExprAdapterFactory to map uppercase column names to lowercase - let config = FileScanConfigBuilder::new(store_url, file_source) - .with_file(PartitionedFile::new(path, file_size)) - .with_expr_adapter(Some(Arc::new(UppercasePhysicalExprAdapterFactory))) - .build(); - - // Create a data source executor - let exec = DataSourceExec::from_data_source(config); - - // Collect results - let task_ctx = ctx.task_ctx(); - let stream = exec.execute(0, task_ctx)?; - let batches = datafusion::physical_plan::common::collect(stream).await?; - - // There should be one batch - assert_eq!(batches.len(), 1); - - // Verify the schema has the uppercase column names - let result_schema = batches[0].schema(); - assert_eq!(result_schema.field(0).name(), "ID"); - assert_eq!(result_schema.field(1).name(), "NAME"); - - // Verify the data was correctly read from the lowercase file columns - // This confirms the PhysicalExprAdapter successfully mapped uppercase -> lowercase - let id_array = batches[0] - .column(0) - .as_any() - .downcast_ref::() - .expect("Expected Int32Array for ID column"); - assert_eq!(id_array.values(), &[1, 2, 3]); - - let name_array = batches[0] - .column(1) - .as_any() - .downcast_ref::() - .expect("Expected StringArray for NAME column"); - assert_eq!(name_array.value(0), "a"); - assert_eq!(name_array.value(1), "b"); - assert_eq!(name_array.value(2), "c"); - - Ok(()) -} - -#[tokio::test] -async fn test_multi_source_schema_adapter_reuse() -> Result<()> { - // This test verifies that the same schema adapter factory can be reused - // across different file source types. This is important for ensuring that: - // 1. The schema adapter factory interface works uniformly across all source types - // 2. The factory can be shared and cloned efficiently using Arc - // 3. Various data source implementations correctly implement the schema adapter factory pattern - - // Create a test factory - let factory = Arc::new(UppercaseAdapterFactory {}); - - // Test ArrowFileSource - { - let schema = - Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); - let table_schema = TableSchema::new(schema, vec![]); - let source = ArrowSource::new_file_source(table_schema); - let source_with_adapter = source - .clone() - .with_schema_adapter_factory(factory.clone()) - .unwrap(); - - let base_source: Arc = source.into(); - assert!(base_source.schema_adapter_factory().is_none()); - assert!(source_with_adapter.schema_adapter_factory().is_some()); - - let retrieved_factory = source_with_adapter.schema_adapter_factory().unwrap(); - assert_eq!( - format!("{:?}", retrieved_factory.as_ref()), - format!("{:?}", factory.as_ref()) - ); - } - - // Test CsvSource - { - let schema = - Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); - let options = CsvOptions { - has_header: Some(true), - delimiter: b',', - quote: b'"', - ..Default::default() - }; - let source = CsvSource::new(schema).with_csv_options(options); - let source_with_adapter = source - .clone() - .with_schema_adapter_factory(factory.clone()) - .unwrap(); - - let base_source: Arc = source.into(); - assert!(base_source.schema_adapter_factory().is_none()); - assert!(source_with_adapter.schema_adapter_factory().is_some()); - - let retrieved_factory = source_with_adapter.schema_adapter_factory().unwrap(); - assert_eq!( - format!("{:?}", retrieved_factory.as_ref()), - format!("{:?}", factory.as_ref()) - ); - } - - // Test JsonSource - { - let schema = - Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); - let table_schema = TableSchema::new(schema, vec![]); - let source = JsonSource::new(table_schema); - let source_with_adapter = source - .clone() - .with_schema_adapter_factory(factory.clone()) - .unwrap(); - - let base_source: Arc = source.into(); - assert!(base_source.schema_adapter_factory().is_none()); - assert!(source_with_adapter.schema_adapter_factory().is_some()); - - let retrieved_factory = source_with_adapter.schema_adapter_factory().unwrap(); - assert_eq!( - format!("{:?}", retrieved_factory.as_ref()), - format!("{:?}", factory.as_ref()) - ); - } - - Ok(()) -} diff --git a/datafusion/datasource-arrow/src/source.rs b/datafusion/datasource-arrow/src/source.rs index 892ab01b23c16..16d5d497c979f 100644 --- a/datafusion/datasource-arrow/src/source.rs +++ b/datafusion/datasource-arrow/src/source.rs @@ -34,13 +34,9 @@ use std::sync::Arc; use std::{any::Any, io::Cursor}; -use datafusion_datasource::schema_adapter::{ - DefaultSchemaAdapterFactory, SchemaAdapterFactory, -}; use datafusion_datasource::{TableSchema, as_file_source}; use arrow::buffer::Buffer; -use arrow::datatypes::SchemaRef; use arrow::ipc::reader::{FileDecoder, FileReader, StreamReader}; use datafusion_common::error::Result; use datafusion_common::exec_datafusion_err; @@ -54,7 +50,7 @@ use datafusion_physical_plan::projection::ProjectionExprs; use datafusion_datasource::file_stream::FileOpenFuture; use datafusion_datasource::file_stream::FileOpener; -use futures::{StreamExt, TryStreamExt}; +use futures::StreamExt; use itertools::Itertools; use object_store::{GetOptions, GetRange, GetResultPayload, ObjectStore}; @@ -71,8 +67,6 @@ enum ArrowFormat { pub(crate) struct ArrowStreamFileOpener { object_store: Arc, projection: Option>, - projected_schema: Option, - schema_adapter_factory: Option>, } impl FileOpener for ArrowStreamFileOpener { @@ -84,8 +78,6 @@ impl FileOpener for ArrowStreamFileOpener { } let object_store = Arc::clone(&self.object_store); let projection = self.projection.clone(); - let projected_schema = self.projected_schema.clone(); - let schema_adapter_factory = self.schema_adapter_factory.clone(); Ok(Box::pin(async move { let r = object_store @@ -111,26 +103,7 @@ impl FileOpener for ArrowStreamFileOpener { } }; - // If we have a schema adapter factory and projected schema, use them to normalize the schema - if let (Some(factory), Some(proj_schema)) = - (schema_adapter_factory, projected_schema) - { - Ok(stream - .and_then(move |batch| { - let factory = Arc::clone(&factory); - let proj_schema = Arc::clone(&proj_schema); - async move { - let schema_adapter = - factory.create_with_projected_schema(proj_schema); - let (schema_mapper, _) = - schema_adapter.map_schema(batch.schema().as_ref())?; - schema_mapper.map_batch(batch) - } - }) - .boxed()) - } else { - Ok(stream) - } + Ok(stream) })) } } @@ -139,16 +112,12 @@ impl FileOpener for ArrowStreamFileOpener { pub(crate) struct ArrowFileOpener { object_store: Arc, projection: Option>, - projected_schema: Option, - schema_adapter_factory: Option>, } impl FileOpener for ArrowFileOpener { fn open(&self, partitioned_file: PartitionedFile) -> Result { let object_store = Arc::clone(&self.object_store); let projection = self.projection.clone(); - let projected_schema = self.projected_schema.clone(); - let schema_adapter_factory = self.schema_adapter_factory.clone(); Ok(Box::pin(async move { let range = partitioned_file.range.clone(); @@ -176,26 +145,7 @@ impl FileOpener for ArrowFileOpener { } }; - // Apply schema adaptation if available - if let (Some(factory), Some(proj_schema)) = - (schema_adapter_factory, projected_schema) - { - Ok(stream - .and_then(move |batch| { - let factory = Arc::clone(&factory); - let proj_schema = Arc::clone(&proj_schema); - async move { - let schema_adapter = - factory.create_with_projected_schema(proj_schema); - let (schema_mapper, _) = schema_adapter - .map_schema(batch.schema().as_ref())?; - schema_mapper.map_batch(batch) - } - }) - .boxed()) - } else { - Ok(stream) - } + Ok(stream) } Some(range) => { // range is not none, the file maybe split into multiple parts to scan in parallel @@ -297,27 +247,7 @@ impl FileOpener for ArrowFileOpener { .map(|r| r.map_err(Into::into)) .boxed(); - // Apply schema adaptation if available - if let (Some(factory), Some(proj_schema)) = - (schema_adapter_factory, projected_schema) - { - Ok(stream - .and_then(move |batch| { - let factory = Arc::clone(&factory); - let proj_schema = Arc::clone(&proj_schema); - async move { - let schema_adapter = - factory.create_with_projected_schema(proj_schema); - let (schema_mapper, projection) = schema_adapter - .map_schema(batch.schema().as_ref())?; - let batch = batch.project(&projection)?; - schema_mapper.map_batch(batch) - } - }) - .boxed()) - } else { - Ok(stream) - } + Ok(stream) } } })) @@ -329,7 +259,6 @@ impl FileOpener for ArrowFileOpener { pub struct ArrowSource { format: ArrowFormat, metrics: ExecutionPlanMetricsSet, - schema_adapter_factory: Option>, projection: SplitProjection, table_schema: TableSchema, } @@ -341,7 +270,6 @@ impl ArrowSource { Self { format: ArrowFormat::File, metrics: ExecutionPlanMetricsSet::new(), - schema_adapter_factory: None, projection: SplitProjection::unprojected(&table_schema), table_schema, } @@ -353,7 +281,6 @@ impl ArrowSource { Self { format: ArrowFormat::Stream, metrics: ExecutionPlanMetricsSet::new(), - schema_adapter_factory: None, projection: SplitProjection::unprojected(&table_schema), table_schema, } @@ -368,32 +295,15 @@ impl FileSource for ArrowSource { _partition: usize, ) -> Result> { let split_projection = self.projection.clone(); - // For schema adaptation, we only use the file schema (not partition columns) - let projected_file_schema = SchemaRef::from( - self.table_schema - .file_schema() - .project(&split_projection.file_indices)?, - ); - - // Use provided schema adapter factory, or default to DefaultSchemaAdapterFactory - // This ensures schema normalization (removing metadata differences) happens during execution - let schema_adapter_factory = self - .schema_adapter_factory - .clone() - .unwrap_or_else(|| Arc::new(DefaultSchemaAdapterFactory)); let opener: Arc = match self.format { ArrowFormat::File => Arc::new(ArrowFileOpener { object_store, projection: Some(split_projection.file_indices.clone()), - projected_schema: Some(Arc::clone(&projected_file_schema)), - schema_adapter_factory: Some(schema_adapter_factory), }), ArrowFormat::Stream => Arc::new(ArrowStreamFileOpener { object_store, projection: Some(split_projection.file_indices.clone()), - projected_schema: Some(projected_file_schema), - schema_adapter_factory: Some(schema_adapter_factory), }), }; ProjectionOpener::try_new( @@ -422,20 +332,6 @@ impl FileSource for ArrowSource { } } - fn with_schema_adapter_factory( - &self, - schema_adapter_factory: Arc, - ) -> Result> { - Ok(Arc::new(Self { - schema_adapter_factory: Some(schema_adapter_factory), - ..self.clone() - })) - } - - fn schema_adapter_factory(&self) -> Option> { - self.schema_adapter_factory.clone() - } - fn repartitioned( &self, target_partitions: usize, @@ -529,8 +425,6 @@ impl ArrowOpener { inner: Arc::new(ArrowFileOpener { object_store, projection, - projected_schema: None, - schema_adapter_factory: None, }), } } @@ -543,8 +437,6 @@ impl ArrowOpener { inner: Arc::new(ArrowStreamFileOpener { object_store, projection, - projected_schema: None, - schema_adapter_factory: None, }), } } @@ -740,8 +632,6 @@ mod tests { let opener = ArrowStreamFileOpener { object_store, projection: Some(vec![0]), // just the first column - projected_schema: None, - schema_adapter_factory: None, }; let mut stream = opener.open(partitioned_file)?.await?; diff --git a/datafusion/datasource-avro/src/source.rs b/datafusion/datasource-avro/src/source.rs index 33d6cf5272678..1c466be266f17 100644 --- a/datafusion/datasource-avro/src/source.rs +++ b/datafusion/datasource-avro/src/source.rs @@ -28,7 +28,6 @@ use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::FileScanConfig; use datafusion_datasource::file_stream::FileOpener; use datafusion_datasource::projection::{ProjectionOpener, SplitProjection}; -use datafusion_datasource::schema_adapter::SchemaAdapterFactory; use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion_physical_plan::projection::ProjectionExprs; @@ -42,7 +41,6 @@ pub struct AvroSource { batch_size: Option, projection: SplitProjection, metrics: ExecutionPlanMetricsSet, - schema_adapter_factory: Option>, } impl AvroSource { @@ -54,7 +52,6 @@ impl AvroSource { table_schema, batch_size: None, metrics: ExecutionPlanMetricsSet::new(), - schema_adapter_factory: None, } } @@ -142,20 +139,6 @@ impl FileSource for AvroSource { ) -> Result> { Ok(None) } - - fn with_schema_adapter_factory( - &self, - schema_adapter_factory: Arc, - ) -> Result> { - Ok(Arc::new(Self { - schema_adapter_factory: Some(schema_adapter_factory), - ..self.clone() - })) - } - - fn schema_adapter_factory(&self) -> Option> { - self.schema_adapter_factory.clone() - } } mod private { diff --git a/datafusion/datasource-csv/src/source.rs b/datafusion/datasource-csv/src/source.rs index b318d89189d6b..33cf4d995ff7b 100644 --- a/datafusion/datasource-csv/src/source.rs +++ b/datafusion/datasource-csv/src/source.rs @@ -18,7 +18,6 @@ //! Execution plan for reading CSV files use datafusion_datasource::projection::{ProjectionOpener, SplitProjection}; -use datafusion_datasource::schema_adapter::SchemaAdapterFactory; use datafusion_physical_plan::projection::ProjectionExprs; use std::any::Any; use std::fmt; @@ -92,7 +91,6 @@ pub struct CsvSource { table_schema: TableSchema, projection: SplitProjection, metrics: ExecutionPlanMetricsSet, - schema_adapter_factory: Option>, } impl CsvSource { @@ -105,7 +103,6 @@ impl CsvSource { table_schema, batch_size: None, metrics: ExecutionPlanMetricsSet::new(), - schema_adapter_factory: None, } } @@ -305,20 +302,6 @@ impl FileSource for CsvSource { DisplayFormatType::TreeRender => Ok(()), } } - - fn with_schema_adapter_factory( - &self, - schema_adapter_factory: Arc, - ) -> Result> { - Ok(Arc::new(Self { - schema_adapter_factory: Some(schema_adapter_factory), - ..self.clone() - })) - } - - fn schema_adapter_factory(&self) -> Option> { - self.schema_adapter_factory.clone() - } } impl FileOpener for CsvOpener { diff --git a/datafusion/datasource-json/src/source.rs b/datafusion/datasource-json/src/source.rs index 21ffa7f2f9e9e..5797054f11b9c 100644 --- a/datafusion/datasource-json/src/source.rs +++ b/datafusion/datasource-json/src/source.rs @@ -30,7 +30,6 @@ use datafusion_datasource::decoder::{DecoderDeserializer, deserialize_stream}; use datafusion_datasource::file_compression_type::FileCompressionType; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; use datafusion_datasource::projection::{ProjectionOpener, SplitProjection}; -use datafusion_datasource::schema_adapter::SchemaAdapterFactory; use datafusion_datasource::{ ListingTableUrl, PartitionedFile, RangeCalculation, as_file_source, calculate_range, }; @@ -80,7 +79,6 @@ pub struct JsonSource { table_schema: datafusion_datasource::TableSchema, batch_size: Option, metrics: ExecutionPlanMetricsSet, - schema_adapter_factory: Option>, projection: SplitProjection, } @@ -93,7 +91,6 @@ impl JsonSource { table_schema, batch_size: None, metrics: ExecutionPlanMetricsSet::new(), - schema_adapter_factory: None, } } } @@ -172,20 +169,6 @@ impl FileSource for JsonSource { fn file_type(&self) -> &str { "json" } - - fn with_schema_adapter_factory( - &self, - schema_adapter_factory: Arc, - ) -> Result> { - Ok(Arc::new(Self { - schema_adapter_factory: Some(schema_adapter_factory), - ..self.clone() - })) - } - - fn schema_adapter_factory(&self) -> Option> { - self.schema_adapter_factory.clone() - } } impl FileOpener for JsonOpener { diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 4956f83effcc7..7d7662a4a9cb8 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -264,7 +264,6 @@ use parquet::encryption::decrypt::FileDecryptionProperties; /// filled with nulls, but this can be customized via [`PhysicalExprAdapterFactory`]. /// /// [`RecordBatch`]: arrow::record_batch::RecordBatch -/// [`SchemaAdapter`]: datafusion_datasource::schema_adapter::SchemaAdapter /// [`ParquetMetadata`]: parquet::file::metadata::ParquetMetaData /// [`PhysicalExprAdapterFactory`]: datafusion_physical_expr_adapter::PhysicalExprAdapterFactory #[derive(Clone, Debug)] diff --git a/datafusion/datasource/src/file.rs b/datafusion/datasource/src/file.rs index 2c69987f91342..9e8f6dc1a67c2 100644 --- a/datafusion/datasource/src/file.rs +++ b/datafusion/datasource/src/file.rs @@ -25,9 +25,8 @@ use std::sync::Arc; use crate::file_groups::FileGroupPartitioner; use crate::file_scan_config::FileScanConfig; use crate::file_stream::FileOpener; -use crate::schema_adapter::SchemaAdapterFactory; +use datafusion_common::Result; use datafusion_common::config::ConfigOptions; -use datafusion_common::{Result, not_impl_err}; use datafusion_physical_expr::projection::ProjectionExprs; use datafusion_physical_expr::{LexOrdering, PhysicalExpr}; use datafusion_physical_plan::DisplayFormatType; @@ -174,48 +173,4 @@ pub trait FileSource: Send + Sync { ) -> Result>> { Ok(None) } - - /// Set optional schema adapter factory. - /// - /// [`SchemaAdapterFactory`] allows user to specify how fields from the - /// file get mapped to that of the table schema. If you implement this - /// method, you should also implement [`schema_adapter_factory`]. - /// - /// The default implementation returns a not implemented error. - /// - /// [`schema_adapter_factory`]: Self::schema_adapter_factory - fn with_schema_adapter_factory( - &self, - _factory: Arc, - ) -> Result> { - not_impl_err!( - "FileSource {} does not support schema adapter factory", - self.file_type() - ) - } - - /// Returns the current schema adapter factory if set - /// - /// Default implementation returns `None`. - fn schema_adapter_factory(&self) -> Option> { - None - } - - /// Set the file ordering information - /// - /// This allows the file source to know how the files are sorted, - /// enabling it to make informed decisions about sort pushdown. - /// - /// # Default Implementation - /// - /// Returns `not_impl_err!`. FileSource implementations that support - /// sort optimization should override this method. - fn with_file_ordering_info( - &self, - _ordering: Option, - ) -> Result> { - // Default: clone self without modification - // ParquetSource will override this - not_impl_err!("with_file_ordering_info not implemented for this FileSource") - } } diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index a9a00c227c2cc..f5c684736f7b8 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -78,7 +78,6 @@ use std::{any::Any, fmt::Debug, fmt::Formatter, fmt::Result as FmtResult, sync:: /// # use datafusion_physical_expr::projection::ProjectionExprs; /// # use datafusion_physical_plan::ExecutionPlan; /// # use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; -/// # use datafusion_datasource::schema_adapter::SchemaAdapterFactory; /// # let file_schema = Arc::new(Schema::new(vec![ /// # Field::new("c1", DataType::Int32, false), /// # Field::new("c2", DataType::Int32, false), @@ -89,7 +88,6 @@ use std::{any::Any, fmt::Debug, fmt::Formatter, fmt::Result as FmtResult, sync:: /// #[derive(Clone)] /// # struct ParquetSource { /// # table_schema: TableSchema, -/// # schema_adapter_factory: Option> /// # }; /// # impl FileSource for ParquetSource { /// # fn create_file_opener(&self, _: Arc, _: &FileScanConfig, _: usize) -> Result> { unimplemented!() } @@ -98,13 +96,11 @@ use std::{any::Any, fmt::Debug, fmt::Formatter, fmt::Result as FmtResult, sync:: /// # fn with_batch_size(&self, _: usize) -> Arc { unimplemented!() } /// # fn metrics(&self) -> &ExecutionPlanMetricsSet { unimplemented!() } /// # fn file_type(&self) -> &str { "parquet" } -/// # fn with_schema_adapter_factory(&self, factory: Arc) -> Result> { Ok(Arc::new(Self {table_schema: self.table_schema.clone(), schema_adapter_factory: Some(factory)} )) } -/// # fn schema_adapter_factory(&self) -> Option> { self.schema_adapter_factory.clone() } /// # // Note that this implementation drops the projection on the floor, it is not complete! /// # fn try_pushdown_projection(&self, projection: &ProjectionExprs) -> Result>> { Ok(Some(Arc::new(self.clone()) as Arc)) } /// # } /// # impl ParquetSource { -/// # fn new(table_schema: impl Into) -> Self { Self {table_schema: table_schema.into(), schema_adapter_factory: None} } +/// # fn new(table_schema: impl Into) -> Self { Self {table_schema: table_schema.into()} } /// # } /// // create FileScan config for reading parquet files from file:// /// let object_store_url = ObjectStoreUrl::local_filesystem(); diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs index 347e783c278d0..223d00ea074d0 100644 --- a/datafusion/datasource/src/mod.rs +++ b/datafusion/datasource/src/mod.rs @@ -40,7 +40,6 @@ pub mod file_sink_config; pub mod file_stream; pub mod memory; pub mod projection; -pub mod schema_adapter; pub mod sink; pub mod source; mod statistics; diff --git a/datafusion/datasource/src/schema_adapter.rs b/datafusion/datasource/src/schema_adapter.rs deleted file mode 100644 index 0ee4ffd6485fd..0000000000000 --- a/datafusion/datasource/src/schema_adapter.rs +++ /dev/null @@ -1,1047 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! [`SchemaAdapter`] and [`SchemaAdapterFactory`] to adapt file-level record batches to a table schema. -//! -//! Adapter provides a method of translating the RecordBatches that come out of the -//! physical format into how they should be used by DataFusion. For instance, a schema -//! can be stored external to a parquet file that maps parquet logical types to arrow types. -use arrow::{ - array::{ArrayRef, RecordBatch, RecordBatchOptions, new_null_array}, - compute::can_cast_types, - datatypes::{DataType, Field, Schema, SchemaRef}, -}; -use datafusion_common::{ - ColumnStatistics, - format::DEFAULT_CAST_OPTIONS, - nested_struct::{cast_column, validate_struct_compatibility}, - plan_err, -}; -use std::{fmt::Debug, sync::Arc}; -/// Function used by [`SchemaMapping`] to adapt a column from the file schema to -/// the table schema. -pub type CastColumnFn = dyn Fn( - &ArrayRef, - &Field, - &arrow::compute::CastOptions, - ) -> datafusion_common::Result - + Send - + Sync; - -/// Factory for creating [`SchemaAdapter`] -/// -/// This interface provides a way to implement custom schema adaptation logic -/// for DataSourceExec (for example, to fill missing columns with default value -/// other than null). -/// -/// Most users should use [`DefaultSchemaAdapterFactory`]. See that struct for -/// more details and examples. -pub trait SchemaAdapterFactory: Debug + Send + Sync + 'static { - /// Create a [`SchemaAdapter`] - /// - /// Arguments: - /// - /// * `projected_table_schema`: The schema for the table, projected to - /// include only the fields being output (projected) by the this mapping. - /// - /// * `table_schema`: The entire table schema for the table - fn create( - &self, - projected_table_schema: SchemaRef, - table_schema: SchemaRef, - ) -> Box; - - /// Create a [`SchemaAdapter`] using only the projected table schema. - /// - /// This is a convenience method for cases where the table schema and the - /// projected table schema are the same. - fn create_with_projected_schema( - &self, - projected_table_schema: SchemaRef, - ) -> Box { - self.create(Arc::clone(&projected_table_schema), projected_table_schema) - } -} - -/// Creates [`SchemaMapper`]s to map file-level [`RecordBatch`]es to a table -/// schema, which may have a schema obtained from merging multiple file-level -/// schemas. -/// -/// This is useful for implementing schema evolution in partitioned datasets. -/// -/// See [`DefaultSchemaAdapterFactory`] for more details and examples. -pub trait SchemaAdapter: Send + Sync { - /// Map a column index in the table schema to a column index in a particular - /// file schema - /// - /// This is used while reading a file to push down projections by mapping - /// projected column indexes from the table schema to the file schema - /// - /// Panics if index is not in range for the table schema - fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option; - - /// Creates a mapping for casting columns from the file schema to the table - /// schema. - /// - /// This is used after reading a record batch. The returned [`SchemaMapper`]: - /// - /// 1. Maps columns to the expected columns indexes - /// 2. Handles missing values (e.g. fills nulls or a default value) for - /// columns in the in the table schema not in the file schema - /// 2. Handles different types: if the column in the file schema has a - /// different type than `table_schema`, the mapper will resolve this - /// difference (e.g. by casting to the appropriate type) - /// - /// Returns: - /// * a [`SchemaMapper`] - /// * an ordered list of columns to project from the file - fn map_schema( - &self, - file_schema: &Schema, - ) -> datafusion_common::Result<(Arc, Vec)>; -} - -/// Maps, columns from a specific file schema to the table schema. -/// -/// See [`DefaultSchemaAdapterFactory`] for more details and examples. -pub trait SchemaMapper: Debug + Send + Sync { - /// Adapts a `RecordBatch` to match the `table_schema` - fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result; - - /// Adapts file-level column `Statistics` to match the `table_schema` - fn map_column_statistics( - &self, - file_col_statistics: &[ColumnStatistics], - ) -> datafusion_common::Result>; -} - -/// Default [`SchemaAdapterFactory`] for mapping schemas. -/// -/// This can be used to adapt file-level record batches to a table schema and -/// implement schema evolution. -/// -/// Given an input file schema and a table schema, this factory returns -/// [`SchemaAdapter`] that return [`SchemaMapper`]s that: -/// -/// 1. Reorder columns -/// 2. Cast columns to the correct type -/// 3. Fill missing columns with nulls -/// -/// # Errors: -/// -/// * If a column in the table schema is non-nullable but is not present in the -/// file schema (i.e. it is missing), the returned mapper tries to fill it with -/// nulls resulting in a schema error. -/// -/// # Illustration of Schema Mapping -/// -/// ```text -/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ -/// ┌───────┐ ┌───────┐ │ ┌───────┐ ┌───────┐ ┌───────┐ │ -/// ││ 1.0 │ │ "foo" │ ││ NULL │ │ "foo" │ │ "1.0" │ -/// ├───────┤ ├───────┤ │ Schema mapping ├───────┤ ├───────┤ ├───────┤ │ -/// ││ 2.0 │ │ "bar" │ ││ NULL │ │ "bar" │ │ "2.0" │ -/// └───────┘ └───────┘ │────────────────▶ └───────┘ └───────┘ └───────┘ │ -/// │ │ -/// column "c" column "b"│ column "a" column "b" column "c"│ -/// │ Float64 Utf8 │ Int32 Utf8 Utf8 -/// ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ -/// Input Record Batch Output Record Batch -/// -/// Schema { Schema { -/// "c": Float64, "a": Int32, -/// "b": Utf8, "b": Utf8, -/// } "c": Utf8, -/// } -/// ``` -/// -/// # Example of using the `DefaultSchemaAdapterFactory` to map [`RecordBatch`]s -/// -/// Note `SchemaMapping` also supports mapping partial batches, which is used as -/// part of predicate pushdown. -/// -/// ``` -/// # use std::sync::Arc; -/// # use arrow::datatypes::{DataType, Field, Schema}; -/// # use datafusion_datasource::schema_adapter::{DefaultSchemaAdapterFactory, SchemaAdapterFactory}; -/// # use datafusion_common::record_batch; -/// // Table has fields "a", "b" and "c" -/// let table_schema = Schema::new(vec![ -/// Field::new("a", DataType::Int32, true), -/// Field::new("b", DataType::Utf8, true), -/// Field::new("c", DataType::Utf8, true), -/// ]); -/// -/// // create an adapter to map the table schema to the file schema -/// let adapter = DefaultSchemaAdapterFactory::from_schema(Arc::new(table_schema)); -/// -/// // The file schema has fields "c" and "b" but "b" is stored as an 'Float64' -/// // instead of 'Utf8' -/// let file_schema = Schema::new(vec![ -/// Field::new("c", DataType::Utf8, true), -/// Field::new("b", DataType::Float64, true), -/// ]); -/// -/// // Get a mapping from the file schema to the table schema -/// let (mapper, _indices) = adapter.map_schema(&file_schema).unwrap(); -/// -/// let file_batch = record_batch!( -/// ("c", Utf8, vec!["foo", "bar"]), -/// ("b", Float64, vec![1.0, 2.0]) -/// ).unwrap(); -/// -/// let mapped_batch = mapper.map_batch(file_batch).unwrap(); -/// -/// // the mapped batch has the correct schema and the "b" column has been cast to Utf8 -/// let expected_batch = record_batch!( -/// ("a", Int32, vec![None, None]), // missing column filled with nulls -/// ("b", Utf8, vec!["1.0", "2.0"]), // b was cast to string and order was changed -/// ("c", Utf8, vec!["foo", "bar"]) -/// ).unwrap(); -/// assert_eq!(mapped_batch, expected_batch); -/// ``` -#[derive(Clone, Debug, Default)] -pub struct DefaultSchemaAdapterFactory; - -impl DefaultSchemaAdapterFactory { - /// Create a new factory for mapping batches from a file schema to a table - /// schema. - /// - /// This is a convenience for [`DefaultSchemaAdapterFactory::create`] with - /// the same schema for both the projected table schema and the table - /// schema. - pub fn from_schema(table_schema: SchemaRef) -> Box { - Self.create(Arc::clone(&table_schema), table_schema) - } -} - -impl SchemaAdapterFactory for DefaultSchemaAdapterFactory { - fn create( - &self, - projected_table_schema: SchemaRef, - _table_schema: SchemaRef, - ) -> Box { - Box::new(DefaultSchemaAdapter { - projected_table_schema, - }) - } -} - -/// This SchemaAdapter requires both the table schema and the projected table -/// schema. See [`SchemaMapping`] for more details -#[derive(Clone, Debug)] -pub(crate) struct DefaultSchemaAdapter { - /// The schema for the table, projected to include only the fields being output (projected) by the - /// associated ParquetSource - projected_table_schema: SchemaRef, -} - -/// Checks if a file field can be cast to a table field -/// -/// Returns Ok(true) if casting is possible, or an error explaining why casting is not possible -pub(crate) fn can_cast_field( - file_field: &Field, - table_field: &Field, -) -> datafusion_common::Result { - match (file_field.data_type(), table_field.data_type()) { - (DataType::Struct(source_fields), DataType::Struct(target_fields)) => { - // validate_struct_compatibility returns Result<()>; on success we can cast structs - validate_struct_compatibility(source_fields, target_fields)?; - Ok(true) - } - _ => { - if can_cast_types(file_field.data_type(), table_field.data_type()) { - Ok(true) - } else { - plan_err!( - "Cannot cast file schema field {} of type {} to table schema field of type {}", - file_field.name(), - file_field.data_type(), - table_field.data_type() - ) - } - } - } -} - -impl SchemaAdapter for DefaultSchemaAdapter { - /// Map a column index in the table schema to a column index in a particular - /// file schema - /// - /// Panics if index is not in range for the table schema - fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option { - let field = self.projected_table_schema.field(index); - Some(file_schema.fields.find(field.name())?.0) - } - - /// Creates a `SchemaMapping` for casting or mapping the columns from the - /// file schema to the table schema. - /// - /// If the provided `file_schema` contains columns of a different type to - /// the expected `table_schema`, the method will attempt to cast the array - /// data from the file schema to the table schema where possible. - /// - /// Returns a [`SchemaMapping`] that can be applied to the output batch - /// along with an ordered list of columns to project from the file - fn map_schema( - &self, - file_schema: &Schema, - ) -> datafusion_common::Result<(Arc, Vec)> { - let (field_mappings, projection) = create_field_mapping( - file_schema, - &self.projected_table_schema, - can_cast_field, - )?; - - Ok(( - Arc::new(SchemaMapping::new( - Arc::clone(&self.projected_table_schema), - field_mappings, - Arc::new( - |array: &ArrayRef, - field: &Field, - opts: &arrow::compute::CastOptions| { - cast_column(array, field, opts) - }, - ), - )), - projection, - )) - } -} - -/// Helper function that creates field mappings between file schema and table schema -/// -/// Maps columns from the file schema to their corresponding positions in the table schema, -/// applying type compatibility checking via the provided predicate function. -/// -/// Returns field mappings (for column reordering) and a projection (for field selection). -pub(crate) fn create_field_mapping( - file_schema: &Schema, - projected_table_schema: &SchemaRef, - can_map_field: F, -) -> datafusion_common::Result<(Vec>, Vec)> -where - F: Fn(&Field, &Field) -> datafusion_common::Result, -{ - let mut projection = Vec::with_capacity(file_schema.fields().len()); - let mut field_mappings = vec![None; projected_table_schema.fields().len()]; - - for (file_idx, file_field) in file_schema.fields.iter().enumerate() { - if let Some((table_idx, table_field)) = - projected_table_schema.fields().find(file_field.name()) - && can_map_field(file_field, table_field)? - { - field_mappings[table_idx] = Some(projection.len()); - projection.push(file_idx); - } - } - - Ok((field_mappings, projection)) -} - -/// The SchemaMapping struct holds a mapping from the file schema to the table -/// schema and any necessary type conversions. -/// -/// [`map_batch`] is used by the ParquetOpener to produce a RecordBatch which -/// has the projected schema, since that's the schema which is supposed to come -/// out of the execution of this query. Thus `map_batch` uses -/// `projected_table_schema` as it can only operate on the projected fields. -/// -/// [`map_batch`]: Self::map_batch -pub struct SchemaMapping { - /// The schema of the table. This is the expected schema after conversion - /// and it should match the schema of the query result. - projected_table_schema: SchemaRef, - /// Mapping from field index in `projected_table_schema` to index in - /// projected file_schema. - /// - /// They are Options instead of just plain `usize`s because the table could - /// have fields that don't exist in the file. - field_mappings: Vec>, - /// Function used to adapt a column from the file schema to the table schema - /// when it exists in both schemas - cast_column: Arc, -} - -impl Debug for SchemaMapping { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("SchemaMapping") - .field("projected_table_schema", &self.projected_table_schema) - .field("field_mappings", &self.field_mappings) - .field("cast_column", &"") - .finish() - } -} - -impl SchemaMapping { - /// Creates a new SchemaMapping instance - /// - /// Initializes the field mappings needed to transform file data to the projected table schema - pub fn new( - projected_table_schema: SchemaRef, - field_mappings: Vec>, - cast_column: Arc, - ) -> Self { - Self { - projected_table_schema, - field_mappings, - cast_column, - } - } -} - -impl SchemaMapper for SchemaMapping { - /// Adapts a `RecordBatch` to match the `projected_table_schema` using the stored mapping and - /// conversions. - /// The produced RecordBatch has a schema that contains only the projected columns. - fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result { - let (_old_schema, batch_cols, batch_rows) = batch.into_parts(); - - let cols = self - .projected_table_schema - // go through each field in the projected schema - .fields() - .iter() - // and zip it with the index that maps fields from the projected table schema to the - // projected file schema in `batch` - .zip(&self.field_mappings) - // and for each one... - .map(|(field, file_idx)| { - file_idx.map_or_else( - // If this field only exists in the table, and not in the file, then we know - // that it's null, so just return that. - || Ok(new_null_array(field.data_type(), batch_rows)), - // However, if it does exist in both, use the cast_column function - // to perform any necessary conversions - |batch_idx| { - (self.cast_column)( - &batch_cols[batch_idx], - field, - &DEFAULT_CAST_OPTIONS, - ) - }, - ) - }) - .collect::, _>>()?; - - // Necessary to handle empty batches - let options = RecordBatchOptions::new().with_row_count(Some(batch_rows)); - - let schema = Arc::clone(&self.projected_table_schema); - let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?; - Ok(record_batch) - } - - /// Adapts file-level column `Statistics` to match the `table_schema` - fn map_column_statistics( - &self, - file_col_statistics: &[ColumnStatistics], - ) -> datafusion_common::Result> { - let mut table_col_statistics = vec![]; - - // Map the statistics for each field in the file schema to the corresponding field in the - // table schema, if a field is not present in the file schema, we need to fill it with `ColumnStatistics::new_unknown` - for (_, file_col_idx) in self - .projected_table_schema - .fields() - .iter() - .zip(&self.field_mappings) - { - if let Some(file_col_idx) = file_col_idx { - table_col_statistics.push( - file_col_statistics - .get(*file_col_idx) - .cloned() - .unwrap_or_default(), - ); - } else { - table_col_statistics.push(ColumnStatistics::new_unknown()); - } - } - - Ok(table_col_statistics) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use arrow::{ - array::{Array, ArrayRef, StringBuilder, StructArray, TimestampMillisecondArray}, - compute::cast, - datatypes::{DataType, Field, TimeUnit}, - record_batch::RecordBatch, - }; - use datafusion_common::{Result, ScalarValue, Statistics, stats::Precision}; - - #[test] - fn test_schema_mapping_map_statistics_basic() { - // Create table schema (a, b, c) - let table_schema = Arc::new(Schema::new(vec![ - Field::new("a", DataType::Int32, true), - Field::new("b", DataType::Utf8, true), - Field::new("c", DataType::Float64, true), - ])); - - // Create file schema (b, a) - different order, missing c - let file_schema = Schema::new(vec![ - Field::new("b", DataType::Utf8, true), - Field::new("a", DataType::Int32, true), - ]); - - // Create SchemaAdapter - let adapter = DefaultSchemaAdapter { - projected_table_schema: Arc::clone(&table_schema), - }; - - // Get mapper and projection - let (mapper, projection) = adapter.map_schema(&file_schema).unwrap(); - - // Should project columns 0,1 from file - assert_eq!(projection, vec![0, 1]); - - // Create file statistics - let mut file_stats = Statistics::default(); - - // Statistics for column b (index 0 in file) - let b_stats = ColumnStatistics { - null_count: Precision::Exact(5), - ..Default::default() - }; - - // Statistics for column a (index 1 in file) - let a_stats = ColumnStatistics { - null_count: Precision::Exact(10), - ..Default::default() - }; - - file_stats.column_statistics = vec![b_stats, a_stats]; - - // Map statistics - let table_col_stats = mapper - .map_column_statistics(&file_stats.column_statistics) - .unwrap(); - - // Verify stats - assert_eq!(table_col_stats.len(), 3); - assert_eq!(table_col_stats[0].null_count, Precision::Exact(10)); // a from file idx 1 - assert_eq!(table_col_stats[1].null_count, Precision::Exact(5)); // b from file idx 0 - assert_eq!(table_col_stats[2].null_count, Precision::Absent); // c (unknown) - } - - #[test] - fn test_schema_mapping_map_statistics_empty() { - // Create schemas - let table_schema = Arc::new(Schema::new(vec![ - Field::new("a", DataType::Int32, true), - Field::new("b", DataType::Utf8, true), - ])); - let file_schema = Schema::new(vec![ - Field::new("a", DataType::Int32, true), - Field::new("b", DataType::Utf8, true), - ]); - - let adapter = DefaultSchemaAdapter { - projected_table_schema: Arc::clone(&table_schema), - }; - let (mapper, _) = adapter.map_schema(&file_schema).unwrap(); - - // Empty file statistics - let file_stats = Statistics::default(); - let table_col_stats = mapper - .map_column_statistics(&file_stats.column_statistics) - .unwrap(); - - // All stats should be unknown - assert_eq!(table_col_stats.len(), 2); - assert_eq!(table_col_stats[0], ColumnStatistics::new_unknown(),); - assert_eq!(table_col_stats[1], ColumnStatistics::new_unknown(),); - } - - #[test] - fn test_can_cast_field() { - // Same type should work - let from_field = Field::new("col", DataType::Int32, true); - let to_field = Field::new("col", DataType::Int32, true); - assert!(can_cast_field(&from_field, &to_field).unwrap()); - - // Casting Int32 to Float64 is allowed - let from_field = Field::new("col", DataType::Int32, true); - let to_field = Field::new("col", DataType::Float64, true); - assert!(can_cast_field(&from_field, &to_field).unwrap()); - - // Casting Float64 to Utf8 should work (converts to string) - let from_field = Field::new("col", DataType::Float64, true); - let to_field = Field::new("col", DataType::Utf8, true); - assert!(can_cast_field(&from_field, &to_field).unwrap()); - - // Binary to Utf8 is not supported - this is an example of a cast that should fail - // Note: We use Binary instead of Utf8->Int32 because Arrow actually supports that cast - let from_field = Field::new("col", DataType::Binary, true); - let to_field = Field::new("col", DataType::Decimal128(10, 2), true); - let result = can_cast_field(&from_field, &to_field); - assert!(result.is_err()); - let error_msg = result.unwrap_err().to_string(); - assert!(error_msg.contains("Cannot cast file schema field col")); - } - - #[test] - fn test_create_field_mapping() { - // Define the table schema - let table_schema = Arc::new(Schema::new(vec![ - Field::new("a", DataType::Int32, true), - Field::new("b", DataType::Utf8, true), - Field::new("c", DataType::Float64, true), - ])); - - // Define file schema: different order, missing column c, and b has different type - let file_schema = Schema::new(vec![ - Field::new("b", DataType::Float64, true), // Different type but castable to Utf8 - Field::new("a", DataType::Int32, true), // Same type - Field::new("d", DataType::Boolean, true), // Not in table schema - ]); - - // Custom can_map_field function that allows all mappings for testing - let allow_all = |_: &Field, _: &Field| Ok(true); - - // Test field mapping - let (field_mappings, projection) = - create_field_mapping(&file_schema, &table_schema, allow_all).unwrap(); - - // Expected: - // - field_mappings[0] (a) maps to projection[1] - // - field_mappings[1] (b) maps to projection[0] - // - field_mappings[2] (c) is None (not in file) - assert_eq!(field_mappings, vec![Some(1), Some(0), None]); - assert_eq!(projection, vec![0, 1]); // Projecting file columns b, a - - // Test with a failing mapper - let fails_all = |_: &Field, _: &Field| Ok(false); - let (field_mappings, projection) = - create_field_mapping(&file_schema, &table_schema, fails_all).unwrap(); - - // Should have no mappings or projections if all cast checks fail - assert_eq!(field_mappings, vec![None, None, None]); - assert_eq!(projection, Vec::::new()); - - // Test with error-producing mapper - let error_mapper = |_: &Field, _: &Field| plan_err!("Test error"); - let result = create_field_mapping(&file_schema, &table_schema, error_mapper); - assert!(result.is_err()); - assert!(result.unwrap_err().to_string().contains("Test error")); - } - - #[test] - fn test_schema_mapping_new() { - // Define the projected table schema - let projected_schema = Arc::new(Schema::new(vec![ - Field::new("a", DataType::Int32, true), - Field::new("b", DataType::Utf8, true), - ])); - - // Define field mappings from table to file - let field_mappings = vec![Some(1), Some(0)]; - - // Create SchemaMapping manually - let mapping = SchemaMapping::new( - Arc::clone(&projected_schema), - field_mappings.clone(), - Arc::new( - |array: &ArrayRef, field: &Field, opts: &arrow::compute::CastOptions| { - cast_column(array, field, opts) - }, - ), - ); - - // Check that fields were set correctly - assert_eq!(*mapping.projected_table_schema, *projected_schema); - assert_eq!(mapping.field_mappings, field_mappings); - - // Test with a batch to ensure it works properly - let batch = RecordBatch::try_new( - Arc::new(Schema::new(vec![ - Field::new("b_file", DataType::Utf8, true), - Field::new("a_file", DataType::Int32, true), - ])), - vec![ - Arc::new(arrow::array::StringArray::from(vec!["hello", "world"])), - Arc::new(arrow::array::Int32Array::from(vec![1, 2])), - ], - ) - .unwrap(); - - // Test that map_batch works with our manually created mapping - let mapped_batch = mapping.map_batch(batch).unwrap(); - - // Verify the mapped batch has the correct schema and data - assert_eq!(*mapped_batch.schema(), *projected_schema); - assert_eq!(mapped_batch.num_columns(), 2); - assert_eq!(mapped_batch.column(0).len(), 2); // a column - assert_eq!(mapped_batch.column(1).len(), 2); // b column - } - - #[test] - fn test_map_schema_error_path() { - // Define the table schema - let table_schema = Arc::new(Schema::new(vec![ - Field::new("a", DataType::Int32, true), - Field::new("b", DataType::Utf8, true), - Field::new("c", DataType::Decimal128(10, 2), true), // Use Decimal which has stricter cast rules - ])); - - // Define file schema with incompatible type for column c - let file_schema = Schema::new(vec![ - Field::new("a", DataType::Int32, true), - Field::new("b", DataType::Float64, true), // Different but castable - Field::new("c", DataType::Binary, true), // Not castable to Decimal128 - ]); - - // Create DefaultSchemaAdapter - let adapter = DefaultSchemaAdapter { - projected_table_schema: Arc::clone(&table_schema), - }; - - // map_schema should error due to incompatible types - let result = adapter.map_schema(&file_schema); - assert!(result.is_err()); - let error_msg = result.unwrap_err().to_string(); - assert!(error_msg.contains("Cannot cast file schema field c")); - } - - #[test] - fn test_map_schema_happy_path() { - // Define the table schema - let table_schema = Arc::new(Schema::new(vec![ - Field::new("a", DataType::Int32, true), - Field::new("b", DataType::Utf8, true), - Field::new("c", DataType::Decimal128(10, 2), true), - ])); - - // Create DefaultSchemaAdapter - let adapter = DefaultSchemaAdapter { - projected_table_schema: Arc::clone(&table_schema), - }; - - // Define compatible file schema (missing column c) - let compatible_file_schema = Schema::new(vec![ - Field::new("a", DataType::Int64, true), // Can be cast to Int32 - Field::new("b", DataType::Float64, true), // Can be cast to Utf8 - ]); - - // Test successful schema mapping - let (mapper, projection) = adapter.map_schema(&compatible_file_schema).unwrap(); - - // Verify field_mappings and projection created correctly - assert_eq!(projection, vec![0, 1]); // Projecting a and b - - // Verify the SchemaMapping works with actual data - let file_batch = RecordBatch::try_new( - Arc::new(compatible_file_schema.clone()), - vec![ - Arc::new(arrow::array::Int64Array::from(vec![100, 200])), - Arc::new(arrow::array::Float64Array::from(vec![1.5, 2.5])), - ], - ) - .unwrap(); - - let mapped_batch = mapper.map_batch(file_batch).unwrap(); - - // Verify correct schema mapping - assert_eq!(*mapped_batch.schema(), *table_schema); - assert_eq!(mapped_batch.num_columns(), 3); // a, b, c - - // Column c should be null since it wasn't in the file schema - let c_array = mapped_batch.column(2); - assert_eq!(c_array.len(), 2); - assert_eq!(c_array.null_count(), 2); - } - - #[test] - fn test_adapt_struct_with_added_nested_fields() -> Result<()> { - let (file_schema, table_schema) = create_test_schemas_with_nested_fields(); - let batch = create_test_batch_with_struct_data(&file_schema)?; - - let adapter = DefaultSchemaAdapter { - projected_table_schema: Arc::clone(&table_schema), - }; - let (mapper, _) = adapter.map_schema(file_schema.as_ref())?; - let mapped_batch = mapper.map_batch(batch)?; - - verify_adapted_batch_with_nested_fields(&mapped_batch, &table_schema)?; - Ok(()) - } - - #[test] - fn test_map_column_statistics_struct() -> Result<()> { - let (file_schema, table_schema) = create_test_schemas_with_nested_fields(); - - let adapter = DefaultSchemaAdapter { - projected_table_schema: Arc::clone(&table_schema), - }; - let (mapper, _) = adapter.map_schema(file_schema.as_ref())?; - - let file_stats = vec![ - create_test_column_statistics( - 0, - 100, - Some(ScalarValue::Int32(Some(1))), - Some(ScalarValue::Int32(Some(100))), - Some(ScalarValue::Int32(Some(5100))), - ), - create_test_column_statistics(10, 50, None, None, None), - ]; - - let table_stats = mapper.map_column_statistics(&file_stats)?; - assert_eq!(table_stats.len(), 1); - verify_column_statistics( - &table_stats[0], - Some(0), - Some(100), - Some(ScalarValue::Int32(Some(1))), - Some(ScalarValue::Int32(Some(100))), - Some(ScalarValue::Int32(Some(5100))), - ); - let missing_stats = mapper.map_column_statistics(&[])?; - assert_eq!(missing_stats.len(), 1); - assert_eq!(missing_stats[0], ColumnStatistics::new_unknown()); - Ok(()) - } - - fn create_test_schemas_with_nested_fields() -> (SchemaRef, SchemaRef) { - let file_schema = Arc::new(Schema::new(vec![Field::new( - "info", - DataType::Struct( - vec![ - Field::new("location", DataType::Utf8, true), - Field::new( - "timestamp_utc", - DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())), - true, - ), - ] - .into(), - ), - true, - )])); - - let table_schema = Arc::new(Schema::new(vec![Field::new( - "info", - DataType::Struct( - vec![ - Field::new("location", DataType::Utf8, true), - Field::new( - "timestamp_utc", - DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())), - true, - ), - Field::new( - "reason", - DataType::Struct( - vec![ - Field::new("_level", DataType::Float64, true), - Field::new( - "details", - DataType::Struct( - vec![ - Field::new("rurl", DataType::Utf8, true), - Field::new("s", DataType::Float64, true), - Field::new("t", DataType::Utf8, true), - ] - .into(), - ), - true, - ), - ] - .into(), - ), - true, - ), - ] - .into(), - ), - true, - )])); - - (file_schema, table_schema) - } - - fn create_test_batch_with_struct_data( - file_schema: &SchemaRef, - ) -> Result { - let mut location_builder = StringBuilder::new(); - location_builder.append_value("San Francisco"); - location_builder.append_value("New York"); - - let timestamp_array = TimestampMillisecondArray::from(vec![ - Some(1640995200000), - Some(1641081600000), - ]); - - let timestamp_type = - DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())); - let timestamp_array = cast(×tamp_array, ×tamp_type)?; - - let info_struct = StructArray::from(vec![ - ( - Arc::new(Field::new("location", DataType::Utf8, true)), - Arc::new(location_builder.finish()) as ArrayRef, - ), - ( - Arc::new(Field::new("timestamp_utc", timestamp_type, true)), - timestamp_array, - ), - ]); - - Ok(RecordBatch::try_new( - Arc::clone(file_schema), - vec![Arc::new(info_struct)], - )?) - } - - fn verify_adapted_batch_with_nested_fields( - mapped_batch: &RecordBatch, - table_schema: &SchemaRef, - ) -> Result<()> { - assert_eq!(mapped_batch.schema(), *table_schema); - assert_eq!(mapped_batch.num_rows(), 2); - - let info_col = mapped_batch.column(0); - let info_array = info_col - .as_any() - .downcast_ref::() - .expect("Expected info column to be a StructArray"); - - verify_preserved_fields(info_array)?; - verify_reason_field_structure(info_array)?; - Ok(()) - } - - fn verify_preserved_fields(info_array: &StructArray) -> Result<()> { - let location_col = info_array - .column_by_name("location") - .expect("Expected location field in struct"); - let location_array = location_col - .as_any() - .downcast_ref::() - .expect("Expected location to be a StringArray"); - assert_eq!(location_array.value(0), "San Francisco"); - assert_eq!(location_array.value(1), "New York"); - - let timestamp_col = info_array - .column_by_name("timestamp_utc") - .expect("Expected timestamp_utc field in struct"); - let timestamp_array = timestamp_col - .as_any() - .downcast_ref::() - .expect("Expected timestamp_utc to be a TimestampMillisecondArray"); - assert_eq!(timestamp_array.value(0), 1640995200000); - assert_eq!(timestamp_array.value(1), 1641081600000); - Ok(()) - } - - fn verify_reason_field_structure(info_array: &StructArray) -> Result<()> { - let reason_col = info_array - .column_by_name("reason") - .expect("Expected reason field in struct"); - let reason_array = reason_col - .as_any() - .downcast_ref::() - .expect("Expected reason to be a StructArray"); - assert_eq!(reason_array.fields().len(), 2); - assert!(reason_array.column_by_name("_level").is_some()); - assert!(reason_array.column_by_name("details").is_some()); - - let details_col = reason_array - .column_by_name("details") - .expect("Expected details field in reason struct"); - let details_array = details_col - .as_any() - .downcast_ref::() - .expect("Expected details to be a StructArray"); - assert_eq!(details_array.fields().len(), 3); - assert!(details_array.column_by_name("rurl").is_some()); - assert!(details_array.column_by_name("s").is_some()); - assert!(details_array.column_by_name("t").is_some()); - for i in 0..2 { - assert!(reason_array.is_null(i), "reason field should be null"); - } - Ok(()) - } - - fn verify_column_statistics( - stats: &ColumnStatistics, - expected_null_count: Option, - expected_distinct_count: Option, - expected_min: Option, - expected_max: Option, - expected_sum: Option, - ) { - if let Some(count) = expected_null_count { - assert_eq!( - stats.null_count, - Precision::Exact(count), - "Null count should match expected value" - ); - } - if let Some(count) = expected_distinct_count { - assert_eq!( - stats.distinct_count, - Precision::Exact(count), - "Distinct count should match expected value" - ); - } - if let Some(min) = expected_min { - assert_eq!( - stats.min_value, - Precision::Exact(min), - "Min value should match expected value" - ); - } - if let Some(max) = expected_max { - assert_eq!( - stats.max_value, - Precision::Exact(max), - "Max value should match expected value" - ); - } - if let Some(sum) = expected_sum { - assert_eq!( - stats.sum_value, - Precision::Exact(sum), - "Sum value should match expected value" - ); - } - } - - fn create_test_column_statistics( - null_count: usize, - distinct_count: usize, - min_value: Option, - max_value: Option, - sum_value: Option, - ) -> ColumnStatistics { - ColumnStatistics { - null_count: Precision::Exact(null_count), - distinct_count: Precision::Exact(distinct_count), - min_value: min_value.map_or_else(|| Precision::Absent, Precision::Exact), - max_value: max_value.map_or_else(|| Precision::Absent, Precision::Exact), - sum_value: sum_value.map_or_else(|| Precision::Absent, Precision::Exact), - byte_size: Precision::Absent, - } - } -} diff --git a/datafusion/datasource/src/test_util.rs b/datafusion/datasource/src/test_util.rs index a7eb7fd1c495f..c8d5dd54cb8a2 100644 --- a/datafusion/datasource/src/test_util.rs +++ b/datafusion/datasource/src/test_util.rs @@ -17,7 +17,6 @@ use crate::{ file::FileSource, file_scan_config::FileScanConfig, file_stream::FileOpener, - schema_adapter::SchemaAdapterFactory, }; use std::sync::Arc; @@ -32,7 +31,6 @@ use object_store::ObjectStore; #[derive(Clone)] pub(crate) struct MockSource { metrics: ExecutionPlanMetricsSet, - schema_adapter_factory: Option>, filter: Option>, table_schema: crate::table_schema::TableSchema, projection: crate::projection::SplitProjection, @@ -44,7 +42,6 @@ impl Default for MockSource { crate::table_schema::TableSchema::new(Arc::new(Schema::empty()), vec![]); Self { metrics: ExecutionPlanMetricsSet::new(), - schema_adapter_factory: None, filter: None, projection: crate::projection::SplitProjection::unprojected(&table_schema), table_schema, @@ -57,7 +54,6 @@ impl MockSource { let table_schema = table_schema.into(); Self { metrics: ExecutionPlanMetricsSet::new(), - schema_adapter_factory: None, filter: None, projection: crate::projection::SplitProjection::unprojected(&table_schema), table_schema, @@ -100,20 +96,6 @@ impl FileSource for MockSource { "mock" } - fn with_schema_adapter_factory( - &self, - schema_adapter_factory: Arc, - ) -> Result> { - Ok(Arc::new(Self { - schema_adapter_factory: Some(schema_adapter_factory), - ..self.clone() - })) - } - - fn schema_adapter_factory(&self) -> Option> { - self.schema_adapter_factory.clone() - } - fn table_schema(&self) -> &crate::table_schema::TableSchema { &self.table_schema } diff --git a/datafusion/physical-expr-adapter/src/schema_rewriter.rs b/datafusion/physical-expr-adapter/src/schema_rewriter.rs index e9b8ff5c37dbd..83727ac092044 100644 --- a/datafusion/physical-expr-adapter/src/schema_rewriter.rs +++ b/datafusion/physical-expr-adapter/src/schema_rewriter.rs @@ -408,8 +408,8 @@ impl<'a> DefaultPhysicalExprAdapterRewriter<'a> { column.name() ); } - // If the column is missing from the physical schema fill it in with nulls as `SchemaAdapter` used to do. - // If users want a different behavior they need to provide a custom `PhysicalExprAdapter` implementation. + // If the column is missing from the physical schema fill it in with nulls. + // For a different behavior, provide a custom `PhysicalExprAdapter` implementation. let null_value = ScalarValue::Null.cast_to(logical_field.data_type())?; return Ok(Transformed::yes(expressions::lit(null_value))); } diff --git a/docs/source/library-user-guide/upgrading.md b/docs/source/library-user-guide/upgrading.md index 159bd3e4e790e..826dfabcb4f10 100644 --- a/docs/source/library-user-guide/upgrading.md +++ b/docs/source/library-user-guide/upgrading.md @@ -490,6 +490,24 @@ If you were using a custom `SchemaAdapterFactory` for schema adaptation (e.g., d See the [default column values example](https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/custom_data_source/default_column_values.rs) for how to implement a custom `PhysicalExprAdapterFactory`. +### `SchemaAdapter` and `SchemaAdapterFactory` completely removed + +The following symbols have been completely removed from DataFusion: + +- `SchemaAdapter` trait +- `SchemaAdapterFactory` trait +- `SchemaMapper` trait +- `SchemaMapping` struct +- `DefaultSchemaAdapterFactory` struct + +These types were previously used to adapt record batch schemas during file reading. +This functionality has been replaced by `PhysicalExprAdapterFactory`, which rewrites expressions at planning time rather than transforming batches at runtime. + +**Migration guide:** + +If you implemented a custom `SchemaAdapterFactory`, migrate to `PhysicalExprAdapterFactory`. +See the [default column values example](https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/custom_data_source/default_column_values.rs) for a complete implementation. + ## DataFusion `51.0.0` ### `arrow` / `parquet` updated to 57.0.0 From 8baa453b4ce8585aacfd1eb827ade39219a7ad45 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 17 Dec 2025 15:58:48 -0600 Subject: [PATCH 2/7] remove overlap in upgrading guide --- docs/source/library-user-guide/upgrading.md | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/docs/source/library-user-guide/upgrading.md b/docs/source/library-user-guide/upgrading.md index 826dfabcb4f10..799219d57b85e 100644 --- a/docs/source/library-user-guide/upgrading.md +++ b/docs/source/library-user-guide/upgrading.md @@ -479,20 +479,11 @@ let config = FileScanConfigBuilder::new(url, source) .build(); ``` -### `SchemaAdapterFactory` Fully Removed from Parquet +### `SchemaAdapter` and `SchemaAdapterFactory` completely removed Following the deprecation announced in [DataFusion 49.0.0](#deprecating-schemaadapterfactory-and-schemaadapter), `SchemaAdapterFactory` has been fully removed from Parquet scanning. This applies to both: -- **Predicate pushdown / row filtering** (deprecated in 49.0.0) -- **Projections** (newly removed in 52.0.0) - -If you were using a custom `SchemaAdapterFactory` for schema adaptation (e.g., default column values, type coercion), you should now implement `PhysicalExprAdapterFactory` instead. - -See the [default column values example](https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/custom_data_source/default_column_values.rs) for how to implement a custom `PhysicalExprAdapterFactory`. - -### `SchemaAdapter` and `SchemaAdapterFactory` completely removed - -The following symbols have been completely removed from DataFusion: +The following symbols have been deprecated and will be removed in the next release: - `SchemaAdapter` trait - `SchemaAdapterFactory` trait @@ -502,6 +493,8 @@ The following symbols have been completely removed from DataFusion: These types were previously used to adapt record batch schemas during file reading. This functionality has been replaced by `PhysicalExprAdapterFactory`, which rewrites expressions at planning time rather than transforming batches at runtime. +If you were using a custom `SchemaAdapterFactory` for schema adaptation (e.g., default column values, type coercion), you should now implement `PhysicalExprAdapterFactory` instead. +See the [default column values example](https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/custom_data_source/default_column_values.rs) for how to implement a custom `PhysicalExprAdapterFactory`. **Migration guide:** From 9e66e500eba6a72723bba1743992dffd7bb07874 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 17 Dec 2025 15:59:15 -0600 Subject: [PATCH 3/7] remove outdated note --- .../examples/custom_data_source/default_column_values.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/datafusion-examples/examples/custom_data_source/default_column_values.rs b/datafusion-examples/examples/custom_data_source/default_column_values.rs index 5089396ec341e..81d74cfbecabd 100644 --- a/datafusion-examples/examples/custom_data_source/default_column_values.rs +++ b/datafusion-examples/examples/custom_data_source/default_column_values.rs @@ -143,8 +143,6 @@ pub async fn default_column_values() -> Result<()> { ); println!("4. Default values from metadata are cast to proper types at planning time"); println!("5. The DefaultPhysicalExprAdapter handles other schema adaptations"); - println!("\nNote: PhysicalExprAdapter is specifically for filter predicates."); - println!("For projection columns, different mechanisms handle missing columns."); Ok(()) } From cd033298eac7f62f9a8b0d73f1d33675187e0303 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 17 Dec 2025 16:21:42 -0600 Subject: [PATCH 4/7] add more tests --- datafusion/core/tests/parquet/expr_adapter.rs | 145 ++++++++++++++++++ .../test_files/schema_evolution.slt | 144 +++++++++++++++++ 2 files changed, 289 insertions(+) diff --git a/datafusion/core/tests/parquet/expr_adapter.rs b/datafusion/core/tests/parquet/expr_adapter.rs index cc7d3ada45c9b..515422ed750ef 100644 --- a/datafusion/core/tests/parquet/expr_adapter.rs +++ b/datafusion/core/tests/parquet/expr_adapter.rs @@ -319,3 +319,148 @@ async fn test_physical_expr_adapter_with_non_null_defaults() { ]; assert_batches_eq!(expected, &batches); } + +/// Test demonstrating that a single PhysicalExprAdapterFactory instance can be +/// reused across multiple ListingTable instances. +/// +/// This addresses the concern: "This is important for ListingTable. A test for +/// ListingTable would add assurance that the functionality is retained [i.e. we +/// can re-use a PhysicalExprAdapterFactory]" +#[tokio::test] +async fn test_physical_expr_adapter_factory_reuse_across_tables() { + // Create two different parquet files with different schemas + // File 1: has column c1 only + let batch1 = record_batch!(("c1", Int32, [1, 2, 3])).unwrap(); + // File 2: has column c1 only but different data + let batch2 = record_batch!(("c1", Int32, [10, 20, 30])).unwrap(); + + let store = Arc::new(InMemory::new()) as Arc; + let store_url = ObjectStoreUrl::parse("memory://").unwrap(); + + // Write files to different paths + write_parquet(batch1, store.clone(), "table1/data.parquet").await; + write_parquet(batch2, store.clone(), "table2/data.parquet").await; + + // Table schema has additional columns that don't exist in files + let table_schema = Arc::new(Schema::new(vec![ + Field::new("c1", DataType::Int64, false), + Field::new("c2", DataType::Utf8, true), // missing from files + ])); + + let mut cfg = SessionConfig::new() + .with_collect_statistics(false) + .with_parquet_pruning(false); + cfg.options_mut().execution.parquet.pushdown_filters = true; + let ctx = SessionContext::new_with_config(cfg); + ctx.register_object_store(store_url.as_ref(), Arc::clone(&store)); + + // Create ONE factory instance wrapped in Arc - this will be REUSED + let factory: Arc = + Arc::new(CustomPhysicalExprAdapterFactory); + + // Create ListingTable 1 using the shared factory + let listing_table_config1 = + ListingTableConfig::new(ListingTableUrl::parse("memory:///table1/").unwrap()) + .infer_options(&ctx.state()) + .await + .unwrap() + .with_schema(table_schema.clone()) + .with_expr_adapter_factory(Arc::clone(&factory)); // Clone the Arc, not create new factory + + let table1 = ListingTable::try_new(listing_table_config1).unwrap(); + ctx.register_table("t1", Arc::new(table1)).unwrap(); + + // Create ListingTable 2 using the SAME factory instance + let listing_table_config2 = + ListingTableConfig::new(ListingTableUrl::parse("memory:///table2/").unwrap()) + .infer_options(&ctx.state()) + .await + .unwrap() + .with_schema(table_schema.clone()) + .with_expr_adapter_factory(Arc::clone(&factory)); // Reuse same factory + + let table2 = ListingTable::try_new(listing_table_config2).unwrap(); + ctx.register_table("t2", Arc::new(table2)).unwrap(); + + // Verify table 1 works correctly with the shared factory + // CustomPhysicalExprAdapterFactory fills missing Utf8 columns with 'b' + let batches = ctx + .sql("SELECT c1, c2 FROM t1 ORDER BY c1") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let expected = [ + "+----+----+", + "| c1 | c2 |", + "+----+----+", + "| 1 | b |", + "| 2 | b |", + "| 3 | b |", + "+----+----+", + ]; + assert_batches_eq!(expected, &batches); + + // Verify table 2 also works correctly with the SAME shared factory + let batches = ctx + .sql("SELECT c1, c2 FROM t2 ORDER BY c1") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let expected = [ + "+----+----+", + "| c1 | c2 |", + "+----+----+", + "| 10 | b |", + "| 20 | b |", + "| 30 | b |", + "+----+----+", + ]; + assert_batches_eq!(expected, &batches); + + // Verify predicates work on both tables with the shared factory + let batches = ctx + .sql("SELECT c1 FROM t1 WHERE c2 = 'b' ORDER BY c1") + .await + .unwrap() + .collect() + .await + .unwrap(); + + #[rustfmt::skip] + let expected = [ + "+----+", + "| c1 |", + "+----+", + "| 1 |", + "| 2 |", + "| 3 |", + "+----+", + ]; + assert_batches_eq!(expected, &batches); + + let batches = ctx + .sql("SELECT c1 FROM t2 WHERE c2 = 'b' ORDER BY c1") + .await + .unwrap() + .collect() + .await + .unwrap(); + + #[rustfmt::skip] + let expected = [ + "+----+", + "| c1 |", + "+----+", + "| 10 |", + "| 20 |", + "| 30 |", + "+----+", + ]; + assert_batches_eq!(expected, &batches); +} diff --git a/datafusion/sqllogictest/test_files/schema_evolution.slt b/datafusion/sqllogictest/test_files/schema_evolution.slt index 5572c4a5ffef3..e29aa14f13e92 100644 --- a/datafusion/sqllogictest/test_files/schema_evolution.slt +++ b/datafusion/sqllogictest/test_files/schema_evolution.slt @@ -138,3 +138,147 @@ select * from parquet_table where c > 11.0; ---- bzz 300 13.7 foo 200 12.6 + +########## +# Projection tests - selecting subset of columns +# These tests verify column reordering and projection work correctly +# with schema evolution (addresses E2E column reordering concern) +########## + +# Select only column a +query T rowsort +select a from parquet_table; +---- +NULL +bzz +foo +foo +foo +foo +foo +foo +foo + +# Select columns in different order than table schema (c, a instead of a, b, c) +query RT rowsort +select c, a from parquet_table; +---- +10.5 foo +12.6 foo +13.7 bzz +NULL NULL +NULL foo +NULL foo +NULL foo +NULL foo +NULL foo + +# Select single column that's missing in some files +query I rowsort +select b from parquet_table; +---- +1 +10 +100 +2 +200 +3 +300 +NULL +NULL + +########## +# Projection with filter tests +########## + +# Projection with equality filter +query TI rowsort +select a, b from parquet_table where a = 'foo'; +---- +foo 1 +foo 100 +foo 2 +foo 200 +foo 3 +foo NULL +foo NULL + +# Projection with range filter on projected column +query IR rowsort +select b, c from parquet_table where b > 5; +---- +10 NULL +100 10.5 +200 12.6 +300 13.7 + +# Projection excluding filtered column (filter on c, project a) +query T rowsort +select a from parquet_table where c > 11.0; +---- +bzz +foo + +########## +# Complex filter tests - OR combinations and IS NOT NULL +########## + +# OR combination +query TIR rowsort +select * from parquet_table where a = 'foo' OR b > 100; +---- +bzz 300 13.7 +foo 1 NULL +foo 100 10.5 +foo 2 NULL +foo 200 12.6 +foo 3 NULL +foo NULL NULL +foo NULL NULL + +# IS NOT NULL on column a +query TIR rowsort +select * from parquet_table where a IS NOT NULL; +---- +bzz 300 13.7 +foo 1 NULL +foo 100 10.5 +foo 2 NULL +foo 200 12.6 +foo 3 NULL +foo NULL NULL +foo NULL NULL + +# IS NOT NULL on column c (missing in most files) +query TIR rowsort +select * from parquet_table where c IS NOT NULL; +---- +bzz 300 13.7 +foo 100 10.5 +foo 200 12.6 + +# Combined conditions with NULL checks +query TIR rowsort +select * from parquet_table where a IS NULL OR (b IS NOT NULL AND b > 5); +---- +NULL 10 NULL +bzz 300 13.7 +foo 100 10.5 +foo 200 12.6 + +########## +# Multi-column predicates +########## + +# AND across columns with different availability +query TIR rowsort +select * from parquet_table where a = 'foo' AND b > 50; +---- +foo 100 10.5 +foo 200 12.6 + +# Filter on multiple columns from reordered file (File4 has b, a, c order) +query TIR rowsort +select * from parquet_table where b = 100 AND c = 10.5; +---- +foo 100 10.5 From 8dc4130bea81100dbd83251431a091125970b55a Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 17 Dec 2025 16:27:22 -0600 Subject: [PATCH 5/7] fix merge --- datafusion/datasource/src/file.rs | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/datafusion/datasource/src/file.rs b/datafusion/datasource/src/file.rs index 9e8f6dc1a67c2..79543a5d7036b 100644 --- a/datafusion/datasource/src/file.rs +++ b/datafusion/datasource/src/file.rs @@ -25,7 +25,7 @@ use std::sync::Arc; use crate::file_groups::FileGroupPartitioner; use crate::file_scan_config::FileScanConfig; use crate::file_stream::FileOpener; -use datafusion_common::Result; +use datafusion_common::{Result, not_impl_err}; use datafusion_common::config::ConfigOptions; use datafusion_physical_expr::projection::ProjectionExprs; use datafusion_physical_expr::{LexOrdering, PhysicalExpr}; @@ -173,4 +173,22 @@ pub trait FileSource: Send + Sync { ) -> Result>> { Ok(None) } + + /// Set the file ordering information + /// + /// This allows the file source to know how the files are sorted, + /// enabling it to make informed decisions about sort pushdown. + /// + /// # Default Implementation + /// + /// Returns `not_impl_err!`. FileSource implementations that support + /// sort optimization should override this method. + fn with_file_ordering_info( + &self, + _ordering: Option, + ) -> Result> { + // Default: clone self without modification + // ParquetSource will override this + not_impl_err!("with_file_ordering_info not implemented for this FileSource") + } } From 1fee4099727fe5a53cf637121f0e9dd1ee8f26d7 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 17 Dec 2025 16:59:42 -0600 Subject: [PATCH 6/7] add deprecated skeletons --- datafusion/catalog-listing/src/config.rs | 21 ++ datafusion/catalog-listing/src/table.rs | 36 ++++ datafusion/core/src/datasource/mod.rs | 4 +- datafusion/datasource/src/file.rs | 35 +++- datafusion/datasource/src/mod.rs | 1 + datafusion/datasource/src/schema_adapter.rs | 212 ++++++++++++++++++++ 6 files changed, 306 insertions(+), 3 deletions(-) create mode 100644 datafusion/datasource/src/schema_adapter.rs diff --git a/datafusion/catalog-listing/src/config.rs b/datafusion/catalog-listing/src/config.rs index baf80db64353c..553cc29fddb0f 100644 --- a/datafusion/catalog-listing/src/config.rs +++ b/datafusion/catalog-listing/src/config.rs @@ -21,6 +21,8 @@ use datafusion_catalog::Session; use datafusion_common::{config_err, internal_err}; use datafusion_datasource::ListingTableUrl; use datafusion_datasource::file_compression_type::FileCompressionType; +#[allow(deprecated)] +use datafusion_datasource::schema_adapter::SchemaAdapterFactory; use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; use std::str::FromStr; use std::sync::Arc; @@ -295,4 +297,23 @@ impl ListingTableConfig { ..self } } + + /// Deprecated: Set the [`SchemaAdapterFactory`] for the [`crate::ListingTable`] + /// + /// `SchemaAdapterFactory` has been removed. Use [`Self::with_expr_adapter_factory`] + /// and `PhysicalExprAdapterFactory` instead. See `upgrading.md` for more details. + /// + /// This method is a no-op and returns `self` unchanged. + #[deprecated( + since = "52.0.0", + note = "SchemaAdapterFactory has been removed. Use with_expr_adapter_factory and PhysicalExprAdapterFactory instead. See upgrading.md for more details." + )] + #[allow(deprecated)] + pub fn with_schema_adapter_factory( + self, + _schema_adapter_factory: Arc, + ) -> Self { + // No-op - just return self unchanged + self + } } diff --git a/datafusion/catalog-listing/src/table.rs b/datafusion/catalog-listing/src/table.rs index 14d6747b7e120..989b5f38ad5db 100644 --- a/datafusion/catalog-listing/src/table.rs +++ b/datafusion/catalog-listing/src/table.rs @@ -29,6 +29,8 @@ use datafusion_datasource::file::FileSource; use datafusion_datasource::file_groups::FileGroup; use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder}; use datafusion_datasource::file_sink_config::FileSinkConfig; +#[allow(deprecated)] +use datafusion_datasource::schema_adapter::SchemaAdapterFactory; use datafusion_datasource::{ ListingTableUrl, PartitionedFile, TableSchema, compute_all_files_statistics, }; @@ -286,6 +288,40 @@ impl ListingTable { self.schema_source } + /// Deprecated: Set the [`SchemaAdapterFactory`] for this [`ListingTable`] + /// + /// `SchemaAdapterFactory` has been removed. Use [`ListingTableConfig::with_expr_adapter_factory`] + /// and `PhysicalExprAdapterFactory` instead. See `upgrading.md` for more details. + /// + /// This method is a no-op and returns `self` unchanged. + #[deprecated( + since = "52.0.0", + note = "SchemaAdapterFactory has been removed. Use ListingTableConfig::with_expr_adapter_factory and PhysicalExprAdapterFactory instead. See upgrading.md for more details." + )] + #[allow(deprecated)] + pub fn with_schema_adapter_factory( + self, + _schema_adapter_factory: Arc, + ) -> Self { + // No-op - just return self unchanged + self + } + + /// Deprecated: Returns the [`SchemaAdapterFactory`] used by this [`ListingTable`]. + /// + /// `SchemaAdapterFactory` has been removed. Use `PhysicalExprAdapterFactory` instead. + /// See `upgrading.md` for more details. + /// + /// Always returns `None`. + #[deprecated( + since = "52.0.0", + note = "SchemaAdapterFactory has been removed. Use PhysicalExprAdapterFactory instead. See upgrading.md for more details." + )] + #[allow(deprecated)] + pub fn schema_adapter_factory(&self) -> Option> { + None + } + /// Creates a file source for this table fn create_file_source(&self) -> Arc { let table_schema = TableSchema::new( diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs index b40d7092f0f87..36ddb740ee47c 100644 --- a/datafusion/core/src/datasource/mod.rs +++ b/datafusion/core/src/datasource/mod.rs @@ -42,6 +42,7 @@ pub use datafusion_catalog::default_table_source; pub use datafusion_catalog::memory; pub use datafusion_catalog::stream; pub use datafusion_catalog::view; +pub use datafusion_datasource::schema_adapter; pub use datafusion_datasource::sink; pub use datafusion_datasource::source; pub use datafusion_datasource::table_schema; @@ -64,8 +65,7 @@ mod tests { tree_node::{Transformed, TransformedResult, TreeNode}, }; use datafusion_datasource::{ - PartitionedFile, file_scan_config::FileScanConfigBuilder, - source::DataSourceExec, + PartitionedFile, file_scan_config::FileScanConfigBuilder, source::DataSourceExec, }; use datafusion_datasource_parquet::source::ParquetSource; use datafusion_physical_expr::expressions::{Column, Literal}; diff --git a/datafusion/datasource/src/file.rs b/datafusion/datasource/src/file.rs index 79543a5d7036b..3056e95f7f2c9 100644 --- a/datafusion/datasource/src/file.rs +++ b/datafusion/datasource/src/file.rs @@ -25,8 +25,10 @@ use std::sync::Arc; use crate::file_groups::FileGroupPartitioner; use crate::file_scan_config::FileScanConfig; use crate::file_stream::FileOpener; -use datafusion_common::{Result, not_impl_err}; +#[allow(deprecated)] +use crate::schema_adapter::SchemaAdapterFactory; use datafusion_common::config::ConfigOptions; +use datafusion_common::{Result, not_impl_err}; use datafusion_physical_expr::projection::ProjectionExprs; use datafusion_physical_expr::{LexOrdering, PhysicalExpr}; use datafusion_physical_plan::DisplayFormatType; @@ -191,4 +193,35 @@ pub trait FileSource: Send + Sync { // ParquetSource will override this not_impl_err!("with_file_ordering_info not implemented for this FileSource") } + + /// Deprecated: Set optional schema adapter factory. + /// + /// `SchemaAdapterFactory` has been removed. Use `PhysicalExprAdapterFactory` instead. + /// See `upgrading.md` for more details. + #[deprecated( + since = "52.0.0", + note = "SchemaAdapterFactory has been removed. Use PhysicalExprAdapterFactory instead. See upgrading.md for more details." + )] + #[allow(deprecated)] + fn with_schema_adapter_factory( + &self, + _factory: Arc, + ) -> Result> { + not_impl_err!( + "SchemaAdapterFactory has been removed. Use PhysicalExprAdapterFactory instead. See upgrading.md for more details." + ) + } + + /// Deprecated: Returns the current schema adapter factory if set. + /// + /// `SchemaAdapterFactory` has been removed. Use `PhysicalExprAdapterFactory` instead. + /// See `upgrading.md` for more details. + #[deprecated( + since = "52.0.0", + note = "SchemaAdapterFactory has been removed. Use PhysicalExprAdapterFactory instead. See upgrading.md for more details." + )] + #[allow(deprecated)] + fn schema_adapter_factory(&self) -> Option> { + None + } } diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs index 223d00ea074d0..347e783c278d0 100644 --- a/datafusion/datasource/src/mod.rs +++ b/datafusion/datasource/src/mod.rs @@ -40,6 +40,7 @@ pub mod file_sink_config; pub mod file_stream; pub mod memory; pub mod projection; +pub mod schema_adapter; pub mod sink; pub mod source; mod statistics; diff --git a/datafusion/datasource/src/schema_adapter.rs b/datafusion/datasource/src/schema_adapter.rs new file mode 100644 index 0000000000000..3d0b06954e085 --- /dev/null +++ b/datafusion/datasource/src/schema_adapter.rs @@ -0,0 +1,212 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Deprecated: [`SchemaAdapter`] and [`SchemaAdapterFactory`] have been removed. +//! +//! Use [`PhysicalExprAdapterFactory`] instead. See `upgrading.md` for more details. +//! +//! [`PhysicalExprAdapterFactory`]: datafusion_physical_expr_adapter::PhysicalExprAdapterFactory + +#![allow(deprecated)] + +use arrow::array::{ArrayRef, RecordBatch}; +use arrow::datatypes::{Field, Schema, SchemaRef}; +use datafusion_common::{ColumnStatistics, Result, not_impl_err}; +use log::warn; +use std::fmt::Debug; +use std::sync::Arc; + +/// Deprecated: Function type for casting columns. +/// +/// This type has been removed. Use [`PhysicalExprAdapterFactory`] instead. +/// See `upgrading.md` for more details. +/// +/// [`PhysicalExprAdapterFactory`]: datafusion_physical_expr_adapter::PhysicalExprAdapterFactory +#[deprecated( + since = "52.0.0", + note = "SchemaAdapter has been removed. Use PhysicalExprAdapterFactory instead. See upgrading.md for more details." +)] +pub type CastColumnFn = dyn Fn(&ArrayRef, &Field, &arrow::compute::CastOptions) -> Result + + Send + + Sync; + +/// Deprecated: Factory for creating [`SchemaAdapter`]. +/// +/// This trait has been removed. Use [`PhysicalExprAdapterFactory`] instead. +/// See `upgrading.md` for more details. +/// +/// [`PhysicalExprAdapterFactory`]: datafusion_physical_expr_adapter::PhysicalExprAdapterFactory +#[deprecated( + since = "52.0.0", + note = "SchemaAdapter has been removed. Use PhysicalExprAdapterFactory instead. See upgrading.md for more details." +)] +pub trait SchemaAdapterFactory: Debug + Send + Sync + 'static { + /// Create a [`SchemaAdapter`] + fn create( + &self, + projected_table_schema: SchemaRef, + table_schema: SchemaRef, + ) -> Box; + + /// Create a [`SchemaAdapter`] using only the projected table schema. + fn create_with_projected_schema( + &self, + projected_table_schema: SchemaRef, + ) -> Box { + self.create(Arc::clone(&projected_table_schema), projected_table_schema) + } +} + +/// Deprecated: Creates [`SchemaMapper`]s to map file-level [`RecordBatch`]es to a table schema. +/// +/// This trait has been removed. Use [`PhysicalExprAdapterFactory`] instead. +/// See `upgrading.md` for more details. +/// +/// [`PhysicalExprAdapterFactory`]: datafusion_physical_expr_adapter::PhysicalExprAdapterFactory +#[deprecated( + since = "52.0.0", + note = "SchemaAdapter has been removed. Use PhysicalExprAdapterFactory instead. See upgrading.md for more details." +)] +pub trait SchemaAdapter: Send + Sync { + /// Map a column index in the table schema to a column index in a particular file schema. + fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option; + + /// Creates a mapping for casting columns from the file schema to the table schema. + fn map_schema( + &self, + file_schema: &Schema, + ) -> Result<(Arc, Vec)>; +} + +/// Deprecated: Maps columns from a specific file schema to the table schema. +/// +/// This trait has been removed. Use [`PhysicalExprAdapterFactory`] instead. +/// See `upgrading.md` for more details. +/// +/// [`PhysicalExprAdapterFactory`]: datafusion_physical_expr_adapter::PhysicalExprAdapterFactory +#[deprecated( + since = "52.0.0", + note = "SchemaMapper has been removed. Use PhysicalExprAdapterFactory instead. See upgrading.md for more details." +)] +pub trait SchemaMapper: Debug + Send + Sync { + /// Adapts a `RecordBatch` to match the `table_schema`. + fn map_batch(&self, batch: RecordBatch) -> Result; + + /// Adapts file-level column `Statistics` to match the `table_schema`. + fn map_column_statistics( + &self, + file_col_statistics: &[ColumnStatistics], + ) -> Result>; +} + +/// Deprecated: Default [`SchemaAdapterFactory`] for mapping schemas. +/// +/// This struct has been removed. Use [`PhysicalExprAdapterFactory`] instead. +/// See `upgrading.md` for more details. +/// +/// [`PhysicalExprAdapterFactory`]: datafusion_physical_expr_adapter::PhysicalExprAdapterFactory +#[deprecated( + since = "52.0.0", + note = "DefaultSchemaAdapterFactory has been removed. Use PhysicalExprAdapterFactory instead. See upgrading.md for more details." +)] +#[derive(Clone, Debug, Default)] +pub struct DefaultSchemaAdapterFactory; + +impl SchemaAdapterFactory for DefaultSchemaAdapterFactory { + fn create( + &self, + projected_table_schema: SchemaRef, + _table_schema: SchemaRef, + ) -> Box { + Box::new(DeprecatedSchemaAdapter { + _projected_table_schema: projected_table_schema, + }) + } +} + +impl DefaultSchemaAdapterFactory { + /// Deprecated: Create a new factory for mapping batches from a file schema to a table schema. + #[deprecated( + since = "52.0.0", + note = "DefaultSchemaAdapterFactory has been removed. Use PhysicalExprAdapterFactory instead. See upgrading.md for more details." + )] + pub fn from_schema(table_schema: SchemaRef) -> Box { + // Note: this method did not return an error thus the errors are raised from the returned adapter + warn!( + "DefaultSchemaAdapterFactory::from_schema is deprecated. Use PhysicalExprAdapterFactory instead. See upgrading.md for more details." + ); + Box::new(DeprecatedSchemaAdapter { + _projected_table_schema: table_schema, + }) + } +} + +/// Internal deprecated adapter that returns errors when methods are called. +struct DeprecatedSchemaAdapter { + _projected_table_schema: SchemaRef, +} + +impl SchemaAdapter for DeprecatedSchemaAdapter { + fn map_column_index(&self, _index: usize, _file_schema: &Schema) -> Option { + None // Safe no-op + } + + fn map_schema( + &self, + _file_schema: &Schema, + ) -> Result<(Arc, Vec)> { + not_impl_err!( + "SchemaAdapter has been removed. Use PhysicalExprAdapterFactory instead. \ + See upgrading.md for more details." + ) + } +} + +/// Deprecated: The SchemaMapping struct held a mapping from the file schema to the table schema. +/// +/// This struct has been removed. Use [`PhysicalExprAdapterFactory`] instead. +/// See `upgrading.md` for more details. +/// +/// [`PhysicalExprAdapterFactory`]: datafusion_physical_expr_adapter::PhysicalExprAdapterFactory +#[deprecated( + since = "52.0.0", + note = "SchemaMapping has been removed. Use PhysicalExprAdapterFactory instead. See upgrading.md for more details." +)] +#[derive(Debug)] +pub struct SchemaMapping { + // Private fields removed - this is a skeleton for deprecation purposes only + _private: (), +} + +impl SchemaMapper for SchemaMapping { + fn map_batch(&self, _batch: RecordBatch) -> Result { + not_impl_err!( + "SchemaMapping has been removed. Use PhysicalExprAdapterFactory instead. \ + See upgrading.md for more details." + ) + } + + fn map_column_statistics( + &self, + _file_col_statistics: &[ColumnStatistics], + ) -> Result> { + not_impl_err!( + "SchemaMapping has been removed. Use PhysicalExprAdapterFactory instead. \ + See upgrading.md for more details." + ) + } +} From 5b667dc7cabaddfe862564ed7260558c732f998b Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 17 Dec 2025 17:14:39 -0600 Subject: [PATCH 7/7] lint --- datafusion/catalog-listing/src/config.rs | 4 ++-- datafusion/catalog-listing/src/table.rs | 6 +++--- datafusion/datasource/src/file.rs | 6 +++--- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/datafusion/catalog-listing/src/config.rs b/datafusion/catalog-listing/src/config.rs index 553cc29fddb0f..ca4d2abfcd737 100644 --- a/datafusion/catalog-listing/src/config.rs +++ b/datafusion/catalog-listing/src/config.rs @@ -21,7 +21,7 @@ use datafusion_catalog::Session; use datafusion_common::{config_err, internal_err}; use datafusion_datasource::ListingTableUrl; use datafusion_datasource::file_compression_type::FileCompressionType; -#[allow(deprecated)] +#[expect(deprecated)] use datafusion_datasource::schema_adapter::SchemaAdapterFactory; use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; use std::str::FromStr; @@ -308,7 +308,7 @@ impl ListingTableConfig { since = "52.0.0", note = "SchemaAdapterFactory has been removed. Use with_expr_adapter_factory and PhysicalExprAdapterFactory instead. See upgrading.md for more details." )] - #[allow(deprecated)] + #[expect(deprecated)] pub fn with_schema_adapter_factory( self, _schema_adapter_factory: Arc, diff --git a/datafusion/catalog-listing/src/table.rs b/datafusion/catalog-listing/src/table.rs index 989b5f38ad5db..9fb2dd2dce29c 100644 --- a/datafusion/catalog-listing/src/table.rs +++ b/datafusion/catalog-listing/src/table.rs @@ -29,7 +29,7 @@ use datafusion_datasource::file::FileSource; use datafusion_datasource::file_groups::FileGroup; use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder}; use datafusion_datasource::file_sink_config::FileSinkConfig; -#[allow(deprecated)] +#[expect(deprecated)] use datafusion_datasource::schema_adapter::SchemaAdapterFactory; use datafusion_datasource::{ ListingTableUrl, PartitionedFile, TableSchema, compute_all_files_statistics, @@ -298,7 +298,7 @@ impl ListingTable { since = "52.0.0", note = "SchemaAdapterFactory has been removed. Use ListingTableConfig::with_expr_adapter_factory and PhysicalExprAdapterFactory instead. See upgrading.md for more details." )] - #[allow(deprecated)] + #[expect(deprecated)] pub fn with_schema_adapter_factory( self, _schema_adapter_factory: Arc, @@ -317,7 +317,7 @@ impl ListingTable { since = "52.0.0", note = "SchemaAdapterFactory has been removed. Use PhysicalExprAdapterFactory instead. See upgrading.md for more details." )] - #[allow(deprecated)] + #[expect(deprecated)] pub fn schema_adapter_factory(&self) -> Option> { None } diff --git a/datafusion/datasource/src/file.rs b/datafusion/datasource/src/file.rs index 3056e95f7f2c9..5f568584f6c9f 100644 --- a/datafusion/datasource/src/file.rs +++ b/datafusion/datasource/src/file.rs @@ -25,7 +25,7 @@ use std::sync::Arc; use crate::file_groups::FileGroupPartitioner; use crate::file_scan_config::FileScanConfig; use crate::file_stream::FileOpener; -#[allow(deprecated)] +#[expect(deprecated)] use crate::schema_adapter::SchemaAdapterFactory; use datafusion_common::config::ConfigOptions; use datafusion_common::{Result, not_impl_err}; @@ -202,7 +202,7 @@ pub trait FileSource: Send + Sync { since = "52.0.0", note = "SchemaAdapterFactory has been removed. Use PhysicalExprAdapterFactory instead. See upgrading.md for more details." )] - #[allow(deprecated)] + #[expect(deprecated)] fn with_schema_adapter_factory( &self, _factory: Arc, @@ -220,7 +220,7 @@ pub trait FileSource: Send + Sync { since = "52.0.0", note = "SchemaAdapterFactory has been removed. Use PhysicalExprAdapterFactory instead. See upgrading.md for more details." )] - #[allow(deprecated)] + #[expect(deprecated)] fn schema_adapter_factory(&self) -> Option> { None }