Skip to content

Commit fbae03d

Browse files
authored
shard project_gt_stats incr query (#1184)
* shard incr query * bugs * fix math * fix query * fix * ruff
1 parent 89ae498 commit fbae03d

File tree

2 files changed

+36
-4
lines changed

2 files changed

+36
-4
lines changed

v03_pipeline/lib/misc/clickhouse.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from v03_pipeline.lib.core import DatasetType, ReferenceGenome
1313
from v03_pipeline.lib.core.environment import Env
1414
from v03_pipeline.lib.logger import get_logger
15+
from v03_pipeline.lib.misc.math import split_ranges
1516
from v03_pipeline.lib.misc.retry import retry
1617
from v03_pipeline.lib.paths import (
1718
new_entries_parquet_path,
@@ -793,12 +794,27 @@ def rebuild_gt_stats(
793794
table_name_builder.dst_prefix,
794795
table_name_builder.staging_dst_prefix,
795796
)
796-
logged_query(
797+
# NB: encountered OOMs with large projects, necessitating sharding the insertion query.
798+
max_key = logged_query(
797799
f"""
798-
INSERT INTO {table_name_builder.staging_dst_table(ClickHouseTable.PROJECT_GT_STATS)}
799-
{select_statement}
800+
SELECT max(key) FROM {table_name_builder.dst_table(ClickHouseTable.GT_STATS)}
800801
""",
801-
)
802+
)[0][0]
803+
for range_start, range_end in split_ranges(max_key):
804+
logged_query(
805+
f"""
806+
INSERT INTO {
807+
table_name_builder.staging_dst_table(ClickHouseTable.PROJECT_GT_STATS)
808+
}
809+
{
810+
select_statement.replace(
811+
'GROUP BY project_guid',
812+
'WHERE key >= %(range_start)s AND key <= %(range_end)s GROUP BY project_guid',
813+
)
814+
}
815+
""",
816+
{'range_start': range_start, 'range_end': range_end},
817+
)
802818
finalize_refresh_flow(table_name_builder, project_guids)
803819

804820

v03_pipeline/lib/misc/math.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,19 @@
1+
import math
2+
from collections.abc import Generator
3+
4+
5+
def split_ranges(max_value: int, n: int = 5) -> Generator[tuple[int, int], None, None]:
6+
if max_value < n:
7+
yield (0, max_value)
8+
return
9+
step = math.ceil(max_value / n)
10+
start, end = 0, step
11+
while start < max_value:
12+
yield (start, min(end, max_value))
13+
start += step
14+
end += step
15+
16+
117
def constrain(
218
number: int | float,
319
lower_bound: int | float,

0 commit comments

Comments
 (0)