Skip to content

Commit 5cb46f0

Browse files
authored
feat: restart resilience (#1188)
* retry resilience * more restart resilience * this works! * refactor * job * fix tests * ruff
1 parent 28b913e commit 5cb46f0

File tree

12 files changed

+228
-139
lines changed

12 files changed

+228
-139
lines changed

v03_pipeline/api/app_test.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ async def test_loading_pipeline_enqueue(self):
9898
'skip_check_sex_and_relatedness': False,
9999
'skip_validation': False,
100100
'skip_expect_tdr_metrics': False,
101+
'attempt_id': 0,
101102
},
102103
},
103104
)

v03_pipeline/api/model.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
import hailtop.fs as hfs
2-
from pydantic import AliasChoices, BaseModel, Field, field_validator
2+
from pydantic import AliasChoices, BaseModel, Field, conint, field_validator
33

44
from v03_pipeline.lib.core import DatasetType, ReferenceGenome, SampleType
55

6+
MAX_LOADING_PIPELINE_ATTEMPTS = 3
67
VALID_FILE_TYPES = ['vcf', 'vcf.gz', 'vcf.bgz', 'mt']
78

89

9-
class PipelineRunnerRequest(BaseModel, frozen=True):
10+
class PipelineRunnerRequest(BaseModel):
1011
request_type: str
1112

1213
def __init_subclass__(cls, **kwargs):
@@ -15,6 +16,7 @@ def __init_subclass__(cls, **kwargs):
1516

1617

1718
class LoadingPipelineRequest(PipelineRunnerRequest):
19+
attempt_id: conint(ge=0, le=MAX_LOADING_PIPELINE_ATTEMPTS - 1) = 0
1820
callset_path: str
1921
project_guids: list[str] = Field(
2022
min_length=1,
@@ -28,6 +30,12 @@ class LoadingPipelineRequest(PipelineRunnerRequest):
2830
skip_check_sex_and_relatedness: bool = False
2931
skip_expect_tdr_metrics: bool = False
3032

33+
def incr_attempt(self):
34+
if self.attempt_id == (MAX_LOADING_PIPELINE_ATTEMPTS - 1):
35+
return False
36+
self.attempt_id += 1
37+
return True
38+
3139
@field_validator('callset_path')
3240
@classmethod
3341
def check_valid_callset_path(cls, callset_path: str) -> str:

v03_pipeline/api/model_test.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ def test_valid_loading_pipeline_requests(self) -> None:
2020
self.assertEqual(lpr.reference_genome, ReferenceGenome.GRCh38)
2121
self.assertEqual(lpr.project_guids, ['project_a'])
2222
self.assertEqual(lpr.request_type, 'LoadingPipelineRequest')
23+
self.assertEqual(lpr.attempt_id, 0)
2324

2425
# Test wildcard VCF
2526
raw_request['callset_path'] = CALLSET_PATH.replace(
@@ -35,12 +36,13 @@ def test_invalid_loading_pipeline_requests(self) -> None:
3536
'sample_type': 'BLENDED',
3637
'reference_genome': ReferenceGenome.GRCh38.value,
3738
'dataset_type': DatasetType.SNV_INDEL.value,
39+
'attempt_id': 5,
3840
}
3941
with self.assertRaises(ValueError) as cm:
4042
LoadingPipelineRequest.model_validate(raw_request)
4143
self.assertTrue(
4244
str(cm.exception).startswith(
43-
'3 validation errors for LoadingPipelineRequest',
45+
'4 validation errors for LoadingPipelineRequest',
4446
),
4547
)
4648

v03_pipeline/api/request_handlers.py

Lines changed: 17 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,10 @@
1212
PipelineRunnerRequest,
1313
RebuildGtStatsRequest,
1414
)
15-
from v03_pipeline.lib.core import DatasetType, FeatureFlag, ReferenceGenome
15+
from v03_pipeline.lib.core import DatasetType, ReferenceGenome
1616
from v03_pipeline.lib.logger import get_logger
1717
from v03_pipeline.lib.misc.clickhouse import (
1818
delete_family_guids,
19-
load_complete_run,
2019
rebuild_gt_stats,
2120
)
2221
from v03_pipeline.lib.misc.retry import retry
@@ -75,39 +74,22 @@ def run_loading_pipeline(
7574
local_scheduler: bool,
7675
*_: Any,
7776
):
78-
for attempt_id in range(3):
79-
luigi_task_result = luigi.build(
80-
[
81-
WriteSuccessFileTask(
82-
run_id=run_id,
83-
attempt_id=attempt_id,
84-
**lpr.model_dump(exclude='request_type'),
85-
),
86-
],
87-
detailed_summary=True,
88-
local_scheduler=local_scheduler,
89-
)
90-
if luigi_task_result.status in {
91-
luigi.execution_summary.LuigiStatusCode.SUCCESS,
92-
luigi.execution_summary.LuigiStatusCode.SUCCESS_WITH_RETRY,
93-
}:
94-
break
95-
else:
96-
raise RuntimeError(luigi_task_result.status.value[1])
97-
if FeatureFlag.CLICKHOUSE_LOADER_DISABLED:
98-
project_guids, family_guids = fetch_run_metadata(
99-
lpr.reference_genome,
100-
lpr.dataset_type,
101-
run_id,
102-
)
103-
load_complete_run(
104-
lpr.reference_genome,
105-
lpr.dataset_type,
106-
run_id,
107-
project_guids,
108-
family_guids,
109-
)
110-
write_success_file(lpr.reference_genome, lpr.dataset_type, run_id)
77+
luigi_task_result = luigi.build(
78+
[
79+
WriteSuccessFileTask(
80+
run_id=run_id,
81+
**lpr.model_dump(exclude='request_type'),
82+
),
83+
],
84+
detailed_summary=True,
85+
local_scheduler=local_scheduler,
86+
)
87+
if luigi_task_result.status in {
88+
luigi.execution_summary.LuigiStatusCode.SUCCESS,
89+
luigi.execution_summary.LuigiStatusCode.SUCCESS_WITH_RETRY,
90+
}:
91+
return
92+
raise RuntimeError(luigi_task_result.status.value[1])
11193

