Skip to content

Commit aa5c691

Browse files
authored
chore: tooling for repartitioning (#1187)
* tooling for repartitioning * lint * test scaffold * ops * ruff * fix up * cleanup and tests passing * ruff * optionally!
1 parent 5cb46f0 commit aa5c691

File tree

3 files changed

+230
-0
lines changed

3 files changed

+230
-0
lines changed

v03_pipeline/ops/__init__.py

Whitespace-only changes.
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
####################################################
2+
#
3+
# This script is provided as means to repartition an existing `GRCh38/SNV_INDEL/entries` table
4+
# from a project-only partitioning strategy to one that includes project subpartitions.
5+
# Very large genome projects with thousands of families will require additional
6+
# splitting to maintain reasonable loading performance and to keep partition size under
7+
# the recommended ClickHouse maximum. Unfortunately, a ClickHouse table partition definition
8+
# is static upon creation, necessitating the expensive table re-write process demonstrated below.
9+
#
10+
# Post seqr-platform version x.x.x, the subpartition-ing strategy is provided by default;
11+
# earlier installations will continue to function as-is. This script is meant to
12+
# be run under human supervision and with caution.
13+
#
14+
# At the end of this script, you should run the following SQL to finalize the migration:
15+
# EXCHANGE TABLES seqr.'GRCh38/SNV_INDEL/entries' AND staging_grch38_snvindel_repartition.'GRCh38/SNV_INDEL/repartitioned_entries'
16+
# DROP DATABASE `staging_grch38_snvindel_repartition`;
17+
#
18+
# Resource Requirements:
19+
# - Free disk space equal to 2.5x the usage of your current `GRCh38/SNV_INDEL/entries` table.
20+
#
21+
####################################################
22+
import argparse
23+
24+
from v03_pipeline.lib.core.environment import Env
25+
from v03_pipeline.lib.misc.clickhouse import (
26+
logged_query,
27+
normalize_partition,
28+
)
29+
30+
REPARTITION_DATABASE_NAME = 'staging_grch38_snvindel_repartition'
31+
32+
33+
def get_partitions_for_project(project_guid: str):
34+
rows = logged_query(
35+
"""
36+
SELECT DISTINCT partition
37+
FROM system.parts
38+
WHERE
39+
database = %(database)s
40+
AND table = %(table)s
41+
AND partition like %(project_guid)s
42+
""",
43+
{
44+
'database': REPARTITION_DATABASE_NAME,
45+
'table': 'GRCh38/SNV_INDEL/repartitioned_entries',
46+
'project_guid': f'%{project_guid}%',
47+
},
48+
)
49+
return [normalize_partition(row[0]) for row in rows]
50+
51+
52+
def main(max_insert_threads: int, project_guids: list[str]):
53+
logged_query(
54+
f"""
55+
CREATE DATABASE IF NOT EXISTS {REPARTITION_DATABASE_NAME};
56+
""",
57+
)
58+
logged_query(
59+
f"""
60+
CREATE TABLE IF NOT EXISTS {REPARTITION_DATABASE_NAME}.`GRCh38/SNV_INDEL/repartitioned_entries`
61+
AS {Env.CLICKHOUSE_DATABASE}.`GRCh38/SNV_INDEL/entries` PARTITION BY (project_guid, partition_id)
62+
""",
63+
)
64+
if not project_guids:
65+
project_guids = [
66+
x[0]
67+
for x in logged_query(
68+
f"""
69+
SELECT DISTINCT project_guid from {Env.CLICKHOUSE_DATABASE}.`GRCh38/SNV_INDEL/entries`
70+
""",
71+
)
72+
]
73+
for project_guid in project_guids:
74+
for partition in get_partitions_for_project(
75+
project_guid,
76+
):
77+
logged_query(
78+
f"""
79+
ALTER TABLE {REPARTITION_DATABASE_NAME}.`GRCh38/SNV_INDEL/repartitioned_entries`
80+
DROP PARTITION %(partition)s
81+
""",
82+
{'partition': partition},
83+
)
84+
logged_query(
85+
f"""
86+
INSERT INTO {REPARTITION_DATABASE_NAME}.`GRCh38/SNV_INDEL/repartitioned_entries`
87+
SELECT * FROM {Env.CLICKHOUSE_DATABASE}.`GRCh38/SNV_INDEL/entries`
88+
WHERE project_guid=%(project_guid)s
89+
SETTINGS max_insert_threads=%(max_insert_threads)s
90+
""",
91+
{'project_guid': project_guid, 'max_insert_threads': max_insert_threads},
92+
)
93+
94+
95+
if __name__ == '__main__':
96+
parser = argparse.ArgumentParser()
97+
parser.add_argument(
98+
'--max-insert-threads',
99+
type=int,
100+
default=4,
101+
help='Maximum number of insert threads to use (default: 4).',
102+
)
103+
parser.add_argument(
104+
'--project-guids',
105+
nargs='+',
106+
required=False,
107+
help='Optionally provide an override list of project guids: --project-guids proj1 proj2 proj3',
108+
)
109+
args = parser.parse_args()
110+
main(args.max_insert_threads, args.project_guids)
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
import unittest
2+
3+
from v03_pipeline.lib.core.environment import Env
4+
from v03_pipeline.lib.misc.clickhouse import get_clickhouse_client
5+
from v03_pipeline.ops.repartition_clickhouse_grch38_snv_indel import (
6+
REPARTITION_DATABASE_NAME,
7+
main,
8+
)
9+
10+
11+
class RepartitionGRCh38SnvIndelTest(unittest.TestCase):
12+
def setUp(self):
13+
client = get_clickhouse_client()
14+
client.execute(
15+
f"""
16+
DROP DATABASE IF EXISTS {Env.CLICKHOUSE_DATABASE}
17+
PARALLEL WITH
18+
DROP DATABASE IF EXISTS {REPARTITION_DATABASE_NAME};
19+
""",
20+
)
21+
client.execute(
22+
f"""
23+
CREATE DATABASE {Env.CLICKHOUSE_DATABASE}
24+
PARALLEL WITH
25+
CREATE DATABASE {REPARTITION_DATABASE_NAME};
26+
""",
27+
)
28+
client.execute(
29+
f"""
30+
CREATE TABLE {Env.CLICKHOUSE_DATABASE}.`GRCh38/SNV_INDEL/entries` (
31+
`key` UInt32,
32+
`project_guid` LowCardinality(String),
33+
`family_guid` String,
34+
`is_annotated_in_any_gene` Boolean,
35+
`sign` Int8,
36+
`n_partitions` UInt8 MATERIALIZED 2,
37+
`partition_id` UInt8 MATERIALIZED farmHash64(family_guid) % n_partitions,
38+
PROJECTION xpos_projection
39+
(
40+
SELECT *
41+
ORDER BY is_annotated_in_any_gene
42+
)
43+
)
44+
ENGINE = CollapsingMergeTree(sign)
45+
PARTITION BY project_guid
46+
ORDER BY (project_guid, family_guid, key)
47+
SETTINGS deduplicate_merge_projection_mode = 'rebuild';
48+
""",
49+
)
50+
client.execute(
51+
f"""
52+
INSERT INTO {Env.CLICKHOUSE_DATABASE}.`GRCh38/SNV_INDEL/entries`
53+
VALUES
54+
(0, 'project_a', 'family_a1', 0, 1),
55+
(1, 'project_a', 'family_a2', 0, 1),
56+
(2, 'project_a', 'family_a3', 0, 1),
57+
(0, 'project_b', 'family_b1', 0, 1),
58+
(1, 'project_b', 'family_b2', 0, 1),
59+
(2, 'project_b', 'family_b2', 0, 1),
60+
(0, 'project_c', 'family_c1', 1, 1),
61+
(3, 'project_c', 'family_c2', 1, 1),
62+
""",
63+
)
64+
client.execute(
65+
f"""
66+
CREATE DICTIONARY {Env.CLICKHOUSE_DATABASE}.`GRCh38/SNV_INDEL/project_partitions_dict`
67+
(
68+
`project_guid` String,
69+
`n_partitions` UInt8
70+
)
71+
PRIMARY KEY project_guid
72+
SOURCE(
73+
CLICKHOUSE(
74+
USER '{Env.CLICKHOUSE_WRITER_USER}' PASSWORD '{Env.CLICKHOUSE_WRITER_PASSWORD}'
75+
DB {Env.CLICKHOUSE_DATABASE} QUERY 'SELECT project_guid, 3 FROM `GRCh38/SNV_INDEL/entries`'
76+
)
77+
)
78+
LIFETIME(0)
79+
LAYOUT(FLAT(MAX_ARRAY_SIZE 10000))
80+
""",
81+
)
82+
83+
def test_main_all_projects(self):
84+
client = get_clickhouse_client()
85+
main(1, [])
86+
res = client.execute(
87+
f"""
88+
SELECT *, n_partitions, partition_id FROM {REPARTITION_DATABASE_NAME}.`GRCh38/SNV_INDEL/repartitioned_entries`
89+
""",
90+
)
91+
self.assertCountEqual(
92+
res,
93+
[
94+
(3, 'project_c', 'family_c2', True, 1, 2, 1),
95+
(0, 'project_b', 'family_b1', 0, 1, 2, 1),
96+
(1, 'project_b', 'family_b2', 0, 1, 2, 1),
97+
(2, 'project_b', 'family_b2', 0, 1, 2, 1),
98+
(2, 'project_a', 'family_a3', 0, 1, 2, 0),
99+
(0, 'project_a', 'family_a1', 0, 1, 2, 1),
100+
(1, 'project_a', 'family_a2', 0, 1, 2, 1),
101+
(0, 'project_c', 'family_c1', True, 1, 2, 0),
102+
],
103+
)
104+
105+
def test_main_one_project(self):
106+
client = get_clickhouse_client()
107+
main(1, ['project_a'])
108+
res = client.execute(
109+
f"""
110+
SELECT *, n_partitions, partition_id FROM {REPARTITION_DATABASE_NAME}.`GRCh38/SNV_INDEL/repartitioned_entries`
111+
""",
112+
)
113+
self.assertCountEqual(
114+
res,
115+
[
116+
(2, 'project_a', 'family_a3', 0, 1, 2, 0),
117+
(0, 'project_a', 'family_a1', 0, 1, 2, 1),
118+
(1, 'project_a', 'family_a2', 0, 1, 2, 1),
119+
],
120+
)

0 commit comments

Comments
 (0)