Skip to content

Commit f30db5b

Browse files
committed
ref: unify ops
1 parent e399353 commit f30db5b

File tree

9 files changed

+33
-32
lines changed

9 files changed

+33
-32
lines changed

python/xorq/caching/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ def rename_remote_table(node, _, **kwargs):
179179
name=name,
180180
schema=node.schema,
181181
source=node.source,
182-
remote_expr=node.remote_expr,
182+
parent=node.parent,
183183
namespace=node.namespace,
184184
)
185185
return rt

python/xorq/common/utils/dask_normalize/dask_normalize_expr.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ def normalize_letsql_databasetable(dt):
246246
return dask.base.normalize_token(
247247
(
248248
"normalize_letsql_databasetable",
249-
dt.input_expr,
249+
dt.parent,
250250
# we need to "stabilize" the name of the tables in the unbound expr
251251
rename_unbound_static(dt.unbound_expr.op()).to_expr(),
252252
dt.make_connection,
@@ -256,7 +256,7 @@ def normalize_letsql_databasetable(dt):
256256
return dask.base.normalize_token(
257257
(
258258
"normalize_letsql_databasetable",
259-
dt.input_expr,
259+
dt.parent,
260260
dt.udxf.exchange_f,
261261
dt.make_connection,
262262
)
@@ -372,7 +372,7 @@ def normalize_remote_table(dt):
372372

373373
return normalize_seq_with_caller(
374374
("schema", dt.schema),
375-
("expr", dt.remote_expr),
375+
("expr", dt.parent),
376376
# only thing that matters is the type of source its going into
377377
("source", dt.source.name),
378378
caller="normalize_remote_table",
@@ -497,7 +497,7 @@ def opaque_node_replacer(node, kwargs):
497497
case rel.RemoteTable():
498498
new_node = api.table(
499499
node.schema,
500-
name=dask.base.tokenize(node.remote_expr),
500+
name=dask.base.tokenize(node.parent),
501501
).op()
502502
case rel.FlightUDXF() | rel.FlightExpr():
503503
new_node = api.table(

python/xorq/common/utils/graph_utils.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,13 @@ def gen_children_of(node: Node) -> Tuple[Node, ...]:
3434
gen = () if rel_node is None else (to_node(rel_node),)
3535

3636
case rel.RemoteTable():
37-
gen = (to_node(node.remote_expr),)
37+
gen = (to_node(node.parent),)
3838

3939
case rel.CachedNode():
4040
gen = (to_node(node.parent),)
4141

4242
case rel.FlightExpr() | rel.FlightUDXF():
43-
gen = (to_node(node.input_expr),)
43+
gen = (to_node(node.parent),)
4444

4545
case udf.ExprScalarUDF():
4646
gen = (to_node(node.computed_kwargs_expr),)
@@ -95,13 +95,13 @@ def process_node(op, _kwargs):
9595
op = replacer(op, _kwargs)
9696
match op:
9797
case rel.RemoteTable():
98-
remote_expr = op.remote_expr.op().replace(process_node).to_expr()
98+
remote_expr = op.parent.op().replace(process_node).to_expr()
9999
return do_recreate(op, _kwargs, remote_expr=remote_expr)
100100
case rel.CachedNode():
101101
parent = op.parent.op().replace(process_node).to_expr()
102102
return do_recreate(op, _kwargs, parent=parent)
103103
case rel.FlightExpr() | rel.FlightUDXF():
104-
input_expr = op.input_expr.op().replace(process_node).to_expr()
104+
input_expr = op.parent.op().replace(process_node).to_expr()
105105
return do_recreate(op, _kwargs, input_expr=input_expr)
106106
case udf.ExprScalarUDF():
107107
computed_kwargs_expr = (

python/xorq/common/utils/lineage_utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ def _(node: rel.CachedNode, cfg: Dict[str, Any] | None = None) -> str:
172172
def _(node: rel.FlightExpr, cfg: Dict[str, Any] | None = None) -> str:
173173
palette: ColorScheme = (cfg or {}).get("palette", default_palette)
174174
col = palette.get("flight")
175-
return f"{col}FlightExpr ({node.input_expr})[/]"
175+
return f"{col}FlightExpr ({node.parent})[/]"
176176

177177

178178
@format_node.register

python/xorq/expr/relations.py

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ def replace_cache_table(node, kwargs):
3333
if isinstance(node, CachedNode):
3434
return node.parent.op().replace(replace_cache_table)
3535
elif isinstance(node, RemoteTable):
36-
return node.remote_expr.op().replace(replace_cache_table)
36+
return node.parent.op().replace(replace_cache_table)
3737
else:
3838
return node
3939

@@ -141,8 +141,11 @@ def __dask_tokenize__(self):
141141
)
142142

143143

144-
class CachedNode(ops.DatabaseTable):
144+
class DatabaseTableView(ops.DatabaseTable):
145145
parent: Any = None
146+
147+
148+
class CachedNode(DatabaseTableView):
146149
storage: Any = None
147150

148151

@@ -154,26 +157,24 @@ class CachedNode(ops.DatabaseTable):
154157
)
155158

156159

157-
class RemoteTable(ops.DatabaseTable):
158-
remote_expr: Expr = None
159-
160+
class RemoteTable(DatabaseTableView):
160161
@classmethod
161162
def from_expr(cls, con, expr, name=None):
162163
name = name or gen_name()
163164
return cls(
164165
name=name,
165166
schema=expr.schema(),
166167
source=con,
167-
remote_expr=expr,
168+
parent=expr,
168169
)
169170

170171

171172
def into_backend(expr, con, name=None):
172173
return RemoteTable.from_expr(con=con, expr=expr, name=name).to_expr()
173174

174175

175-
class FlightExpr(ops.DatabaseTable):
176-
input_expr: Expr = None
176+
class FlightExpr(DatabaseTableView):
177+
parent: Expr = None
177178
unbound_expr: Expr = None
178179
make_server: Callable = None
179180
make_connection: Callable = None
@@ -211,7 +212,7 @@ def roundtrip_cloudpickle(obj):
211212
name=name or gen_name(),
212213
schema=unbound_expr.schema(),
213214
source=input_expr._find_backend(),
214-
input_expr=input_expr,
215+
parent=input_expr,
215216
unbound_expr=roundtrip_cloudpickle(unbound_expr),
216217
make_server=make_server or FlightServer,
217218
make_connection=make_connection or xo_connect,
@@ -228,7 +229,7 @@ def to_rbr(self, do_instrument_reader=None):
228229
do_instrument_reader = self.do_instrument_reader
229230

230231
def inner(flight_exchange):
231-
rbr_in = flight_exchange.input_expr.to_pyarrow_batches()
232+
rbr_in = flight_exchange.parent.to_pyarrow_batches()
232233
if do_instrument_reader:
233234
rbr_in = instrument_reader(rbr_in, "input: ")
234235
with flight_exchange.make_server() as server:
@@ -316,8 +317,8 @@ def flight_expr(
316317
)
317318

318319

319-
class FlightUDXF(ops.DatabaseTable):
320-
input_expr: Expr = None
320+
class FlightUDXF(DatabaseTableView):
321+
parent: Expr = None
321322
# FIXME: fix circular import issue so we can possibly pass an instance of AbstractExchanger
322323
udxf: type = None
323324
make_server: Callable = None
@@ -360,7 +361,7 @@ def make_mtls_server():
360361
name=name or gen_name(),
361362
schema=schema,
362363
source=input_expr._find_backend(),
363-
input_expr=input_expr,
364+
parent=input_expr,
364365
udxf=udxf,
365366
make_server=make_server or make_mtls_server,
366367
make_connection=make_connection or xo_connect,
@@ -374,7 +375,7 @@ def to_rbr(self, do_instrument_reader=None):
374375
do_instrument_reader = self.do_instrument_reader
375376

376377
def inner(flight_udxf):
377-
rbr_in = flight_udxf.input_expr.to_pyarrow_batches()
378+
rbr_in = flight_udxf.parent.to_pyarrow_batches()
378379
if do_instrument_reader:
379380
rbr_in = instrument_reader(rbr_in, "input: ")
380381
with flight_udxf.make_server() as server:
@@ -603,7 +604,7 @@ def register_and_transform_remote_tables(expr):
603604
)
604605
batches_table = {}
605606
for arg, count in counts.items():
606-
ex = arg.remote_expr
607+
ex = arg.parent
607608
batches = ex.to_pyarrow_batches()
608609
schema = ex.as_table().schema().to_pyarrow()
609610
replicas = SafeTee.tee(batches, count)

python/xorq/ibis_yaml/sql.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,15 +58,15 @@ def get_name(node):
5858

5959
def find_tables(expr: ir.Expr) -> Tuple[Dict[str, QueryInfo], Dict[str, QueryInfo]]:
6060
def get_remote_table_backend(node):
61-
return node.remote_expr._find_backend()
61+
return node.parent._find_backend()
6262

6363
grouped = toolz.groupby(type, walk_nodes((RemoteTable, Read), expr))
6464
remote_tables: Dict[str, QueryInfo] = {
6565
node.name: {
6666
"engine": backend.name,
6767
"profile_name": backend._profile.hash_name,
68-
"relations": find_relations(node.remote_expr),
69-
"sql": to_sql(node.remote_expr).strip(),
68+
"relations": find_relations(node.parent),
69+
"sql": to_sql(node.parent).strip(),
7070
"options": {},
7171
}
7272
for node in grouped.get(RemoteTable, ())

python/xorq/ibis_yaml/translate.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -510,7 +510,7 @@ def _cached_node_from_yaml(yaml_dict: dict, context: any) -> ibis.Expr:
510510
@translate_to_yaml.register(RemoteTable)
511511
def _remotetable_to_yaml(op: RemoteTable, context: any) -> dict:
512512
profile_name = op.source._profile.hash_name
513-
remote_expr_yaml = translate_to_yaml(op.remote_expr, context)
513+
remote_expr_yaml = translate_to_yaml(op.parent, context)
514514
schema_id = context.schema_registry.register_schema(op.schema)
515515
# TODO: change profile to profile_name
516516
return freeze(

python/xorq/ibis_yaml/udf.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,7 @@ def _aggudf_from_yaml(yaml_dict: dict, compiler: any) -> any:
326326

327327
@translate_to_yaml.register(FlightExpr)
328328
def flight_expr_to_yaml(op: FlightExpr, context: any) -> dict:
329-
input_expr_yaml = translate_to_yaml(op.input_expr, context)
329+
input_expr_yaml = translate_to_yaml(op.parent, context)
330330
unbound_expr_yaml = translate_to_yaml(op.unbound_expr, context)
331331

332332
schema_id = context.schema_registry.register_schema(op.schema)
@@ -379,7 +379,7 @@ def flight_expr_from_yaml(yaml_dict: Dict, context: Any) -> Any:
379379

380380
@translate_to_yaml.register(FlightUDXF)
381381
def flight_udxf_to_yaml(op: FlightUDXF, context: any) -> dict:
382-
input_expr_yaml = translate_to_yaml(op.input_expr, context)
382+
input_expr_yaml = translate_to_yaml(op.parent, context)
383383
schema_id = context.schema_registry.register_schema(op.schema)
384384
udxf_pickle = serialize_callable(op.udxf)
385385
make_server_pickle = serialize_callable(op.make_server)

python/xorq/vendor/ibis/expr/types/core.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -801,7 +801,7 @@ def uncached_one(self):
801801

802802
parent = self.expr.op().parent
803803
if isinstance(parent.op(), RemoteTable):
804-
return parent.op().remote_expr
804+
return parent.op().parent
805805
else:
806806
return parent
807807
else:

0 commit comments

Comments
 (0)