11294

11395
def run_delete_families(dpr: DeleteFamiliesRequest, run_id: str, *_: Any):

v03_pipeline/bin/pipeline_worker.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
from v03_pipeline.lib.paths import (
2323
loading_pipeline_deadletter_queue_dir,
2424
loading_pipeline_deadletter_queue_path,
25+
loading_pipeline_queue_path,
2526
)
2627

2728
logger = get_logger(__name__)
@@ -65,13 +66,19 @@ def process_queue(local_scheduler=False):
6566
return
6667
prr, run_id = parse_latest_queue_path(latest_queue_path)
6768
REQUEST_HANDLER_MAP[type(prr)](prr, run_id, local_scheduler)
69+
os.remove(latest_queue_path)
6870
safe_post_to_slack_success(
6971
run_id,
7072
prr,
7173
)
7274
except Exception as e:
7375
logger.exception('Unhandled Exception')
74-
if run_id is not None:
76+
if run_id is None:
77+
return
78+
if hasattr(prr, 'attempt_id') and prr.incr_attempt():
79+
with open(loading_pipeline_queue_path(run_id), 'w') as f:
80+
f.write(prr.model_dump_json())
81+
else:
7582
safe_post_to_slack_failure(
7683
run_id,
7784
prr,
@@ -80,16 +87,13 @@ def process_queue(local_scheduler=False):
8087
os.makedirs(loading_pipeline_deadletter_queue_dir(), exist_ok=True)
8188
with open(loading_pipeline_deadletter_queue_path(run_id), 'w') as f:
8289
f.write(prr.model_dump_json())
83-
finally:
84-
if latest_queue_path is not None and os.path.exists(latest_queue_path):
85-
os.remove(latest_queue_path)
86-
logger.info('Looking for more work')
87-
time.sleep(1)
8890

8991

9092
def main():
9193
while True:
9294
process_queue()
95+
logger.info('Looking for more work')
96+
time.sleep(1)
9397

9498

9599
if __name__ == '__main__':

v03_pipeline/bin/pipeline_worker_test.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ def test_process_queue(
5858
json.dump(raw_request, f)
5959
process_queue(local_scheduler=True)
6060
mock_safe_post_to_slack.assert_called_once_with(
61-
':white_check_mark: Pipeline Runner Request Success! :white_check_mark:\nRun ID: 20250916-200704-123456\n```{\n "callset_path": "v03_pipeline/var/test/callsets/1kg_30variants.vcf",\n "dataset_type": "SNV_INDEL",\n "project_guids": [\n "project_a"\n ],\n "reference_genome": "GRCh38",\n "request_type": "LoadingPipelineRequest",\n "sample_type": "WGS",\n "skip_check_sex_and_relatedness": false,\n "skip_expect_tdr_metrics": false,\n "skip_validation": false\n}```',
61+
':white_check_mark: Pipeline Runner Request Success! :white_check_mark:\nRun ID: 20250916-200704-123456\n```{\n "attempt_id": 0,\n "callset_path": "v03_pipeline/var/test/callsets/1kg_30variants.vcf",\n "dataset_type": "SNV_INDEL",\n "project_guids": [\n "project_a"\n ],\n "reference_genome": "GRCh38",\n "request_type": "LoadingPipelineRequest",\n "sample_type": "WGS",\n "skip_check_sex_and_relatedness": false,\n "skip_expect_tdr_metrics": false,\n "skip_validation": false\n}```',
6262
)
6363

6464
@patch('v03_pipeline.lib.misc.slack._safe_post_to_slack')
@@ -92,13 +92,17 @@ def test_process_failure(
9292
) as f:
9393
json.dump(raw_request, f)
9494
process_queue(local_scheduler=True)
95+
process_queue(local_scheduler=True)
96+
process_queue(local_scheduler=True)
9597
mock_safe_post_to_slack.assert_called_once_with(
96-
':failed: Pipeline Runner Request Failed :failed:\nRun ID: 20250918-200704-123456\n```{\n "callset_path": "v03_pipeline/var/test/callsets/1kg_30variants.vcf",\n "dataset_type": "SNV_INDEL",\n "project_guids": [\n "project_a"\n ],\n "reference_genome": "GRCh38",\n "request_type": "LoadingPipelineRequest",\n "sample_type": "WGS",\n "skip_check_sex_and_relatedness": false,\n "skip_expect_tdr_metrics": false,\n "skip_validation": false\n}```\nReason: there were failed tasks',
98+
':failed: Pipeline Runner Request Failed :failed:\nRun ID: 20250918-200704-123456\n```{\n "attempt_id": 2,\n "callset_path": "v03_pipeline/var/test/callsets/1kg_30variants.vcf",\n "dataset_type": "SNV_INDEL",\n "project_guids": [\n "project_a"\n ],\n "reference_genome": "GRCh38",\n "request_type": "LoadingPipelineRequest",\n "sample_type": "WGS",\n "skip_check_sex_and_relatedness": false,\n "skip_expect_tdr_metrics": false,\n "skip_validation": false\n}```\nReason: there were failed tasks',
9799
)
98100
with open(
99101
os.path.join(
100102
loading_pipeline_deadletter_queue_dir(),
101103
'request_20250918-200704-123456.json',
102104
),
103105
) as f:
104-
self.assertEqual(json.load(f)['request_type'], 'LoadingPipelineRequest')
106+
r = json.load(f)
107+
self.assertEqual(r['request_type'], 'LoadingPipelineRequest')
108+
self.assertEqual(r['attempt_id'], 2)

v03_pipeline/lib/core/environment.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from typing import Literal
44

55
# NB: using os.environ.get inside the dataclass defaults gives a lint error.
6-
LOCAL_DISK_MOUNT_PATH = os.environ.get('LOCAL_DISK_MOUNT_PATH', '/var/seqr')
6+
LOCAL_DISK_MOUNT_DIR = os.environ.get('LOCAL_DISK_MOUNT_DIR', '/var/seqr')
77
HAIL_TMP_DIR = os.environ.get('HAIL_TMP_DIR', '/tmp') # noqa: S108
88
PIPELINE_DATA_DIR = os.environ.get(
99
'PIPELINE_DATA_DIR',
@@ -78,7 +78,7 @@ class Env:
7878
GCLOUD_ZONE: str | None = GCLOUD_ZONE
7979
GCLOUD_REGION: str | None = GCLOUD_REGION
8080
HAIL_TMP_DIR: str = HAIL_TMP_DIR
81-
LOCAL_DISK_MOUNT_PATH: str = LOCAL_DISK_MOUNT_PATH
81+
LOCAL_DISK_MOUNT_DIR: str = LOCAL_DISK_MOUNT_DIR
8282
PIPELINE_DATA_DIR: str = PIPELINE_DATA_DIR
8383
LOADING_DATASETS_DIR: str = LOADING_DATASETS_DIR
8484
LOADING_QUEUE_LIMIT: int = LOADING_QUEUE_LIMIT

v03_pipeline/lib/paths.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -406,15 +406,15 @@ def loading_pipeline_queue_dir() -> str:
406406
Returns the directory where loading pipeline requests are queued.
407407
"""
408408
return os.path.join(
409-
Env.LOCAL_DISK_MOUNT_PATH,
409+
Env.LOCAL_DISK_MOUNT_DIR,
410410
'loading_pipeline_queue',
411411
)
412412

413413

414414
# https://en.wikipedia.org/wiki/Dead_letter_queue
415415
def loading_pipeline_deadletter_queue_dir() -> str:
416416
return os.path.join(
417-
Env.LOCAL_DISK_MOUNT_PATH,
417+
Env.LOCAL_DISK_MOUNT_DIR,
418418
'loading_pipeline_deadletter_queue',
419419
)
420420

v03_pipeline/lib/tasks/dataproc/base_run_job_on_dataproc.py

Lines changed: 55 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,11 @@
1515
)
1616
from v03_pipeline.lib.tasks.dataproc.misc import get_cluster_name, to_kebab_str_args
1717

18+
FAILURE_STATUSES = {
19+
google.cloud.dataproc_v1.types.jobs.JobStatus.State.CANCELLED,
20+
google.cloud.dataproc_v1.types.jobs.JobStatus.State.ERROR,
21+
google.cloud.dataproc_v1.types.jobs.JobStatus.State.ATTEMPT_FAILURE,
22+
}
1823
SEQR_PIPELINE_RUNNER_BUILD = f'gs://seqr-pipeline-runner-builds/{Env.DEPLOYMENT_TYPE}/{Env.PIPELINE_RUNNER_APP_VERSION}'
1924
TIMEOUT_S = 172800 # 2 days
2025

@@ -44,7 +49,9 @@ def job_id(self):
4449
def requires(self) -> [luigi.Task]:
4550
return [self.clone(CreateDataprocClusterTask)]
4651

47-
def complete(self) -> bool:
52+
def safely_get_job(
53+
self,
54+
):
4855
try:
4956
job = self.client.get_job(
5057
request={
@@ -54,12 +61,15 @@ def complete(self) -> bool:
5461
},
5562
)
5663
except google.api_core.exceptions.NotFound:
64+
return None
65+
else:
66+
return job
67+
68+
def complete(self) -> bool:
69+
job = self.safely_get_job()
70+
if not job:
5771
return False
58-
if job.status.state in {
59-
google.cloud.dataproc_v1.types.jobs.JobStatus.State.CANCELLED,
60-
google.cloud.dataproc_v1.types.jobs.JobStatus.State.ERROR,
61-
google.cloud.dataproc_v1.types.jobs.JobStatus.State.ATTEMPT_FAILURE,
62-
}:
72+
if job.status.state in FAILURE_STATUSES:
6373
msg = f'Job {self.job_id} entered {job.status.state.name} state'
6474
logger.error(msg)
6575
logger.error(job.status.details)
@@ -68,43 +78,52 @@ def complete(self) -> bool:
6878
)
6979

7080
def run(self):
71-
operation = self.client.submit_job_as_operation(
72-
request={
73-
'project_id': Env.GCLOUD_PROJECT,
74-
'region': Env.GCLOUD_REGION,
75-
'job': {
76-
'reference': {
77-
'job_id': self.job_id,
78-
},
79-
'placement': {
80-
'cluster_name': get_cluster_name(
81-
self.reference_genome,
82-
self.run_id,
83-
),
84-
},
85-
'pyspark_job': {
86-
'main_python_file_uri': f'{SEQR_PIPELINE_RUNNER_BUILD}/bin/run_task.py',
87-
'args': [
88-
self.task.task_family,
89-
'--local-scheduler',
90-
*to_kebab_str_args(self),
91-
],
92-
'python_file_uris': [
93-
f'{SEQR_PIPELINE_RUNNER_BUILD}/pyscripts.zip',
94-
],
81+
job = self.safely_get_job()
82+
if not job:
83+
self.client.submit_job_as_operation(
84+
request={
85+
'project_id': Env.GCLOUD_PROJECT,
86+
'region': Env.GCLOUD_REGION,
87+
'job': {
88+
'reference': {
89+
'job_id': self.job_id,
90+
},
91+
'placement': {
92+
'cluster_name': get_cluster_name(
93+
self.reference_genome,
94+
self.run_id,
95+
),
96+
},
97+
'pyspark_job': {
98+
'main_python_file_uri': f'{SEQR_PIPELINE_RUNNER_BUILD}/bin/run_task.py',
99+
'args': [
100+
self.task.task_family,
101+
'--local-scheduler',
102+
*to_kebab_str_args(self),
103+
],
104+
'python_file_uris': [
105+
f'{SEQR_PIPELINE_RUNNER_BUILD}/pyscripts.zip',
106+
],
107+
},
95108
},
96109
},
97-
},
98-
)
110+
)
99111
wait_s = 0
100112
while wait_s < TIMEOUT_S:
101-
if operation.done():
102-
operation.result() # Will throw on failure!
103-
msg = f'Finished {self.job_id}'
113+
job = self.safely_get_job()
114+
if (
115+
job.status.state
116+
== google.cloud.dataproc_v1.types.jobs.JobStatus.State.DONE
117+
):
118+
msg = f'Job {self.job_id} is complete'
104119
logger.info(msg)
105120
break
121+
if job.status.state in FAILURE_STATUSES:
122+
msg = f'Job {self.job_id} entered {job.status.state.name} state'
123+
logger.error(msg)
124+
raise RuntimeError(msg)
106125
logger.info(
107-
f'Waiting for job completion {self.job_id}',
126+
f'Waiting for Job completion {self.job_id}',
108127
)
109128
time.sleep(3)
110129
wait_s += 3

0 commit comments

Comments
 (0)