Skip to content

Commit 0d8f173

Browse files
authored
deadletter queue (#1180)
* deadletter queue * fix * ruff * Update paths.py
1 parent dc231ba commit 0d8f173

File tree

3 files changed

+35
-1
lines changed

3 files changed

+35
-1
lines changed

v03_pipeline/bin/pipeline_worker.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@
1919
safe_post_to_slack_failure,
2020
safe_post_to_slack_success,
2121
)
22+
from v03_pipeline.lib.paths import (
23+
loading_pipeline_deadletter_queue_dir,
24+
loading_pipeline_deadletter_queue_path,
25+
)
2226

2327
logger = get_logger(__name__)
2428

@@ -73,6 +77,9 @@ def process_queue(local_scheduler=False):
7377
prr,
7478
e,
7579
)
80+
os.makedirs(loading_pipeline_deadletter_queue_dir(), exist_ok=True)
81+
with open(loading_pipeline_deadletter_queue_path(run_id), 'w') as f:
82+
f.write(prr.model_dump_json())
7683
finally:
7784
if latest_queue_path is not None and os.path.exists(latest_queue_path):
7885
os.remove(latest_queue_path)

v03_pipeline/bin/pipeline_worker_test.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,10 @@
66

77
from v03_pipeline.bin.pipeline_worker import process_queue
88
from v03_pipeline.lib.core import DatasetType, ReferenceGenome, SampleType
9-
from v03_pipeline.lib.paths import loading_pipeline_queue_dir
9+
from v03_pipeline.lib.paths import (
10+
loading_pipeline_deadletter_queue_dir,
11+
loading_pipeline_queue_dir,
12+
)
1013
from v03_pipeline.lib.test.mock_complete_task import MockCompleteTask
1114
from v03_pipeline.lib.test.mocked_dataroot_testcase import MockedDatarootTestCase
1215

@@ -92,3 +95,10 @@ def test_process_failure(
9295
mock_safe_post_to_slack.assert_called_once_with(
9396
':failed: Pipeline Runner Request Failed :failed:\nRun ID: 20250918-200704-123456\n```{\n "callset_path": "v03_pipeline/var/test/callsets/1kg_30variants.vcf",\n "dataset_type": "SNV_INDEL",\n "project_guids": [\n "project_a"\n ],\n "reference_genome": "GRCh38",\n "request_type": "LoadingPipelineRequest",\n "sample_type": "WGS",\n "skip_check_sex_and_relatedness": false,\n "skip_expect_tdr_metrics": false,\n "skip_validation": false\n}```\nReason: there were failed tasks',
9497
)
98+
with open(
99+
os.path.join(
100+
loading_pipeline_deadletter_queue_dir(),
101+
'request_20250918-200704-123456.json',
102+
),
103+
) as f:
104+
self.assertEqual(json.load(f)['request_type'], 'LoadingPipelineRequest')

v03_pipeline/lib/paths.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -410,6 +410,13 @@ def loading_pipeline_queue_dir() -> str:
410410
'loading_pipeline_queue',
411411
)
412412

413+
# https://en.wikipedia.org/wiki/Dead_letter_queue
414+
def loading_pipeline_deadletter_queue_dir() -> str:
415+
return os.path.join(
416+
Env.LOCAL_DISK_MOUNT_PATH,
417+
'loading_pipeline_deadletter_queue',
418+
)
419+
413420

414421
def loading_pipeline_queue_path(run_id: str) -> str:
415422
"""
@@ -421,6 +428,16 @@ def loading_pipeline_queue_path(run_id: str) -> str:
421428
)
422429

423430

431+
def loading_pipeline_deadletter_queue_path(run_id: str) -> str:
432+
"""
433+
Returns a new path for a loading pipeline queue request file.
434+
"""
435+
return os.path.join(
436+
loading_pipeline_deadletter_queue_dir(),
437+
f'request_{run_id}.json',
438+
)
439+
440+
424441
def pipeline_run_success_file_path(
425442
reference_genome: ReferenceGenome,
426443
dataset_type: DatasetType,

0 commit comments

Comments
 (0)