Skip to content

Commit 81af28d

Browse files
committed
update tests for doris
1 parent 2be38ef commit 81af28d

File tree

3 files changed

+156
-41
lines changed

3 files changed

+156
-41
lines changed

sqlmesh/core/engine_adapter/doris.py

Lines changed: 125 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
from sqlglot import exp, parse_one
88

9-
from sqlmesh.core.dialect import to_schema
9+
from sqlmesh.core.dialect import to_schema, transform_values
1010

1111
from sqlmesh.core.engine_adapter.mixins import (
1212
LogicalMergeMixin,
@@ -21,14 +21,14 @@
2121
set_catalog,
2222
)
2323
from sqlmesh.core.schema_diff import SchemaDiffer
24-
from sqlmesh.utils import random_id
24+
from sqlmesh.utils import random_id, get_source_columns_to_types
2525
from sqlmesh.utils.errors import (
2626
SQLMeshError,
2727
)
2828

2929
if t.TYPE_CHECKING:
3030
from sqlmesh.core._typing import SchemaName, TableName
31-
from sqlmesh.core.engine_adapter._typing import QueryOrDF
31+
from sqlmesh.core.engine_adapter._typing import QueryOrDF, Query
3232
from sqlmesh.core.node import IntervalUnit
3333

3434
logger = logging.getLogger(__name__)
@@ -136,10 +136,20 @@ def _get_data_objects(
136136
.where(exp.column("table_schema").eq(to_schema(schema_name).db))
137137
)
138138
if object_names:
139-
query = query.where(exp.column("table_name").isin(*object_names))
139+
# Doris may treat information_schema table_name comparisons as case-sensitive depending on settings.
140+
# Use LOWER(table_name) to match case-insensitively.
141+
lowered_names = [name.lower() for name in object_names]
142+
query = query.where(exp.func("LOWER", exp.column("table_name")).isin(*lowered_names))
140143

