Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ bytes = { workspace = true }
dashmap = { workspace = true }
# note only use main datafusion crate for examples
base64 = "0.22.1"
datafusion = { workspace = true, default-features = true, features = ["parquet_encryption"] }
datafusion = { workspace = true, default-features = true, features = ["parquet_encryption", "sql"] }
datafusion-common = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-physical-expr-adapter = { workspace = true }
Expand Down
49 changes: 10 additions & 39 deletions datafusion-examples/examples/relation_planner/table_sample.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,12 @@ use std::{
any::Any,
fmt::{self, Debug, Formatter},
hash::{Hash, Hasher},
ops::{Add, Div, Mul, Sub},
pin::Pin,
str::FromStr,
sync::Arc,
task::{Context, Poll},
};

use arrow::datatypes::{Float64Type, Int64Type};
use arrow::{
array::{ArrayRef, Int32Array, RecordBatch, StringArray, UInt32Array},
compute,
Expand All @@ -102,6 +101,7 @@ use futures::{
use rand::{rngs::StdRng, Rng, SeedableRng};
use tonic::async_trait;

use datafusion::optimizer::simplify_expressions::simplify_sql_literal::parse_sql_literal;
use datafusion::{
execution::{
context::QueryPlanner, RecordBatchStream, SendableRecordBatchStream,
Expand Down Expand Up @@ -384,6 +384,8 @@ impl RelationPlanner for TableSamplePlanner {
};
let input = context.plan(base_relation)?;

let schema = input.schema();

// Handle bucket sampling (Hive-style: TABLESAMPLE(BUCKET x OUT OF y))
if let Some(bucket) = sample.bucket {
if bucket.on.is_some() {
Expand Down Expand Up @@ -415,7 +417,8 @@ impl RelationPlanner for TableSamplePlanner {
match quantity.unit {
// TABLESAMPLE (N ROWS) - exact row limit
Some(TableSampleUnit::Rows) => {
let rows = parse_quantity::<i64>(&quantity.value)?;
let rows: i64 =
parse_sql_literal::<Int64Type>(&quantity.value, schema, context)?;
if rows < 0 {
return plan_err!("row count must be non-negative, got {}", rows);
}
Expand All @@ -427,15 +430,17 @@ impl RelationPlanner for TableSamplePlanner {

// TABLESAMPLE (N PERCENT) - percentage sampling
Some(TableSampleUnit::Percent) => {
let percent = parse_quantity::<f64>(&quantity.value)?;
let percent: f64 =
parse_sql_literal::<Float64Type>(&quantity.value, schema, context)?;
let fraction = percent / 100.0;
let plan = TableSamplePlanNode::new(input, fraction, seed).into_plan();
Ok(RelationPlanning::Planned(PlannedRelation::new(plan, alias)))
}

// TABLESAMPLE (N) - fraction if <1.0, row limit if >=1.0
None => {
let value = parse_quantity::<f64>(&quantity.value)?;
let value =
parse_sql_literal::<Float64Type>(&quantity.value, schema, context)?;
if value < 0.0 {
return plan_err!("sample value must be non-negative, got {}", value);
}
Expand All @@ -454,40 +459,6 @@ impl RelationPlanner for TableSamplePlanner {
}
}

/// Parse a SQL expression as a numeric value (supports basic arithmetic).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@geoffreyclaude I wonder what your thoughts on this PR and the approach of parse_sql_literal?

fn parse_quantity<T>(expr: &ast::Expr) -> Result<T>
where
T: FromStr + Add<Output = T> + Sub<Output = T> + Mul<Output = T> + Div<Output = T>,
{
eval_numeric_expr(expr)
.ok_or_else(|| plan_datafusion_err!("invalid numeric expression: {:?}", expr))
}

/// Recursively evaluate numeric SQL expressions.
fn eval_numeric_expr<T>(expr: &ast::Expr) -> Option<T>
where
T: FromStr + Add<Output = T> + Sub<Output = T> + Mul<Output = T> + Div<Output = T>,
{
match expr {
ast::Expr::Value(v) => match &v.value {
ast::Value::Number(n, _) => n.to_string().parse().ok(),
_ => None,
},
ast::Expr::BinaryOp { left, op, right } => {
let l = eval_numeric_expr::<T>(left)?;
let r = eval_numeric_expr::<T>(right)?;
match op {
ast::BinaryOperator::Plus => Some(l + r),
ast::BinaryOperator::Minus => Some(l - r),
ast::BinaryOperator::Multiply => Some(l * r),
ast::BinaryOperator::Divide => Some(l / r),
_ => None,
}
}
_ => None,
}
}

/// Custom logical plan node representing a TABLESAMPLE operation.
///
/// Stores sampling parameters (bounds, seed) and wraps the input plan.
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ serde = [
]
sql = [
"datafusion-common/sql",
"datafusion-optimizer/sql",
"datafusion-functions-nested?/sql",
"datafusion-sql",
"sqlparser",
Expand Down
2 changes: 2 additions & 0 deletions datafusion/optimizer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ workspace = true
name = "datafusion_optimizer"

[features]
default = ["sql"]
recursive_protection = ["dep:recursive"]
sql = ["datafusion-expr/sql"]

[dependencies]
arrow = { workspace = true }
Expand Down
2 changes: 2 additions & 0 deletions datafusion/optimizer/src/simplify_expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ mod inlist_simplifier;
mod regex;
pub mod simplify_exprs;
mod simplify_predicates;
#[cfg(feature = "sql")]
pub mod simplify_sql_literal;
mod unwrap_cast;
mod utils;

Expand Down
245 changes: 245 additions & 0 deletions datafusion/optimizer/src/simplify_expressions/simplify_sql_literal.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Parses and simplifies a SQL expression to a literal of a given type.
//!
//! This module provides functionality to parse and simplify static SQL expressions
//! used in SQL constructs like `FROM TABLE SAMPLE (10 + 50 * 2)`. If they are required
//! in a planning (not an execution) phase, they need to be reduced to literals of a given type.

use crate::simplify_expressions::ExprSimplifier;
use arrow::datatypes::ArrowPrimitiveType;
use datafusion_common::{
DFSchemaRef, DataFusionError, Result, ScalarValue, plan_datafusion_err, plan_err,
};
use datafusion_expr::Expr;
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::planner::RelationPlannerContext;
use datafusion_expr::simplify::SimplifyContext;
use datafusion_expr::sqlparser::ast;
use std::sync::Arc;

/// Parse and simplifies a SQL expression to a numeric literal,
/// corresponding to an arrow primitive type `T` (for example, Float64Type).
///
/// This function simplifies and coerces the expression, then extracts the underlying
/// native type using `TryFrom<ScalarValue>`.
///
/// # Arguments
/// * `expr` - A logical AST expression
/// * `schema` - Schema reference for expression planning
/// * `context` - `RelationPlannerContext` context
///
/// # Returns
/// A `Result` containing a literal type
///
/// # Example
/// ```ignore
/// let value: f64 = parse_sql_literal::<Float64Type>(&expr, &schema, &mut relPlannerContext)?;
/// ```
pub fn parse_sql_literal<T>(
expr: &ast::Expr,
schema: &DFSchemaRef,
context: &mut dyn RelationPlannerContext,
) -> Result<T::Native>
where
T: ArrowPrimitiveType,
<T as ArrowPrimitiveType>::Native: TryFrom<ScalarValue, Error = DataFusionError>,
{
match context.sql_to_expr(expr.clone(), &Arc::clone(schema)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is a lot of ceremony to do this process

However, it also feels like this function is non trivial to use itself (it has 4 parameters and a type parameter)

I wonder if we added a method like SessionContext::simplify to invoke the simplifier more easily

Then extracting an expr as a particular type might look like

let expr = ...; // Expr to simplity
// cast to target type and simplify
let cast_expr = SessionContext::simplify(expr.cast(target_type)?);

// Check if it simplified to a type
let Expr::Literal(scalar) = cast_expr  else {
  // error parsing
};

// convert

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true. Its purpose is to simplify expression and convert to a native type at the same time, hence the five parameters and the things behind the scenes (simplify, coerce, arrow cast and arrow-to-native cast) to shift it from client code.

For other usages of the optimiser's method - I agree that simplification could be done more easily in the client code via facade methods in context, as you suggested.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do wonder if there's a way to compact this signature; for example, having both T and target_type seem redundant since they'd need to be compatible anyway, but off the top of my head I can't think of a way to combine them 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree.

I've found a way to use an ArrowPrimitiveType and derive a native type from this trait. Now it looks much more compact with three parameters.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we also remove schema potentially? If we intend for this to only be used for literals, we can just pass an empty schema anywhere it is required inside parse_sql_literal 🤔

Ok(logical_expr) => {
log::debug!("Parsing expr {:?} to type {}", logical_expr, T::DATA_TYPE);

let execution_props = ExecutionProps::new();
let simplifier = ExprSimplifier::new(
SimplifyContext::new(&execution_props).with_schema(Arc::clone(schema)),
);

// Simplify and coerce expression in case of constant arithmetic operations (e.g., 10 + 5)
let simplified_expr: Expr = simplifier
.simplify(logical_expr.clone())
.map_err(|err| plan_datafusion_err!("Cannot simplify {expr:?}: {err}"))?;
let coerced_expr: Expr = simplifier.coerce(simplified_expr, schema)?;
log::debug!("Coerced expression: {:?}", &coerced_expr);

match coerced_expr {
Expr::Literal(scalar_value, _) => {
// It is a literal - proceed to the underlying value
// Cast to the target type if needed
let casted_scalar = scalar_value.cast_to(&T::DATA_TYPE)?;

// Extract the native type
T::Native::try_from(casted_scalar).map_err(|err| {
plan_datafusion_err!(
"Cannot extract {} from scalar value: {err}",
std::any::type_name::<T>()
)
})
}
actual => {
plan_err!(
"Cannot extract literal from coerced {actual:?} expression given {expr:?} expression"
)
}
}
}
Err(err) => {
plan_err!("Cannot construct logical expression from {expr:?}: {err}")
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use arrow::datatypes::{DataType, Float64Type, Int64Type};
use datafusion_common::config::ConfigOptions;
use datafusion_common::{DFSchema, TableReference, not_impl_err};
use datafusion_expr::planner::ContextProvider;
use datafusion_expr::sqlparser::parser::Parser;
use datafusion_expr::{AggregateUDF, ScalarUDF, TableSource, WindowUDF};
use datafusion_sql::planner::{PlannerContext, SqlToRel};
use datafusion_sql::relation::SqlToRelRelationContext;
use datafusion_sql::sqlparser::dialect::GenericDialect;
use std::sync::Arc;

// Simple mock context provider for testing
struct MockContextProvider {
options: ConfigOptions,
}

impl ContextProvider for MockContextProvider {
fn get_table_source(&self, _: TableReference) -> Result<Arc<dyn TableSource>> {
not_impl_err!("mock")
}

fn get_function_meta(&self, _name: &str) -> Option<Arc<ScalarUDF>> {
None
}

fn get_aggregate_meta(&self, _name: &str) -> Option<Arc<AggregateUDF>> {
None
}

fn get_variable_type(&self, _variable_names: &[String]) -> Option<DataType> {
None
}

fn get_window_meta(&self, _name: &str) -> Option<Arc<WindowUDF>> {
None
}

fn options(&self) -> &ConfigOptions {
&self.options
}

fn udf_names(&self) -> Vec<String> {
vec![]
}

fn udaf_names(&self) -> Vec<String> {
vec![]
}

fn udwf_names(&self) -> Vec<String> {
vec![]
}
}

#[test]
fn test_parse_sql_float_literal() {
let test_cases = vec![
("0.0", 0.0),
("1.0", 1.0),
("0", 0.0),
("1", 1.0),
("0.5", 0.5),
("100.0", 100.0),
("0.001", 0.001),
("999.999", 999.999),
("1.0 + 2.0", 3.0),
("10.0 * 0.5", 5.0),
("100.0 / 4.0", 25.0),
("(80.0 + 2.0*10.0) / 4.0", 25.0),
("50.0 - 10.0", 40.0),
("1e2", 100.0),
("1.5e1", 15.0),
("2.5e-1", 0.25),
];

let schema = DFSchemaRef::new(DFSchema::empty());
let context = MockContextProvider {
options: ConfigOptions::default(),
};
let sql_to_rel = SqlToRel::new(&context);
let mut planner_context = PlannerContext::new();
let mut sql_context =
SqlToRelRelationContext::new(&sql_to_rel, &mut planner_context);
let dialect = GenericDialect {};

for (sql_expr, expected) in test_cases {
let ast_expr = Parser::new(&dialect)
.try_with_sql(sql_expr)
.unwrap()
.parse_expr()
.unwrap();

let result: Result<f64> =
parse_sql_literal::<Float64Type>(&ast_expr, &schema, &mut sql_context);

match result {
Ok(value) => {
assert!(
(value - expected).abs() < 1e-10,
"For expression '{sql_expr}': expected {expected}, got {value}",
);
}
Err(e) => panic!("Failed to parse expression '{sql_expr}': {e}"),
}
}
}

#[test]
fn test_parse_sql_integer_literal() {
let schema = DFSchemaRef::new(DFSchema::empty());
let context = MockContextProvider {
options: ConfigOptions::default(),
};
let sql_to_rel = SqlToRel::new(&context);
let mut planner_context = PlannerContext::new();
let mut sql_context =
SqlToRelRelationContext::new(&sql_to_rel, &mut planner_context);
let dialect = GenericDialect {};

// Integer
let ast_expr = Parser::new(&dialect)
.try_with_sql("2 + 4")
.unwrap()
.parse_expr()
.unwrap();

let result: Result<i64> =
parse_sql_literal::<Int64Type>(&ast_expr, &schema, &mut sql_context);

match result {
Ok(value) => {
assert_eq!(6, value);
}
Err(e) => panic!("Failed to parse expression: {e}"),
}
}
}
2 changes: 1 addition & 1 deletion datafusion/sql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ mod expr;
pub mod parser;
pub mod planner;
mod query;
mod relation;
pub mod relation;
pub mod resolve;
mod select;
mod set_expr;
Expand Down
Loading