diff --git a/v03_pipeline/lib/misc/clickhouse.py b/v03_pipeline/lib/misc/clickhouse.py index e9c305e46..1c807ce64 100644 --- a/v03_pipeline/lib/misc/clickhouse.py +++ b/v03_pipeline/lib/misc/clickhouse.py @@ -1,3 +1,4 @@ +import ast import functools import hashlib import os @@ -289,6 +290,53 @@ def get_create_mv_statements( )[0] +def normalize_partition(partition: str) -> tuple: + """ + Ensure a ClickHouse partition expression is always returned as a tuple. + 'project_d' -> ('project_d',) + "('project_d', 0)" -> ('project_d', 0) + """ + if not isinstance(partition, str): + msg = f'Unsupported partition type: {type(partition)}' + raise TypeError(msg) + partition = partition.strip() + if partition.startswith('(') and partition.endswith(')'): + return ast.literal_eval(partition) + return (partition,) + + +def get_partitions_for_projects( + table_name_builder: TableNameBuilder, + clickhouse_table: ClickHouseTable, + project_guids: list[str], + staging=False, +): + rows = logged_query( + """ + SELECT DISTINCT partition + FROM system.parts + WHERE + database = %(database)s + AND table = %(table)s + AND multiSearchAny(partition, %(project_guids)s) + """, + { + 'database': STAGING_CLICKHOUSE_DATABASE + if staging + else Env.CLICKHOUSE_DATABASE, + 'table': ( + table_name_builder.staging_dst_table(clickhouse_table) + if staging + else table_name_builder.dst_table(clickhouse_table) + ) + .split('.')[1] + .replace('`', ''), + 'project_guids': project_guids, + }, + ) + return [normalize_partition(row[0]) for row in rows] + + def create_staging_materialized_views( table_name_builder: TableNameBuilder, clickhouse_mvs: list[ClickHouseMaterializedView], @@ -324,15 +372,19 @@ def stage_existing_project_partitions( """, ) continue - for project_guid in project_guids: + for partition in get_partitions_for_projects( + table_name_builder, + clickhouse_table, + project_guids, + ): # Note that ClickHouse successfully handles the case where the project # does not already exist in the dst table. We simply attach an empty partition! logged_query( f""" ALTER TABLE {table_name_builder.staging_dst_table(clickhouse_table)} - ATTACH PARTITION %(project_guid)s FROM {table_name_builder.dst_table(clickhouse_table)} + ATTACH PARTITION %(partition)s FROM {table_name_builder.dst_table(clickhouse_table)} """, - {'project_guid': project_guid}, + {'partition': partition}, ) @@ -343,7 +395,7 @@ def delete_existing_families_from_staging_entries( logged_query( f""" INSERT INTO {table_name_builder.staging_dst_table(ClickHouseTable.ENTRIES)} - SELECT COLUMNS('.*') EXCEPT(sign), -1 as sign + SELECT COLUMNS('.*') EXCEPT(sign, n_partitions, partition_id), -1 as sign FROM {table_name_builder.staging_dst_table(ClickHouseTable.ENTRIES)} WHERE family_guid in %(family_guids)s """, @@ -354,10 +406,25 @@ def delete_existing_families_from_staging_entries( def insert_new_entries( table_name_builder: TableNameBuilder, ) -> None: + dst_cols = [ + r[0] + for r in logged_query( + f'DESCRIBE TABLE {table_name_builder.staging_dst_table(ClickHouseTable.ENTRIES)}', + ) + ] + src_cols = [ + r[0] + for r in logged_query( + f'DESCRIBE TABLE {table_name_builder.src_table(ClickHouseTable.ENTRIES)}', + ) + ] + common = [c for c in dst_cols if c in src_cols] + dst_list = ', '.join(common) + src_list = ', '.join(common) logged_query( f""" - INSERT INTO {table_name_builder.staging_dst_table(ClickHouseTable.ENTRIES)} - SELECT * + INSERT INTO {table_name_builder.staging_dst_table(ClickHouseTable.ENTRIES)} ({dst_list}) + SELECT {src_list} FROM {table_name_builder.src_table(ClickHouseTable.ENTRIES)} """, ) @@ -488,13 +555,18 @@ def replace_project_partitions( project_guids: list[str], ) -> None: for clickhouse_table in clickhouse_tables: - for project_guid in project_guids: + for partition in get_partitions_for_projects( + table_name_builder, + clickhouse_table, + project_guids, + staging=True, + ): logged_query( f""" ALTER TABLE {table_name_builder.dst_table(clickhouse_table)} - REPLACE PARTITION %(project_guid)s FROM {table_name_builder.staging_dst_table(clickhouse_table)} + REPLACE PARTITION %(partition)s FROM {table_name_builder.staging_dst_table(clickhouse_table)} """, - {'project_guid': project_guid}, + {'partition': partition}, ) @@ -778,13 +850,18 @@ def rebuild_gt_stats( dataset_type, ), ) - for project_guid in project_guids: + for partition in get_partitions_for_projects( + table_name_builder, + ClickHouseTable.PROJECT_GT_STATS, + project_guids, + staging=True, + ): logged_query( f""" ALTER TABLE {table_name_builder.staging_dst_table(ClickHouseTable.PROJECT_GT_STATS)} - DROP PARTITION %(project_guid)s + DROP PARTITION %(partition)s """, - {'project_guid': project_guid}, + {'partition': partition}, ) select_statement = get_create_mv_statements( table_name_builder, diff --git a/v03_pipeline/lib/misc/clickhouse_test.py b/v03_pipeline/lib/misc/clickhouse_test.py index 947f83dea..7821676b8 100644 --- a/v03_pipeline/lib/misc/clickhouse_test.py +++ b/v03_pipeline/lib/misc/clickhouse_test.py @@ -23,6 +23,7 @@ insert_new_entries, load_complete_run, logged_query, + normalize_partition, optimize_entries, rebuild_gt_stats, refresh_materialized_views, @@ -427,6 +428,13 @@ def test_get_clickhouse_client(self): result = client.execute('SELECT 1') self.assertEqual(result[0][0], 1) + def test_normalize_partition(self): + self.assertEqual(normalize_partition('project_d'), ('project_d',)) + self.assertEqual( + normalize_partition("('project_d', 0)"), + ('project_d', 0), + ) + def test_table_name_builder(self): table_name_builder = TableNameBuilder( ReferenceGenome.GRCh38, @@ -1152,3 +1160,56 @@ def test_rebuild_gt_stats(self): """, ) self.assertCountEqual(gt_stats, [(1, 0)]) + + def test_repartitioned_entries_table(self): + client = get_clickhouse_client() + client.execute( + f""" + REPLACE TABLE {Env.CLICKHOUSE_DATABASE}.`GRCh38/SNV_INDEL/entries` ( + `key` UInt32, + `project_guid` LowCardinality(String), + `family_guid` String, + `xpos` UInt64 CODEC(Delta(8), ZSTD(1)), + `sample_type` Enum8('WES' = 0, 'WGS' = 1), + `is_annotated_in_any_gene` Boolean, + `geneId_ids` Array(UInt32), + `calls` Array( + Tuple( + sampleId String, + gt Nullable(Enum8('REF' = 0, 'HET' = 1, 'HOM' = 2)), + ) + ), + `sign` Int8, + `n_partitions` UInt8 MATERIALIZED 2, + `partition_id` UInt8 MATERIALIZED farmHash64(family_guid) % n_partitions, + PROJECTION xpos_projection + ( + SELECT * + ORDER BY is_annotated_in_any_gene, xpos + ) + ) + ENGINE = CollapsingMergeTree(sign) + PARTITION BY (project_guid, partition_id) + ORDER BY (project_guid, family_guid, key) + SETTINGS deduplicate_merge_projection_mode = 'rebuild'; + """, + ) + load_complete_run( + ReferenceGenome.GRCh38, + DatasetType.SNV_INDEL, + TEST_RUN_ID, + ['project_d'], + ['family_d1', 'family_d2', 'family_d3'], + ) + project_gt_stats = client.execute( + f""" + SELECT project_guid, sum(het_samples), sum(hom_samples) + FROM + {Env.CLICKHOUSE_DATABASE}.`GRCh38/SNV_INDEL/project_gt_stats` + GROUP BY project_guid + """, + ) + self.assertCountEqual( + project_gt_stats, + [('project_d', 1, 1)], + )