Skip to content

Commit 994ff86

Browse files
committed
move clickhouse load to luigi
1 parent fbae03d commit 994ff86

File tree

4 files changed

+136
-65
lines changed

4 files changed

+136
-65
lines changed

v03_pipeline/api/request_handlers.py

Lines changed: 10 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
1-
import json
21
from collections.abc import Callable
32
from typing import Any
43

5-
import hailtop.fs as hfs
64
import luigi
75
import luigi.execution_summary
86

@@ -12,63 +10,20 @@
1210
PipelineRunnerRequest,
1311
RebuildGtStatsRequest,
1412
)
15-
from v03_pipeline.lib.core import DatasetType, FeatureFlag, ReferenceGenome
13+
from v03_pipeline.lib.core import DatasetType, FeatureFlag
1614
from v03_pipeline.lib.logger import get_logger
1715
from v03_pipeline.lib.misc.clickhouse import (
1816
delete_family_guids,
19-
load_complete_run,
2017
rebuild_gt_stats,
2118
)
22-
from v03_pipeline.lib.misc.retry import retry
23-
from v03_pipeline.lib.paths import (
24-
clickhouse_load_success_file_path,
25-
metadata_for_run_path,
19+
from v03_pipeline.lib.tasks.write_clickhouse_load_success_file import (
20+
WriteClickhouseLoadSuccessFileTask,
2621
)
2722
from v03_pipeline.lib.tasks.write_success_file import WriteSuccessFileTask
2823

2924
logger = get_logger(__name__)
3025

3126

32-
@retry()
33-
def fetch_run_metadata(
34-
reference_genome: ReferenceGenome,
35-
dataset_type: DatasetType,
36-
run_id: str,
37-
) -> tuple[list[str], list[str]]:
38-
# Run metadata
39-
with hfs.open(
40-
metadata_for_run_path(
41-
reference_genome,
42-
dataset_type,
43-
run_id,
44-
),
45-
'r',
46-
) as f:
47-
metadata_json = json.load(f)
48-
project_guids = metadata_json['project_guids']
49-
family_guids = list(metadata_json['family_samples'].keys())
50-
return project_guids, family_guids
51-
52-
53-
@retry()
54-
def write_success_file(
55-
reference_genome: ReferenceGenome,
56-
dataset_type: DatasetType,
57-
run_id: str,
58-
):
59-
with hfs.open(
60-
clickhouse_load_success_file_path(
61-
reference_genome,
62-
dataset_type,
63-
run_id,
64-
),
65-
'w',
66-
) as f:
67-
f.write('')
68-
msg = f'Successfully loaded {reference_genome.value}/{dataset_type.value}/{run_id}'
69-
logger.info(msg)
70-
71-
7227
def run_loading_pipeline(
7328
lpr: LoadingPipelineRequest,
7429
run_id: str,
@@ -82,6 +37,13 @@ def run_loading_pipeline(
8237
run_id=run_id,
8338
attempt_id=attempt_id,
8439
**lpr.model_dump(exclude='request_type'),
40+
)
41+
if FeatureFlag.CLICKHOUSE_LOADER_DISABLED
42+
else (
43+
WriteClickhouseLoadSuccessFileTask(
44+
run_id=run_id,
45+
**lpr.model_dump(exclude='request_type'),
46+
)
8547
),
8648
],
8749
detailed_summary=True,
@@ -94,20 +56,6 @@ def run_loading_pipeline(
9456
break
9557
else:
9658
raise RuntimeError(luigi_task_result.status.value[1])
97-
if FeatureFlag.CLICKHOUSE_LOADER_DISABLED:
98-
project_guids, family_guids = fetch_run_metadata(
99-
lpr.reference_genome,
100-
lpr.dataset_type,
101-
run_id,
102-
)
103-
load_complete_run(
104-
lpr.reference_genome,
105-
lpr.dataset_type,
106-
run_id,
107-
project_guids,
108-
family_guids,
109-
)
110-
write_success_file(lpr.reference_genome, lpr.dataset_type, run_id)
11159

11260

11361
def run_delete_families(dpr: DeleteFamiliesRequest, run_id: str, *_: Any):

v03_pipeline/bin/clickhouse_loader.py

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
#!/usr/bin/env python3
2+
import json
23
import signal
34
import sys
45
import time
56

67
import hailtop.fs as hfs
78

8-
from v03_pipeline.api.request_handlers import fetch_run_metadata, write_success_file
9-
from v03_pipeline.lib.core import FeatureFlag
9+
from v03_pipeline.lib.core import DatasetType, FeatureFlag, ReferenceGenome
1010
from v03_pipeline.lib.logger import get_logger
1111
from v03_pipeline.lib.misc.clickhouse import (
1212
drop_staging_db,
@@ -15,6 +15,8 @@
1515
from v03_pipeline.lib.misc.runs import get_run_ids
1616
from v03_pipeline.lib.paths import (
1717
clickhouse_load_fail_file_path,
18+
clickhouse_load_success_file_path,
19+
metadata_for_run_path,
1820
)
1921

2022
logger = get_logger(__name__)
@@ -31,6 +33,26 @@ def signal_handler(*_):
3133
signal.signal(signal.SIGTERM, signal_handler)
3234

3335

36+
def fetch_run_metadata(
37+
reference_genome: ReferenceGenome,
38+
dataset_type: DatasetType,
39+
run_id: str,
40+
) -> tuple[list[str], list[str]]:
41+
# Run metadata
42+
with hfs.open(
43+
metadata_for_run_path(
44+
reference_genome,
45+
dataset_type,
46+
run_id,
47+
),
48+
'r',
49+
) as f:
50+
metadata_json = json.load(f)
51+
project_guids = metadata_json['project_guids']
52+
family_guids = list(metadata_json['family_samples'].keys())
53+
return project_guids, family_guids
54+
55+
3456
def main():
3557
reference_genome, dataset_type, run_id = None, None, None
3658
while True:
@@ -69,7 +91,15 @@ def main():
6991
project_guids,
7092
family_guids,
7193
)
72-
write_success_file(reference_genome, dataset_type, run_id)
94+
with hfs.open(
95+
clickhouse_load_success_file_path(
96+
reference_genome,
97+
dataset_type,
98+
run_id,
99+
),
100+
'w',
101+
) as f:
102+
f.write('')
73103
except Exception:
74104
logger.exception('Unhandled Exception')
75105
if reference_genome and dataset_type and run_id:
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
import json
2+
3+
import hailtop.fs as hfs
4+
import luigi
5+
import luigi.util
6+
7+
from v03_pipeline.lib.misc.clickhouse import (
8+
ClickHouseTable,
9+
TableNameBuilder,
10+
load_complete_run,
11+
logged_query,
12+
)
13+
from v03_pipeline.lib.paths import metadata_for_run_path
14+
from v03_pipeline.lib.tasks.base.base_loading_run_params import (
15+
BaseLoadingRunParams,
16+
)
17+
from v03_pipeline.lib.tasks.write_success_file import WriteSuccessFile
18+
19+
20+
@luigi.util.inherits(BaseLoadingRunParams)
21+
class LoadCompleteRunToClickhouse(luigi.Task):
22+
def requires(self) -> luigi.Task:
23+
# Note retries happen within the ClickHouse Load
24+
return [self.clone(WriteSuccessFile, attempt_id=0)]
25+
26+
def complete(self):
27+
table_name_builder = TableNameBuilder(
28+
self.reference_genome,
29+
self.dataset_type,
30+
self.run_id,
31+
)
32+
max_key_src = logged_query(
33+
f"""
34+
SELECT max(key) FROM {table_name_builder.src_table(ClickHouseTable.ANNOTATIONS_MEMORY)}
35+
""",
36+
)[0][0]
37+
return logged_query(
38+
f"""
39+
SELECT EXISTS (
40+
SELECT 1
41+
FROM {table_name_builder.dst_table(ClickHouseTable.ANNOTATIONS_MEMORY)}
42+
WHERE key = %(max_key_src)
43+
);
44+
""",
45+
{'max_key_src': max_key_src},
46+
)[0][0]
47+
48+
def run(self):
49+
with hfs.open(
50+
metadata_for_run_path(
51+
self.reference_genome,
52+
self.dataset_type,
53+
self.run_id,
54+
),
55+
) as f:
56+
family_guids = list(json.load(f)['family_samples'].keys())
57+
load_complete_run(
58+
self.reference_genome,
59+
self.dataset_type,
60+
self.run_id,
61+
self.project_guids,
62+
family_guids,
63+
)
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import luigi
2+
import luigi.util
3+
4+
from v03_pipeline.lib.paths import clickhouse_load_success_file_path
5+
from v03_pipeline.lib.tasks.base.base_loading_run_params import (
6+
BaseLoadingRunParams,
7+
)
8+
from v03_pipeline.lib.tasks.files import GCSorLocalTarget
9+
from v03_pipeline.lib.tasks.load_complete_run_to_clickhouse import (
10+
LoadCompleteRunToClickhouse,
11+
)
12+
13+
14+
@luigi.util.inherits(BaseLoadingRunParams)
15+
class WriteClickhouseLoadSuccessFileTask(luigi.Task):
16+
def output(self) -> luigi.Target:
17+
return GCSorLocalTarget(
18+
clickhouse_load_success_file_path(
19+
self.reference_genome,
20+
self.dataset_type,
21+
self.run_id,
22+
),
23+
)
24+
25+
def requires(self) -> luigi.Task:
26+
return self.clone(LoadCompleteRunToClickhouse)
27+
28+
def run(self):
29+
with self.output().open('w') as f:
30+
f.write('')

0 commit comments

Comments
 (0)