Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 10 additions & 62 deletions v03_pipeline/api/request_handlers.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import json
from collections.abc import Callable
from typing import Any

import hailtop.fs as hfs
import luigi
import luigi.execution_summary

Expand All @@ -12,63 +10,20 @@
PipelineRunnerRequest,
RebuildGtStatsRequest,
)
from v03_pipeline.lib.core import DatasetType, FeatureFlag, ReferenceGenome
from v03_pipeline.lib.core import DatasetType, FeatureFlag
from v03_pipeline.lib.logger import get_logger
from v03_pipeline.lib.misc.clickhouse import (
delete_family_guids,
load_complete_run,
rebuild_gt_stats,
)
from v03_pipeline.lib.misc.retry import retry
from v03_pipeline.lib.paths import (
clickhouse_load_success_file_path,
metadata_for_run_path,
from v03_pipeline.lib.tasks.write_clickhouse_load_success_file import (
WriteClickhouseLoadSuccessFileTask,
)
from v03_pipeline.lib.tasks.write_success_file import WriteSuccessFileTask

logger = get_logger(__name__)


@retry()
def fetch_run_metadata(
reference_genome: ReferenceGenome,
dataset_type: DatasetType,
run_id: str,
) -> tuple[list[str], list[str]]:
# Run metadata
with hfs.open(
metadata_for_run_path(
reference_genome,
dataset_type,
run_id,
),
'r',
) as f:
metadata_json = json.load(f)
project_guids = metadata_json['project_guids']
family_guids = list(metadata_json['family_samples'].keys())
return project_guids, family_guids


@retry()
def write_success_file(
reference_genome: ReferenceGenome,
dataset_type: DatasetType,
run_id: str,
):
with hfs.open(
clickhouse_load_success_file_path(
reference_genome,
dataset_type,
run_id,
),
'w',
) as f:
f.write('')
msg = f'Successfully loaded {reference_genome.value}/{dataset_type.value}/{run_id}'
logger.info(msg)


def run_loading_pipeline(
lpr: LoadingPipelineRequest,
run_id: str,
Expand All @@ -82,6 +37,13 @@ def run_loading_pipeline(
run_id=run_id,
attempt_id=attempt_id,
**lpr.model_dump(exclude='request_type'),
)
if FeatureFlag.CLICKHOUSE_LOADER_DISABLED
else (
WriteClickhouseLoadSuccessFileTask(
run_id=run_id,
**lpr.model_dump(exclude='request_type'),
)
),
],
detailed_summary=True,
Expand All @@ -94,20 +56,6 @@ def run_loading_pipeline(
break
else:
raise RuntimeError(luigi_task_result.status.value[1])
if FeatureFlag.CLICKHOUSE_LOADER_DISABLED:
project_guids, family_guids = fetch_run_metadata(
lpr.reference_genome,
lpr.dataset_type,
run_id,
)
load_complete_run(
lpr.reference_genome,
lpr.dataset_type,
run_id,
project_guids,
family_guids,
)
write_success_file(lpr.reference_genome, lpr.dataset_type, run_id)


def run_delete_families(dpr: DeleteFamiliesRequest, run_id: str, *_: Any):
Expand Down
36 changes: 33 additions & 3 deletions v03_pipeline/bin/clickhouse_loader.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
#!/usr/bin/env python3
import json
import signal
import sys
import time

import hailtop.fs as hfs

from v03_pipeline.api.request_handlers import fetch_run_metadata, write_success_file
from v03_pipeline.lib.core import FeatureFlag
from v03_pipeline.lib.core import DatasetType, FeatureFlag, ReferenceGenome
from v03_pipeline.lib.logger import get_logger
from v03_pipeline.lib.misc.clickhouse import (
drop_staging_db,
Expand All @@ -15,6 +15,8 @@
from v03_pipeline.lib.misc.runs import get_run_ids
from v03_pipeline.lib.paths import (
clickhouse_load_fail_file_path,
clickhouse_load_success_file_path,
metadata_for_run_path,
)

logger = get_logger(__name__)
Expand All @@ -31,6 +33,26 @@ def signal_handler(*_):
signal.signal(signal.SIGTERM, signal_handler)


def fetch_run_metadata(
reference_genome: ReferenceGenome,
dataset_type: DatasetType,
run_id: str,
) -> tuple[list[str], list[str]]:
# Run metadata
with hfs.open(
metadata_for_run_path(
reference_genome,
dataset_type,
run_id,
),
'r',
) as f:
metadata_json = json.load(f)
project_guids = metadata_json['project_guids']
family_guids = list(metadata_json['family_samples'].keys())
return project_guids, family_guids


def main():
reference_genome, dataset_type, run_id = None, None, None
while True:
Expand Down Expand Up @@ -69,7 +91,15 @@ def main():
project_guids,
family_guids,
)
write_success_file(reference_genome, dataset_type, run_id)
with hfs.open(
clickhouse_load_success_file_path(
reference_genome,
dataset_type,
run_id,
),
'w',
) as f:
f.write('')
except Exception:
logger.exception('Unhandled Exception')
if reference_genome and dataset_type and run_id:
Expand Down
12 changes: 6 additions & 6 deletions v03_pipeline/bin/pipeline_worker_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ def output(self):

class PipelineWorkerTest(MockedDatarootTestCase):
@patch('v03_pipeline.lib.misc.slack._safe_post_to_slack')
@patch('v03_pipeline.api.request_handlers.WriteSuccessFileTask')
@patch('v03_pipeline.api.request_handlers.WriteClickhouseLoadSuccessFileTask')
@patch('v03_pipeline.bin.pipeline_worker.logger')
def test_process_queue(
self,
mock_logger,
mock_write_success_file_task,
mock_write_clickhouse_load_success_file_task,
mock_safe_post_to_slack,
):
raw_request = {
Expand All @@ -43,7 +43,7 @@ def test_process_queue(
'reference_genome': ReferenceGenome.GRCh38.value,
'dataset_type': DatasetType.SNV_INDEL.value,
}
mock_write_success_file_task.return_value = MockCompleteTask()
mock_write_clickhouse_load_success_file_task.return_value = MockCompleteTask()
os.makedirs(
loading_pipeline_queue_dir(),
exist_ok=True,
Expand All @@ -62,12 +62,12 @@ def test_process_queue(
)

@patch('v03_pipeline.lib.misc.slack._safe_post_to_slack')
@patch('v03_pipeline.api.request_handlers.WriteSuccessFileTask')
@patch('v03_pipeline.api.request_handlers.WriteClickhouseLoadSuccessFileTask')
@patch('v03_pipeline.bin.pipeline_worker.logger')
def test_process_failure(
self,
mock_logger,
mock_write_success_file_task,
mock_write_clickhouse_load_success_file_task,
mock_safe_post_to_slack,
):
raw_request = {
Expand All @@ -78,7 +78,7 @@ def test_process_failure(
'reference_genome': ReferenceGenome.GRCh38.value,
'dataset_type': DatasetType.SNV_INDEL.value,
}
mock_write_success_file_task.return_value = MyFailingTask()
mock_write_clickhouse_load_success_file_task.return_value = MyFailingTask()
os.makedirs(
loading_pipeline_queue_dir(),
exist_ok=True,
Expand Down
63 changes: 63 additions & 0 deletions v03_pipeline/lib/tasks/load_complete_run_to_clickhouse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import json

import hailtop.fs as hfs
import luigi
import luigi.util

from v03_pipeline.lib.misc.clickhouse import (
ClickHouseTable,
TableNameBuilder,
load_complete_run,
logged_query,
)
from v03_pipeline.lib.paths import metadata_for_run_path
from v03_pipeline.lib.tasks.base.base_loading_run_params import (
BaseLoadingRunParams,
)
from v03_pipeline.lib.tasks.write_success_file import WriteSuccessFileTask


@luigi.util.inherits(BaseLoadingRunParams)
class LoadCompleteRunToClickhouse(luigi.Task):
def requires(self) -> luigi.Task:
# Note retries happen within the ClickHouse Load
return [self.clone(WriteSuccessFileTask, attempt_id=0)]

def complete(self):
table_name_builder = TableNameBuilder(
self.reference_genome,
self.dataset_type,
self.run_id,
)
max_key_src = logged_query(
f"""
SELECT max(key) FROM {table_name_builder.src_table(ClickHouseTable.ANNOTATIONS_MEMORY)}
""",
)[0][0]
return logged_query(
f"""
SELECT EXISTS (
SELECT 1
FROM {table_name_builder.dst_table(ClickHouseTable.ANNOTATIONS_MEMORY)}
WHERE key = %(max_key_src)
);
""",
{'max_key_src': max_key_src},
)[0][0]

def run(self):
with hfs.open(
metadata_for_run_path(
self.reference_genome,
self.dataset_type,
self.run_id,
),
) as f:
family_guids = list(json.load(f)['family_samples'].keys())
load_complete_run(
self.reference_genome,
self.dataset_type,
self.run_id,
self.project_guids,
family_guids,
)
30 changes: 30 additions & 0 deletions v03_pipeline/lib/tasks/write_clickhouse_load_success_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import luigi
import luigi.util

from v03_pipeline.lib.paths import clickhouse_load_success_file_path
from v03_pipeline.lib.tasks.base.base_loading_run_params import (
BaseLoadingRunParams,
)
from v03_pipeline.lib.tasks.files import GCSorLocalTarget
from v03_pipeline.lib.tasks.load_complete_run_to_clickhouse import (
LoadCompleteRunToClickhouse,
)


@luigi.util.inherits(BaseLoadingRunParams)
class WriteClickhouseLoadSuccessFileTask(luigi.Task):
def output(self) -> luigi.Target:
return GCSorLocalTarget(
clickhouse_load_success_file_path(
self.reference_genome,
self.dataset_type,
self.run_id,
),
)

def requires(self) -> luigi.Task:
return self.clone(LoadCompleteRunToClickhouse)

def run(self):
with self.output().open('w') as f:
f.write('')