141144
result = []
142-
for schema_val, name_val, type_val in self.fetchall(query):
145+
rows = self.fetchall(query)
146+
logger.debug(
147+
"[Doris] _get_data_objects schema=%s object_names=%s -> %d rows",
148+
schema_name,
149+
list(object_names) if object_names else None,
150+
len(rows),
151+
)
152+
for schema_val, name_val, type_val in rows:
143153
try:
144154
schema = str(schema_val) if schema_val is not None else str(schema_name)
145155
name = str(name_val) if name_val is not None else "unknown"
@@ -396,6 +406,7 @@ def _create_materialized_view(
396406
insert_pos = 0
397407
create_sql = f"{create_sql[:insert_pos]} {insert_text}{create_sql[insert_pos:]}"
398408

409+
logger.debug("[Doris] CREATE MATERIALIZED VIEW SQL: %s", create_sql)
399410
self.execute(create_sql)
400411

401412
def drop_view(
@@ -559,6 +570,115 @@ def _build_column_condition(self, column: exp.Expression, is_not_in: bool) -> ex
559570
t2_col = exp.column(column.name, table="_t2")
560571
return t1_col.neq(t2_col) if is_not_in else t1_col.eq(t2_col)
561572

573+
def replace_query(
574+
self,
575+
table_name: "TableName",
576+
query_or_df: "QueryOrDF",
577+
target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
578+
table_description: t.Optional[str] = None,
579+
column_descriptions: t.Optional[t.Dict[str, str]] = None,
580+
source_columns: t.Optional[t.List[str]] = None,
581+
**kwargs: t.Any,
582+
) -> None:
583+
"""
584+
Doris does not support REPLACE TABLE. Avoid CTAS on replace and always perform a
585+
delete+insert (or engine strategy) to ensure data is written even if the table exists.
586+
"""
587+
logger.debug(
588+
"[Doris] replace_query target=%s source_columns=%s",
589+
table_name,
590+
source_columns,
591+
)
592+
target_table = exp.to_table(table_name)
593+
source_queries, inferred_columns_to_types = self._get_source_queries_and_columns_to_types(
594+
query_or_df,
595+
target_columns_to_types,
596+
target_table=target_table,
597+
source_columns=source_columns,
598+
)
599+
target_columns_to_types = inferred_columns_to_types or self.columns(target_table)
600+
logger.debug(
601+
"[Doris] replace_query using %d source queries; columns=%s",
602+
len(source_queries),
603+
list(target_columns_to_types.keys()),
604+
)
605+
# Use the standard insert-overwrite-by-condition path (DELETE/INSERT for Doris by default)
606+
return self._insert_overwrite_by_condition(
607+
target_table,
608+
source_queries,
609+
target_columns_to_types,
610+
)
611+
612+
def _values_to_sql(
613+
self,
614+
values: t.List[t.Tuple[t.Any, ...]],
615+
target_columns_to_types: t.Dict[str, exp.DataType],
616+
batch_start: int,
617+
batch_end: int,
618+
alias: str = "t",
619+
source_columns: t.Optional[t.List[str]] = None,
620+
) -> "Query":
621+
"""
622+
Build a SELECT/UNION ALL subquery for a batch of literal rows.
623+
624+
Doris (MySQL-compatible) doesn't reliably render SQLGlot's VALUES in FROM when using the
625+
'doris' dialect, which led to an empty `(SELECT)` subquery. To avoid that, construct a
626+
dialect-agnostic union of SELECT literals and then cast/order in an outer SELECT.
627+
"""
628+
source_columns = source_columns or list(target_columns_to_types)
629+
source_columns_to_types = get_source_columns_to_types(
630+
target_columns_to_types, source_columns
631+
)
632+
633+
row_values = values[batch_start:batch_end]
634+
635+
inner: exp.Query
636+
if not row_values:
637+
# Produce a zero-row subquery with the correct schema
638+
zero_row_select = exp.select(
639+
*[
640+
exp.cast(exp.null(), to=col_type).as_(col, quoted=True)
641+
for col, col_type in source_columns_to_types.items()
642+
]
643+
).where(exp.false())
644+
inner = zero_row_select
645+
else:
646+
# Build UNION ALL of SELECT <literals AS columns>
647+
selects: t.List[exp.Select] = []
648+
for row in row_values:
649+
converted_vals = list(transform_values(row, source_columns_to_types))
650+
select_exprs = [
651+
exp.alias_(val, col, quoted=True)
652+
for val, col in zip(converted_vals, source_columns_to_types.keys())
653+
]
654+
selects.append(exp.select(*select_exprs))
655+
656+
inner = selects[0]
657+
for s in selects[1:]:
658+
inner = exp.union(inner, s, distinct=False)
659+
660+
# Outer select to coerce/order target columns
661+
casted_columns = [
662+
exp.alias_(
663+
exp.cast(
664+
exp.column(column, table=alias, quoted=True)
665+
if column in source_columns_to_types
666+
else exp.Null(),
667+
to=kind,
668+
),
669+
column,
670+
quoted=True,
671+
)
672+
for column, kind in target_columns_to_types.items()
673+
]
674+
675+
final_query = exp.select(*casted_columns).from_(
676+
exp.alias_(exp.Subquery(this=inner), alias, table=True),
677+
copy=False,
678+
)
679+
680+
return final_query
681+
562682
def _create_table_from_columns(
563683
self,
564684
table_name: TableName,

tests/core/engine_adapter/integration/test_integration.py

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -800,11 +800,19 @@ def test_insert_overwrite_by_time_partition(ctx_query_and_df: TestContext):
800800
# Get current year and create dates for testing. Doris cannot have more than 500 history partitions.
801801
current_year = datetime.now().year
802802
current_date = datetime(current_year, 1, 1)
803-
date_1 = current_date.strftime("%Y-%m-%d")
804-
date_2 = (current_date + timedelta(days=1)).strftime("%Y-%m-%d")
805-
date_3 = (current_date + timedelta(days=2)).strftime("%Y-%m-%d")
806-
date_4 = (current_date + timedelta(days=3)).strftime("%Y-%m-%d")
807-
date_5 = (current_date + timedelta(days=4)).strftime("%Y-%m-%d")
803+
if ctx.dialect == "doris":
804+
# For Doris with DATE type, use pandas date objects
805+
date_1 = current_date.date()
806+
date_2 = (current_date + timedelta(days=1)).date()
807+
date_3 = (current_date + timedelta(days=2)).date()
808+
date_4 = (current_date + timedelta(days=3)).date()
809+
date_5 = (current_date + timedelta(days=4)).date()
810+
else:
811+
date_1 = current_date.strftime("%Y-%m-%d")
812+
date_2 = (current_date + timedelta(days=1)).strftime("%Y-%m-%d")
813+
date_3 = (current_date + timedelta(days=2)).strftime("%Y-%m-%d")
814+
date_4 = (current_date + timedelta(days=3)).strftime("%Y-%m-%d")
815+
date_5 = (current_date + timedelta(days=4)).strftime("%Y-%m-%d")
808816

809817
ctx.columns_to_types = {"id": "int", "ds": ds_type}
810818
table = ctx.table("test_table")
@@ -901,11 +909,19 @@ def test_insert_overwrite_by_time_partition_source_columns(ctx_query_and_df: Tes
901909
# Get current year and create dates for testing. Doris cannot have more than 500 history partitions.
902910
current_year = datetime.now().year
903911
current_date = datetime(current_year, 1, 1)
904-
date_1 = current_date.strftime("%Y-%m-%d")
905-
date_2 = (current_date + timedelta(days=1)).strftime("%Y-%m-%d")
906-
date_3 = (current_date + timedelta(days=2)).strftime("%Y-%m-%d")
907-
date_4 = (current_date + timedelta(days=3)).strftime("%Y-%m-%d")
908-
date_5 = (current_date + timedelta(days=4)).strftime("%Y-%m-%d")
912+
if ctx.dialect == "doris":
913+
# For Doris with DATE type, use pandas date objects
914+
date_1 = current_date.date()
915+
date_2 = (current_date + timedelta(days=1)).date()
916+
date_3 = (current_date + timedelta(days=2)).date()
917+
date_4 = (current_date + timedelta(days=3)).date()
918+
date_5 = (current_date + timedelta(days=4)).date()
919+
else:
920+
date_1 = current_date.strftime("%Y-%m-%d")
921+
date_2 = (current_date + timedelta(days=1)).strftime("%Y-%m-%d")
922+
date_3 = (current_date + timedelta(days=2)).strftime("%Y-%m-%d")
923+
date_4 = (current_date + timedelta(days=3)).strftime("%Y-%m-%d")
924+
date_5 = (current_date + timedelta(days=4)).strftime("%Y-%m-%d")
909925

910926
ctx.columns_to_types = {"id": "int", "ds": ds_type}
911927
columns_to_types = {

tests/core/engine_adapter/test_doris.py

Lines changed: 5 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,7 @@ def test_create_table_with_partitioned_by(
280280
adapter.create_table(
281281
"test_table",
282282
target_columns_to_types={"a": exp.DataType.build("INT"), "b": exp.DataType.build("DATE")},
283-
partitioned_by=[exp.Literal.string("RANGE(b)")],
283+
partitioned_by=[exp.to_column("b")],
284284
table_properties={
285285
"partitions": exp.Literal.string(
286286
"FROM ('2000-11-14') TO ('2021-11-14') INTERVAL 2 YEAR"
@@ -292,29 +292,6 @@ def test_create_table_with_partitioned_by(
292292
"CREATE TABLE IF NOT EXISTS `test_table` (`a` INT, `b` DATE) PARTITION BY RANGE (`b`) (FROM ('2000-11-14') TO ('2021-11-14') INTERVAL 2 YEAR)",
293293
]
294294

295-
adapter.cursor.execute.reset_mock()
296-
297-
adapter.create_table(
298-
"test_table",
299-
target_columns_to_types={"a": exp.DataType.build("INT"), "b": exp.DataType.build("TEXT")},
300-
partitioned_by=[exp.Literal.string("LIST(b)")],
301-
table_properties={
302-
"partitions": exp.Tuple(
303-
expressions=[
304-
exp.Literal.string(
305-
'PARTITION `p_cn` VALUES IN ("Beijing", "Shanghai", "Hong Kong")'
306-
),
307-
exp.Literal.string('PARTITION `p_usa` VALUES IN ("New York", "San Francisco")'),
308-
exp.Literal.string('PARTITION `p_other` VALUES IN ("Other")'),
309-
]
310-
)
311-
},
312-
)
313-
314-
assert to_sql_calls(adapter) == [
315-
'CREATE TABLE IF NOT EXISTS `test_table` (`a` INT, `b` STRING) PARTITION BY RANGE (`b`) (PARTITION `p_cn` VALUES IN ("Beijing", "Shanghai", "Hong Kong"), PARTITION `p_usa` VALUES IN ("New York", "San Francisco"), PARTITION `p_other` VALUES IN ("Other"))',
316-
]
317-
318295

319296
def test_create_full_materialized_view(
320297
make_mocked_engine_adapter: t.Callable[..., DorisEngineAdapter],
@@ -346,7 +323,9 @@ def test_create_full_materialized_view(
346323
"replication_num": exp.Literal.string("1"),
347324
}
348325
materialized_properties = {
349-
"partitioned_by": [exp.to_column("orderdate")],
326+
"partitioned_by": [
327+
parse_one("DATE_TRUNC(o_orderdate, 'MONTH')", dialect="doris"),
328+
],
350329
"clustered_by": [],
351330
"partition_interval_unit": None,
352331
}
@@ -387,7 +366,7 @@ def test_create_full_materialized_view(
387366
expected_sqls = [
388367
"CREATE MATERIALIZED VIEW `complete_mv` (`orderdate` COMMENT 'order date', `orderkey` COMMENT 'order key', `partkey` COMMENT 'part key') "
389368
"BUILD IMMEDIATE REFRESH AUTO ON SCHEDULE EVERY 1 DAY STARTS '2024-12-01 20:30:00' KEY (`orderkey`) COMMENT 'test_description' "
390-
"PARTITION BY (`orderdate`) "
369+
"PARTITION BY (DATE_TRUNC(`o_orderdate`, 'MONTH')) "
391370
"DISTRIBUTED BY HASH (`orderkey`) BUCKETS 2 PROPERTIES ('replication_num'='1') "
392371
"AS SELECT `o_orderdate`, `l_orderkey`, `l_partkey` FROM `orders` LEFT JOIN `lineitem` ON `l_orderkey` = `o_orderkey` LEFT JOIN `partsupp` ON `ps_partkey` = `l_partkey` AND `l_suppkey` = `ps_suppkey`",
393372
]

0 commit comments

Comments
 (0)