From 994ff86792366530b73a1359872ed81cef505b47 Mon Sep 17 00:00:00 2001 From: Benjamin Blankenmeister Date: Wed, 29 Oct 2025 14:35:55 -0400 Subject: [PATCH 1/3] move clickhouse load to luigi --- v03_pipeline/api/request_handlers.py | 72 +++---------------- v03_pipeline/bin/clickhouse_loader.py | 36 +++++++++- .../tasks/load_complete_run_to_clickhouse.py | 63 ++++++++++++++++ .../write_clickhouse_load_success_file.py | 30 ++++++++ 4 files changed, 136 insertions(+), 65 deletions(-) create mode 100644 v03_pipeline/lib/tasks/load_complete_run_to_clickhouse.py create mode 100644 v03_pipeline/lib/tasks/write_clickhouse_load_success_file.py diff --git a/v03_pipeline/api/request_handlers.py b/v03_pipeline/api/request_handlers.py index 6c86fcd32..ea35284c6 100644 --- a/v03_pipeline/api/request_handlers.py +++ b/v03_pipeline/api/request_handlers.py @@ -1,8 +1,6 @@ -import json from collections.abc import Callable from typing import Any -import hailtop.fs as hfs import luigi import luigi.execution_summary @@ -12,63 +10,20 @@ PipelineRunnerRequest, RebuildGtStatsRequest, ) -from v03_pipeline.lib.core import DatasetType, FeatureFlag, ReferenceGenome +from v03_pipeline.lib.core import DatasetType, FeatureFlag from v03_pipeline.lib.logger import get_logger from v03_pipeline.lib.misc.clickhouse import ( delete_family_guids, - load_complete_run, rebuild_gt_stats, ) -from v03_pipeline.lib.misc.retry import retry -from v03_pipeline.lib.paths import ( - clickhouse_load_success_file_path, - metadata_for_run_path, +from v03_pipeline.lib.tasks.write_clickhouse_load_success_file import ( + WriteClickhouseLoadSuccessFileTask, ) from v03_pipeline.lib.tasks.write_success_file import WriteSuccessFileTask logger = get_logger(__name__) -@retry() -def fetch_run_metadata( - reference_genome: ReferenceGenome, - dataset_type: DatasetType, - run_id: str, -) -> tuple[list[str], list[str]]: - # Run metadata - with hfs.open( - metadata_for_run_path( - reference_genome, - dataset_type, - run_id, - ), - 'r', - ) as f: - metadata_json = json.load(f) - project_guids = metadata_json['project_guids'] - family_guids = list(metadata_json['family_samples'].keys()) - return project_guids, family_guids - - -@retry() -def write_success_file( - reference_genome: ReferenceGenome, - dataset_type: DatasetType, - run_id: str, -): - with hfs.open( - clickhouse_load_success_file_path( - reference_genome, - dataset_type, - run_id, - ), - 'w', - ) as f: - f.write('') - msg = f'Successfully loaded {reference_genome.value}/{dataset_type.value}/{run_id}' - logger.info(msg) - - def run_loading_pipeline( lpr: LoadingPipelineRequest, run_id: str, @@ -82,6 +37,13 @@ def run_loading_pipeline( run_id=run_id, attempt_id=attempt_id, **lpr.model_dump(exclude='request_type'), + ) + if FeatureFlag.CLICKHOUSE_LOADER_DISABLED + else ( + WriteClickhouseLoadSuccessFileTask( + run_id=run_id, + **lpr.model_dump(exclude='request_type'), + ) ), ], detailed_summary=True, @@ -94,20 +56,6 @@ def run_loading_pipeline( break else: raise RuntimeError(luigi_task_result.status.value[1]) - if FeatureFlag.CLICKHOUSE_LOADER_DISABLED: - project_guids, family_guids = fetch_run_metadata( - lpr.reference_genome, - lpr.dataset_type, - run_id, - ) - load_complete_run( - lpr.reference_genome, - lpr.dataset_type, - run_id, - project_guids, - family_guids, - ) - write_success_file(lpr.reference_genome, lpr.dataset_type, run_id) def run_delete_families(dpr: DeleteFamiliesRequest, run_id: str, *_: Any): diff --git a/v03_pipeline/bin/clickhouse_loader.py b/v03_pipeline/bin/clickhouse_loader.py index db519b25a..3c3cb8396 100755 --- a/v03_pipeline/bin/clickhouse_loader.py +++ b/v03_pipeline/bin/clickhouse_loader.py @@ -1,12 +1,12 @@ #!/usr/bin/env python3 +import json import signal import sys import time import hailtop.fs as hfs -from v03_pipeline.api.request_handlers import fetch_run_metadata, write_success_file -from v03_pipeline.lib.core import FeatureFlag +from v03_pipeline.lib.core import DatasetType, FeatureFlag, ReferenceGenome from v03_pipeline.lib.logger import get_logger from v03_pipeline.lib.misc.clickhouse import ( drop_staging_db, @@ -15,6 +15,8 @@ from v03_pipeline.lib.misc.runs import get_run_ids from v03_pipeline.lib.paths import ( clickhouse_load_fail_file_path, + clickhouse_load_success_file_path, + metadata_for_run_path, ) logger = get_logger(__name__) @@ -31,6 +33,26 @@ def signal_handler(*_): signal.signal(signal.SIGTERM, signal_handler) +def fetch_run_metadata( + reference_genome: ReferenceGenome, + dataset_type: DatasetType, + run_id: str, +) -> tuple[list[str], list[str]]: + # Run metadata + with hfs.open( + metadata_for_run_path( + reference_genome, + dataset_type, + run_id, + ), + 'r', + ) as f: + metadata_json = json.load(f) + project_guids = metadata_json['project_guids'] + family_guids = list(metadata_json['family_samples'].keys()) + return project_guids, family_guids + + def main(): reference_genome, dataset_type, run_id = None, None, None while True: @@ -69,7 +91,15 @@ def main(): project_guids, family_guids, ) - write_success_file(reference_genome, dataset_type, run_id) + with hfs.open( + clickhouse_load_success_file_path( + reference_genome, + dataset_type, + run_id, + ), + 'w', + ) as f: + f.write('') except Exception: logger.exception('Unhandled Exception') if reference_genome and dataset_type and run_id: diff --git a/v03_pipeline/lib/tasks/load_complete_run_to_clickhouse.py b/v03_pipeline/lib/tasks/load_complete_run_to_clickhouse.py new file mode 100644 index 000000000..e9840948f --- /dev/null +++ b/v03_pipeline/lib/tasks/load_complete_run_to_clickhouse.py @@ -0,0 +1,63 @@ +import json + +import hailtop.fs as hfs +import luigi +import luigi.util + +from v03_pipeline.lib.misc.clickhouse import ( + ClickHouseTable, + TableNameBuilder, + load_complete_run, + logged_query, +) +from v03_pipeline.lib.paths import metadata_for_run_path +from v03_pipeline.lib.tasks.base.base_loading_run_params import ( + BaseLoadingRunParams, +) +from v03_pipeline.lib.tasks.write_success_file import WriteSuccessFile + + +@luigi.util.inherits(BaseLoadingRunParams) +class LoadCompleteRunToClickhouse(luigi.Task): + def requires(self) -> luigi.Task: + # Note retries happen within the ClickHouse Load + return [self.clone(WriteSuccessFile, attempt_id=0)] + + def complete(self): + table_name_builder = TableNameBuilder( + self.reference_genome, + self.dataset_type, + self.run_id, + ) + max_key_src = logged_query( + f""" + SELECT max(key) FROM {table_name_builder.src_table(ClickHouseTable.ANNOTATIONS_MEMORY)} + """, + )[0][0] + return logged_query( + f""" + SELECT EXISTS ( + SELECT 1 + FROM {table_name_builder.dst_table(ClickHouseTable.ANNOTATIONS_MEMORY)} + WHERE key = %(max_key_src) + ); + """, + {'max_key_src': max_key_src}, + )[0][0] + + def run(self): + with hfs.open( + metadata_for_run_path( + self.reference_genome, + self.dataset_type, + self.run_id, + ), + ) as f: + family_guids = list(json.load(f)['family_samples'].keys()) + load_complete_run( + self.reference_genome, + self.dataset_type, + self.run_id, + self.project_guids, + family_guids, + ) diff --git a/v03_pipeline/lib/tasks/write_clickhouse_load_success_file.py b/v03_pipeline/lib/tasks/write_clickhouse_load_success_file.py new file mode 100644 index 000000000..22e899f4a --- /dev/null +++ b/v03_pipeline/lib/tasks/write_clickhouse_load_success_file.py @@ -0,0 +1,30 @@ +import luigi +import luigi.util + +from v03_pipeline.lib.paths import clickhouse_load_success_file_path +from v03_pipeline.lib.tasks.base.base_loading_run_params import ( + BaseLoadingRunParams, +) +from v03_pipeline.lib.tasks.files import GCSorLocalTarget +from v03_pipeline.lib.tasks.load_complete_run_to_clickhouse import ( + LoadCompleteRunToClickhouse, +) + + +@luigi.util.inherits(BaseLoadingRunParams) +class WriteClickhouseLoadSuccessFileTask(luigi.Task): + def output(self) -> luigi.Target: + return GCSorLocalTarget( + clickhouse_load_success_file_path( + self.reference_genome, + self.dataset_type, + self.run_id, + ), + ) + + def requires(self) -> luigi.Task: + return self.clone(LoadCompleteRunToClickhouse) + + def run(self): + with self.output().open('w') as f: + f.write('') From f7984c606e859e8327b00c57cca75a03d4070902 Mon Sep 17 00:00:00 2001 From: Benjamin Blankenmeister Date: Wed, 29 Oct 2025 17:18:30 -0400 Subject: [PATCH 2/3] fix import --- v03_pipeline/lib/tasks/load_complete_run_to_clickhouse.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/v03_pipeline/lib/tasks/load_complete_run_to_clickhouse.py b/v03_pipeline/lib/tasks/load_complete_run_to_clickhouse.py index e9840948f..d3fd5ff0b 100644 --- a/v03_pipeline/lib/tasks/load_complete_run_to_clickhouse.py +++ b/v03_pipeline/lib/tasks/load_complete_run_to_clickhouse.py @@ -14,14 +14,14 @@ from v03_pipeline.lib.tasks.base.base_loading_run_params import ( BaseLoadingRunParams, ) -from v03_pipeline.lib.tasks.write_success_file import WriteSuccessFile +from v03_pipeline.lib.tasks.write_success_file import WriteSuccessFileTask @luigi.util.inherits(BaseLoadingRunParams) class LoadCompleteRunToClickhouse(luigi.Task): def requires(self) -> luigi.Task: # Note retries happen within the ClickHouse Load - return [self.clone(WriteSuccessFile, attempt_id=0)] + return [self.clone(WriteSuccessFileTask, attempt_id=0)] def complete(self): table_name_builder = TableNameBuilder( From ca46bd4c613ba1552423babebde042e89727fb07 Mon Sep 17 00:00:00 2001 From: Benjamin Blankenmeister Date: Wed, 29 Oct 2025 17:56:08 -0400 Subject: [PATCH 3/3] fix mocking --- v03_pipeline/bin/pipeline_worker_test.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/v03_pipeline/bin/pipeline_worker_test.py b/v03_pipeline/bin/pipeline_worker_test.py index e0fa27ddb..ac8a8646d 100644 --- a/v03_pipeline/bin/pipeline_worker_test.py +++ b/v03_pipeline/bin/pipeline_worker_test.py @@ -27,12 +27,12 @@ def output(self): class PipelineWorkerTest(MockedDatarootTestCase): @patch('v03_pipeline.lib.misc.slack._safe_post_to_slack') - @patch('v03_pipeline.api.request_handlers.WriteSuccessFileTask') + @patch('v03_pipeline.api.request_handlers.WriteClickhouseLoadSuccessFileTask') @patch('v03_pipeline.bin.pipeline_worker.logger') def test_process_queue( self, mock_logger, - mock_write_success_file_task, + mock_write_clickhouse_load_success_file_task, mock_safe_post_to_slack, ): raw_request = { @@ -43,7 +43,7 @@ def test_process_queue( 'reference_genome': ReferenceGenome.GRCh38.value, 'dataset_type': DatasetType.SNV_INDEL.value, } - mock_write_success_file_task.return_value = MockCompleteTask() + mock_write_clickhouse_load_success_file_task.return_value = MockCompleteTask() os.makedirs( loading_pipeline_queue_dir(), exist_ok=True, @@ -62,12 +62,12 @@ def test_process_queue( ) @patch('v03_pipeline.lib.misc.slack._safe_post_to_slack') - @patch('v03_pipeline.api.request_handlers.WriteSuccessFileTask') + @patch('v03_pipeline.api.request_handlers.WriteClickhouseLoadSuccessFileTask') @patch('v03_pipeline.bin.pipeline_worker.logger') def test_process_failure( self, mock_logger, - mock_write_success_file_task, + mock_write_clickhouse_load_success_file_task, mock_safe_post_to_slack, ): raw_request = { @@ -78,7 +78,7 @@ def test_process_failure( 'reference_genome': ReferenceGenome.GRCh38.value, 'dataset_type': DatasetType.SNV_INDEL.value, } - mock_write_success_file_task.return_value = MyFailingTask() + mock_write_clickhouse_load_success_file_task.return_value = MyFailingTask() os.makedirs( loading_pipeline_queue_dir(), exist_ok=True,