Skip to content

Commit 9fb8f36

Browse files
committed
feat: add file_sort_order to register_parquet
1 parent f298786 commit 9fb8f36

File tree

4 files changed

+32
-3
lines changed

4 files changed

+32
-3
lines changed

python/letsql/backends/let/__init__.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import pandas as pd
99
import pyarrow as pa
1010
import pyarrow_hotfix # noqa: F401
11-
from letsql.internal import WindowUDF
11+
import toolz
1212
from ibis import BaseBackend
1313
from ibis.expr import types as ir, schema as sch
1414
from sqlglot import exp, parse_one
@@ -17,11 +17,13 @@
1717
from letsql.backends.let.datafusion import Backend as DataFusionBackend
1818
from letsql.common.collections import SourceDict
1919
from letsql.common.utils.graph_utils import replace_fix
20+
from letsql.common.utils.parquet_utils import get_df_sort_order
2021
from letsql.expr.relations import (
2122
CachedNode,
2223
replace_cache_table,
2324
RemoteTableReplacer,
2425
)
26+
from letsql.internal import WindowUDF
2527

2628

2729
def _get_datafusion_table(con, table_name, database="public"):
@@ -90,6 +92,13 @@ def register(
9092
def read_parquet(
9193
self, path: str | Path, table_name: str | None = None, **kwargs: Any
9294
) -> ir.Table:
95+
if "file_sort_order" not in kwargs:
96+
try:
97+
kwargs = toolz.merge(
98+
kwargs, {"file_sort_order": get_df_sort_order(path)}
99+
)
100+
except Exception:
101+
pass
93102
registered_table = super().read_parquet(path, table_name=table_name, **kwargs)
94103
self._sources[registered_table.op()] = registered_table.op()
95104
return registered_table

python/letsql/expr/__init__.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,11 @@
33

44
def __getattr__(name):
55
return getattr(expr, name)
6+
7+
8+
def column(name):
9+
return expr.Expr.column(name)
10+
11+
12+
def col(name):
13+
return expr.Expr.column(name)

src/context.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use crate::catalog::{PyCatalog, PyTable};
77
use crate::dataframe::PyDataFrame;
88
use crate::dataset::Dataset;
99
use crate::errors::DataFusionError;
10+
use crate::expr::sort_expr::PySortExpr;
1011
use crate::functions::greatest::GreatestFunc;
1112
use crate::functions::least::LeastFunc;
1213
use crate::ibis_table::IbisTable;
@@ -209,7 +210,8 @@ impl PySessionContext {
209210
file_extension=".parquet",
210211
skip_metadata=true,
211212
schema=None,
212-
storage_options=None
213+
storage_options=None,
214+
file_sort_order=None,
213215
))]
214216
fn register_parquet(
215217
&mut self,
@@ -221,6 +223,7 @@ impl PySessionContext {
221223
skip_metadata: bool,
222224
schema: Option<PyArrowType<Schema>>,
223225
storage_options: Option<HashMap<String, String>>,
226+
file_sort_order: Option<Vec<Vec<PySortExpr>>>,
224227
py: Python,
225228
) -> PyResult<()> {
226229
let mut options = ParquetReadOptions::default()
@@ -229,6 +232,12 @@ impl PySessionContext {
229232
.skip_metadata(skip_metadata);
230233
options.file_extension = file_extension;
231234
options.schema = schema.as_ref().map(|x| &x.0);
235+
options.file_sort_order = file_sort_order
236+
.unwrap_or_default()
237+
.into_iter()
238+
.map(|e| e.into_iter().map(|f| f.into()).collect())
239+
.collect();
240+
232241
let storage_options = storage_options.unwrap_or_default();
233242

234243
let result = async {

src/expr.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,8 @@ pub mod union;
9595
mod wildcard;
9696
pub mod window;
9797

98+
use sort_expr::PySortExpr;
99+
98100
/// A PyExpr that can be used on a DataFrame
99101
#[pyclass(name = "Expr", module = "datafusion.expr", subclass)]
100102
#[derive(Debug, Clone)]
@@ -247,7 +249,7 @@ impl PyExpr {
247249

248250
/// Create a sort PyExpr from an existing PyExpr.
249251
#[pyo3(signature = (ascending=true, nulls_first=true))]
250-
pub fn sort(&self, ascending: bool, nulls_first: bool) -> PyOrdered {
252+
pub fn sort(&self, ascending: bool, nulls_first: bool) -> PySortExpr {
251253
self.expr.clone().sort(ascending, nulls_first).into()
252254
}
253255

@@ -709,6 +711,7 @@ pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> {
709711
m.add_class::<create_memory_table::PyCreateMemoryTable>()?;
710712
m.add_class::<create_view::PyCreateView>()?;
711713
m.add_class::<distinct::PyDistinct>()?;
714+
m.add_class::<sort_expr::PySortExpr>()?;
712715
m.add_class::<subquery_alias::PySubqueryAlias>()?;
713716
m.add_class::<drop_table::PyDropTable>()?;
714717
m.add_class::<repartition::PyPartitioning>()?;

0 commit comments

Comments
 (0)