Skip to content

Commit b447a80

Browse files
authored
Use sync insert strategy since we are already batching client-side (#2303)
1 parent ed86b4f commit b447a80

File tree

4 files changed

+21
-10
lines changed

4 files changed

+21
-10
lines changed

apps/webapp/app/env.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -911,6 +911,7 @@ const EnvironmentSchema = z.object({
911911
RUN_REPLICATION_INSERT_MAX_RETRIES: z.coerce.number().int().default(3),
912912
RUN_REPLICATION_INSERT_BASE_DELAY_MS: z.coerce.number().int().default(100),
913913
RUN_REPLICATION_INSERT_MAX_DELAY_MS: z.coerce.number().int().default(2000),
914+
RUN_REPLICATION_INSERT_STRATEGY: z.enum(["insert", "insert_async"]).default("insert"),
914915

915916
// Clickhouse
916917
CLICKHOUSE_URL: z.string(),

apps/webapp/app/services/runsReplicationInstance.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ function initializeRunsReplicationInstance() {
6565
insertMaxRetries: env.RUN_REPLICATION_INSERT_MAX_RETRIES,
6666
insertBaseDelayMs: env.RUN_REPLICATION_INSERT_BASE_DELAY_MS,
6767
insertMaxDelayMs: env.RUN_REPLICATION_INSERT_MAX_DELAY_MS,
68+
insertStrategy: env.RUN_REPLICATION_INSERT_STRATEGY,
6869
});
6970

7071
if (env.RUN_REPLICATION_ENABLED === "1") {

apps/webapp/app/services/runsReplicationService.server.ts

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ export type RunsReplicationServiceOptions = {
5353
logLevel?: LogLevel;
5454
tracer?: Tracer;
5555
waitForAsyncInsert?: boolean;
56+
insertStrategy?: "insert" | "insert_async";
5657
// Retry configuration for insert operations
5758
insertMaxRetries?: number;
5859
insertBaseDelayMs?: number;
@@ -90,6 +91,7 @@ export class RunsReplicationService {
9091
private _insertMaxRetries: number;
9192
private _insertBaseDelayMs: number;
9293
private _insertMaxDelayMs: number;
94+
private _insertStrategy: "insert" | "insert_async";
9395

9496
public readonly events: EventEmitter<RunsReplicationServiceEvents>;
9597

@@ -101,6 +103,8 @@ export class RunsReplicationService {
101103

102104
this._acknowledgeTimeoutMs = options.acknowledgeTimeoutMs ?? 1_000;
103105

106+
this._insertStrategy = options.insertStrategy ?? "insert";
107+
104108
this._replicationClient = new LogicalReplicationClient({
105109
pgConfig: {
106110
connectionString: options.pgConnectionUrl,
@@ -598,15 +602,26 @@ export class RunsReplicationService {
598602
return delay + jitter;
599603
}
600604

605+
#getClickhouseInsertSettings() {
606+
if (this._insertStrategy === "insert") {
607+
return {};
608+
} else if (this._insertStrategy === "insert_async") {
609+
return {
610+
async_insert: 1 as const,
611+
async_insert_max_data_size: "1000000",
612+
async_insert_busy_timeout_ms: 1000,
613+
wait_for_async_insert: this.options.waitForAsyncInsert ? (1 as const) : (0 as const),
614+
};
615+
}
616+
}
617+
601618
async #insertTaskRunInserts(taskRunInserts: TaskRunV2[], attempt: number) {
602619
return await startSpan(this._tracer, "insertTaskRunsInserts", async (span) => {
603620
const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insert(
604621
taskRunInserts,
605622
{
606623
params: {
607-
clickhouse_settings: {
608-
wait_for_async_insert: this.options.waitForAsyncInsert ? 1 : 0,
609-
},
624+
clickhouse_settings: this.#getClickhouseInsertSettings(),
610625
},
611626
}
612627
);
@@ -631,9 +646,7 @@ export class RunsReplicationService {
631646
payloadInserts,
632647
{
633648
params: {
634-
clickhouse_settings: {
635-
wait_for_async_insert: this.options.waitForAsyncInsert ? 1 : 0,
636-
},
649+
clickhouse_settings: this.#getClickhouseInsertSettings(),
637650
},
638651
}
639652
);

internal-packages/clickhouse/src/taskRuns.ts

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,6 @@ export function insertTaskRuns(ch: ClickhouseWriter, settings?: ClickHouseSettin
5656
table: "trigger_dev.task_runs_v2",
5757
schema: TaskRunV2,
5858
settings: {
59-
async_insert: 1,
60-
wait_for_async_insert: 0,
61-
async_insert_max_data_size: "1000000",
62-
async_insert_busy_timeout_ms: 1000,
6359
enable_json_type: 1,
6460
type_json_skip_duplicated_paths: 1,
6561
...settings,

0 commit comments

Comments
 (0)