3131 VERSION_TABLE_NAME ,
3232 PIPELINE_STATE_TABLE_NAME ,
3333 ColumnPropInfos ,
34+ TColumnPropMergeType ,
3435 TColumnName ,
3536 TFileFormat ,
3637 TPartialTableSchema ,
@@ -154,6 +155,22 @@ def has_default_column_prop_value(prop: str, value: Any) -> bool:
154155 return value in (None , False )
155156
156157
158+ def has_merge_type (prop : str , merge_type : TColumnPropMergeType = "remove_if_empty" ) -> bool :
159+ if prop in ColumnPropInfos :
160+ return ColumnPropInfos [prop ].merge_type == merge_type
161+ return False
162+
163+
164+ def remove_column_props_with_merge_type (
165+ column_schema : TColumnSchema , merge_type : TColumnPropMergeType = "remove_if_empty"
166+ ) -> TColumnSchema :
167+ """Removes properties that have merge type remove if empty"""
168+ for prop in list (column_schema .keys ()):
169+ if has_merge_type (prop , merge_type ):
170+ column_schema .pop (prop ) # type: ignore
171+ return column_schema
172+
173+
157174def remove_column_defaults (column_schema : TColumnSchema ) -> TColumnSchema :
158175 """Removes default values from `column_schema` in place, returns the input for chaining"""
159176 # remove hints with default values
@@ -420,19 +437,39 @@ def diff_table_references(
420437
421438
422439def merge_column (
423- col_a : TColumnSchema , col_b : TColumnSchema , merge_defaults : bool = True
440+ col_a : TColumnSchema ,
441+ col_b : TColumnSchema ,
442+ merge_defaults : bool = True ,
443+ respect_merge_type : bool = True ,
424444) -> TColumnSchema :
425- """Merges ` col_b` into ` col_a`. if `merge_defaults` is True, only hints from `col_b` that are not default in `col_a` will be set .
445+ """Merges col_b into col_a in place. Returns col_a .
426446
427- Modifies col_a in place and returns it
447+ merge_defaults: If False, only merge non-default values from col_b
448+ respect_merge_type: If True, apply "remove_if_empty" merge rules to col_a properties
428449 """
429- col_b_clean = col_b if merge_defaults else remove_column_defaults (copy (col_b ))
430- for n , v in col_b_clean .items ():
431- col_a [n ] = v # type: ignore
450+
451+ col_b_clean = copy (col_b ) if merge_defaults else remove_column_defaults (copy (col_b ))
452+
453+ for prop in list (col_a .keys ()):
454+ if prop in col_b_clean :
455+ col_a [prop ] = col_b_clean .pop (prop ) # type: ignore
456+ else :
457+ if respect_merge_type and has_merge_type (prop , "remove_if_empty" ):
458+ col_a .pop (prop ) # type: ignore
459+
460+ for prop , value in col_b_clean .items ():
461+ col_a [prop ] = value # type: ignore
432462
433463 return col_a
434464
435465
466+ # col_b_clean = col_b if merge_defaults else remove_column_defaults(copy(col_b))
467+ # for n, v in col_b_clean.items():
468+ # col_a[n] = v # type: ignore
469+
470+ # return col_a
471+
472+
436473def merge_columns (
437474 columns_a : TTableSchemaColumns ,
438475 columns_b : TTableSchemaColumns ,
@@ -465,7 +502,10 @@ def merge_columns(
465502
466503
467504def diff_table (
468- schema_name : str , tab_a : TTableSchema , tab_b : TPartialTableSchema
505+ schema_name : str ,
506+ tab_a : TTableSchema ,
507+ tab_b : TPartialTableSchema ,
508+ respect_merge_type : bool = False ,
469509) -> TPartialTableSchema :
470510 """Creates a partial table that contains properties found in `tab_b` that are not present or different in `tab_a`.
471511 The name is always present in returned partial.
@@ -480,18 +520,22 @@ def diff_table(
480520 ensure_compatible_tables (schema_name , tab_a , tab_b , ensure_columns = False )
481521
482522 # get new columns, changes in the column data type or other properties are not allowed
483- tab_a_columns = tab_a ["columns" ]
523+ tab_a_columns = copy ( tab_a ["columns" ])
484524 new_columns : List [TColumnSchema ] = []
485525 for col_b_name , col_b in tab_b ["columns" ].items ():
486526 if col_b_name in tab_a_columns :
487- col_a = tab_a_columns [ col_b_name ]
527+ col_a = tab_a_columns . pop ( col_b_name )
488528 # all other properties can change
489- merged_column = merge_column (copy (col_a ), col_b )
529+ merged_column = merge_column (copy (col_a ), col_b , respect_merge_type = respect_merge_type )
490530 if merged_column != col_a :
491531 new_columns .append (merged_column )
492532 else :
493533 new_columns .append (col_b )
494534
535+ if respect_merge_type :
536+ for col_a in tab_a_columns .values ():
537+ remove_column_props_with_merge_type (col_a , "remove_if_empty" )
538+
495539 # return partial table containing only name and properties that differ (column, filters etc.)
496540 table_name = tab_a ["name" ]
497541
0 commit comments