21
21
set_catalog ,
22
22
)
23
23
from sqlmesh .core .schema_diff import SchemaDiffer
24
+ from sqlmesh .utils import random_id
24
25
from sqlmesh .utils .errors import (
25
26
SQLMeshError ,
26
27
)
34
35
35
36
36
37
@set_catalog ()
37
- class DorisEngineAdapter (LogicalMergeMixin , PandasNativeFetchDFSupportMixin , NonTransactionalTruncateMixin ):
38
+ class DorisEngineAdapter (
39
+ LogicalMergeMixin , PandasNativeFetchDFSupportMixin , NonTransactionalTruncateMixin
40
+ ):
38
41
DIALECT = "doris"
39
42
DEFAULT_BATCH_SIZE = 200
40
43
SUPPORTS_TRANSACTIONS = False # Doris doesn't support transactions
@@ -224,7 +227,9 @@ def _create_materialized_view(
224
227
import pandas as pd
225
228
226
229
if isinstance (query_or_df , pd .DataFrame ):
227
- values : t .List [t .Tuple [t .Any , ...]] = list (query_or_df .itertuples (index = False , name = None ))
230
+ values : t .List [t .Tuple [t .Any , ...]] = list (
231
+ query_or_df .itertuples (index = False , name = None )
232
+ )
228
233
target_columns_to_types , source_columns = self ._columns_to_types (
229
234
query_or_df , target_columns_to_types , source_columns
230
235
)
@@ -304,12 +309,16 @@ def _create_materialized_view(
304
309
doris_inline_clauses .append (f"BUILD { build_value } " )
305
310
refresh = view_properties .get ("refresh" )
306
311
if refresh is not None :
307
- refresh_value = refresh .this if isinstance (refresh , exp .Literal ) else str (refresh )
312
+ refresh_value = (
313
+ refresh .this if isinstance (refresh , exp .Literal ) else str (refresh )
314
+ )
308
315
doris_inline_clauses .append (f"REFRESH { refresh_value } " )
309
316
refresh_trigger = view_properties .get ("refresh_trigger" )
310
317
if refresh_trigger is not None :
311
318
refresh_trigger_value = (
312
- refresh_trigger .this if isinstance (refresh_trigger , exp .Literal ) else str (refresh_trigger )
319
+ refresh_trigger .this
320
+ if isinstance (refresh_trigger , exp .Literal )
321
+ else str (refresh_trigger )
313
322
)
314
323
doris_inline_clauses .append (str (refresh_trigger_value ))
315
324
# KEY / DUPLICATE KEY clause
@@ -339,7 +348,9 @@ def _create_materialized_view(
339
348
doris_inline_clauses .append (f"DUPLICATE KEY ({ ', ' .join (cols )} )" )
340
349
# COMMENT clause
341
350
if table_description :
342
- doris_inline_clauses .append (f"COMMENT '{ self ._truncate_table_comment (table_description )} '" )
351
+ doris_inline_clauses .append (
352
+ f"COMMENT '{ self ._truncate_table_comment (table_description )} '"
353
+ )
343
354
# PARTITION BY (inline for Doris MV)
344
355
if partitioned_by :
345
356
part_cols = ", " .join (
@@ -349,7 +360,9 @@ def _create_materialized_view(
349
360
if isinstance (col , exp .Expression )
350
361
else exp .to_column (col ).sql (dialect = self .dialect , identify = True )
351
362
)
352
- for col in (partitioned_by if isinstance (partitioned_by , list ) else [partitioned_by ])
363
+ for col in (
364
+ partitioned_by if isinstance (partitioned_by , list ) else [partitioned_by ]
365
+ )
353
366
]
354
367
)
355
368
doris_inline_clauses .append (f"PARTITION BY ({ part_cols } )" )
@@ -398,7 +411,9 @@ def drop_view(
398
411
# Remove cascade from kwargs as Doris doesn't support it
399
412
if materialized and kwargs .get ("view_properties" ):
400
413
view_properties = kwargs .pop ("view_properties" )
401
- if view_properties .get ("materialized_type" ) == "SYNC" and view_properties .get ("source_table" ):
414
+ if view_properties .get ("materialized_type" ) == "SYNC" and view_properties .get (
415
+ "source_table"
416
+ ):
402
417
# Format the source table name properly for Doris
403
418
source_table = view_properties .get ("source_table" )
404
419
if isinstance (source_table , exp .Table ):
@@ -432,18 +447,24 @@ def create_table_like(
432
447
)
433
448
)
434
449
435
- def _create_table_comment (self , table_name : TableName , table_comment : str , table_kind : str = "TABLE" ) -> None :
450
+ def _create_table_comment (
451
+ self , table_name : TableName , table_comment : str , table_kind : str = "TABLE"
452
+ ) -> None :
436
453
table_sql = exp .to_table (table_name ).sql (dialect = self .dialect , identify = True )
437
454
438
- self .execute (f'ALTER TABLE { table_sql } MODIFY COMMENT "{ self ._truncate_table_comment (table_comment )} "' )
455
+ self .execute (
456
+ f'ALTER TABLE { table_sql } MODIFY COMMENT "{ self ._truncate_table_comment (table_comment )} "'
457
+ )
439
458
440
459
def _build_create_comment_column_exp (
441
460
self , table : exp .Table , column_name : str , column_comment : str , table_kind : str = "TABLE"
442
461
) -> exp .Comment | str :
443
462
table_sql = table .sql (dialect = self .dialect , identify = True )
444
463
return f'ALTER TABLE { table_sql } MODIFY COLUMN { column_name } COMMENT "{ self ._truncate_column_comment (column_comment )} "'
445
464
446
- def delete_from (self , table_name : TableName , where : t .Optional [t .Union [str , exp .Expression ]] = None ) -> None :
465
+ def delete_from (
466
+ self , table_name : TableName , where : t .Optional [t .Union [str , exp .Expression ]] = None
467
+ ) -> None :
447
468
"""
448
469
Delete from a table.
449
470
@@ -491,7 +512,11 @@ def _find_subquery_in_condition(
491
512
492
513
def _is_subquery_expression (self , expr : exp .Expression ) -> bool :
493
514
"""Check if expression contains a subquery."""
494
- return "query" in expr .args and expr .args ["query" ] and isinstance (expr .args ["query" ], exp .Subquery )
515
+ return (
516
+ "query" in expr .args
517
+ and expr .args ["query" ]
518
+ and isinstance (expr .args ["query" ], exp .Subquery )
519
+ )
495
520
496
521
def _execute_delete_with_subquery (
497
522
self , table_name : TableName , subquery_info : t .Tuple [exp .Expression , exp .Expression , bool ]
@@ -561,7 +586,9 @@ def _create_table_from_columns(
561
586
# Convert primary_key to unique_key for Doris (Doris doesn't support primary keys)
562
587
if primary_key and "unique_key" not in table_properties :
563
588
# Represent as a Tuple of columns to match downstream handling
564
- table_properties ["unique_key" ] = exp .Tuple (expressions = [exp .to_column (col ) for col in primary_key ])
589
+ table_properties ["unique_key" ] = exp .Tuple (
590
+ expressions = [exp .to_column (col ) for col in primary_key ]
591
+ )
565
592
566
593
# Update kwargs with the modified table_properties
567
594
kwargs ["table_properties" ] = table_properties
@@ -606,7 +633,9 @@ def to_raw_sql(expr: t.Union[exp.Literal, exp.Var, str, t.Any]) -> exp.Var:
606
633
if partitions :
607
634
if isinstance (partitions , exp .Tuple ):
608
635
create_expressions = [
609
- exp .Var (this = e .this , quoted = False ) if isinstance (e , exp .Literal ) else to_raw_sql (e )
636
+ exp .Var (this = e .this , quoted = False )
637
+ if isinstance (e , exp .Literal )
638
+ else to_raw_sql (e )
610
639
for e in partitions .expressions
611
640
]
612
641
elif isinstance (partitions , exp .Literal ):
@@ -645,13 +674,19 @@ def _build_table_properties_exp(
645
674
# Extract column names from Tuple expressions
646
675
column_names = []
647
676
for expr in unique_key .expressions :
648
- if isinstance (expr , exp .Column ) and hasattr (expr , "this" ) and hasattr (expr .this , "this" ):
677
+ if (
678
+ isinstance (expr , exp .Column )
679
+ and hasattr (expr , "this" )
680
+ and hasattr (expr .this , "this" )
681
+ ):
649
682
column_names .append (str (expr .this .this ))
650
683
elif hasattr (expr , "this" ):
651
684
column_names .append (str (expr .this ))
652
685
else :
653
686
column_names .append (str (expr ))
654
- properties .append (exp .UniqueKeyProperty (expressions = [exp .to_column (k ) for k in column_names ]))
687
+ properties .append (
688
+ exp .UniqueKeyProperty (expressions = [exp .to_column (k ) for k in column_names ])
689
+ )
655
690
elif isinstance (unique_key , exp .Column ):
656
691
# Handle as single column
657
692
if hasattr (unique_key , "this" ) and hasattr (unique_key .this , "this" ):
@@ -669,26 +704,38 @@ def _build_table_properties_exp(
669
704
# Extract column names from Tuple expressions
670
705
column_names = []
671
706
for expr in duplicate_key .expressions :
672
- if isinstance (expr , exp .Column ) and hasattr (expr , "this" ) and hasattr (expr .this , "this" ):
707
+ if (
708
+ isinstance (expr , exp .Column )
709
+ and hasattr (expr , "this" )
710
+ and hasattr (expr .this , "this" )
711
+ ):
673
712
column_names .append (str (expr .this .this ))
674
713
elif hasattr (expr , "this" ):
675
714
column_names .append (str (expr .this ))
676
715
else :
677
716
column_names .append (str (expr ))
678
- properties .append (exp .DuplicateKeyProperty (expressions = [exp .to_column (k ) for k in column_names ]))
717
+ properties .append (
718
+ exp .DuplicateKeyProperty (expressions = [exp .to_column (k ) for k in column_names ])
719
+ )
679
720
elif isinstance (duplicate_key , exp .Column ):
680
721
# Handle as single column
681
722
if hasattr (duplicate_key , "this" ) and hasattr (duplicate_key .this , "this" ):
682
723
column_name = str (duplicate_key .this .this )
683
724
else :
684
725
column_name = str (duplicate_key .this )
685
- properties .append (exp .DuplicateKeyProperty (expressions = [exp .to_column (column_name )]))
726
+ properties .append (
727
+ exp .DuplicateKeyProperty (expressions = [exp .to_column (column_name )])
728
+ )
686
729
elif isinstance (duplicate_key , str ):
687
- properties .append (exp .DuplicateKeyProperty (expressions = [exp .to_column (duplicate_key )]))
730
+ properties .append (
731
+ exp .DuplicateKeyProperty (expressions = [exp .to_column (duplicate_key )])
732
+ )
688
733
689
734
if table_description :
690
735
properties .append (
691
- exp .SchemaCommentProperty (this = exp .Literal .string (self ._truncate_table_comment (table_description )))
736
+ exp .SchemaCommentProperty (
737
+ this = exp .Literal .string (self ._truncate_table_comment (table_description ))
738
+ )
692
739
)
693
740
694
741
# Handle partitioning
@@ -700,10 +747,14 @@ def _build_table_properties_exp(
700
747
# Handle literal strings like "RANGE(col)" or "LIST(col)"
701
748
if isinstance (expr , exp .Literal ) and getattr (expr , "is_string" , False ):
702
749
text = str (expr .this )
703
- match = re .match (r"^\s*(RANGE|LIST)\s*\((.*?)\)\s*$" , text , flags = re .IGNORECASE )
750
+ match = re .match (
751
+ r"^\s*(RANGE|LIST)\s*\((.*?)\)\s*$" , text , flags = re .IGNORECASE
752
+ )
704
753
if match :
705
754
inner = match .group (2 )
706
- inner_cols = [c .strip ().strip ("`" ) for c in inner .split ("," ) if c .strip ()]
755
+ inner_cols = [
756
+ c .strip ().strip ("`" ) for c in inner .split ("," ) if c .strip ()
757
+ ]
707
758
for col in inner_cols :
708
759
normalized_partitioned_by .append (exp .to_column (col ))
709
760
continue
@@ -720,7 +771,11 @@ def _build_table_properties_exp(
720
771
key_cols_set = set ()
721
772
if isinstance (unique_key , exp .Tuple ):
722
773
for expr in unique_key .expressions :
723
- if isinstance (expr , exp .Column ) and hasattr (expr , "this" ) and hasattr (expr .this , "this" ):
774
+ if (
775
+ isinstance (expr , exp .Column )
776
+ and hasattr (expr , "this" )
777
+ and hasattr (expr .this , "this" )
778
+ ):
724
779
key_cols_set .add (str (expr .this .this ))
725
780
elif hasattr (expr , "this" ):
726
781
key_cols_set .add (str (expr .this ))
@@ -801,12 +856,16 @@ def _build_table_properties_exp(
801
856
elif isinstance (expr .expression , exp .Array ):
802
857
# Handle expressions array
803
858
distributed_info [key ] = [
804
- str (e .this ) for e in expr .expression .expressions if hasattr (e , "this" )
859
+ str (e .this )
860
+ for e in expr .expression .expressions
861
+ if hasattr (e , "this" )
805
862
]
806
863
elif isinstance (expr .expression , exp .Tuple ):
807
864
# Handle expressions tuple (array of strings)
808
865
distributed_info [key ] = [
809
- str (e .this ) for e in expr .expression .expressions if hasattr (e , "this" )
866
+ str (e .this )
867
+ for e in expr .expression .expressions
868
+ if hasattr (e , "this" )
810
869
]
811
870
else :
812
871
distributed_info [key ] = str (expr .expression )
@@ -859,13 +918,19 @@ def _build_table_properties_exp(
859
918
)
860
919
properties .append (prop )
861
920
else :
862
- unique_key_property = next ((prop for prop in properties if isinstance (prop , exp .UniqueKeyProperty )), None )
921
+ unique_key_property = next (
922
+ (prop for prop in properties if isinstance (prop , exp .UniqueKeyProperty )), None
923
+ )
863
924
if unique_key_property :
864
925
# Use the first column from unique_key as the distribution key
865
926
if unique_key_property .expressions :
866
927
first_col = unique_key_property .expressions [0 ]
867
- column_name = str (first_col .this ) if hasattr (first_col , "this" ) else str (first_col )
868
- logger .info (f"[Doris] Adding default distributed_by using unique_key column: { column_name } " )
928
+ column_name = (
929
+ str (first_col .this ) if hasattr (first_col , "this" ) else str (first_col )
930
+ )
931
+ logger .info (
932
+ f"[Doris] Adding default distributed_by using unique_key column: { column_name } "
933
+ )
869
934
properties .append (
870
935
exp .DistributedByProperty (
871
936
expressions = [exp .to_column (column_name )],
@@ -882,13 +947,9 @@ def _build_table_properties_exp(
882
947
return exp .Properties (expressions = properties )
883
948
return None
884
949
885
- def _get_temp_table (
886
- self , table : TableName , table_only : bool = False , quoted : bool = True , start_with : str = "__"
887
- ) -> exp .Table :
888
- """
889
- Returns the name of the temp table that should be used for the given table name.
890
- """
891
- return super ()._get_temp_table (table , table_only , quoted , start_with = "" )
950
+ def _get_temp_table_name (self , table : TableName ) -> str :
951
+ table_obj = exp .to_table (table )
952
+ return f"temp_{ table_obj .name } _{ random_id (short = True )} "
892
953
893
954
def _properties_to_expressions (self , properties : t .Dict [str , t .Any ]) -> t .List [exp .Expression ]:
894
955
"""Convert a dictionary of properties to a list of exp.Property expressions."""
0 commit comments