Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added v03_pipeline/ops/__init__.py
Empty file.
110 changes: 110 additions & 0 deletions v03_pipeline/ops/repartition_clickhouse_grch38_snv_indel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
####################################################
#
# This script is provided as means to repartition an existing `GRCh38/SNV_INDEL/entries` table
# from a project-only partitioning strategy to one that includes project subpartitions.
# Very large genome projects with thousands of families will require additional
# splitting to maintain reasonable loading performance and to keep partition size under
# the recommended ClickHouse maximum. Unfortunately, a ClickHouse table partition definition
# is static upon creation, necessitating the expensive table re-write process demonstrated below.
#
# Post seqr-platform version x.x.x, the subpartition-ing strategy is provided by default;
# earlier installations will continue to function as-is. This script is meant to
# be run under human supervision and with caution.
#
# At the end of this script, you should run the following SQL to finalize the migration:
# EXCHANGE TABLES seqr.'GRCh38/SNV_INDEL/entries' AND staging_grch38_snvindel_repartition.'GRCh38/SNV_INDEL/repartitioned_entries'
# DROP DATABASE `staging_grch38_snvindel_repartition`;
#
# Resource Requirements:
# - Free disk space equal to 2.5x the usage of your current `GRCh38/SNV_INDEL/entries` table.
#
####################################################
import argparse

from v03_pipeline.lib.core.environment import Env
from v03_pipeline.lib.misc.clickhouse import (
logged_query,
normalize_partition,
)

REPARTITION_DATABASE_NAME = 'staging_grch38_snvindel_repartition'


def get_partitions_for_project(project_guid: str):
rows = logged_query(
"""
SELECT DISTINCT partition
FROM system.parts
WHERE
database = %(database)s
AND table = %(table)s
AND partition like %(project_guid)s
""",
{
'database': REPARTITION_DATABASE_NAME,
'table': 'GRCh38/SNV_INDEL/repartitioned_entries',
'project_guid': f'%{project_guid}%',
},
)
return [normalize_partition(row[0]) for row in rows]


def main(max_insert_threads: int, project_guids: list[str]):
logged_query(
f"""
CREATE DATABASE IF NOT EXISTS {REPARTITION_DATABASE_NAME};
""",
)
logged_query(
f"""
CREATE TABLE IF NOT EXISTS {REPARTITION_DATABASE_NAME}.`GRCh38/SNV_INDEL/repartitioned_entries`
AS {Env.CLICKHOUSE_DATABASE}.`GRCh38/SNV_INDEL/entries` PARTITION BY (project_guid, partition_id)
""",
)
if not project_guids:
project_guids = [
x[0]
for x in logged_query(
f"""
SELECT DISTINCT project_guid from {Env.CLICKHOUSE_DATABASE}.`GRCh38/SNV_INDEL/entries`
""",
)
]
for project_guid in project_guids:
for partition in get_partitions_for_project(
project_guid,
):
logged_query(
f"""
ALTER TABLE {REPARTITION_DATABASE_NAME}.`GRCh38/SNV_INDEL/repartitioned_entries`
DROP PARTITION %(partition)s
""",
{'partition': partition},
)
logged_query(
f"""
INSERT INTO {REPARTITION_DATABASE_NAME}.`GRCh38/SNV_INDEL/repartitioned_entries`
SELECT * FROM {Env.CLICKHOUSE_DATABASE}.`GRCh38/SNV_INDEL/entries`
WHERE project_guid=%(project_guid)s
SETTINGS max_insert_threads=%(max_insert_threads)s
""",
{'project_guid': project_guid, 'max_insert_threads': max_insert_threads},
)


if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
'--max-insert-threads',
type=int,
default=4,
help='Maximum number of insert threads to use (default: 4).',
)
parser.add_argument(
'--project-guids',
nargs='+',
required=False,
help='Optionally provide an override list of project guids: --project-guids proj1 proj2 proj3',
)
args = parser.parse_args()
main(args.max_insert_threads, args.project_guids)
120 changes: 120 additions & 0 deletions v03_pipeline/ops/repartition_clickhouse_grch38_snv_indel_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import unittest

from v03_pipeline.lib.core.environment import Env
from v03_pipeline.lib.misc.clickhouse import get_clickhouse_client
from v03_pipeline.ops.repartition_clickhouse_grch38_snv_indel import (
REPARTITION_DATABASE_NAME,
main,
)


