3131 VERSION_TABLE_NAME ,
3232 PIPELINE_STATE_TABLE_NAME ,
3333 ColumnPropInfos ,
34+ TColumnPropMergeType ,
3435 TColumnName ,
3536 TFileFormat ,
3637 TPartialTableSchema ,
@@ -154,6 +155,22 @@ def has_default_column_prop_value(prop: str, value: Any) -> bool:
154155 return value in (None , False )
155156
156157
158+ def has_merge_type (prop : str , merge_type : TColumnPropMergeType = "remove_if_empty" ) -> bool :
159+ if prop in ColumnPropInfos :
160+ return ColumnPropInfos [prop ].merge_type == merge_type
161+ return False
162+
163+
164+ def remove_column_props_with_merge_type (
165+ column_schema : TColumnSchema , merge_type : TColumnPropMergeType = "remove_if_empty"
166+ ) -> TColumnSchema :
167+ """Removes properties that have merge type remove if empty"""
168+ for prop in list (column_schema .keys ()):
169+ if has_merge_type (prop , merge_type ):
170+ column_schema .pop (prop ) # type: ignore
171+ return column_schema
172+
173+
157174def remove_column_defaults (column_schema : TColumnSchema ) -> TColumnSchema :
158175 """Removes default values from `column_schema` in place, returns the input for chaining"""
159176 # remove hints with default values
@@ -420,24 +437,45 @@ def diff_table_references(
420437
421438
422439def merge_column (
423- col_a : TColumnSchema , col_b : TColumnSchema , merge_defaults : bool = True
440+ col_a : TColumnSchema ,
441+ col_b : TColumnSchema ,
442+ merge_defaults : bool = True ,
443+ respect_merge_type : bool = False ,
424444) -> TColumnSchema :
425- """Merges ` col_b` into ` col_a`. if `merge_defaults` is True, only hints from `col_b` that are not default in `col_a` will be set .
445+ """Merges col_b into col_a in place. Returns col_a .
426446
427- Modifies col_a in place and returns it
447+ merge_defaults: If False, only merge non-default values from col_b
448+ respect_merge_type: If True, apply "remove_if_empty" merge rules to col_a properties
428449 """
429- col_b_clean = col_b if merge_defaults else remove_column_defaults (copy (col_b ))
430- for n , v in col_b_clean .items ():
431- col_a [n ] = v # type: ignore
450+
451+ col_b_clean = copy (col_b ) if merge_defaults else remove_column_defaults (copy (col_b ))
452+
453+ for prop in list (col_a .keys ()):
454+ if prop in col_b_clean :
455+ col_a [prop ] = col_b_clean .pop (prop ) # type: ignore
456+ else :
457+ if respect_merge_type and has_merge_type (prop , "remove_if_empty" ):
458+ col_a .pop (prop ) # type: ignore
459+
460+ for prop , value in col_b_clean .items ():
461+ col_a [prop ] = value # type: ignore
432462
433463 return col_a
434464
435465
466+ # col_b_clean = col_b if merge_defaults else remove_column_defaults(copy(col_b))
467+ # for n, v in col_b_clean.items():
468+ # col_a[n] = v # type: ignore
469+
470+ # return col_a
471+
472+
436473def merge_columns (
437474 columns_a : TTableSchemaColumns ,
438475 columns_b : TTableSchemaColumns ,
439476 merge_columns : bool = False ,
440477 columns_partial : bool = True ,
478+ respect_merge_type : bool = False ,
441479) -> TTableSchemaColumns :
442480 """Merges `columns_a` with `columns_b`. `columns_a` is modified in place.
443481
@@ -458,14 +496,17 @@ def merge_columns(
458496 if column_a and not is_complete_column (column_a ):
459497 columns_a .pop (col_name )
460498 if column_a and merge_columns :
461- column_b = merge_column (column_a , column_b )
499+ column_b = merge_column (column_a , column_b , respect_merge_type )
462500 # set new or updated column
463501 columns_a [col_name ] = column_b
464502 return columns_a
465503
466504
467505def diff_table (
468- schema_name : str , tab_a : TTableSchema , tab_b : TPartialTableSchema
506+ schema_name : str ,
507+ tab_a : TTableSchema ,
508+ tab_b : TPartialTableSchema ,
509+ respect_merge_type : bool = False ,
469510) -> TPartialTableSchema :
470511 """Creates a partial table that contains properties found in `tab_b` that are not present or different in `tab_a`.
471512 The name is always present in returned partial.
@@ -480,18 +521,22 @@ def diff_table(
480521 ensure_compatible_tables (schema_name , tab_a , tab_b , ensure_columns = False )
481522
482523 # get new columns, changes in the column data type or other properties are not allowed
483- tab_a_columns = tab_a ["columns" ]
524+ tab_a_columns = copy ( tab_a ["columns" ])
484525 new_columns : List [TColumnSchema ] = []
485526 for col_b_name , col_b in tab_b ["columns" ].items ():
486527 if col_b_name in tab_a_columns :
487- col_a = tab_a_columns [ col_b_name ]
528+ col_a = tab_a_columns . pop ( col_b_name )
488529 # all other properties can change
489- merged_column = merge_column (copy (col_a ), col_b )
530+ merged_column = merge_column (copy (col_a ), col_b , respect_merge_type = respect_merge_type )
490531 if merged_column != col_a :
491532 new_columns .append (merged_column )
492533 else :
493534 new_columns .append (col_b )
494535
536+ if respect_merge_type :
537+ for col_a in tab_a_columns .values ():
538+ remove_column_props_with_merge_type (col_a , "remove_if_empty" )
539+
495540 # return partial table containing only name and properties that differ (column, filters etc.)
496541 table_name = tab_a ["name" ]
497542
0 commit comments