Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ dependencies = [
"strenum>=0.4.15 ; python_version <= '3.10'",
"uv>=0.7.20",
"rich>=13.9.4",
"ibis-framework>=10.5.0"
]
requires-python = ">=3.10"
authors = [
Expand Down
125 changes: 125 additions & 0 deletions python/xorq/backends/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,130 @@
import functools
import importlib.metadata
from abc import ABC
from typing import Any, Mapping

import ibis
import ibis.expr.operations as ops
import pandas as pd
import pyarrow as pa
from ibis.backends.sql import SQLBackend
from ibis.expr import schema as sch
from ibis.expr import types as ir


class ExecutionBackend(SQLBackend, ABC):
def _pandas_execute(self, expr: ir.Expr, **kwargs):
from xorq.expr.api import _transform_expr
from xorq.expr.relations import FlightExpr, FlightUDXF

node = expr.op()
if isinstance(node, (FlightExpr, FlightUDXF)):
df = node.to_rbr().read_pandas(timestamp_as_object=True)
return expr.__pandas_result__(df)
(expr, created) = _transform_expr(expr)

return super().execute(expr, **kwargs)

def execute(self, expr, **kwargs) -> Any:
if self.name == "pandas":
return self._pandas_execute(expr, **kwargs)

batch_reader = self.to_pyarrow_batches(expr, **kwargs)
df = batch_reader.read_pandas(timestamp_as_object=True)

return expr.__pandas_result__(df)

def to_pyarrow_batches(
self,
expr: ir.Expr,
*,
chunk_size: int = 1_000_000,
**kwargs: Any,
):
from xorq.common.utils.defer_utils import rbr_wrapper
from xorq.expr.api import _transform_expr
from xorq.expr.relations import FlightExpr, FlightUDXF

if isinstance(expr.op(), (FlightExpr, FlightUDXF)):
return expr.op().to_rbr()
(expr, created) = _transform_expr(expr)
reader = super().to_pyarrow_batches(expr, chunk_size=chunk_size, **kwargs)

def clean_up():
for table_name, conn in created.items():
try:
conn.drop_table(table_name, force=True)
except Exception:
conn.drop_view(table_name)

return rbr_wrapper(reader, clean_up)

def _pandas_to_pyarrow(self, expr, **kwargs):
from xorq.expr.api import _transform_expr
from xorq.expr.relations import FlightExpr, FlightUDXF

node = expr.op()
if isinstance(node, (FlightExpr, FlightUDXF)):
df = node.to_rbr().read_pandas(timestamp_as_object=True)
return expr.__pyarrow_result__(df)
(expr, created) = _transform_expr(expr)

return super().to_pyarrow(expr, **kwargs)

def to_pyarrow(
self,
expr: ir.Expr,
*,
params: Mapping[ir.Scalar, Any] | None = None,
limit: int | str | None = None,
**kwargs: Any,
):
if self.name == "pandas":
return self._pandas_to_pyarrow(expr, **kwargs)

batch_reader = self.to_pyarrow_batches(expr, **kwargs)
arrow_table = batch_reader.read_all()
return expr.__pyarrow_result__(arrow_table)

@property
def version(self) -> str:
return super().version

def list_tables(
self, *, like: str | None = None, database: tuple[str, str] | str | None = None
) -> list[str]:
return super().list_tables(like=like, database=database)

def _get_schema_using_query(self, query: str) -> sch.Schema:
return super()._get_schema_using_query(query)

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
return super()._register_in_memory_table(op)

def create_table(
self,
name: str,
/,
obj: pd.DataFrame | pa.Table | ir.Table | None = None,
*,
schema: ibis.Schema | None = None,
database: str | None = None,
temp: bool = False,
overwrite: bool = False,
) -> ir.Table:
return super().create_table(
name,
obj=obj,
schema=schema,
database=database,
temp=temp,
overwrite=overwrite,
)

def table(
self, name: str, /, *, database: tuple[str, str] | str | None = None
) -> ir.Table:
return super().table(name, database=database)


@functools.cache
Expand Down
5 changes: 3 additions & 2 deletions python/xorq/backends/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
import os

import _pytest
import ibis
import pytest

import xorq.api as xo
from xorq.backends import _get_backend_names
from xorq.caching import ParquetSnapshotStorage, SourceSnapshotStorage
from xorq.vendor import ibis
from xorq.expr.relations import cache


snowflake_credentials_varnames = (
Expand Down Expand Up @@ -210,7 +211,7 @@ def con_cache_find_backend(parquet_dir):
def _con_cache_find_backend(_parquet_dir, cls, conn):
astronauts_path = _parquet_dir / "astronauts.parquet"
storage = cls(source=conn)
expr = conn.read_parquet(astronauts_path).cache(storage=storage)
expr = conn.read_parquet(astronauts_path).pipe(cache(storage=storage))
assert expr._find_backend()._profile == conn._profile

return functools.partial(_con_cache_find_backend, parquet_dir)
3 changes: 2 additions & 1 deletion python/xorq/backends/datafusion/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import xorq.vendor.ibis.expr.operations as ops
import xorq.vendor.ibis.expr.schema as sch
import xorq.vendor.ibis.expr.types as ir
from xorq.backends import ExecutionBackend
from xorq.vendor import ibis
from xorq.vendor.ibis.backends.datafusion import Backend as IbisDatafusionBackend
from xorq.vendor.ibis.common.dispatch import lazy_singledispatch
Expand All @@ -21,7 +22,7 @@
import pandas as pd


class Backend(IbisDatafusionBackend):
class Backend(ExecutionBackend, IbisDatafusionBackend):
def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
self.con.from_arrow(op.data.to_pyarrow(op.schema), op.name)

Expand Down
13 changes: 9 additions & 4 deletions python/xorq/backends/duckdb/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
from typing import Any, Mapping

import pyarrow as pa
from ibis.backends.duckdb import Backend as IbisDuckDBBackend
from ibis.expr import types as ir
from ibis.util import gen_name

from xorq.vendor.ibis.backends.duckdb import Backend as IbisDuckDBBackend
from xorq.vendor.ibis.expr import types as ir
from xorq.vendor.ibis.util import gen_name
from xorq.backends import ExecutionBackend


class Backend(IbisDuckDBBackend):
class BaseExecutionBackend(IbisDuckDBBackend):
def execute(
self,
expr: ir.Expr,
Expand Down Expand Up @@ -37,3 +38,7 @@ def to_pyarrow_batches(
return self._to_duckdb_relation(
expr, params=params, limit=limit
).fetch_arrow_reader(chunk_size)


class Backend(ExecutionBackend, BaseExecutionBackend):
pass
39 changes: 2 additions & 37 deletions python/xorq/backends/let/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import pyarrow_hotfix # noqa: F401
from sqlglot import exp, parse_one

from xorq.backends import ExecutionBackend
from xorq.backends.let.datafusion import Backend as DataFusionBackend
from xorq.common.collections import SourceDict
from xorq.internal import SessionConfig, WindowUDF
Expand Down Expand Up @@ -35,7 +36,7 @@ def _get_datafusion_dataframe(con, expr, **kwargs):
return con.con.sql(raw_sql)


class Backend(DataFusionBackend):
class Backend(ExecutionBackend, DataFusionBackend):
name = "let"

def __init__(self, *args, **kwargs):
Expand Down Expand Up @@ -158,42 +159,6 @@ def create_table(
self._sources[registered_table.op()] = registered_table.op()
return registered_table

def execute(self, expr: ir.Expr, **kwargs: Any):
batch_reader = self.to_pyarrow_batches(expr, **kwargs)
return expr.__pandas_result__(
batch_reader.read_pandas(timestamp_as_object=True)
)

def to_pyarrow(self, expr: ir.Expr, **kwargs: Any) -> pa.Table:
batch_reader = self.to_pyarrow_batches(expr, **kwargs)
arrow_table = batch_reader.read_all()
return expr.__pyarrow_result__(arrow_table)

def to_pyarrow_batches(
self,
expr: ir.Expr,
*,
chunk_size: int = 1_000_000,
**kwargs: Any,
) -> pa.ipc.RecordBatchReader:
return super().to_pyarrow_batches(expr, chunk_size=chunk_size, **kwargs)

def do_connect(self, config: SessionConfig | None = None) -> None:
"""Creates a connection.

Parameters
----------
config
Mapping of table names to files.

Examples
--------
>>> import xorq.api as xo
>>> con = xo.connect()

"""
super().do_connect(config=config)

def _to_sqlglot(
self, expr: ir.Expr, *, limit: str | None = None, params=None, **_: Any
):
Expand Down
43 changes: 22 additions & 21 deletions python/xorq/backends/let/datafusion/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,32 @@
from pathlib import Path
from typing import TYPE_CHECKING, Any

import ibis
import ibis.expr.datatypes as dt
import ibis.expr.operations as ops
import ibis.expr.schema as sch
import ibis.expr.types as ir
import pyarrow as pa
import pyarrow_hotfix # noqa: F401
import sqlglot as sg
import sqlglot.expressions as sge
from ibis.backends import (
CanCreateCatalog,
CanCreateDatabase,
NoUrl,
)
from ibis.backends.sql import SQLBackend
from ibis.backends.sql.compilers.base import C
from ibis.common.dispatch import lazy_singledispatch
from ibis.expr.operations.udf import InputType
from ibis.formats.pyarrow import (
PyArrowType,
)
from ibis.util import gen_name, normalize_filename, normalize_filenames

import xorq
import xorq.common.exceptions as com
import xorq.expr.datatypes as dt
import xorq.internal as df
import xorq.vendor.ibis.expr.operations as ops
import xorq.vendor.ibis.expr.schema as sch
import xorq.vendor.ibis.expr.types as ir
from xorq.backends.let.datafusion.compiler import compiler
from xorq.backends.let.datafusion.provider import IbisTableProvider
from xorq.common.utils import classproperty
Expand All @@ -38,21 +52,7 @@
WindowEvaluator,
udwf,
)
from xorq.vendor import ibis
from xorq.vendor.ibis.backends import (
CanCreateCatalog,
CanCreateDatabase,
CanCreateSchema,
NoUrl,
)
from xorq.vendor.ibis.backends.sql import SQLBackend
from xorq.vendor.ibis.backends.sql.compilers.base import C
from xorq.vendor.ibis.common.dispatch import lazy_singledispatch
from xorq.vendor.ibis.expr.operations.udf import InputType
from xorq.vendor.ibis.formats.pyarrow import (
PyArrowType,
)
from xorq.vendor.ibis.util import gen_name, normalize_filename, normalize_filenames
from xorq.vendor.ibis.expr.datatypes import LargeString


if TYPE_CHECKING:
Expand Down Expand Up @@ -201,7 +201,7 @@ def translate_sort(exprs: list[ir.Expr]):
return result


class Backend(SQLBackend, CanCreateCatalog, CanCreateDatabase, CanCreateSchema, NoUrl):
class Backend(SQLBackend, CanCreateCatalog, CanCreateDatabase, NoUrl):
name = "datafusion"
supports_in_memory_tables = True
supports_arrays = True
Expand Down Expand Up @@ -759,6 +759,7 @@ def to_pyarrow(self, expr: ir.Expr, **kwargs: Any) -> pa.Table:
return expr.__pyarrow_result__(arrow_table)

def execute(self, expr: ir.Expr, **kwargs: Any):
breakpoint()
batch_reader = self._to_pyarrow_batches(expr, **kwargs)
return expr.__pandas_result__(
batch_reader.read_pandas(timestamp_as_object=True)
Expand Down Expand Up @@ -821,7 +822,7 @@ def create_table(
compiler.cast(
sg.column(col, table=relname, quoted=quoted), dtype
).as_(col, quoted=quoted)
if not isinstance(dtype, dt.LargeString)
if not isinstance(dtype, LargeString)
else compiler.f.arrow_cast(
sg.column(col, table=relname, quoted=quoted), "LargeUtf8"
).as_(col, quoted=quoted)
Expand Down
Loading
Loading