Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions v03_pipeline/api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@ async def loading_pipeline_enqueue(request: web.Request) -> web.Response:

if is_queue_full():
return web.json_response(
{
f'Loading pipeline queue is full. Please try again later. (limit={Env.LOADING_QUEUE_LIMIT})',
},
f'Loading pipeline queue is full. Please try again later. (limit={Env.LOADING_QUEUE_LIMIT})',
status=web_exceptions.HTTPConflict.status_code,
)

Expand Down
1 change: 1 addition & 0 deletions v03_pipeline/api/app_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ async def test_loading_pipeline_enqueue(self):
'skip_check_sex_and_relatedness': False,
'skip_validation': False,
'skip_expect_tdr_metrics': False,
'validations_to_skip': [],
},
},
)
Expand Down
53 changes: 38 additions & 15 deletions v03_pipeline/api/model.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
import json
from typing import Literal

import hailtop.fs as hfs
from pydantic import AliasChoices, BaseModel, Field, field_validator
from pydantic import AliasChoices, BaseModel, Field, field_validator, root_validator

from v03_pipeline.lib.misc.validation import ALL_VALIDATIONS, SKIPPABLE_VALIDATIONS
from v03_pipeline.lib.model import DatasetType, ReferenceGenome, SampleType

STRINGIFIED_SKIPPABLE_VALIDATIONS = [f.__name__ for f in SKIPPABLE_VALIDATIONS]
VALID_FILE_TYPES = ['vcf', 'vcf.gz', 'vcf.bgz', 'mt']


Expand All @@ -16,10 +21,23 @@ class LoadingPipelineRequest(BaseModel):
sample_type: SampleType
reference_genome: ReferenceGenome
dataset_type: DatasetType
skip_validation: bool = False
skip_check_sex_and_relatedness: bool = False
skip_expect_tdr_metrics: bool = False

# New-style list
validations_to_skip: list[Literal[*STRINGIFIED_SKIPPABLE_VALIDATIONS]] = []
# Old-style boolean for backwards compatibility
skip_validation: bool = Field(False, alias='skip_validation')

@field_validator('validations_to_skip')
@classmethod
def must_be_known_validator(cls, validations_to_skip):
for v in validations_to_skip:
if v not in set(STRINGIFIED_SKIPPABLE_VALIDATIONS):
msg = f'{v} is not a valid validator'
raise ValueError(msg)
return validations_to_skip

@field_validator('callset_path')
@classmethod
def check_valid_callset_path(cls, callset_path: str) -> str:
Expand All @@ -31,16 +49,21 @@ def check_valid_callset_path(cls, callset_path: str) -> str:
raise ValueError(msg)
return callset_path

def __str__(self) -> str:
return '\n'.join(
[
f'Callset Path: {self.callset_path}',
f'Project Guids: {",".join(self.project_guids)}',
f'Reference Genome: {self.reference_genome.value}',
f'Dataset Type: {self.dataset_type.value}',
f'Sample Type: {self.sample_type.value}',
f'Skip Validation: {self.skip_validation}',
f'Skip Sex & Relatedness: {self.skip_check_sex_and_relatedness}',
f'Skip Expect TDR Metrics: {self.skip_expect_tdr_metrics}',
],
)
@root_validator(
pre=True,
) # the root validator runs before Pydantic parses or coerces field values.
@classmethod
def backwards_compatible_skip_validation(cls, values):
if values.get('skip_validation') or values.get('validations_to_skip') == [
ALL_VALIDATIONS,
]:
values['validations_to_skip'] = STRINGIFIED_SKIPPABLE_VALIDATIONS
return values

def dict(self, *args, **kwargs):
data = self.model_dump(*args, **kwargs)
data['validations_to_skip'] = [f.__name__ for f in self.validations_to_skip]
return data

def json(self, *args, **kwargs):
return json.dumps(self.dict(*args, **kwargs), *args, **kwargs)
36 changes: 36 additions & 0 deletions v03_pipeline/api/model_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,39 @@ def test_invalid_loading_pipeline_requests(self) -> None:
'3 validation errors for LoadingPipelineRequest',
),
)

