Skip to content

Commit 4be4cc9

Browse files
committed
Merge branch 'main' of github.com:broadinstitute/seqr-loading-pipelines into benb/move_clickhouse_load_to_luigi
2 parents 994ff86 + 8087cbb commit 4be4cc9

File tree

2 files changed

+150
-12
lines changed

2 files changed

+150
-12
lines changed

v03_pipeline/lib/misc/clickhouse.py

Lines changed: 89 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import ast
12
import functools
23
import hashlib
34
import os
@@ -289,6 +290,53 @@ def get_create_mv_statements(
289290
)[0]
290291

291292

293+
def normalize_partition(partition: str) -> tuple:
294+
"""
295+
Ensure a ClickHouse partition expression is always returned as a tuple.
296+
'project_d' -> ('project_d',)
297+
"('project_d', 0)" -> ('project_d', 0)
298+
"""
299+
if not isinstance(partition, str):
300+
msg = f'Unsupported partition type: {type(partition)}'
301+
raise TypeError(msg)
302+
partition = partition.strip()
303+
if partition.startswith('(') and partition.endswith(')'):
304+
return ast.literal_eval(partition)
305+
return (partition,)
306+
307+
308+
def get_partitions_for_projects(
309+
table_name_builder: TableNameBuilder,
310+
clickhouse_table: ClickHouseTable,
311+
project_guids: list[str],
312+
staging=False,
313+
):
314+
rows = logged_query(
315+
"""
316+
SELECT DISTINCT partition
317+
FROM system.parts
318+
WHERE
319+
database = %(database)s
320+
AND table = %(table)s
321+
AND multiSearchAny(partition, %(project_guids)s)
322+
""",
323+
{
324+
'database': STAGING_CLICKHOUSE_DATABASE
325+
if staging
326+
else Env.CLICKHOUSE_DATABASE,
327+
'table': (
328+
table_name_builder.staging_dst_table(clickhouse_table)
329+
if staging
330+
else table_name_builder.dst_table(clickhouse_table)
331+
)
332+
.split('.')[1]
333+
.replace('`', ''),
334+
'project_guids': project_guids,
335+
},
336+
)
337+
return [normalize_partition(row[0]) for row in rows]
338+
339+
292340
def create_staging_materialized_views(
293341
table_name_builder: TableNameBuilder,
294342
clickhouse_mvs: list[ClickHouseMaterializedView],
@@ -324,15 +372,19 @@ def stage_existing_project_partitions(
324372
""",
325373
)
326374
continue
327-
for project_guid in project_guids:
375+
for partition in get_partitions_for_projects(
376+
table_name_builder,
377+
clickhouse_table,
378+
project_guids,
379+
):
328380
# Note that ClickHouse successfully handles the case where the project
329381
# does not already exist in the dst table. We simply attach an empty partition!
330382
logged_query(
331383
f"""
332384
ALTER TABLE {table_name_builder.staging_dst_table(clickhouse_table)}
333-
ATTACH PARTITION %(project_guid)s FROM {table_name_builder.dst_table(clickhouse_table)}
385+
ATTACH PARTITION %(partition)s FROM {table_name_builder.dst_table(clickhouse_table)}
334386
""",
335-
{'project_guid': project_guid},
387+
{'partition': partition},
336388
)
337389

338390

@@ -343,7 +395,7 @@ def delete_existing_families_from_staging_entries(
343395
logged_query(
344396
f"""
345397
INSERT INTO {table_name_builder.staging_dst_table(ClickHouseTable.ENTRIES)}
346-
SELECT COLUMNS('.*') EXCEPT(sign), -1 as sign
398+
SELECT COLUMNS('.*') EXCEPT(sign, n_partitions, partition_id), -1 as sign
347399
FROM {table_name_builder.staging_dst_table(ClickHouseTable.ENTRIES)}
348400
WHERE family_guid in %(family_guids)s
349401
""",
@@ -354,10 +406,25 @@ def delete_existing_families_from_staging_entries(
354406
def insert_new_entries(
355407
table_name_builder: TableNameBuilder,
356408
) -> None:
409+
dst_cols = [
410+
r[0]
411+
for r in logged_query(
412+
f'DESCRIBE TABLE {table_name_builder.staging_dst_table(ClickHouseTable.ENTRIES)}',
413+
)
414+
]
415+
src_cols = [
416+
r[0]
417+
for r in logged_query(
418+
f'DESCRIBE TABLE {table_name_builder.src_table(ClickHouseTable.ENTRIES)}',
419+
)
420+
]
421+
common = [c for c in dst_cols if c in src_cols]
422+
dst_list = ', '.join(common)
423+
src_list = ', '.join(common)
357424
logged_query(
358425
f"""
359-
INSERT INTO {table_name_builder.staging_dst_table(ClickHouseTable.ENTRIES)}
360-
SELECT *
426+
INSERT INTO {table_name_builder.staging_dst_table(ClickHouseTable.ENTRIES)} ({dst_list})
427+
SELECT {src_list}
361428
FROM {table_name_builder.src_table(ClickHouseTable.ENTRIES)}
362429
""",
363430
)
@@ -488,13 +555,18 @@ def replace_project_partitions(
488555
project_guids: list[str],
489556
) -> None:
490557
for clickhouse_table in clickhouse_tables:
491-
for project_guid in project_guids:
558+
for partition in get_partitions_for_projects(
559+
table_name_builder,
560+
clickhouse_table,
561+
project_guids,
562+
staging=True,
563+
):
492564
logged_query(
493565
f"""
494566
ALTER TABLE {table_name_builder.dst_table(clickhouse_table)}
495-
REPLACE PARTITION %(project_guid)s FROM {table_name_builder.staging_dst_table(clickhouse_table)}
567+
REPLACE PARTITION %(partition)s FROM {table_name_builder.staging_dst_table(clickhouse_table)}
496568
""",
497-
{'project_guid': project_guid},
569+
{'partition': partition},
498570
)
499571

500572

@@ -778,13 +850,18 @@ def rebuild_gt_stats(
778850
dataset_type,
779851
),
780852
)
781-
for project_guid in project_guids:
853+
for partition in get_partitions_for_projects(
854+
table_name_builder,
855+
ClickHouseTable.PROJECT_GT_STATS,
856+
project_guids,
857+
staging=True,
858+
):
782859
logged_query(
783860
f"""
784861
ALTER TABLE {table_name_builder.staging_dst_table(ClickHouseTable.PROJECT_GT_STATS)}
785-
DROP PARTITION %(project_guid)s
862+
DROP PARTITION %(partition)s
786863
""",
787-
{'project_guid': project_guid},
864+
{'partition': partition},
788865
)
789866
select_statement = get_create_mv_statements(
790867
table_name_builder,

v03_pipeline/lib/misc/clickhouse_test.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
insert_new_entries,
2424
load_complete_run,
2525
logged_query,
26+
normalize_partition,
2627
optimize_entries,
2728
rebuild_gt_stats,
2829
refresh_materialized_views,
@@ -427,6 +428,13 @@ def test_get_clickhouse_client(self):
427428
result = client.execute('SELECT 1')
428429
self.assertEqual(result[0][0], 1)
429430

431+
def test_normalize_partition(self):
432+
self.assertEqual(normalize_partition('project_d'), ('project_d',))
433+
self.assertEqual(
434+
normalize_partition("('project_d', 0)"),
435+
('project_d', 0),
436+
)
437+
430438
def test_table_name_builder(self):
431439
table_name_builder = TableNameBuilder(
432440
ReferenceGenome.GRCh38,
@@ -1152,3 +1160,56 @@ def test_rebuild_gt_stats(self):
11521160
""",
11531161
)
11541162
self.assertCountEqual(gt_stats, [(1, 0)])
1163+
1164+
def test_repartitioned_entries_table(self):
1165+
client = get_clickhouse_client()
1166+
client.execute(
1167+
f"""
1168+
REPLACE TABLE {Env.CLICKHOUSE_DATABASE}.`GRCh38/SNV_INDEL/entries` (
1169+
`key` UInt32,
1170+
`project_guid` LowCardinality(String),
1171+
`family_guid` String,
1172+
`xpos` UInt64 CODEC(Delta(8), ZSTD(1)),
1173+
`sample_type` Enum8('WES' = 0, 'WGS' = 1),
1174+
`is_annotated_in_any_gene` Boolean,
1175+
`geneId_ids` Array(UInt32),
1176+
`calls` Array(
1177+
Tuple(
1178+
sampleId String,
1179+
gt Nullable(Enum8('REF' = 0, 'HET' = 1, 'HOM' = 2)),
1180+
)
1181+
),
1182+
`sign` Int8,
1183+
`n_partitions` UInt8 MATERIALIZED 2,
1184+
`partition_id` UInt8 MATERIALIZED farmHash64(family_guid) % n_partitions,
1185+
PROJECTION xpos_projection
1186+
(
1187+
SELECT *
1188+
ORDER BY is_annotated_in_any_gene, xpos
1189+
)
1190+
)
1191+
ENGINE = CollapsingMergeTree(sign)
1192+
PARTITION BY (project_guid, partition_id)
1193+
ORDER BY (project_guid, family_guid, key)
1194+
SETTINGS deduplicate_merge_projection_mode = 'rebuild';
1195+
""",
1196+
)
1197+
load_complete_run(
1198+
ReferenceGenome.GRCh38,
1199+
DatasetType.SNV_INDEL,
1200+
TEST_RUN_ID,
1201+
['project_d'],
1202+
['family_d1', 'family_d2', 'family_d3'],
1203+
)
1204+
project_gt_stats = client.execute(
1205+
f"""
1206+
SELECT project_guid, sum(het_samples), sum(hom_samples)
1207+
FROM
1208+
{Env.CLICKHOUSE_DATABASE}.`GRCh38/SNV_INDEL/project_gt_stats`
1209+
GROUP BY project_guid
1210+
""",
1211+
)
1212+
self.assertCountEqual(
1213+
project_gt_stats,
1214+
[('project_d', 1, 1)],
1215+
)

0 commit comments

Comments
 (0)