diff --git a/v03_pipeline/api/app.py b/v03_pipeline/api/app.py index 9080daad7..ec4d92aeb 100644 --- a/v03_pipeline/api/app.py +++ b/v03_pipeline/api/app.py @@ -35,9 +35,7 @@ async def loading_pipeline_enqueue(request: web.Request) -> web.Response: if is_queue_full(): return web.json_response( - { - f'Loading pipeline queue is full. Please try again later. (limit={Env.LOADING_QUEUE_LIMIT})', - }, + f'Loading pipeline queue is full. Please try again later. (limit={Env.LOADING_QUEUE_LIMIT})', status=web_exceptions.HTTPConflict.status_code, ) diff --git a/v03_pipeline/api/app_test.py b/v03_pipeline/api/app_test.py index c119745c7..0f6859860 100644 --- a/v03_pipeline/api/app_test.py +++ b/v03_pipeline/api/app_test.py @@ -95,6 +95,7 @@ async def test_loading_pipeline_enqueue(self): 'skip_check_sex_and_relatedness': False, 'skip_validation': False, 'skip_expect_tdr_metrics': False, + 'validations_to_skip': [], }, }, ) diff --git a/v03_pipeline/api/model.py b/v03_pipeline/api/model.py index d4ebc1532..be7aa0e18 100644 --- a/v03_pipeline/api/model.py +++ b/v03_pipeline/api/model.py @@ -1,8 +1,13 @@ +import json +from typing import Literal + import hailtop.fs as hfs -from pydantic import AliasChoices, BaseModel, Field, field_validator +from pydantic import AliasChoices, BaseModel, Field, field_validator, root_validator +from v03_pipeline.lib.misc.validation import ALL_VALIDATIONS, SKIPPABLE_VALIDATIONS from v03_pipeline.lib.model import DatasetType, ReferenceGenome, SampleType +STRINGIFIED_SKIPPABLE_VALIDATIONS = [f.__name__ for f in SKIPPABLE_VALIDATIONS] VALID_FILE_TYPES = ['vcf', 'vcf.gz', 'vcf.bgz', 'mt'] @@ -16,10 +21,23 @@ class LoadingPipelineRequest(BaseModel): sample_type: SampleType reference_genome: ReferenceGenome dataset_type: DatasetType - skip_validation: bool = False skip_check_sex_and_relatedness: bool = False skip_expect_tdr_metrics: bool = False + # New-style list + validations_to_skip: list[Literal[*STRINGIFIED_SKIPPABLE_VALIDATIONS]] = [] + # Old-style boolean for backwards compatibility + skip_validation: bool = Field(False, alias='skip_validation') + + @field_validator('validations_to_skip') + @classmethod + def must_be_known_validator(cls, validations_to_skip): + for v in validations_to_skip: + if v not in set(STRINGIFIED_SKIPPABLE_VALIDATIONS): + msg = f'{v} is not a valid validator' + raise ValueError(msg) + return validations_to_skip + @field_validator('callset_path') @classmethod def check_valid_callset_path(cls, callset_path: str) -> str: @@ -31,16 +49,21 @@ def check_valid_callset_path(cls, callset_path: str) -> str: raise ValueError(msg) return callset_path - def __str__(self) -> str: - return '\n'.join( - [ - f'Callset Path: {self.callset_path}', - f'Project Guids: {",".join(self.project_guids)}', - f'Reference Genome: {self.reference_genome.value}', - f'Dataset Type: {self.dataset_type.value}', - f'Sample Type: {self.sample_type.value}', - f'Skip Validation: {self.skip_validation}', - f'Skip Sex & Relatedness: {self.skip_check_sex_and_relatedness}', - f'Skip Expect TDR Metrics: {self.skip_expect_tdr_metrics}', - ], - ) + @root_validator( + pre=True, + ) # the root validator runs before Pydantic parses or coerces field values. + @classmethod + def backwards_compatible_skip_validation(cls, values): + if values.get('skip_validation') or values.get('validations_to_skip') == [ + ALL_VALIDATIONS, + ]: + values['validations_to_skip'] = STRINGIFIED_SKIPPABLE_VALIDATIONS + return values + + def dict(self, *args, **kwargs): + data = self.model_dump(*args, **kwargs) + data['validations_to_skip'] = [f.__name__ for f in self.validations_to_skip] + return data + + def json(self, *args, **kwargs): + return json.dumps(self.dict(*args, **kwargs), *args, **kwargs) diff --git a/v03_pipeline/api/model_test.py b/v03_pipeline/api/model_test.py index f135f7bbc..54f1f674a 100644 --- a/v03_pipeline/api/model_test.py +++ b/v03_pipeline/api/model_test.py @@ -34,3 +34,39 @@ def test_invalid_loading_pipeline_requests(self) -> None: '3 validation errors for LoadingPipelineRequest', ), ) + + def test_validations_to_skip(self) -> None: + shared_params = { + 'callset_path': TEST_VCF, + 'projects_to_run': ['project_a'], + 'sample_type': SampleType.WGS.value, + 'reference_genome': ReferenceGenome.GRCh38.value, + 'dataset_type': DatasetType.SNV_INDEL.value, + } + raw_request = { + **shared_params, + 'skip_validation': True, + } + lpr = LoadingPipelineRequest.model_validate(raw_request) + self.assertGreater(len(lpr.validations_to_skip), 1) + + raw_request = { + **shared_params, + 'validations_to_skip': ['all'], + } + lpr = LoadingPipelineRequest.model_validate(raw_request) + self.assertGreater(len(lpr.validations_to_skip), 1) + + raw_request = { + **shared_params, + 'validations_to_skip': ['validate_sample_type'], + } + lpr = LoadingPipelineRequest.model_validate(raw_request) + self.assertEqual(len(lpr.validations_to_skip), 1) + + raw_request = { + **shared_params, + 'validations_to_skip': ['validate_blended_exome'], + } + with self.assertRaises(ValueError): + LoadingPipelineRequest.model_validate(raw_request) diff --git a/v03_pipeline/bin/pipeline_worker_test.py b/v03_pipeline/bin/pipeline_worker_test.py index 6be40d4b5..e5afccfeb 100644 --- a/v03_pipeline/bin/pipeline_worker_test.py +++ b/v03_pipeline/bin/pipeline_worker_test.py @@ -55,7 +55,7 @@ def test_process_queue( json.dump(raw_request, f) process_queue(local_scheduler=True) mock_safe_post_to_slack.assert_called_once_with( - ':white_check_mark: Pipeline Run Success! Kicking off ClickHouse Load! :white_check_mark:\nRun ID: 20250916-200704-123456\nCallset Path: v03_pipeline/var/test/callsets/1kg_30variants.vcf\nProject Guids: project_a\nReference Genome: GRCh38\nDataset Type: SNV_INDEL\nSample Type: WGS\nSkip Validation: False\nSkip Sex & Relatedness: False\nSkip Expect TDR Metrics: False', + ':white_check_mark: Pipeline Run Success! Kicking off ClickHouse Load! :white_check_mark:\nRun ID: 20250916-200704-123456\n```{\n "callset_path": "v03_pipeline/var/test/callsets/1kg_30variants.vcf",\n "project_guids": [\n "project_a"\n ],\n "sample_type": "WGS",\n "reference_genome": "GRCh38",\n "dataset_type": "SNV_INDEL",\n "skip_check_sex_and_relatedness": false,\n "skip_expect_tdr_metrics": false,\n "validations_to_skip": [],\n "skip_validation": false\n}```', ) @patch('v03_pipeline.lib.paths.LOCAL_DISK_MOUNT_PATH', './var/seqr') @@ -90,5 +90,5 @@ def test_process_failure( json.dump(raw_request, f) process_queue(local_scheduler=True) mock_safe_post_to_slack.assert_called_once_with( - ':failed: Pipeline Run Failed. :failed:\nRun ID: 20250918-200704-123456\nCallset Path: v03_pipeline/var/test/callsets/1kg_30variants.vcf\nProject Guids: project_a\nReference Genome: GRCh38\nDataset Type: SNV_INDEL\nSample Type: WGS\nSkip Validation: False\nSkip Sex & Relatedness: False\nSkip Expect TDR Metrics: False\nReason: there were failed tasks', + ':failed: Pipeline Run Failed. :failed:\nRun ID: 20250918-200704-123456\n```{\n "callset_path": "v03_pipeline/var/test/callsets/1kg_30variants.vcf",\n "project_guids": [\n "project_a"\n ],\n "sample_type": "WGS",\n "reference_genome": "GRCh38",\n "dataset_type": "SNV_INDEL",\n "skip_check_sex_and_relatedness": false,\n "skip_expect_tdr_metrics": false,\n "validations_to_skip": [],\n "skip_validation": false\n}```\nReason: there were failed tasks', ) diff --git a/v03_pipeline/lib/methods/sample_qc.py b/v03_pipeline/lib/methods/sample_qc.py index bab7f074d..0dbf782b2 100644 --- a/v03_pipeline/lib/methods/sample_qc.py +++ b/v03_pipeline/lib/methods/sample_qc.py @@ -10,7 +10,6 @@ from gnomad.utils.filtering import filter_to_autosomes from gnomad_qc.v4.sample_qc.assign_ancestry import assign_pop_with_per_pop_probs -from v03_pipeline.lib.misc.io import split_multi_hts from v03_pipeline.lib.model import SampleType GNOMAD_FILTER_MIN_AF = 0.001 @@ -21,12 +20,6 @@ WES_COVERAGE_LOW_THRESHOLD = 85 WGS_CALLRATE_LOW_THRESHOLD = 30 -POP_PCA_LOADINGS_PATH = ( - 'gs://gcp-public-data--gnomad/release/4.0/pca/gnomad.v4.0.pca_loadings.ht' -) -ANCESTRY_RF_MODEL_PATH = ( - 'gs://seqr-reference-data/v3.1/GRCh38/SNV_INDEL/ancestry_imputation_model.onnx' -) NUM_PCS = 20 GNOMAD_POP_PROBABILITY_CUTOFFS = { 'afr': 0.93, @@ -40,7 +33,6 @@ 'sas': 0.92, } POPULATION_MISSING_LABEL = 'oth' - HAIL_QC_METRICS = [ 'n_snp', 'r_ti_tv', @@ -54,6 +46,8 @@ def call_sample_qc( mt: hl.MatrixTable, tdr_metrics_ht: hl.Table, + pop_pca_loadings_ht: hl.Table, + ancestry_rf_model: onnx.ModelProto, sample_type: SampleType, ): mt = mt.annotate_entries( @@ -63,8 +57,10 @@ def call_sample_qc( .default(hl.missing(hl.tcall)), ) mt = annotate_filtered_callrate(mt) - mt = annotate_filter_flags(mt, tdr_metrics_ht, sample_type) - mt = annotate_qc_gen_anc(mt) + # Annotates contamination_rate, percent_bases_at_20x, and mean_coverage + mt = mt.annotate_cols(**tdr_metrics_ht[mt.col_key]) + mt = annotate_filter_flags(mt, sample_type) + mt = annotate_qc_gen_anc(mt, pop_pca_loadings_ht, ancestry_rf_model) return run_hail_sample_qc(mt, sample_type) @@ -87,19 +83,17 @@ def annotate_filtered_callrate(mt: hl.MatrixTable) -> hl.MatrixTable: def annotate_filter_flags( mt: hl.MatrixTable, - tdr_metrics_ht: hl.Table, sample_type: SampleType, ) -> hl.MatrixTable: - mt = mt.annotate_cols(**tdr_metrics_ht[mt.col_key]) flags = { 'callrate': mt.filtered_callrate < CALLRATE_LOW_THRESHOLD, 'contamination': mt.contamination_rate > CONTAMINATION_UPPER_THRESHOLD, + 'coverage': ( + mt.percent_bases_at_20x < WES_COVERAGE_LOW_THRESHOLD + if sample_type == SampleType.WES + else mt.mean_coverage < WGS_CALLRATE_LOW_THRESHOLD + ), } - if sample_type == SampleType.WES: - flags['coverage'] = mt.percent_bases_at_20x < WES_COVERAGE_LOW_THRESHOLD - else: - flags['coverage'] = mt.mean_coverage < WGS_CALLRATE_LOW_THRESHOLD - return mt.annotate_cols( filter_flags=hl.array( [hl.or_missing(filter_cond, name) for name, filter_cond in flags.items()], @@ -107,17 +101,19 @@ def annotate_filter_flags( ) -def annotate_qc_gen_anc(mt: hl.MatrixTable) -> hl.MatrixTable: +def annotate_qc_gen_anc( + mt: hl.MatrixTable, + pop_pca_loadings_ht: hl.Table, + ancestry_rf_model: onnx.ModelProto, +) -> hl.MatrixTable: mt = mt.select_entries('GT') - scores = _get_pop_pca_scores(mt) - with hl.hadoop_open(ANCESTRY_RF_MODEL_PATH, 'rb') as f: - fit = onnx.load(f) - + scores = pc_project(mt, pop_pca_loadings_ht) + scores = scores.annotate(scores=scores.scores[:NUM_PCS], known_pop='Unknown') pop_pca_ht, _ = assign_population_pcs( scores, pc_cols=scores.scores, output_col='qc_pop', - fit=fit, + fit=ancestry_rf_model, apply_model_func=apply_onnx_classification_model, ) pop_pca_ht = pop_pca_ht.key_by('s') @@ -134,15 +130,8 @@ def annotate_qc_gen_anc(mt: hl.MatrixTable) -> hl.MatrixTable: return mt.annotate_cols(**pop_pca_ht[mt.col_key]) -def _get_pop_pca_scores(mt: hl.MatrixTable) -> hl.Table: - loadings = hl.read_table(POP_PCA_LOADINGS_PATH) - scores = pc_project(mt, loadings) - return scores.annotate(scores=scores.scores[:NUM_PCS], known_pop='Unknown') - - def run_hail_sample_qc(mt: hl.MatrixTable, sample_type: SampleType) -> hl.MatrixTable: mt = filter_to_autosomes(mt) - mt = split_multi_hts(mt, True) mt = hl.sample_qc(mt) mt = mt.annotate_cols( sample_qc=mt.sample_qc.annotate( diff --git a/v03_pipeline/lib/misc/family_entries.py b/v03_pipeline/lib/misc/family_entries.py index fb0a97722..21b2d6901 100644 --- a/v03_pipeline/lib/misc/family_entries.py +++ b/v03_pipeline/lib/misc/family_entries.py @@ -1,30 +1,6 @@ import hail as hl -from v03_pipeline.lib.model import DatasetType, ReferenceGenome - - -def initialize_project_table( - reference_genome: ReferenceGenome, - dataset_type: DatasetType, -): - key_type = dataset_type.table_key_type(reference_genome) - return hl.Table.parallelize( - [], - hl.tstruct( - **key_type, - filters=hl.tset(hl.tstr), - # NB: entries is missing here because it is untyped - # until we read the type off of the first callset aggregation. - ), - key=key_type.fields, - globals=hl.Struct( - family_guids=hl.empty_array(hl.tstr), - family_samples=hl.empty_dict(hl.tstr, hl.tarray(hl.tstr)), - updates=hl.empty_set( - hl.tstruct(callset=hl.tstr, remap_pedigree_hash=hl.tint32), - ), - ), - ) +from v03_pipeline.lib.model import DatasetType def compute_callset_family_entries_ht( @@ -117,73 +93,6 @@ def deglobalize_ids(ht: hl.Table) -> hl.Table: return ht.drop('family_guids', 'family_samples') -def remove_family_guids( - ht: hl.Table, - family_guids: hl.SetExpression, -) -> hl.Table: - # Remove families from the existing project table structure (both the entries arrays and the globals are mutated) - family_indexes_to_keep = hl.eval( - hl.array( - hl.enumerate(ht.globals.family_guids) - .filter(lambda item: ~family_guids.contains(item[1])) - .map(lambda item: item[0]), - ), - ) - ht = ht.annotate( - # NB: this "should" work without the extra if statement (and does in the tests) - # however, experiments on dataproc showed this statement hanging with an empty - # unevaluated indexes array. - family_entries=hl.array(family_indexes_to_keep).map( - lambda i: ht.family_entries[i], - ) - if len(family_indexes_to_keep) > 0 - else hl.empty_array(ht.family_entries.dtype.element_type), - ) - ht = ht.filter(hl.any(ht.family_entries.map(hl.is_defined))) - return ht.annotate_globals( - family_guids=ht.family_guids.filter( - lambda f: ~family_guids.contains(f), - ), - family_samples=hl.dict( - ht.family_samples.items().filter( - lambda item: ~family_guids.contains(item[0]), - ), - ), - ) - - -def join_family_entries_hts(ht: hl.Table, callset_ht: hl.Table) -> hl.Table: - ht = ht.join(callset_ht, 'outer') - ht_empty_family_entries = ht.family_guids.map( - lambda _: hl.missing(ht.family_entries_1.dtype.element_type), - ) - callset_ht_empty_family_entries = ht.family_guids_1.map( - lambda _: hl.missing(ht.family_entries_1.dtype.element_type), - ) - ht = ht.select( - filters=hl.or_else(ht.filters_1, ht.filters), - family_entries=( - hl.case() - .when( - hl.is_missing(ht.family_entries), - ht_empty_family_entries.extend(ht.family_entries_1), - ) - .when( - hl.is_missing(ht.family_entries_1), - ht.family_entries.extend(callset_ht_empty_family_entries), - ) - .default(ht.family_entries.extend(ht.family_entries_1)) - ), - ) - # NB: transmute because we want to drop the *_1 fields, but preserve other globals - return ht.transmute_globals( - family_guids=ht.family_guids.extend(ht.family_guids_1), - family_samples=hl.dict( - ht.family_samples.items().extend(ht.family_samples_1.items()), - ), - ) - - def deduplicate_by_most_non_ref_calls(ht: hl.Table) -> hl.Table: ht = ht.annotate( non_ref_count=hl.len( diff --git a/v03_pipeline/lib/misc/family_entries_test.py b/v03_pipeline/lib/misc/family_entries_test.py index 1802a7753..1e6e65419 100644 --- a/v03_pipeline/lib/misc/family_entries_test.py +++ b/v03_pipeline/lib/misc/family_entries_test.py @@ -7,8 +7,6 @@ deduplicate_by_most_non_ref_calls, deglobalize_ids, globalize_ids, - join_family_entries_hts, - remove_family_guids, ) from v03_pipeline.lib.model import DatasetType @@ -203,336 +201,6 @@ def test_globalize_and_deglobalize(self) -> None: ], ) - def test_remove_family_guids(self) -> None: - family_entries_ht = hl.Table.parallelize( - [ - { - 'id': 0, - 'filters': {'HIGH_SR_BACKGROUND'}, - 'family_entries': [ - [ - hl.Struct(a=1), - ], - [ - hl.Struct(a=2), - hl.Struct(a=1), - hl.Struct(a=2), - ], - ], - }, - { - 'id': 1, - 'filters': {'HIGH_SR_BACKGROUND'}, - 'family_entries': [ - [ - hl.Struct(a=2), - ], - None, - ], - }, - ], - hl.tstruct( - id=hl.tint32, - filters=hl.tset(hl.tstr), - family_entries=hl.tarray(hl.tarray(hl.tstruct(a=hl.tint32))), - ), - key='id', - globals=hl.Struct( - family_guids=['012', '123'], - family_samples={ - '012': ['a'], - '123': ['c', 'e', 'f'], - }, - ), - ) - family_entries_ht = remove_family_guids( - family_entries_ht, - hl.set(['012']), - ) - self.assertCountEqual( - family_entries_ht.globals.collect(), - [ - hl.Struct( - family_guids=['123'], - family_samples={ - '123': ['c', 'e', 'f'], - }, - ), - ], - ) - self.assertCountEqual( - family_entries_ht.collect(), - [ - hl.Struct( - id=0, - filters={'HIGH_SR_BACKGROUND'}, - family_entries=[ - [ - hl.Struct(a=2), - hl.Struct(a=1), - hl.Struct(a=2), - ], - ], - ), - ], - ) - - def test_remove_new_callset_family_guids_all_families(self) -> None: - family_entries_ht = hl.Table.parallelize( - [ - { - 'id': 0, - 'filters': {'HIGH_SR_BACKGROUND'}, - 'family_entries': [ - [ - hl.Struct(a=1), - ], - [ - hl.Struct(a=2), - hl.Struct(a=1), - hl.Struct(a=2), - ], - ], - }, - { - 'id': 1, - 'filters': {'HIGH_SR_BACKGROUND'}, - 'family_entries': [ - [ - hl.Struct(a=2), - ], - [ - hl.Struct(a=3), - hl.Struct(a=4), - hl.Struct(a=5), - ], - ], - }, - ], - hl.tstruct( - id=hl.tint32, - filters=hl.tset(hl.tstr), - family_entries=hl.tarray(hl.tarray(hl.tstruct(a=hl.tint32))), - ), - key='id', - globals=hl.Struct( - family_guids=['012', '123'], - family_samples={ - '012': ['a'], - '123': ['c', 'e', 'f'], - }, - ), - ) - ht = remove_family_guids(family_entries_ht, hl.set(['012', '123'])) - self.assertCountEqual( - ht.globals.collect(), - [hl.Struct(family_guids=[], family_samples={})], - ) - - def test_join_family_entries_hts_empty_current_table(self) -> None: - family_entries_ht = hl.Table.parallelize( - [], - hl.tstruct( - id=hl.tint32, - filters=hl.tset(hl.tstr), - family_entries=hl.tarray(hl.tarray(hl.tstruct(a=hl.tint32))), - ), - key='id', - globals=hl.Struct( - family_guids=hl.empty_array(hl.tstr), - family_samples=hl.empty_dict(hl.tstr, hl.tarray(hl.tstr)), - ), - ) - callset_ht = hl.Table.parallelize( - [ - { - 'id': 0, - 'filters': {'HIGH_SR_BACKGROUND', 'UNRESOLVED'}, - 'family_entries': [ - [ - hl.Struct(a=9), - hl.Struct(a=10), - ], - ], - }, - { - 'id': 2, - 'filters': {'HIGH_SR_BACKGROUND'}, - 'family_entries': [ - [ - hl.Struct(a=11), - hl.Struct(a=12), - ], - ], - }, - ], - hl.tstruct( - id=hl.tint32, - filters=hl.tset(hl.tstr), - family_entries=hl.tarray(hl.tarray(hl.tstruct(a=hl.tint32))), - ), - key='id', - globals=hl.Struct( - family_guids=['1', '2'], - family_samples={'1': ['b'], '2': ['g']}, - ), - ) - ht = join_family_entries_hts(family_entries_ht, callset_ht) - self.assertCountEqual( - ht.family_entries.collect(), - [ - [ - [ - hl.Struct(a=9), - hl.Struct(a=10), - ], - ], - [ - [ - hl.Struct(a=11), - hl.Struct(a=12), - ], - ], - ], - ) - self.assertCountEqual( - ht.globals.collect(), - [ - hl.Struct( - family_guids=['1', '2'], - family_samples={'1': ['b'], '2': ['g']}, - ), - ], - ) - - def test_join_family_entries_hts(self) -> None: - family_entries_ht = hl.Table.parallelize( - [ - { - 'id': 0, - 'filters': {'HIGH_SR_BACKGROUND'}, - 'family_entries': [ - [ - hl.Struct(a=1), - hl.Struct(a=2), - ], - None, - ], - }, - { - 'id': 1, - 'filters': {'HIGH_SR_BACKGROUND'}, - 'family_entries': [ - None, - [ - hl.Struct(a=4), - hl.Struct(a=5), - ], - ], - }, - ], - hl.tstruct( - id=hl.tint32, - filters=hl.tset(hl.tstr), - family_entries=hl.tarray( - hl.tarray(hl.tstruct(a=hl.tint32)), - ), - ), - key='id', - globals=hl.Struct( - family_guids=['1', '2'], - family_samples={'1': ['a', 'b'], '2': ['c', 'd']}, - ), - ) - callset_ht = hl.Table.parallelize( - [ - { - 'id': 0, - 'filters': {'PASS'}, - 'family_entries': [ - [ - hl.Struct(a=9), - hl.Struct(a=10), - ], - ], - }, - { - 'id': 2, - 'filters': {'HIGH_SR_BACKGROUND', 'PASS'}, - 'family_entries': [ - [ - hl.Struct(a=11), - hl.Struct(a=12), - ], - ], - }, - ], - hl.tstruct( - id=hl.tint32, - filters=hl.tset(hl.tstr), - family_entries=hl.tarray(hl.tarray(hl.tstruct(a=hl.tint32))), - ), - key='id', - globals=hl.Struct( - family_guids=['3'], - family_samples={'3': ['e', 'f']}, - ), - ) - ht = join_family_entries_hts(family_entries_ht, callset_ht) - self.assertCountEqual( - ht.globals.collect(), - [ - hl.Struct( - family_guids=['1', '2', '3'], - family_samples={'1': ['a', 'b'], '2': ['c', 'd'], '3': ['e', 'f']}, - ), - ], - ) - self.assertCountEqual( - ht.collect(), - [ - hl.Struct( - id=0, - filters={'PASS'}, - family_entries=[ - [ - hl.Struct(a=1), - hl.Struct(a=2), - ], - None, - [ - hl.Struct(a=9), - hl.Struct(a=10), - ], - ], - ), - hl.Struct( - id=1, - filters={'HIGH_SR_BACKGROUND'}, - family_entries=[ - None, - [ - hl.Struct(a=4), - hl.Struct(a=5), - ], - None, - ], - ), - hl.Struct( - id=2, - filters={'PASS', 'HIGH_SR_BACKGROUND'}, - family_entries=[ - None, - None, - [ - hl.Struct(a=11), - hl.Struct(a=12), - ], - ], - ), - ], - ) - def test_deduplicate_by_most_non_ref_calls(self) -> None: project_ht = hl.Table.parallelize( [ diff --git a/v03_pipeline/lib/misc/io.py b/v03_pipeline/lib/misc/io.py index 29bc275a5..7a2eb97f0 100644 --- a/v03_pipeline/lib/misc/io.py +++ b/v03_pipeline/lib/misc/io.py @@ -81,7 +81,7 @@ def compute_hail_n_partitions(file_size_b: int) -> int: ) def split_multi_hts( mt: hl.MatrixTable, - skip_validation: bool, + skip_validate_no_duplicate_variants: bool, max_samples_split_multi_shuffle=MAX_SAMPLES_SPLIT_MULTI_SHUFFLE, ) -> hl.MatrixTable: bi = mt.filter_rows(hl.len(mt.alleles) == BIALLELIC) @@ -98,7 +98,7 @@ def split_multi_hts( mt = split.union_rows(bi) # If we've disabled validation (which is expected to throw an exception # for duplicate variants, we would like to distinc ) - if skip_validation: + if skip_validate_no_duplicate_variants: return mt.distinct_by_row() return mt diff --git a/v03_pipeline/lib/misc/slack.py b/v03_pipeline/lib/misc/slack.py index f8831726b..f7838cdca 100644 --- a/v03_pipeline/lib/misc/slack.py +++ b/v03_pipeline/lib/misc/slack.py @@ -39,7 +39,7 @@ def safe_post_to_slack_failure( message = [ SLACK_FAILURE_MESSAGE_PREFIX, f'Run ID: {run_id}', - str(lpr), + f'```{lpr.model_dump_json(indent=4)}```', f'Reason: {e!s}', ] if FeatureFlag.RUN_PIPELINE_ON_DATAPROC: @@ -55,7 +55,7 @@ def safe_post_to_slack_success(run_id: str, lpr: LoadingPipelineRequest) -> None [ SLACK_SUCCESS_MESSAGE_PREFIX, f'Run ID: {run_id}', - str(lpr), + f'```{lpr.model_dump_json(indent=4)}```', ], ) _safe_post_to_slack(message) diff --git a/v03_pipeline/lib/misc/validation.py b/v03_pipeline/lib/misc/validation.py index 671435e43..455abbb2f 100644 --- a/v03_pipeline/lib/misc/validation.py +++ b/v03_pipeline/lib/misc/validation.py @@ -12,6 +12,7 @@ AMBIGUOUS_THRESHOLD_PERC: float = 0.01 # Fraction of samples identified as "ambiguous_sex" above which an error will be thrown. MIN_ROWS_PER_CONTIG = 100 SAMPLE_TYPE_MATCH_THRESHOLD = 0.3 +ALL_VALIDATIONS = 'all' class SeqrValidationError(Exception): @@ -191,3 +192,12 @@ def validate_sample_type( if has_noncoding and has_coding and sample_type != SampleType.WGS: msg = 'Sample type validation error: dataset sample-type is specified as WES but appears to be WGS because it contains many common non-coding variants' raise SeqrValidationError(msg) + + +SKIPPABLE_VALIDATIONS = [ + validate_allele_depth_length, + validate_allele_type, + validate_expected_contig_frequency, + validate_no_duplicate_variants, + validate_sample_type, +] diff --git a/v03_pipeline/lib/paths.py b/v03_pipeline/lib/paths.py index 57827c68b..b1b857550 100644 --- a/v03_pipeline/lib/paths.py +++ b/v03_pipeline/lib/paths.py @@ -284,6 +284,18 @@ def valid_reference_dataset_path( ) +def ancestry_model_rf_path() -> str: + return os.path.join( + _v03_reference_dataset_prefix( + Env.REFERENCE_DATASETS_DIR, + AccessControl.PUBLIC, + ReferenceGenome.GRCh38, + ), + DatasetType.SNV_INDEL, + 'ancestry_imputation_model.onnx', + ) + + def variant_annotations_table_path( reference_genome: ReferenceGenome, dataset_type: DatasetType, diff --git a/v03_pipeline/lib/tasks/__init__.py b/v03_pipeline/lib/tasks/__init__.py index 5f49f04b7..5526ab392 100644 --- a/v03_pipeline/lib/tasks/__init__.py +++ b/v03_pipeline/lib/tasks/__init__.py @@ -5,7 +5,6 @@ MigrateAllProjectsToClickHouseOnDataprocTask, ) from v03_pipeline.lib.tasks.run_pipeline import RunPipelineTask -from v03_pipeline.lib.tasks.update_project_table import UpdateProjectTableTask from v03_pipeline.lib.tasks.update_variant_annotations_table_with_new_samples import ( UpdateVariantAnnotationsTableWithNewSamplesTask, ) @@ -16,7 +15,6 @@ 'MigrateAllProjectsToClickHouseOnDataprocTask', 'MigrateAllProjectsToClickHouseTask', 'RunPipelineTask', - 'UpdateProjectTableTask', 'UpdateVariantAnnotationsTableWithNewSamplesTask', 'WriteMetadataForRunTask', 'WriteSuccessFileTask', diff --git a/v03_pipeline/lib/tasks/base/base_loading_run_params.py b/v03_pipeline/lib/tasks/base/base_loading_run_params.py index f37c8401f..1327f3e86 100644 --- a/v03_pipeline/lib/tasks/base/base_loading_run_params.py +++ b/v03_pipeline/lib/tasks/base/base_loading_run_params.py @@ -28,10 +28,7 @@ class BaseLoadingRunParams(luigi.Task): default=False, parsing=luigi.BoolParameter.EXPLICIT_PARSING, ) - skip_validation = luigi.BoolParameter( - default=False, - parsing=luigi.BoolParameter.EXPLICIT_PARSING, - ) + validations_to_skip = luigi.ListParameter(default=[]) is_new_gcnv_joint_call = luigi.BoolParameter( default=False, parsing=luigi.BoolParameter.EXPLICIT_PARSING, diff --git a/v03_pipeline/lib/tasks/dataproc/misc_test.py b/v03_pipeline/lib/tasks/dataproc/misc_test.py index 62a0691aa..f505439fb 100644 --- a/v03_pipeline/lib/tasks/dataproc/misc_test.py +++ b/v03_pipeline/lib/tasks/dataproc/misc_test.py @@ -41,8 +41,8 @@ def test_to_kebab_str_args(self, _: Mock): 'False', '--skip-expect-tdr-metrics', 'False', - '--skip-validation', - 'False', + '--validations-to-skip', + '[]', '--is-new-gcnv-joint-call', 'False', '--attempt-id', diff --git a/v03_pipeline/lib/tasks/exports/write_new_entries_parquet_test.py b/v03_pipeline/lib/tasks/exports/write_new_entries_parquet_test.py index e85160a87..397cdf36e 100644 --- a/v03_pipeline/lib/tasks/exports/write_new_entries_parquet_test.py +++ b/v03_pipeline/lib/tasks/exports/write_new_entries_parquet_test.py @@ -5,6 +5,7 @@ import luigi.worker import pandas as pd +from v03_pipeline.lib.misc.validation import ALL_VALIDATIONS from v03_pipeline.lib.model import ( DatasetType, ReferenceGenome, @@ -114,7 +115,7 @@ def test_write_new_entries_parquet(self): sample_type=SampleType.WGS, callset_path=TEST_SNV_INDEL_VCF, project_guids=['R0113_test_project', 'R0114_project4'], - skip_validation=True, + validations_to_skip=[ALL_VALIDATIONS], run_id=TEST_RUN_ID, ) worker.add(task) @@ -214,7 +215,7 @@ def test_mito_write_new_entries_parquet(self): sample_type=SampleType.WGS, callset_path=TEST_MITO_CALLSET, project_guids=['R0116_test_project3'], - skip_validation=True, + validations_to_skip=[ALL_VALIDATIONS], run_id=TEST_RUN_ID, ) worker.add(task) @@ -269,7 +270,7 @@ def test_sv_write_new_entries_parquet(self): sample_type=SampleType.WGS, callset_path=TEST_SV_VCF_2, project_guids=['R0115_test_project2'], - skip_validation=True, + validations_to_skip=[ALL_VALIDATIONS], run_id=TEST_RUN_ID, ) worker.add(task) @@ -352,7 +353,7 @@ def test_gcnv_write_new_entries_parquet(self): sample_type=SampleType.WES, callset_path=TEST_GCNV_BED_FILE, project_guids=['R0115_test_project2'], - skip_validation=True, + validations_to_skip=[ALL_VALIDATIONS], run_id=TEST_RUN_ID, ) worker.add(task) diff --git a/v03_pipeline/lib/tasks/exports/write_new_transcripts_parquet_test.py b/v03_pipeline/lib/tasks/exports/write_new_transcripts_parquet_test.py index 71239a4c3..af5a5cce8 100644 --- a/v03_pipeline/lib/tasks/exports/write_new_transcripts_parquet_test.py +++ b/v03_pipeline/lib/tasks/exports/write_new_transcripts_parquet_test.py @@ -4,6 +4,7 @@ import luigi.worker import pandas as pd +from v03_pipeline.lib.misc.validation import ALL_VALIDATIONS from v03_pipeline.lib.model import ( DatasetType, ReferenceGenome, @@ -87,7 +88,7 @@ def test_write_new_transcripts_parquet_test( project_guids=[ 'fake_project', ], - skip_validation=True, + validations_to_skip=[ALL_VALIDATIONS], run_id=TEST_RUN_ID, ) worker.add(task) @@ -187,7 +188,7 @@ def test_grch37_write_new_transcripts_parquet_test( project_guids=[ 'fake_project', ], - skip_validation=True, + validations_to_skip=[ALL_VALIDATIONS], run_id=TEST_RUN_ID, ) worker.add(task) diff --git a/v03_pipeline/lib/tasks/exports/write_new_variants_parquet_test.py b/v03_pipeline/lib/tasks/exports/write_new_variants_parquet_test.py index 969d255c7..b0b8c819c 100644 --- a/v03_pipeline/lib/tasks/exports/write_new_variants_parquet_test.py +++ b/v03_pipeline/lib/tasks/exports/write_new_variants_parquet_test.py @@ -5,6 +5,7 @@ import luigi.worker import pandas as pd +from v03_pipeline.lib.misc.validation import ALL_VALIDATIONS from v03_pipeline.lib.model import ( DatasetType, ReferenceGenome, @@ -107,7 +108,7 @@ def test_write_new_variants_parquet_test( project_guids=[ 'fake_project', ], - skip_validation=True, + validations_to_skip=[ALL_VALIDATIONS], run_id=TEST_RUN_ID, ) worker.add(task) @@ -231,7 +232,7 @@ def test_grch37_write_new_variants_parquet_test( project_guids=[ 'fake_project', ], - skip_validation=True, + validations_to_skip=[ALL_VALIDATIONS], run_id=TEST_RUN_ID, ) worker.add(task) @@ -342,7 +343,7 @@ def test_mito_write_new_variants_parquet_test( project_guids=[ 'fake_project', ], - skip_validation=True, + validations_to_skip=[ALL_VALIDATIONS], run_id=TEST_RUN_ID, ) worker.add(task) @@ -436,7 +437,7 @@ def test_sv_write_new_variants_parquet_test( project_guids=[ 'fake_project', ], - skip_validation=True, + validations_to_skip=[ALL_VALIDATIONS], run_id=TEST_RUN_ID, ) worker.add(task) @@ -513,7 +514,7 @@ def test_gcnv_write_new_variants_parquet_test( project_guids=[ 'fake_project', ], - skip_validation=True, + validations_to_skip=[ALL_VALIDATIONS], run_id=TEST_RUN_ID, ) worker.add(task) diff --git a/v03_pipeline/lib/tasks/update_project_table.py b/v03_pipeline/lib/tasks/update_project_table.py deleted file mode 100644 index 96fbd1229..000000000 --- a/v03_pipeline/lib/tasks/update_project_table.py +++ /dev/null @@ -1,103 +0,0 @@ -import hail as hl -import luigi -import luigi.util - -from v03_pipeline.lib.annotations.fields import get_fields -from v03_pipeline.lib.misc.family_entries import ( - compute_callset_family_entries_ht, - initialize_project_table, - join_family_entries_hts, - remove_family_guids, -) -from v03_pipeline.lib.misc.io import remap_pedigree_hash -from v03_pipeline.lib.paths import project_pedigree_path, project_table_path -from v03_pipeline.lib.tasks.base.base_loading_run_params import BaseLoadingRunParams -from v03_pipeline.lib.tasks.base.base_update import ( - BaseUpdateTask, -) -from v03_pipeline.lib.tasks.files import GCSorLocalTarget -from v03_pipeline.lib.tasks.write_remapped_and_subsetted_callset import ( - WriteRemappedAndSubsettedCallsetTask, -) - - -@luigi.util.inherits(BaseLoadingRunParams) -class UpdateProjectTableTask(BaseUpdateTask): - project_i = luigi.IntParameter() - - def output(self) -> luigi.Target: - return GCSorLocalTarget( - project_table_path( - self.reference_genome, - self.dataset_type, - self.sample_type, - self.project_guids[self.project_i], - ), - ) - - def complete(self) -> bool: - return super().complete() and hl.eval( - hl.read_table(self.output().path).updates.contains( - hl.Struct( - callset=self.callset_path, - remap_pedigree_hash=remap_pedigree_hash( - project_pedigree_path( - self.reference_genome, - self.dataset_type, - self.sample_type, - self.project_guids[self.project_i], - ), - ), - ), - ), - ) - - def requires(self) -> luigi.Task: - return self.clone(WriteRemappedAndSubsettedCallsetTask) - - def initialize_table(self) -> hl.Table: - return initialize_project_table( - self.reference_genome, - self.dataset_type, - ) - - def update_table(self, ht: hl.Table) -> hl.Table: - callset_mt = hl.read_matrix_table(self.input().path) - callset_ht = compute_callset_family_entries_ht( - self.dataset_type, - callset_mt, - get_fields( - callset_mt, - self.dataset_type.genotype_entry_annotation_fns, - **self.param_kwargs, - ), - ) - # HACK: steal the type from callset_ht when ht is empty. - # This was the least gross way - if 'family_entries' not in ht.row_value: - ht = ht.annotate( - family_entries=hl.missing(callset_ht.family_entries.dtype), - ) - ht = remove_family_guids( - ht, - callset_mt.index_globals().family_samples.key_set(), - ) - ht = join_family_entries_hts(ht, callset_ht) - return ht.select_globals( - family_guids=ht.family_guids, - family_samples=ht.family_samples, - sample_type=self.sample_type.value, - updates=ht.updates.add( - hl.Struct( - callset=self.callset_path, - remap_pedigree_hash=remap_pedigree_hash( - project_pedigree_path( - self.reference_genome, - self.dataset_type, - self.sample_type, - self.project_guids[self.project_i], - ), - ), - ), - ), - ) diff --git a/v03_pipeline/lib/tasks/update_project_table_test.py b/v03_pipeline/lib/tasks/update_project_table_test.py deleted file mode 100644 index 7ddf94a7e..000000000 --- a/v03_pipeline/lib/tasks/update_project_table_test.py +++ /dev/null @@ -1,233 +0,0 @@ -import hail as hl -import luigi.worker - -from v03_pipeline.lib.misc.io import remap_pedigree_hash -from v03_pipeline.lib.model import DatasetType, ReferenceGenome, SampleType -from v03_pipeline.lib.tasks.update_project_table import UpdateProjectTableTask -from v03_pipeline.lib.test.misc import copy_project_pedigree_to_mocked_dir -from v03_pipeline.lib.test.mocked_dataroot_testcase import MockedDatarootTestCase - -TEST_VCF = 'v03_pipeline/var/test/callsets/1kg_30variants.vcf' -TEST_PEDIGREE_3_REMAP = 'v03_pipeline/var/test/pedigrees/test_pedigree_3_remap.tsv' -TEST_PEDIGREE_3_DIFFERENT_FAMILIES = ( - 'v03_pipeline/var/test/pedigrees/test_pedigree_3_different_families.tsv' -) - -TEST_RUN_ID = 'manual__2024-04-03' - - -class UpdateProjectTableTaskTest(MockedDatarootTestCase): - def setUp(self): - super().setUp() - copy_project_pedigree_to_mocked_dir( - TEST_PEDIGREE_3_REMAP, - ReferenceGenome.GRCh38, - DatasetType.SNV_INDEL, - SampleType.WGS, - 'R0113_test_project', - ) - - def test_update_project_table_task(self) -> None: - worker = luigi.worker.Worker() - upt_task = UpdateProjectTableTask( - reference_genome=ReferenceGenome.GRCh38, - dataset_type=DatasetType.SNV_INDEL, - run_id=TEST_RUN_ID, - sample_type=SampleType.WGS, - callset_path=TEST_VCF, - project_guids=['R0113_test_project'], - project_i=0, - skip_validation=True, - ) - worker.add(upt_task) - worker.run() - self.assertTrue(upt_task.complete()) - ht = hl.read_table(upt_task.output().path) - self.assertCountEqual( - ht.globals.collect(), - [ - hl.Struct( - family_guids=['abc_1'], - family_samples={ - 'abc_1': ['HG00731_1', 'HG00732_1', 'HG00733_1'], - }, - sample_type=SampleType.WGS.value, - updates={ - hl.Struct( - callset='v03_pipeline/var/test/callsets/1kg_30variants.vcf', - remap_pedigree_hash=hl.eval( - remap_pedigree_hash( - TEST_PEDIGREE_3_REMAP, - ), - ), - ), - }, - ), - ], - ) - - self.assertCountEqual( - ht.collect()[:2], - [ - hl.Struct( - locus=hl.Locus( - contig='chr1', - position=876499, - reference_genome='GRCh38', - ), - alleles=['A', 'G'], - filters=set(), - family_entries=[ - [ - hl.Struct( - GQ=21, - AB=1.0, - DP=7, - GT=hl.Call(alleles=[1, 1], phased=False), - ), - hl.Struct( - GQ=24, - AB=1.0, - DP=8, - GT=hl.Call(alleles=[1, 1], phased=False), - ), - hl.Struct( - GQ=12, - AB=1.0, - DP=4, - GT=hl.Call(alleles=[1, 1], phased=False), - ), - ], - ], - ), - hl.Struct( - locus=hl.Locus( - contig='chr1', - position=878314, - reference_genome='GRCh38', - ), - alleles=['G', 'C'], - filters={'VQSRTrancheSNP99.00to99.90'}, - family_entries=[ - [ - hl.Struct( - GQ=30, - AB=hl.eval(hl.float32(0.3333333333333333)), - DP=3, - GT=hl.Call(alleles=[0, 1], phased=False), - ), - hl.Struct( - GQ=6, - AB=0.0, - DP=2, - GT=hl.Call(alleles=[0, 0], phased=False), - ), - hl.Struct( - GQ=61, - AB=hl.eval(hl.float32(0.6)), - DP=5, - GT=hl.Call(alleles=[0, 1], phased=False), - ), - ], - ], - ), - ], - ) - - def test_update_project_table_task_different_pedigree(self) -> None: - worker = luigi.worker.Worker() - upt_task = UpdateProjectTableTask( - reference_genome=ReferenceGenome.GRCh38, - dataset_type=DatasetType.SNV_INDEL, - run_id=TEST_RUN_ID, - sample_type=SampleType.WGS, - callset_path=TEST_VCF, - project_guids=['R0113_test_project'], - project_i=0, - skip_validation=True, - ) - worker.add(upt_task) - worker.run() - copy_project_pedigree_to_mocked_dir( - TEST_PEDIGREE_3_DIFFERENT_FAMILIES, - ReferenceGenome.GRCh38, - DatasetType.SNV_INDEL, - SampleType.WGS, - 'R0113_test_project', - ) - - upt_task = UpdateProjectTableTask( - reference_genome=ReferenceGenome.GRCh38, - dataset_type=DatasetType.SNV_INDEL, - run_id=TEST_RUN_ID + '-a', # cache bust - sample_type=SampleType.WGS, - callset_path=TEST_VCF, - project_guids=['R0113_test_project'], - project_i=0, - skip_validation=True, - ) - worker.add(upt_task) - worker.run() - self.assertTrue(upt_task.complete()) - worker.add(upt_task) - worker.run() - ht = hl.read_table(upt_task.output().path) - self.assertCountEqual( - ht.globals.collect(), - [ - hl.Struct( - family_guids=['abc_1'], - family_samples={ - 'abc_1': ['HG00731_1', 'HG00733_1'], - }, - sample_type=SampleType.WGS.value, - updates={ - hl.Struct( - callset='v03_pipeline/var/test/callsets/1kg_30variants.vcf', - remap_pedigree_hash=hl.eval( - remap_pedigree_hash( - TEST_PEDIGREE_3_REMAP, - ), - ), - ), - hl.Struct( - callset='v03_pipeline/var/test/callsets/1kg_30variants.vcf', - remap_pedigree_hash=hl.eval( - remap_pedigree_hash( - TEST_PEDIGREE_3_DIFFERENT_FAMILIES, - ), - ), - ), - }, - ), - ], - ) - - self.assertCountEqual( - ht.collect()[0], - hl.Struct( - locus=hl.Locus( - contig='chr1', - position=876499, - reference_genome='GRCh38', - ), - alleles=['A', 'G'], - filters=set(), - family_entries=[ - [ - hl.Struct( - GQ=21, - AB=1.0, - DP=7, - GT=hl.Call(alleles=[1, 1], phased=False), - ), - hl.Struct( - GQ=12, - AB=1.0, - DP=4, - GT=hl.Call(alleles=[1, 1], phased=False), - ), - ], - ], - ), - ) diff --git a/v03_pipeline/lib/tasks/update_variant_annotations_table_with_new_samples_test.py b/v03_pipeline/lib/tasks/update_variant_annotations_table_with_new_samples_test.py index a7ca987c0..8229e872b 100644 --- a/v03_pipeline/lib/tasks/update_variant_annotations_table_with_new_samples_test.py +++ b/v03_pipeline/lib/tasks/update_variant_annotations_table_with_new_samples_test.py @@ -19,7 +19,12 @@ TRANSCRIPT_CONSEQUENCE_TERMS, ) from v03_pipeline.lib.misc.io import remap_pedigree_hash -from v03_pipeline.lib.misc.validation import validate_expected_contig_frequency +from v03_pipeline.lib.misc.validation import ( + ALL_VALIDATIONS, + validate_allele_type, + validate_expected_contig_frequency, + validate_no_duplicate_variants, +) from v03_pipeline.lib.model import ( DatasetType, ReferenceGenome, @@ -86,7 +91,7 @@ def test_missing_pedigree( sample_type=SampleType.WGS, callset_path=TEST_SNV_INDEL_VCF, project_guids=['R0113_test_project'], - skip_validation=True, + validations_to_skip=[ALL_VALIDATIONS], run_id=TEST_RUN_ID, ) worker = luigi.worker.Worker() @@ -116,7 +121,7 @@ def test_missing_interval_reference_dataset( sample_type=SampleType.WGS, callset_path=TEST_SNV_INDEL_VCF, project_guids=['R0113_test_project'], - skip_validation=True, + validations_to_skip=[ALL_VALIDATIONS], run_id=TEST_RUN_ID, ) worker = luigi.worker.Worker() @@ -129,8 +134,12 @@ def test_missing_interval_reference_dataset( ) @patch('v03_pipeline.lib.tasks.update_new_variants_with_caids.Env') @patch( - 'v03_pipeline.lib.tasks.validate_callset.validate_expected_contig_frequency', - partial(validate_expected_contig_frequency, min_rows_per_contig=25), + 'v03_pipeline.lib.tasks.validate_callset.SKIPPABLE_VALIDATIONS', + [ + validate_allele_type, + validate_no_duplicate_variants, + partial(validate_expected_contig_frequency, min_rows_per_contig=25), + ], ) @patch.object(ReferenceGenome, 'standard_contigs', new_callable=PropertyMock) @patch('v03_pipeline.lib.vep.hl.vep') @@ -270,7 +279,7 @@ def test_multiple_update_vat( sample_type=SampleType.WGS, callset_path=TEST_SNV_INDEL_VCF, project_guids=['R0113_test_project'], - skip_validation=False, + validations_to_skip=[], run_id=TEST_RUN_ID, ) worker.add(uvatwns_task_3) @@ -331,7 +340,7 @@ def test_multiple_update_vat( sample_type=SampleType.WGS, callset_path=TEST_SNV_INDEL_VCF, project_guids=['R0114_project4'], - skip_validation=False, + validations_to_skip=[], run_id=TEST_RUN_ID + '-another-run', ) worker.add(uvatwns_task_4) @@ -529,7 +538,7 @@ def test_update_vat_grch37( sample_type=SampleType.WGS, callset_path=TEST_SNV_INDEL_VCF, project_guids=['R0113_test_project'], - skip_validation=True, + validations_to_skip=[ALL_VALIDATIONS], run_id=TEST_RUN_ID, ) worker.add(uvatwns_task) @@ -685,7 +694,7 @@ def test_update_vat_without_accessing_private_datasets( sample_type=SampleType.WGS, callset_path=TEST_SNV_INDEL_VCF, project_guids=['R0113_test_project'], - skip_validation=True, + validations_to_skip=[ALL_VALIDATIONS], run_id=TEST_RUN_ID, ) worker.add(uvatwns_task) @@ -733,7 +742,7 @@ def test_mito_update_vat( sample_type=SampleType.WGS, callset_path=TEST_MITO_MT, project_guids=['R0115_test_project2'], - skip_validation=True, + validations_to_skip=[ALL_VALIDATIONS], run_id=TEST_RUN_ID, ) ) @@ -841,7 +850,7 @@ def test_sv_multiple_vcf_update_vat( sample_type=SampleType.WGS, callset_path=TEST_SV_VCF, project_guids=['R0115_test_project2'], - skip_validation=True, + validations_to_skip=[ALL_VALIDATIONS], run_id=TEST_RUN_ID, ) ) @@ -1301,7 +1310,7 @@ def test_sv_multiple_vcf_update_vat( sample_type=SampleType.WGS, callset_path=TEST_SV_VCF_2, project_guids=['R0115_test_project2'], - skip_validation=True, + validations_to_skip=[ALL_VALIDATIONS], run_id='second_run_id', ) ) @@ -1421,7 +1430,7 @@ def test_sv_multiple_project_single_vcf( sample_type=SampleType.WGS, callset_path=TEST_SV_VCF_2, project_guids=['R0116_test_project3', 'R0117_test_project4'], - skip_validation=True, + validations_to_skip=[ALL_VALIDATIONS], run_id='run_id', ) ) @@ -1453,7 +1462,7 @@ def test_gcnv_update_vat_multiple( sample_type=SampleType.WES, callset_path=TEST_GCNV_BED_FILE, project_guids=['R0115_test_project2'], - skip_validation=True, + validations_to_skip=[ALL_VALIDATIONS], run_id=TEST_RUN_ID, ) ) @@ -1592,7 +1601,7 @@ def test_gcnv_update_vat_multiple( sample_type=SampleType.WES, callset_path=TEST_GCNV_BED_FILE_2, project_guids=['R0115_test_project2'], - skip_validation=True, + validations_to_skip=[ALL_VALIDATIONS], run_id='second_run_id', ) ) diff --git a/v03_pipeline/lib/tasks/validate_callset.py b/v03_pipeline/lib/tasks/validate_callset.py index e8f5d6f34..197959512 100644 --- a/v03_pipeline/lib/tasks/validate_callset.py +++ b/v03_pipeline/lib/tasks/validate_callset.py @@ -3,12 +3,9 @@ import luigi.util from v03_pipeline.lib.misc.validation import ( + ALL_VALIDATIONS, + SKIPPABLE_VALIDATIONS, SeqrValidationError, - validate_allele_depth_length, - validate_allele_type, - validate_expected_contig_frequency, - validate_no_duplicate_variants, - validate_sample_type, ) from v03_pipeline.lib.paths import ( imported_callset_path, @@ -31,6 +28,22 @@ @luigi.util.inherits(BaseLoadingRunParams) class ValidateCallsetTask(BaseUpdateTask): + @property + def validation_dependencies(self) -> dict[str, hl.Table]: + deps = {} + if ( + ALL_VALIDATIONS not in self.validations_to_skip + and 'validate_sample_type' not in self.validations_to_skip + and self.dataset_type.can_run_validation + ): + deps['coding_and_noncoding_variants_ht'] = hl.read_table( + valid_reference_dataset_path( + self.reference_genome, + ReferenceDataset.gnomad_coding_and_noncoding, + ), + ) + return deps + def complete(self) -> luigi.Target: if super().complete(): mt = hl.read_matrix_table(self.output().path) @@ -50,7 +63,11 @@ def output(self) -> luigi.Target: def requires(self) -> list[luigi.Task]: requirements = [self.clone(WriteImportedCallsetTask)] - if not self.skip_validation and self.dataset_type.can_run_validation: + if ( + ALL_VALIDATIONS not in self.validations_to_skip + and 'validate_sample_type' not in self.validations_to_skip + and self.dataset_type.can_run_validation + ): requirements = [ *requirements, ( @@ -86,30 +103,22 @@ def update_table(self, mt: hl.MatrixTable) -> hl.MatrixTable: & (hl.len(mt.alleles[1]) < MAX_SNV_INDEL_ALLELE_LENGTH) ), ) - - validation_exceptions = [] - if self.skip_validation or not self.dataset_type.can_run_validation: + if ( + ALL_VALIDATIONS in self.validations_to_skip + or not self.dataset_type.can_run_validation + ): return mt.select_globals( callset_path=self.callset_path, validated_sample_type=self.sample_type.value, ) - coding_and_noncoding_variants_ht = hl.read_table( - valid_reference_dataset_path( - self.reference_genome, - ReferenceDataset.gnomad_coding_and_noncoding, - ), - ) - for validation_f in [ - validate_allele_depth_length, - validate_allele_type, - validate_no_duplicate_variants, - validate_expected_contig_frequency, - validate_sample_type, - ]: + validation_exceptions = [] + for validation_f in SKIPPABLE_VALIDATIONS: try: + if validation_f in self.validations_to_skip: + continue validation_f( mt, - coding_and_noncoding_variants_ht=coding_and_noncoding_variants_ht, + **self.validation_dependencies, **self.param_kwargs, ) except SeqrValidationError as e: diff --git a/v03_pipeline/lib/tasks/validate_callset_test.py b/v03_pipeline/lib/tasks/validate_callset_test.py index 8f3638376..2c5f7a490 100644 --- a/v03_pipeline/lib/tasks/validate_callset_test.py +++ b/v03_pipeline/lib/tasks/validate_callset_test.py @@ -55,7 +55,7 @@ def test_validate_callset_multiple_exceptions( # all contigs but chr1, and contains non-coding variants. callset_path=MULTIPLE_VALIDATION_EXCEPTIONS_VCF, project_guids=['project_a'], - skip_validation=False, + validations_to_skip=[], run_id=TEST_RUN_ID, ) worker.add(validate_callset_task) @@ -68,7 +68,7 @@ def test_validate_callset_multiple_exceptions( sample_type=SampleType.WES, callset_path=MULTIPLE_VALIDATION_EXCEPTIONS_VCF, project_guids=['project_a'], - skip_validation=False, + validations_to_skip=[], run_id=TEST_RUN_ID, ) self.assertTrue(write_validation_errors_task.complete()) @@ -79,8 +79,8 @@ def test_validate_callset_multiple_exceptions( 'project_guids': ['project_a'], 'error_messages': [ 'Alleles with invalid allele are present in the callset. This appears to be a GVCF containing records for sites with no variants.', - "Variants are present multiple times in the callset: ['1-902088-G-A']", 'Missing the following expected contigs:chr10, chr11, chr12, chr13, chr14, chr15, chr16, chr17, chr18, chr19, chr2, chr20, chr21, chr22, chr3, chr4, chr5, chr6, chr7, chr8, chr9, chrX', + "Variants are present multiple times in the callset: ['1-902088-G-A']", 'Sample type validation error: dataset sample-type is specified as WES but appears to be WGS because it contains many common non-coding variants', ], }, diff --git a/v03_pipeline/lib/tasks/write_imported_callset.py b/v03_pipeline/lib/tasks/write_imported_callset.py index e4df70899..6a3230197 100644 --- a/v03_pipeline/lib/tasks/write_imported_callset.py +++ b/v03_pipeline/lib/tasks/write_imported_callset.py @@ -85,7 +85,10 @@ def create_table(self) -> hl.MatrixTable: ) if self.dataset_type.has_multi_allelic_variants: # NB: throws SeqrValidationError - mt = split_multi_hts(mt, self.skip_validation) + mt = split_multi_hts( + mt, + 'validate_no_duplicate_variants' in self.validations_to_skip, + ) if self.dataset_type.re_key_by_seqr_internal_truth_vid and hasattr( mt, 'info.SEQR_INTERNAL_TRUTH_VID', diff --git a/v03_pipeline/lib/tasks/write_metadata_for_run_test.py b/v03_pipeline/lib/tasks/write_metadata_for_run_test.py index 877239d08..88ff857dd 100644 --- a/v03_pipeline/lib/tasks/write_metadata_for_run_test.py +++ b/v03_pipeline/lib/tasks/write_metadata_for_run_test.py @@ -4,6 +4,7 @@ import luigi.worker +from v03_pipeline.lib.misc.validation import ALL_VALIDATIONS from v03_pipeline.lib.model import DatasetType, ReferenceGenome, SampleType from v03_pipeline.lib.paths import relatedness_check_tsv_path from v03_pipeline.lib.tasks.write_metadata_for_run import WriteMetadataForRunTask @@ -54,7 +55,7 @@ def test_write_metadata_for_run_task( sample_type=SampleType.WGS, callset_path=TEST_VCF, project_guids=['R0113_test_project', 'R0114_project4'], - skip_validation=True, + validations_to_skip=[ALL_VALIDATIONS], run_id='run_123456', ) worker.add(write_metadata_for_run_task) diff --git a/v03_pipeline/lib/tasks/write_remapped_and_subsetted_callset_test.py b/v03_pipeline/lib/tasks/write_remapped_and_subsetted_callset_test.py index 9add7d3a2..ca6b32193 100644 --- a/v03_pipeline/lib/tasks/write_remapped_and_subsetted_callset_test.py +++ b/v03_pipeline/lib/tasks/write_remapped_and_subsetted_callset_test.py @@ -6,6 +6,7 @@ import luigi.worker from v03_pipeline.lib.misc.io import remap_pedigree_hash +from v03_pipeline.lib.misc.validation import ALL_VALIDATIONS from v03_pipeline.lib.model import DatasetType, ReferenceGenome, SampleType from v03_pipeline.lib.paths import ( relatedness_check_table_path, @@ -101,7 +102,7 @@ def test_write_remapped_and_subsetted_callset_task( callset_path=TEST_VCF, project_guids=['R0113_test_project'], project_i=0, - skip_validation=True, + validations_to_skip=[ALL_VALIDATIONS], skip_expect_tdr_metrics=True, ) worker.add(wrsc_task) @@ -151,7 +152,7 @@ def test_write_remapped_and_subsetted_callset_task_failed_some_family_checks( callset_path=TEST_VCF, project_guids=['R0114_project4'], project_i=0, - skip_validation=True, + validations_to_skip=[ALL_VALIDATIONS], skip_expect_tdr_metrics=True, ) worker.add(wrsc_task) @@ -244,7 +245,7 @@ def test_write_remapped_and_subsetted_callset_task_all_families_failed( callset_path=TEST_VCF, project_guids=['R0114_project4'], project_i=0, - skip_validation=True, + validations_to_skip=[ALL_VALIDATIONS], skip_expect_tdr_metrics=True, ) worker.add(wrsc_task) @@ -256,7 +257,7 @@ def test_write_remapped_and_subsetted_callset_task_all_families_failed( sample_type=SampleType.WES, callset_path=TEST_VCF, project_guids=['R0114_project4'], - skip_validation=True, + validations_to_skip=[ALL_VALIDATIONS], run_id=TEST_RUN_ID, ) self.assertTrue(write_validation_errors_task.complete()) diff --git a/v03_pipeline/lib/tasks/write_sample_qc_json.py b/v03_pipeline/lib/tasks/write_sample_qc_json.py index e6a85c6a7..9fd030c72 100644 --- a/v03_pipeline/lib/tasks/write_sample_qc_json.py +++ b/v03_pipeline/lib/tasks/write_sample_qc_json.py @@ -5,12 +5,21 @@ import hailtop.fs as hfs import luigi import luigi.util +import onnx from v03_pipeline.lib.methods.sample_qc import call_sample_qc from v03_pipeline.lib.misc.io import import_tdr_qc_metrics -from v03_pipeline.lib.paths import sample_qc_json_path, tdr_metrics_dir +from v03_pipeline.lib.paths import ( + ancestry_model_rf_path, + sample_qc_json_path, + tdr_metrics_dir, +) +from v03_pipeline.lib.reference_datasets.reference_dataset import ReferenceDataset from v03_pipeline.lib.tasks.base.base_loading_run_params import BaseLoadingRunParams -from v03_pipeline.lib.tasks.files import GCSorLocalTarget +from v03_pipeline.lib.tasks.files import GCSorLocalTarget, RawFileTask +from v03_pipeline.lib.tasks.reference_data.updated_reference_dataset import ( + UpdatedReferenceDatasetTask, +) from v03_pipeline.lib.tasks.validate_callset import ValidateCallsetTask from v03_pipeline.lib.tasks.write_tdr_metrics_files import WriteTDRMetricsFilesTask @@ -27,11 +36,18 @@ def output(self) -> luigi.Target: ) def requires(self): - return [self.clone(ValidateCallsetTask), self.clone(WriteTDRMetricsFilesTask)] + return [ + self.clone(ValidateCallsetTask), + self.clone(WriteTDRMetricsFilesTask), + self.clone( + UpdatedReferenceDatasetTask, + reference_dataset=ReferenceDataset.gnomad_qc, + ), + RawFileTask(ancestry_model_rf_path()), + ] def run(self): callset_mt = hl.read_matrix_table(self.input()[0].path) - tdr_metrics_ht = None for tdr_metrics_file in hfs.ls( tdr_metrics_dir(self.reference_genome, self.dataset_type), @@ -42,10 +58,14 @@ def run(self): tdr_metrics_ht = tdr_metrics_ht.union( import_tdr_qc_metrics(tdr_metrics_file.path), ) - + pop_pca_loadings_ht = hl.read_table(self.input()[2].path) + with hl.hadoop_open(self.input()[3].path, 'rb') as f: + ancestry_rf_model = onnx.load(f) callset_mt = call_sample_qc( callset_mt, tdr_metrics_ht, + pop_pca_loadings_ht, + ancestry_rf_model, self.sample_type, ) ht = callset_mt.cols() diff --git a/v03_pipeline/lib/tasks/write_sample_qc_json_test.py b/v03_pipeline/lib/tasks/write_sample_qc_json_test.py index 56f1a9f1a..e812fd8b0 100644 --- a/v03_pipeline/lib/tasks/write_sample_qc_json_test.py +++ b/v03_pipeline/lib/tasks/write_sample_qc_json_test.py @@ -1,14 +1,20 @@ import json +import os +import shutil from unittest.mock import Mock, patch import hail as hl import hailtop.fs as hfs import luigi.worker +from v03_pipeline.lib.misc.validation import ALL_VALIDATIONS from v03_pipeline.lib.model import DatasetType, ReferenceGenome, SampleType +from v03_pipeline.lib.paths import ancestry_model_rf_path from v03_pipeline.lib.tasks.write_sample_qc_json import WriteSampleQCJsonTask from v03_pipeline.lib.test.mock_complete_task import MockCompleteTask -from v03_pipeline.lib.test.mocked_dataroot_testcase import MockedDatarootTestCase +from v03_pipeline.lib.test.mocked_reference_datasets_testcase import ( + MockedReferenceDatasetsTestCase, +) TEST_VCF = 'v03_pipeline/var/test/callsets/1kg_30variants.vcf' TEST_TDR_METRICS_FILE = 'v03_pipeline/var/test/tdr_metrics.tsv' @@ -31,22 +37,23 @@ ) -class WriteSampleQCJsonTaskTest(MockedDatarootTestCase): - @patch( - 'v03_pipeline.lib.methods.sample_qc.ANCESTRY_RF_MODEL_PATH', - TEST_ANCESTRY_IMPUTATION_MODEL_PATH, - ) +class WriteSampleQCJsonTaskTest(MockedReferenceDatasetsTestCase): @patch('v03_pipeline.lib.methods.sample_qc.assign_population_pcs') - @patch('v03_pipeline.lib.methods.sample_qc._get_pop_pca_scores') + @patch('v03_pipeline.lib.methods.sample_qc.pc_project') @patch('v03_pipeline.lib.tasks.write_sample_qc_json.WriteTDRMetricsFilesTask') @patch('v03_pipeline.lib.tasks.write_sample_qc_json.hfs.ls') def test_call_sample_qc( self, mock_ls_tdr_dir: Mock, mock_tdr_task: Mock, - mock_get_pop_pca_scores: Mock, + mock_pc_project: Mock, mock_assign_population_pcs: Mock, ) -> None: + os.makedirs( + os.path.dirname(ancestry_model_rf_path()), + exist_ok=True, + ) + shutil.copy2(TEST_ANCESTRY_IMPUTATION_MODEL_PATH, ancestry_model_rf_path()) mock_tdr_task.return_value = MockCompleteTask() mock_tdr_table = Mock() mock_tdr_table.path = TEST_TDR_METRICS_FILE @@ -55,19 +62,17 @@ def test_call_sample_qc( # object being used globally. mock_tdr_table.size = 10 mock_ls_tdr_dir.return_value = [mock_tdr_table] - mock_get_pop_pca_scores.return_value = hl.Table.parallelize( + mock_pc_project.return_value = hl.Table.parallelize( [ { 's': sample_id, 'scores': PCA_SCORES, - 'known_pop': 'Unknown', } for sample_id in ('HG00731', 'HG00732', 'HG00733', 'NA19675') ], hl.tstruct( s=hl.tstr, scores=hl.tarray(hl.tfloat64), - known_pop=hl.tstr, ), key='s', ) @@ -109,7 +114,7 @@ def test_call_sample_qc( sample_type=SampleType.WGS, callset_path=TEST_VCF, project_guids=['R0113_test_project'], - skip_validation=True, + validations_to_skip=[ALL_VALIDATIONS], ) worker.add(task) worker.run() diff --git a/v03_pipeline/lib/tasks/write_variant_annotations_vcf_test.py b/v03_pipeline/lib/tasks/write_variant_annotations_vcf_test.py index 9036e788c..0133df1c7 100644 --- a/v03_pipeline/lib/tasks/write_variant_annotations_vcf_test.py +++ b/v03_pipeline/lib/tasks/write_variant_annotations_vcf_test.py @@ -4,6 +4,7 @@ import hailtop.fs as hfs import luigi.worker +from v03_pipeline.lib.misc.validation import ALL_VALIDATIONS from v03_pipeline.lib.model import DatasetType, ReferenceGenome, SampleType from v03_pipeline.lib.tasks.update_variant_annotations_table_with_new_samples import ( UpdateVariantAnnotationsTableWithNewSamplesTask, @@ -64,7 +65,7 @@ def test_sv_export_vcf( sample_type=SampleType.WGS, callset_path=TEST_SV_VCF, project_guids=['R0115_test_project2'], - skip_validation=True, + validations_to_skip=[ALL_VALIDATIONS], skip_expect_tdr_metrics=True, ) )