class RepartitionGRCh38SnvIndelTest(unittest.TestCase):
def setUp(self):
client = get_clickhouse_client()
client.execute(
f"""
DROP DATABASE IF EXISTS {Env.CLICKHOUSE_DATABASE}
PARALLEL WITH
DROP DATABASE IF EXISTS {REPARTITION_DATABASE_NAME};
""",
)
client.execute(
f"""
CREATE DATABASE {Env.CLICKHOUSE_DATABASE}
PARALLEL WITH
CREATE DATABASE {REPARTITION_DATABASE_NAME};
""",
)
client.execute(
f"""
CREATE TABLE {Env.CLICKHOUSE_DATABASE}.`GRCh38/SNV_INDEL/entries` (
`key` UInt32,
`project_guid` LowCardinality(String),
`family_guid` String,
`is_annotated_in_any_gene` Boolean,
`sign` Int8,
`n_partitions` UInt8 MATERIALIZED 2,
`partition_id` UInt8 MATERIALIZED farmHash64(family_guid) % n_partitions,
PROJECTION xpos_projection
(
SELECT *
ORDER BY is_annotated_in_any_gene
)
)
ENGINE = CollapsingMergeTree(sign)
PARTITION BY project_guid
ORDER BY (project_guid, family_guid, key)
SETTINGS deduplicate_merge_projection_mode = 'rebuild';
""",
)
client.execute(
f"""
INSERT INTO {Env.CLICKHOUSE_DATABASE}.`GRCh38/SNV_INDEL/entries`
VALUES
(0, 'project_a', 'family_a1', 0, 1),
(1, 'project_a', 'family_a2', 0, 1),
(2, 'project_a', 'family_a3', 0, 1),
(0, 'project_b', 'family_b1', 0, 1),
(1, 'project_b', 'family_b2', 0, 1),
(2, 'project_b', 'family_b2', 0, 1),
(0, 'project_c', 'family_c1', 1, 1),
(3, 'project_c', 'family_c2', 1, 1),
""",
)
client.execute(
f"""
CREATE DICTIONARY {Env.CLICKHOUSE_DATABASE}.`GRCh38/SNV_INDEL/project_partitions_dict`
(
`project_guid` String,
`n_partitions` UInt8
)
PRIMARY KEY project_guid
SOURCE(
CLICKHOUSE(
USER '{Env.CLICKHOUSE_WRITER_USER}' PASSWORD '{Env.CLICKHOUSE_WRITER_PASSWORD}'
DB {Env.CLICKHOUSE_DATABASE} QUERY 'SELECT project_guid, 3 FROM `GRCh38/SNV_INDEL/entries`'
)
)
LIFETIME(0)
LAYOUT(FLAT(MAX_ARRAY_SIZE 10000))
""",
)

def test_main_all_projects(self):
client = get_clickhouse_client()
main(1, [])
res = client.execute(
f"""
SELECT *, n_partitions, partition_id FROM {REPARTITION_DATABASE_NAME}.`GRCh38/SNV_INDEL/repartitioned_entries`
""",
)
self.assertCountEqual(
res,
[
(3, 'project_c', 'family_c2', True, 1, 2, 1),
(0, 'project_b', 'family_b1', 0, 1, 2, 1),
(1, 'project_b', 'family_b2', 0, 1, 2, 1),
(2, 'project_b', 'family_b2', 0, 1, 2, 1),
(2, 'project_a', 'family_a3', 0, 1, 2, 0),
(0, 'project_a', 'family_a1', 0, 1, 2, 1),
(1, 'project_a', 'family_a2', 0, 1, 2, 1),
(0, 'project_c', 'family_c1', True, 1, 2, 0),
],
)

def test_main_one_project(self):
client = get_clickhouse_client()
main(1, ['project_a'])
res = client.execute(
f"""
SELECT *, n_partitions, partition_id FROM {REPARTITION_DATABASE_NAME}.`GRCh38/SNV_INDEL/repartitioned_entries`
""",
)
self.assertCountEqual(
res,
[
(2, 'project_a', 'family_a3', 0, 1, 2, 0),
(0, 'project_a', 'family_a1', 0, 1, 2, 1),
(1, 'project_a', 'family_a2', 0, 1, 2, 1),
],
)