Skip to content
Empty file added ci.test
Empty file.
95 changes: 93 additions & 2 deletions datafusion/catalog/src/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,14 @@ impl InformationSchemaConfig {
// schema name may not exist in the catalog, so we need to check
if let Some(schema) = catalog.schema(&schema_name) {
for table_name in schema.table_names() {
if let Some(table) = schema.table(&table_name).await? {
if let Some(table_type) =
schema.table_type(&table_name).await?
{
builder.add_table(
&catalog_name,
&schema_name,
&table_name,
table.table_type(),
table_type,
);
}
}
Expand Down Expand Up @@ -1359,3 +1361,92 @@ impl PartitionStream for InformationSchemaParameters {
))
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::CatalogProvider;

#[tokio::test]
async fn make_tables_uses_table_type() {
let config = InformationSchemaConfig {
catalog_list: Arc::new(Fixture),
};
let mut builder = InformationSchemaTablesBuilder {
catalog_names: StringBuilder::new(),
schema_names: StringBuilder::new(),
table_names: StringBuilder::new(),
table_types: StringBuilder::new(),
schema: Arc::new(Schema::empty()),
};

assert!(config.make_tables(&mut builder).await.is_ok());

assert_eq!("BASE TABLE", builder.table_types.finish().value(0));
}

#[derive(Debug)]
struct Fixture;

#[async_trait]
impl SchemaProvider for Fixture {
// InformationSchemaConfig::make_tables should use this.
async fn table_type(&self, _: &str) -> Result<Option<TableType>> {
Ok(Some(TableType::Base))
}

// InformationSchemaConfig::make_tables used this before `table_type`
// existed but should not, as it may be expensive.
async fn table(&self, _: &str) -> Result<Option<Arc<dyn TableProvider>>> {
panic!("InformationSchemaConfig::make_tables called SchemaProvider::table instead of table_type")
}

fn as_any(&self) -> &dyn Any {
unimplemented!("not required for these tests")
}

fn table_names(&self) -> Vec<String> {
vec!["atable".to_string()]
}

fn table_exist(&self, _: &str) -> bool {
unimplemented!("not required for these tests")
}
}

impl CatalogProviderList for Fixture {
fn as_any(&self) -> &dyn Any {
unimplemented!("not required for these tests")
}

fn register_catalog(
&self,
_: String,
_: Arc<dyn CatalogProvider>,
) -> Option<Arc<dyn CatalogProvider>> {
unimplemented!("not required for these tests")
}

fn catalog_names(&self) -> Vec<String> {
vec!["acatalog".to_string()]
}

fn catalog(&self, _: &str) -> Option<Arc<dyn CatalogProvider>> {
Some(Arc::new(Self))
}
}

impl CatalogProvider for Fixture {
fn as_any(&self) -> &dyn Any {
unimplemented!("not required for these tests")
}

fn schema_names(&self) -> Vec<String> {
vec!["aschema".to_string()]
}

fn schema(&self, _: &str) -> Option<Arc<dyn SchemaProvider>> {
Some(Arc::new(Self))
}
}
}
9 changes: 9 additions & 0 deletions datafusion/catalog/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use std::sync::Arc;

use crate::table::TableProvider;
use datafusion_common::Result;
use datafusion_expr::TableType;

/// Represents a schema, comprising a number of named tables.
///
Expand Down Expand Up @@ -54,6 +55,14 @@ pub trait SchemaProvider: Debug + Sync + Send {
name: &str,
) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError>;

/// Retrieves the type of a specific table from the schema by name, if it exists, otherwise
/// returns `None`. Implementations for which this operation is cheap but [Self::table] is
/// expensive can override this to improve operations that only need the type, e.g.
/// `SELECT * FROM information_schema.tables`.
async fn table_type(&self, name: &str) -> Result<Option<TableType>> {
self.table(name).await.map(|o| o.map(|t| t.table_type()))
}

/// If supported by the implementation, adds a new table named `name` to
/// this schema.
///
Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ config_namespace! {
///
/// This is used to workaround bugs in the planner that are now caught by
/// the new schema verification step.
pub skip_physical_aggregate_schema_check: bool, default = false
pub skip_physical_aggregate_schema_check: bool, default = true

/// Specifies the reserved memory for each spillable sort operation to
/// facilitate an in-memory merge.
Expand Down
3 changes: 3 additions & 0 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,9 @@ impl DefaultPhysicalPlanner {
differences.push(format!("field nullability at index {} [{}]: (physical) {} vs (logical) {}", i, physical_field.name(), physical_field.is_nullable(), logical_field.is_nullable()));
}
}

log::warn!("Physical input schema should be the same as the one converted from logical input schema, but did not match for logical plan:\n{}", input.display_indent());

return internal_err!("Physical input schema should be the same as the one converted from logical input schema. Differences: {}", differences
.iter()
.map(|s| format!("\n\t- {s}"))
Expand Down
Loading