def test_validations_to_skip(self) -> None:
shared_params = {
'callset_path': TEST_VCF,
'projects_to_run': ['project_a'],
'sample_type': SampleType.WGS.value,
'reference_genome': ReferenceGenome.GRCh38.value,
'dataset_type': DatasetType.SNV_INDEL.value,
}
raw_request = {
**shared_params,
'skip_validation': True,
}
lpr = LoadingPipelineRequest.model_validate(raw_request)
self.assertGreater(len(lpr.validations_to_skip), 1)

raw_request = {
**shared_params,
'validations_to_skip': ['all'],
}
lpr = LoadingPipelineRequest.model_validate(raw_request)
self.assertGreater(len(lpr.validations_to_skip), 1)

raw_request = {
**shared_params,
'validations_to_skip': ['validate_sample_type'],
}
lpr = LoadingPipelineRequest.model_validate(raw_request)
self.assertEqual(len(lpr.validations_to_skip), 1)

raw_request = {
**shared_params,
'validations_to_skip': ['validate_blended_exome'],
}
with self.assertRaises(ValueError):
LoadingPipelineRequest.model_validate(raw_request)
4 changes: 2 additions & 2 deletions v03_pipeline/bin/pipeline_worker_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def test_process_queue(
json.dump(raw_request, f)
process_queue(local_scheduler=True)
mock_safe_post_to_slack.assert_called_once_with(
':white_check_mark: Pipeline Run Success! Kicking off ClickHouse Load! :white_check_mark:\nRun ID: 20250916-200704-123456\nCallset Path: v03_pipeline/var/test/callsets/1kg_30variants.vcf\nProject Guids: project_a\nReference Genome: GRCh38\nDataset Type: SNV_INDEL\nSample Type: WGS\nSkip Validation: False\nSkip Sex & Relatedness: False\nSkip Expect TDR Metrics: False',
':white_check_mark: Pipeline Run Success! Kicking off ClickHouse Load! :white_check_mark:\nRun ID: 20250916-200704-123456\n```{\n "callset_path": "v03_pipeline/var/test/callsets/1kg_30variants.vcf",\n "project_guids": [\n "project_a"\n ],\n "sample_type": "WGS",\n "reference_genome": "GRCh38",\n "dataset_type": "SNV_INDEL",\n "skip_check_sex_and_relatedness": false,\n "skip_expect_tdr_metrics": false,\n "validations_to_skip": [],\n "skip_validation": false\n}```',
)

@patch('v03_pipeline.lib.paths.LOCAL_DISK_MOUNT_PATH', './var/seqr')
Expand Down Expand Up @@ -90,5 +90,5 @@ def test_process_failure(
json.dump(raw_request, f)
process_queue(local_scheduler=True)
mock_safe_post_to_slack.assert_called_once_with(
':failed: Pipeline Run Failed. :failed:\nRun ID: 20250918-200704-123456\nCallset Path: v03_pipeline/var/test/callsets/1kg_30variants.vcf\nProject Guids: project_a\nReference Genome: GRCh38\nDataset Type: SNV_INDEL\nSample Type: WGS\nSkip Validation: False\nSkip Sex & Relatedness: False\nSkip Expect TDR Metrics: False\nReason: there were failed tasks',
':failed: Pipeline Run Failed. :failed:\nRun ID: 20250918-200704-123456\n```{\n "callset_path": "v03_pipeline/var/test/callsets/1kg_30variants.vcf",\n "project_guids": [\n "project_a"\n ],\n "sample_type": "WGS",\n "reference_genome": "GRCh38",\n "dataset_type": "SNV_INDEL",\n "skip_check_sex_and_relatedness": false,\n "skip_expect_tdr_metrics": false,\n "validations_to_skip": [],\n "skip_validation": false\n}```\nReason: there were failed tasks',
)
49 changes: 19 additions & 30 deletions v03_pipeline/lib/methods/sample_qc.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from gnomad.utils.filtering import filter_to_autosomes
from gnomad_qc.v4.sample_qc.assign_ancestry import assign_pop_with_per_pop_probs

from v03_pipeline.lib.misc.io import split_multi_hts
from v03_pipeline.lib.model import SampleType

GNOMAD_FILTER_MIN_AF = 0.001
Expand All @@ -21,12 +20,6 @@
WES_COVERAGE_LOW_THRESHOLD = 85
WGS_CALLRATE_LOW_THRESHOLD = 30

POP_PCA_LOADINGS_PATH = (
'gs://gcp-public-data--gnomad/release/4.0/pca/gnomad.v4.0.pca_loadings.ht'
)
ANCESTRY_RF_MODEL_PATH = (
'gs://seqr-reference-data/v3.1/GRCh38/SNV_INDEL/ancestry_imputation_model.onnx'
)
NUM_PCS = 20
GNOMAD_POP_PROBABILITY_CUTOFFS = {
'afr': 0.93,
Expand All @@ -40,7 +33,6 @@
'sas': 0.92,
}
POPULATION_MISSING_LABEL = 'oth'

HAIL_QC_METRICS = [
'n_snp',
'r_ti_tv',
Expand All @@ -54,6 +46,8 @@
def call_sample_qc(
mt: hl.MatrixTable,
tdr_metrics_ht: hl.Table,
pop_pca_loadings_ht: hl.Table,
ancestry_rf_model: onnx.ModelProto,
sample_type: SampleType,
):
mt = mt.annotate_entries(
Expand All @@ -63,8 +57,10 @@ def call_sample_qc(
.default(hl.missing(hl.tcall)),
)
mt = annotate_filtered_callrate(mt)
mt = annotate_filter_flags(mt, tdr_metrics_ht, sample_type)
mt = annotate_qc_gen_anc(mt)
# Annotates contamination_rate, percent_bases_at_20x, and mean_coverage
mt = mt.annotate_cols(**tdr_metrics_ht[mt.col_key])
mt = annotate_filter_flags(mt, sample_type)
mt = annotate_qc_gen_anc(mt, pop_pca_loadings_ht, ancestry_rf_model)
return run_hail_sample_qc(mt, sample_type)


Expand All @@ -87,37 +83,37 @@ def annotate_filtered_callrate(mt: hl.MatrixTable) -> hl.MatrixTable:

def annotate_filter_flags(
mt: hl.MatrixTable,
tdr_metrics_ht: hl.Table,
sample_type: SampleType,
) -> hl.MatrixTable:
mt = mt.annotate_cols(**tdr_metrics_ht[mt.col_key])
flags = {
'callrate': mt.filtered_callrate < CALLRATE_LOW_THRESHOLD,
'contamination': mt.contamination_rate > CONTAMINATION_UPPER_THRESHOLD,
'coverage': (
mt.percent_bases_at_20x < WES_COVERAGE_LOW_THRESHOLD
if sample_type == SampleType.WES
else mt.mean_coverage < WGS_CALLRATE_LOW_THRESHOLD
),
}
if sample_type == SampleType.WES:
flags['coverage'] = mt.percent_bases_at_20x < WES_COVERAGE_LOW_THRESHOLD
else:
flags['coverage'] = mt.mean_coverage < WGS_CALLRATE_LOW_THRESHOLD

return mt.annotate_cols(
filter_flags=hl.array(
[hl.or_missing(filter_cond, name) for name, filter_cond in flags.items()],
).filter(hl.is_defined),
)


def annotate_qc_gen_anc(mt: hl.MatrixTable) -> hl.MatrixTable:
def annotate_qc_gen_anc(
mt: hl.MatrixTable,
pop_pca_loadings_ht: hl.Table,
ancestry_rf_model: onnx.ModelProto,
) -> hl.MatrixTable:
mt = mt.select_entries('GT')
scores = _get_pop_pca_scores(mt)
with hl.hadoop_open(ANCESTRY_RF_MODEL_PATH, 'rb') as f:
fit = onnx.load(f)

scores = pc_project(mt, pop_pca_loadings_ht)
scores = scores.annotate(scores=scores.scores[:NUM_PCS], known_pop='Unknown')
pop_pca_ht, _ = assign_population_pcs(
scores,
pc_cols=scores.scores,
output_col='qc_pop',
fit=fit,
fit=ancestry_rf_model,
apply_model_func=apply_onnx_classification_model,
)
pop_pca_ht = pop_pca_ht.key_by('s')
Expand All @@ -134,15 +130,8 @@ def annotate_qc_gen_anc(mt: hl.MatrixTable) -> hl.MatrixTable:
return mt.annotate_cols(**pop_pca_ht[mt.col_key])


def _get_pop_pca_scores(mt: hl.MatrixTable) -> hl.Table:
loadings = hl.read_table(POP_PCA_LOADINGS_PATH)
scores = pc_project(mt, loadings)
return scores.annotate(scores=scores.scores[:NUM_PCS], known_pop='Unknown')


def run_hail_sample_qc(mt: hl.MatrixTable, sample_type: SampleType) -> hl.MatrixTable:
mt = filter_to_autosomes(mt)
mt = split_multi_hts(mt, True)
mt = hl.sample_qc(mt)
mt = mt.annotate_cols(
sample_qc=mt.sample_qc.annotate(
Expand Down
93 changes: 1 addition & 92 deletions v03_pipeline/lib/misc/family_entries.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,6 @@
import hail as hl

from v03_pipeline.lib.model import DatasetType, ReferenceGenome


def initialize_project_table(
reference_genome: ReferenceGenome,
dataset_type: DatasetType,
):
key_type = dataset_type.table_key_type(reference_genome)
return hl.Table.parallelize(
[],
hl.tstruct(
**key_type,
filters=hl.tset(hl.tstr),
# NB: entries is missing here because it is untyped
# until we read the type off of the first callset aggregation.
),
key=key_type.fields,
globals=hl.Struct(
family_guids=hl.empty_array(hl.tstr),
family_samples=hl.empty_dict(hl.tstr, hl.tarray(hl.tstr)),
updates=hl.empty_set(
hl.tstruct(callset=hl.tstr, remap_pedigree_hash=hl.tint32),
),
),
)
from v03_pipeline.lib.model import DatasetType


def compute_callset_family_entries_ht(
Expand Down Expand Up @@ -117,73 +93,6 @@ def deglobalize_ids(ht: hl.Table) -> hl.Table:
return ht.drop('family_guids', 'family_samples')


def remove_family_guids(
ht: hl.Table,
family_guids: hl.SetExpression,
) -> hl.Table:
# Remove families from the existing project table structure (both the entries arrays and the globals are mutated)
family_indexes_to_keep = hl.eval(
hl.array(
hl.enumerate(ht.globals.family_guids)
.filter(lambda item: ~family_guids.contains(item[1]))
.map(lambda item: item[0]),
),
)
ht = ht.annotate(
# NB: this "should" work without the extra if statement (and does in the tests)
# however, experiments on dataproc showed this statement hanging with an empty
# unevaluated indexes array.
family_entries=hl.array(family_indexes_to_keep).map(
lambda i: ht.family_entries[i],
)
if len(family_indexes_to_keep) > 0
else hl.empty_array(ht.family_entries.dtype.element_type),
)
ht = ht.filter(hl.any(ht.family_entries.map(hl.is_defined)))
return ht.annotate_globals(
family_guids=ht.family_guids.filter(
lambda f: ~family_guids.contains(f),
),
family_samples=hl.dict(
ht.family_samples.items().filter(
lambda item: ~family_guids.contains(item[0]),
),
),
)


def join_family_entries_hts(ht: hl.Table, callset_ht: hl.Table) -> hl.Table:
ht = ht.join(callset_ht, 'outer')
ht_empty_family_entries = ht.family_guids.map(
lambda _: hl.missing(ht.family_entries_1.dtype.element_type),
)
callset_ht_empty_family_entries = ht.family_guids_1.map(
lambda _: hl.missing(ht.family_entries_1.dtype.element_type),
)
ht = ht.select(
filters=hl.or_else(ht.filters_1, ht.filters),
family_entries=(
hl.case()
.when(
hl.is_missing(ht.family_entries),
ht_empty_family_entries.extend(ht.family_entries_1),
)
.when(
hl.is_missing(ht.family_entries_1),
ht.family_entries.extend(callset_ht_empty_family_entries),
)
.default(ht.family_entries.extend(ht.family_entries_1))
),
)
# NB: transmute because we want to drop the *_1 fields, but preserve other globals
return ht.transmute_globals(
family_guids=ht.family_guids.extend(ht.family_guids_1),
family_samples=hl.dict(
ht.family_samples.items().extend(ht.family_samples_1.items()),
),
)


def deduplicate_by_most_non_ref_calls(ht: hl.Table) -> hl.Table:
ht = ht.annotate(
non_ref_count=hl.len(
Expand Down
Loading