From 9741a28b56f8c720aeecf4db428d7a7952acfca8 Mon Sep 17 00:00:00 2001 From: dieriba Date: Tue, 23 Sep 2025 00:58:55 +0200 Subject: [PATCH 1/9] big --- backend/Cargo.lock | 1 + .../20250922155947_v2_job.down.sql | 8 + .../20250922155947_v2_job.up.sql | 79 ++++ .../20250922160402_v2_queue.down.sql | 2 + .../20250922160402_v2_queue.up.sql | 25 + .../20250922160457_v2_job_completed.down.sql | 3 + .../20250922160457_v2_job_completed.up.sql | 26 ++ .../20250922160534_v2_job_runtime.down.sql | 2 + .../20250922160534_v2_job_runtime.up.sql | 6 + .../20250922160609_v2_job_status.down.sql | 2 + .../20250922160609_v2_job_status.up.sql | 7 + .../20250922160649_job_logs.down.sql | 2 + .../20250922160649_job_logs.up.sql | 14 + .../20250922161225_job_perms.down.sql | 2 + .../20250922161225_job_perms.up.sql | 13 + ...50922161300_outstanding_wait_time.down.sql | 2 + ...0250922161300_outstanding_wait_time.up.sql | 6 + backend/src/main.rs | 58 ++- backend/src/monitor.rs | 1 + backend/tests/common/mod.rs | 1 + backend/windmill-api/src/jobs.rs | 430 +++++++++++++----- backend/windmill-common/Cargo.toml | 2 +- backend/windmill-common/src/db.rs | 26 ++ backend/windmill-common/src/lib.rs | 23 +- backend/windmill-common/src/queue.rs | 85 +++- backend/windmill-queue/src/jobs.rs | 349 +++++++++++--- backend/windmill-worker/src/ai_executor.rs | 7 +- .../windmill-worker/src/result_processor.rs | 38 +- backend/windmill-worker/src/worker.rs | 22 +- backend/windmill-worker/src/worker_flow.rs | 21 +- 30 files changed, 1047 insertions(+), 216 deletions(-) create mode 100644 backend/shard-migrations/20250922155947_v2_job.down.sql create mode 100644 backend/shard-migrations/20250922155947_v2_job.up.sql create mode 100644 backend/shard-migrations/20250922160402_v2_queue.down.sql create mode 100644 backend/shard-migrations/20250922160402_v2_queue.up.sql create mode 100644 backend/shard-migrations/20250922160457_v2_job_completed.down.sql create mode 100644 backend/shard-migrations/20250922160457_v2_job_completed.up.sql create mode 100644 backend/shard-migrations/20250922160534_v2_job_runtime.down.sql create mode 100644 backend/shard-migrations/20250922160534_v2_job_runtime.up.sql create mode 100644 backend/shard-migrations/20250922160609_v2_job_status.down.sql create mode 100644 backend/shard-migrations/20250922160609_v2_job_status.up.sql create mode 100644 backend/shard-migrations/20250922160649_job_logs.down.sql create mode 100644 backend/shard-migrations/20250922160649_job_logs.up.sql create mode 100644 backend/shard-migrations/20250922161225_job_perms.down.sql create mode 100644 backend/shard-migrations/20250922161225_job_perms.up.sql create mode 100644 backend/shard-migrations/20250922161300_outstanding_wait_time.down.sql create mode 100644 backend/shard-migrations/20250922161300_outstanding_wait_time.up.sql diff --git a/backend/Cargo.lock b/backend/Cargo.lock index 593de8a8e58cf..b710460023de3 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -15391,6 +15391,7 @@ dependencies = [ "magic-crypt", "mail-send", "object_store", + "once_cell", "openidconnect", "opentelemetry", "opentelemetry-appender-tracing", diff --git a/backend/shard-migrations/20250922155947_v2_job.down.sql b/backend/shard-migrations/20250922155947_v2_job.down.sql new file mode 100644 index 0000000000000..bcc36937bd081 --- /dev/null +++ b/backend/shard-migrations/20250922155947_v2_job.down.sql @@ -0,0 +1,8 @@ +-- Add down migration script here + +DROP TYPE IF EXISTS job_status CASCADE; +DROP TYPE IF EXISTS job_kind CASCADE; +DROP TYPE IF EXISTS job_trigger_kind CASCADE; +DROP TYPE IF EXISTS script_lang CASCADE; +DROP TABLE IF EXISTS v2_job; + diff --git a/backend/shard-migrations/20250922155947_v2_job.up.sql b/backend/shard-migrations/20250922155947_v2_job.up.sql new file mode 100644 index 0000000000000..f67cc5cb1da7f --- /dev/null +++ b/backend/shard-migrations/20250922155947_v2_job.up.sql @@ -0,0 +1,79 @@ +-- Add up migration script here + +CREATE TYPE IF NOT EXISTS job_kind AS ENUM ( + 'script', 'preview', 'flow', 'dependencies', 'flowpreview', 'script_hub', + 'identity', 'flowdependencies', 'http', 'graphql', 'postgresql', 'noop', + 'appdependencies', 'deploymentcallback', 'singlescriptflow', 'flowscript', + 'flownode', 'appscript' +); + + +CREATE TYPE IF NOT EXISTS job_status AS ENUM ('success', 'failure', 'canceled', 'skipped'); + + + +CREATE TYPE IF NOT EXISTS job_trigger_kind AS ENUM ( + 'webhook', 'http', 'websocket', 'kafka', 'email', 'nats', 'schedule', + 'app', 'ui', 'postgres', 'sqs', 'gcp', 'mqtt' +); + +CREATE TYPE IF NOT EXISTS script_lang AS ENUM ( + 'python3', 'deno', 'go', 'bash', 'postgresql', 'nativets', 'bun', 'mysql', + 'bigquery', 'snowflake', 'graphql', 'powershell', 'mssql', 'php', 'bunnative', + 'rust', 'ansible', 'csharp', 'oracledb', 'nu', 'java', 'duckdb' +); + + +CREATE TABLE IF NOT EXISTS v2_job ( + id UUID PRIMARY KEY, + raw_code TEXT, + raw_lock TEXT, + raw_flow JSONB, + tag VARCHAR(255), + workspace_id VARCHAR(100) NOT NULL, + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + created_by VARCHAR(100) NOT NULL, + permissioned_as VARCHAR(100) NOT NULL, + permissioned_as_email VARCHAR(500), + kind job_kind NOT NULL, + runnable_id BIGINT, + runnable_path VARCHAR(500), + parent_job UUID, + root_job UUID, + script_lang script_lang, + script_entrypoint_override VARCHAR(500), + flow_step INTEGER, + flow_step_id VARCHAR(100), + flow_innermost_root_job UUID, + trigger VARCHAR(500), + trigger_kind job_trigger_kind, + same_worker BOOLEAN NOT NULL DEFAULT FALSE, + visible_to_owner BOOLEAN NOT NULL DEFAULT TRUE, + concurrent_limit INTEGER, + concurrency_time_window_s INTEGER, + cache_ttl INTEGER, + timeout INTEGER, + priority SMALLINT, + preprocessed BOOLEAN NOT NULL DEFAULT FALSE, + args JSONB, + labels TEXT[], + pre_run_error TEXT +); + +-- Create indices for v2_job +CREATE INDEX IF NOT EXISTS ix_job_created_at ON v2_job (created_at DESC); +CREATE INDEX IF NOT EXISTS ix_job_workspace_id_created_at_new_3 ON v2_job (workspace_id, created_at DESC); +CREATE INDEX IF NOT EXISTS ix_job_workspace_id_created_at_new_5 ON v2_job (workspace_id, created_at DESC) + WHERE ((kind = ANY (ARRAY['preview'::job_kind, 'flowpreview'::job_kind])) + AND (parent_job IS NULL)); +CREATE INDEX IF NOT EXISTS ix_job_workspace_id_created_at_new_8 ON v2_job (workspace_id, created_at DESC) + WHERE ((kind = 'deploymentcallback'::job_kind) AND (parent_job IS NULL)); +CREATE INDEX IF NOT EXISTS ix_job_workspace_id_created_at_new_9 ON v2_job (workspace_id, created_at DESC) + WHERE ((kind = ANY (ARRAY['dependencies'::job_kind, 'flowdependencies'::job_kind, 'appdependencies'::job_kind])) + AND (parent_job IS NULL)); +CREATE INDEX IF NOT EXISTS ix_v2_job_labels ON v2_job USING gin (labels) WHERE (labels IS NOT NULL); +CREATE INDEX IF NOT EXISTS ix_v2_job_workspace_id_created_at ON v2_job (workspace_id, created_at DESC) + WHERE ((kind = ANY (ARRAY['script'::job_kind, 'flow'::job_kind, 'singlescriptflow'::job_kind])) + AND (parent_job IS NULL)); +CREATE INDEX IF NOT EXISTS ix_job_root_job_index_by_path_2 ON v2_job (workspace_id, runnable_path, created_at DESC) + WHERE (parent_job IS NULL); \ No newline at end of file diff --git a/backend/shard-migrations/20250922160402_v2_queue.down.sql b/backend/shard-migrations/20250922160402_v2_queue.down.sql new file mode 100644 index 0000000000000..a74aba16985a6 --- /dev/null +++ b/backend/shard-migrations/20250922160402_v2_queue.down.sql @@ -0,0 +1,2 @@ +-- Add down migration script here +DROP TABLE IF EXISTS v2_job_queue; diff --git a/backend/shard-migrations/20250922160402_v2_queue.up.sql b/backend/shard-migrations/20250922160402_v2_queue.up.sql new file mode 100644 index 0000000000000..e5d7d40636ed0 --- /dev/null +++ b/backend/shard-migrations/20250922160402_v2_queue.up.sql @@ -0,0 +1,25 @@ +-- Add up migration script here +CREATE TABLE IF NOT EXISTS v2_job_queue ( + id UUID PRIMARY KEY REFERENCES v2_job(id) ON DELETE CASCADE, + workspace_id VARCHAR(100) NOT NULL, + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + started_at TIMESTAMP WITH TIME ZONE, + scheduled_for TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + running BOOLEAN NOT NULL DEFAULT FALSE, + canceled_by VARCHAR(100), + canceled_reason TEXT, + suspend INTEGER, + suspend_until TIMESTAMP WITH TIME ZONE, + tag VARCHAR(255), + priority SMALLINT, + worker VARCHAR(100), + extras JSONB +); + +-- Create indices for v2_job_queue +CREATE INDEX IF NOT EXISTS queue_sort_v2 ON v2_job_queue (priority DESC NULLS LAST, scheduled_for, tag) + WHERE (running = false); +CREATE INDEX IF NOT EXISTS queue_suspended ON v2_job_queue (priority DESC NULLS LAST, created_at, suspend_until, suspend, tag) + WHERE (suspend_until IS NOT NULL); +CREATE INDEX IF NOT EXISTS root_queue_index_by_path ON v2_job_queue (workspace_id, created_at); +CREATE INDEX IF NOT EXISTS v2_job_queue_suspend ON v2_job_queue (workspace_id, suspend) WHERE (suspend > 0); \ No newline at end of file diff --git a/backend/shard-migrations/20250922160457_v2_job_completed.down.sql b/backend/shard-migrations/20250922160457_v2_job_completed.down.sql new file mode 100644 index 0000000000000..7311e8ff11956 --- /dev/null +++ b/backend/shard-migrations/20250922160457_v2_job_completed.down.sql @@ -0,0 +1,3 @@ +-- Add down migration script here + +DROP TABLE IF EXISTS v2_job_completed CASCADE; \ No newline at end of file diff --git a/backend/shard-migrations/20250922160457_v2_job_completed.up.sql b/backend/shard-migrations/20250922160457_v2_job_completed.up.sql new file mode 100644 index 0000000000000..55bf3870990b1 --- /dev/null +++ b/backend/shard-migrations/20250922160457_v2_job_completed.up.sql @@ -0,0 +1,26 @@ +-- Add up migration script here +CREATE TABLE IF NOT EXISTS v2_job_completed ( + id UUID PRIMARY KEY REFERENCES v2_job(id) ON DELETE CASCADE, + workspace_id VARCHAR(100) NOT NULL, + duration_ms BIGINT, + result JSONB, + deleted BOOLEAN NOT NULL DEFAULT FALSE, + canceled_by VARCHAR(100), + canceled_reason TEXT, + flow_status JSONB, + started_at TIMESTAMP WITH TIME ZONE, + memory_peak INTEGER, + status job_status NOT NULL, + completed_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + worker VARCHAR(100), + workflow_as_code_status JSONB, + result_columns TEXT[], + retries UUID[], + extras JSONB +); + +-- Create indices for v2_job_completed +CREATE INDEX IF NOT EXISTS ix_completed_job_workspace_id_started_at_new_2 ON v2_job_completed (workspace_id, started_at DESC); +CREATE INDEX IF NOT EXISTS ix_job_completed_completed_at ON v2_job_completed (completed_at DESC); +CREATE INDEX IF NOT EXISTS labeled_jobs_on_jobs ON v2_job_completed USING gin (((result -> 'wm_labels'::text))) + WHERE (result ? 'wm_labels'::text); \ No newline at end of file diff --git a/backend/shard-migrations/20250922160534_v2_job_runtime.down.sql b/backend/shard-migrations/20250922160534_v2_job_runtime.down.sql new file mode 100644 index 0000000000000..3d74a5b19965a --- /dev/null +++ b/backend/shard-migrations/20250922160534_v2_job_runtime.down.sql @@ -0,0 +1,2 @@ +-- Add down migration script here +DROP TABLE IF EXISTS v2_job_runtime CASCADE; \ No newline at end of file diff --git a/backend/shard-migrations/20250922160534_v2_job_runtime.up.sql b/backend/shard-migrations/20250922160534_v2_job_runtime.up.sql new file mode 100644 index 0000000000000..3bdb144df7c1e --- /dev/null +++ b/backend/shard-migrations/20250922160534_v2_job_runtime.up.sql @@ -0,0 +1,6 @@ +-- Add up migration script here +CREATE TABLE IF NOT EXISTS v2_job_runtime ( + id UUID PRIMARY KEY REFERENCES v2_job(id) ON DELETE CASCADE, + ping TIMESTAMP WITH TIME ZONE, + memory_peak INTEGER +); \ No newline at end of file diff --git a/backend/shard-migrations/20250922160609_v2_job_status.down.sql b/backend/shard-migrations/20250922160609_v2_job_status.down.sql new file mode 100644 index 0000000000000..0f57539a1ec9a --- /dev/null +++ b/backend/shard-migrations/20250922160609_v2_job_status.down.sql @@ -0,0 +1,2 @@ +-- Add down migration script here +DROP TABLE IF EXISTS v2_job_status CASCADE; diff --git a/backend/shard-migrations/20250922160609_v2_job_status.up.sql b/backend/shard-migrations/20250922160609_v2_job_status.up.sql new file mode 100644 index 0000000000000..946160363f3a7 --- /dev/null +++ b/backend/shard-migrations/20250922160609_v2_job_status.up.sql @@ -0,0 +1,7 @@ +-- Add up migration script here +CREATE TABLE IF NOT EXISTS v2_job_status ( + id UUID PRIMARY KEY REFERENCES v2_job(id) ON DELETE CASCADE, + flow_status JSONB, + flow_leaf_jobs JSONB, + workflow_as_code_status JSONB +); \ No newline at end of file diff --git a/backend/shard-migrations/20250922160649_job_logs.down.sql b/backend/shard-migrations/20250922160649_job_logs.down.sql new file mode 100644 index 0000000000000..9adbe0e1d5b5c --- /dev/null +++ b/backend/shard-migrations/20250922160649_job_logs.down.sql @@ -0,0 +1,2 @@ +-- Add down migration script here +DROP TABLE IF EXISTS job_logs CASCADE; \ No newline at end of file diff --git a/backend/shard-migrations/20250922160649_job_logs.up.sql b/backend/shard-migrations/20250922160649_job_logs.up.sql new file mode 100644 index 0000000000000..8150c2584a107 --- /dev/null +++ b/backend/shard-migrations/20250922160649_job_logs.up.sql @@ -0,0 +1,14 @@ +-- Add up migration script here +CREATE TABLE IF NOT EXISTS job_logs ( + job_id UUID NOT NULL, + workspace_id VARCHAR(100) NOT NULL, + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + logs TEXT, + log_offset INTEGER, + log_file_index TEXT[], + PRIMARY KEY (job_id, log_offset) +); + +-- Create indices for job_logs +CREATE INDEX IF NOT EXISTS job_logs_job_id_idx ON job_logs (job_id); +CREATE INDEX IF NOT EXISTS job_logs_workspace_id_created_at_idx ON job_logs (workspace_id, created_at DESC); \ No newline at end of file diff --git a/backend/shard-migrations/20250922161225_job_perms.down.sql b/backend/shard-migrations/20250922161225_job_perms.down.sql new file mode 100644 index 0000000000000..a6487b1dda678 --- /dev/null +++ b/backend/shard-migrations/20250922161225_job_perms.down.sql @@ -0,0 +1,2 @@ +-- Add down migration script here +DROP TABLE IF EXISTS job_perms CASCADE; diff --git a/backend/shard-migrations/20250922161225_job_perms.up.sql b/backend/shard-migrations/20250922161225_job_perms.up.sql new file mode 100644 index 0000000000000..1fe04802ef221 --- /dev/null +++ b/backend/shard-migrations/20250922161225_job_perms.up.sql @@ -0,0 +1,13 @@ +-- Add up migration script here +CREATE TABLE IF NOT EXISTS job_perms ( + job_id UUID NOT NULL, + email VARCHAR(255) NOT NULL, + username VARCHAR(50) NOT NULL, + is_admin BOOLEAN NOT NULL, + is_operator BOOLEAN NOT NULL, + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP, + workspace_id VARCHAR(50) NOT NULL, + groups TEXT[] NOT NULL, + folders JSONB[] NOT NULL, + CONSTRAINT job_perms_pk PRIMARY KEY (job_id) +); \ No newline at end of file diff --git a/backend/shard-migrations/20250922161300_outstanding_wait_time.down.sql b/backend/shard-migrations/20250922161300_outstanding_wait_time.down.sql new file mode 100644 index 0000000000000..b31f3ebdf0cb9 --- /dev/null +++ b/backend/shard-migrations/20250922161300_outstanding_wait_time.down.sql @@ -0,0 +1,2 @@ +-- Add down migration script here +DROP TABLE IF EXISTS outstanding_wait_time CASCADE; diff --git a/backend/shard-migrations/20250922161300_outstanding_wait_time.up.sql b/backend/shard-migrations/20250922161300_outstanding_wait_time.up.sql new file mode 100644 index 0000000000000..419e21b4af47e --- /dev/null +++ b/backend/shard-migrations/20250922161300_outstanding_wait_time.up.sql @@ -0,0 +1,6 @@ +-- Add up migration script here +CREATE TABLE IF NOT EXISTS outstanding_wait_time ( + job_id UUID PRIMARY KEY, + self_wait_time_ms BIGINT DEFAULT NULL, + aggregate_wait_time_ms BIGINT DEFAULT NULL +); \ No newline at end of file diff --git a/backend/src/main.rs b/backend/src/main.rs index 87a148d5c3fa9..e115122a3c066 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -5,7 +5,7 @@ * Please see the included NOTICE for copyright information and * LICENSE-AGPL for a copy of the license. */ -use anyhow::Context; +use anyhow::{anyhow, Context}; use monitor::{ load_base_url, load_otel, reload_critical_alerts_on_db_oversize, reload_delete_logs_periodically_setting, reload_indexer_config, @@ -15,7 +15,7 @@ use monitor::{ send_logs_to_object_store, WORKERS_NAMES, }; use rand::Rng; -use sqlx::postgres::PgListener; +use sqlx::{postgres::PgListener, Pool, Postgres}; use std::{ collections::HashMap, fs::{create_dir_all, DirBuilder}, @@ -34,7 +34,7 @@ use windmill_common::ee_oss::{ use windmill_common::{ agent_workers::build_agent_http_client, - get_database_url, + connect_db, get_database_url, global_settings::{ APP_WORKSPACED_ROUTE_SETTING, BASE_URL_SETTING, BUNFIG_INSTALL_SCOPES_SETTING, CRITICAL_ALERTS_ON_DB_OVERSIZE_SETTING, CRITICAL_ALERT_MUTE_UI_SETTING, @@ -60,7 +60,8 @@ use windmill_common::{ worker::{ reload_custom_tags_setting, Connection, HUB_CACHE_DIR, TMP_DIR, TMP_LOGS_DIR, WORKER_GROUP, }, - KillpillSender, METRICS_ENABLED, + KillpillSender, DB, METRICS_ENABLED, SHARD_DB_INSTANCE, SHARD_DB_URL, SHARD_ID_TO_DB_INSTANCE, + SHARD_MODE, SHARD_URLS, }; #[cfg(feature = "enterprise")] @@ -299,6 +300,35 @@ async fn cache_hub_scripts(file_path: Option) -> anyhow::Result<()> { Ok(()) } +async fn initialize_worker_shard_db() -> anyhow::Result> { + let shard = SHARD_DB_URL.as_deref().ok_or_else(|| { + anyhow!("SHARD_DB_URL environment variable is required for worker shard mode") + })?; + + let shard = connect_db(Some(shard), false, false, true).await?; + SHARD_DB_INSTANCE.set(shard.clone()).map_err(|_| { + anyhow!("SHARD_DB_INSTANCE already initialized") + })?; + Ok(shard) +} + +async fn initialize_server_shard_instances() -> anyhow::Result<()> { + let shard_urls = &*SHARD_URLS + .as_ref() + .ok_or_else(|| anyhow!("SHARD_URLS environment variable is required for server shard mode. Please set it as: SHARD_URLS=dburl1,dburl2,..."))?; + + let mut shard_to_db = HashMap::new(); + for (i, shard_url) in shard_urls.iter().enumerate() { + let shard = connect_db(Some(&shard_url), true, false, false).await?; + shard_to_db.insert(i, shard); + } + + SHARD_ID_TO_DB_INSTANCE.set(shard_to_db).map_err(|_| { + anyhow!("SHARD_ID_TO_DB_INSTANCE already initialized") + })?; + Ok(()) +} + async fn windmill_main() -> anyhow::Result<()> { dotenv::dotenv().ok(); @@ -471,13 +501,24 @@ async fn windmill_main() -> anyhow::Result<()> { } let worker_mode = num_workers > 0; - + let shard_mode = *SHARD_MODE; + let mut job_queue_db = None; let conn = if mode == Mode::Agent { conn } else { // This time we use a pool of connections - let db = windmill_common::connect_db(server_mode, indexer_mode, worker_mode).await?; + let db = connect_db(None, server_mode, indexer_mode, worker_mode).await?; + if shard_mode { + if server_mode { + initialize_server_shard_instances().await?; + } + if worker_mode { + job_queue_db = Some(initialize_worker_shard_db().await?); + } + } else { + job_queue_db = Some(db.clone()); + } // NOTE: Variable/resource cache initialization moved to API server in windmill-api Connection::Sql(db) @@ -779,6 +820,7 @@ Windmill Community Edition {GIT_VERSION} base_internal_url.clone(), hostname.clone(), &workers, + job_queue_db.as_ref(), ) .await?; tracing::info!("All workers exited."); @@ -1364,6 +1406,7 @@ pub async fn run_workers( base_internal_url: String, hostname: String, workers: &[WorkerConn], + job_queue_db: Option<&DB>, ) -> anyhow::Result<()> { let mut killpill_rxs = vec![]; let num_workers = workers.len(); @@ -1432,7 +1475,7 @@ pub async fn run_workers( let tx = tx.clone(); let base_internal_url = base_internal_url.clone(); let hostname = hostname.clone(); - + let job_queue_db = job_queue_db.cloned(); handles.push(tokio::spawn(async move { if num_workers > 1 { tracing::info!(worker = %worker_name, "starting worker {i}"); @@ -1440,6 +1483,7 @@ pub async fn run_workers( let f = windmill_worker::run_worker( &conn1, + job_queue_db, &hostname, worker_name, i as u64, diff --git a/backend/src/monitor.rs b/backend/src/monitor.rs index a0900da40e300..b647bc23ddb5d 100644 --- a/backend/src/monitor.rs +++ b/backend/src/monitor.rs @@ -2267,6 +2267,7 @@ async fn handle_zombie_jobs(db: &Pool, base_internal_url: &str, worker ); let _ = handle_job_error( db, + db, //to update &client, &MiniPulledJob::from(&job), 0, diff --git a/backend/tests/common/mod.rs b/backend/tests/common/mod.rs index 748fae04c1d45..3bd35e927cb74 100644 --- a/backend/tests/common/mod.rs +++ b/backend/tests/common/mod.rs @@ -260,6 +260,7 @@ pub fn spawn_test_worker( } windmill_worker::run_worker( &db.into(), + None, worker_instance, worker_name, 1, diff --git a/backend/windmill-api/src/jobs.rs b/backend/windmill-api/src/jobs.rs index 1b750702177ba..849cddf051669 100644 --- a/backend/windmill-api/src/jobs.rs +++ b/backend/windmill-api/src/jobs.rs @@ -11,6 +11,7 @@ use axum::extract::Request; use axum::http::HeaderValue; #[cfg(feature = "deno_core")] use deno_core::{op2, serde_v8, v8, JsRuntime, OpState}; +use futures::future::join_all; use futures::{StreamExt, TryFutureExt}; use http::{HeaderMap, HeaderName}; use itertools::Itertools; @@ -28,7 +29,7 @@ use tower::ServiceBuilder; #[cfg(all(feature = "enterprise", feature = "smtp"))] use windmill_common::auth::is_super_admin_email; use windmill_common::auth::TOKEN_PREFIX_LEN; -use windmill_common::db::UserDbWithAuthed; +use windmill_common::db::{get_shard_db_from_shard_id, UserDbWithAuthed}; use windmill_common::error::JsonResult; use windmill_common::flow_status::{JobResult, RestartedFrom}; use windmill_common::jobs::{ @@ -40,6 +41,7 @@ use windmill_common::worker::{Connection, CLOUD_HOSTED, TMP_DIR}; use windmill_common::DYNAMIC_INPUT_CACHE; #[cfg(all(feature = "enterprise", feature = "smtp"))] use windmill_common::{email_oss::send_email_html, server::load_smtp_config}; +use windmill_common::{SHARD_ID_TO_DB_INSTANCE, SHARD_MODE}; use windmill_common::scripts::PREVIEW_IS_CODEBASE_HASH; use windmill_common::variables::get_workspace_key; @@ -5885,6 +5887,116 @@ struct BatchInfo { tag: Option, } +async fn insert_batch_jobs_to_db<'c, E: sqlx::Executor<'c, Database = Postgres>>( + executor: E, + w_id: &str, + raw_code: Option, + raw_lock: Option, + raw_flow: Option>, + tag: &str, + hash: Option, + path: Option, + job_kind: JobKind, + language: ScriptLang, + authed: &ApiAuthed, + concurrent_limit: Option, + concurrent_time_window_s: Option, + timeout: Option, + flow_status: Option>, + scheduled_for: chrono::DateTime, + uuids: &[Uuid], +) -> Result<(), Error> { + sqlx::query!( + r#" + WITH uuid_table AS ( + SELECT unnest($25::uuid[]) AS uuid + ), + inserted_job AS ( + INSERT INTO v2_job ( + id, workspace_id, raw_code, raw_lock, raw_flow, tag, + runnable_id, runnable_path, kind, script_lang, + created_by, permissioned_as, permissioned_as_email, + concurrent_limit, concurrency_time_window_s, timeout, args + ) + SELECT + uuid, $1, $2, $3, $4, $5, $6, $7, $8, $9, + $10, $11, $12, $13, $14, $15, + ('{ "uuid": "' || uuid || '" }')::jsonb + FROM uuid_table + RETURNING id AS "id!" + ), + inserted_queue AS ( + INSERT INTO v2_job_queue (id, workspace_id, scheduled_for, tag) + SELECT uuid, $1, $23, $24 FROM uuid_table + ), + inserted_runtime AS ( + INSERT INTO v2_job_runtime (id, ping) SELECT uuid, null FROM uuid_table + ), + inserted_job_perms AS ( + INSERT INTO job_perms (job_id, email, username, is_admin, is_operator, folders, groups, workspace_id) + SELECT uuid, $16, $17, $18, $19, $20, $21, $1 FROM uuid_table + ) + INSERT INTO v2_job_status (id, flow_status) + SELECT uuid, $22 + FROM uuid_table + WHERE $22::jsonb IS NOT NULL + "#, + w_id, + raw_code.as_deref(), + raw_lock.as_deref(), + raw_flow as Option>, + tag, + hash, + path.as_deref(), + job_kind as JobKind, + language as ScriptLang, + authed.username, + username_to_permissioned_as(&authed.username), + authed.email, + concurrent_limit, + concurrent_time_window_s, + timeout, + authed.email, + authed.username, + authed.is_admin, + authed.is_operator, + &[], + &[], + flow_status as Option>, + scheduled_for, + tag, + uuids + ) + .execute(executor) + .await?; + + Ok(()) +} + +async fn insert_concurrency_keys( + tx: &mut Transaction<'_, Postgres>, + custom_concurrency_key: &str, + uuids: &[Uuid], +) -> Result<(), Error> { + sqlx::query!( + "INSERT INTO concurrency_counter(concurrency_id, job_uuids) + VALUES ($1, '{}'::jsonb)", + custom_concurrency_key + ) + .execute(&mut **tx) + .await?; + + sqlx::query!( + "INSERT INTO concurrency_key (job_id, key) SELECT id, $1 FROM unnest($2::uuid[]) as id", + custom_concurrency_key, + uuids + ) + .execute(&mut **tx) + .await?; + + Ok(()) +} + #[tracing::instrument(level = "trace", skip_all)] async fn add_batch_jobs( authed: ApiAuthed, @@ -6038,6 +6150,11 @@ async fn add_batch_jobs( } }; + let uuids = (0..n) + .into_iter() + .map(|_| Uuid::new_v4()) + .collect::>(); + let language = language.unwrap_or(ScriptLang::Deno); let tag = if let Some(dedicated_worker) = dedicated_worker { @@ -6052,106 +6169,116 @@ async fn add_batch_jobs( format!("{}", language.as_str()) }; - let mut tx = user_db.begin(&authed).await?; + if *SHARD_MODE { + let shard_db_store = SHARD_ID_TO_DB_INSTANCE.get().unwrap(); - let uuids = sqlx::query_scalar!( - r#"WITH uuid_table as ( - select gen_random_uuid() as uuid from generate_series(1, $16) - ) - INSERT INTO v2_job - (id, workspace_id, raw_code, raw_lock, raw_flow, tag, runnable_id, runnable_path, kind, - script_lang, created_by, permissioned_as, permissioned_as_email, concurrent_limit, - concurrency_time_window_s, timeout, args) - (SELECT uuid, $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, - ('{ "uuid": "' || uuid || '" }')::jsonb FROM uuid_table) - RETURNING id AS "id!""#, - w_id, - raw_code, - raw_lock, - raw_flow.map(sqlx::types::Json) as Option>, - tag, - hash, - path, - job_kind.clone() as JobKind, - language as ScriptLang, - authed.username, - username_to_permissioned_as(&authed.username), - authed.email, - concurrent_limit, - concurrent_time_window_s, - timeout, - n, - ) - .fetch_all(&mut *tx) - .await?; + if shard_db_store.is_empty() { + return Err(Error::InternalErr( + "No shard databases configured".to_string(), + )); + } - let uuids = sqlx::query_scalar!( - r#"WITH uuid_table as ( - select unnest($4::uuid[]) as uuid - ) - INSERT INTO v2_job_queue - (id, workspace_id, scheduled_for, tag) - (SELECT uuid, $1, $2, $3 FROM uuid_table) - RETURNING id"#, - w_id, - Utc::now(), - tag, - &uuids - ) - .fetch_all(&mut *tx) - .await?; + let mut uuid_per_db = { + let mut store = HashMap::with_capacity(shard_db_store.len()); + for (shard_id, _) in shard_db_store { + store.insert(*shard_id, Vec::new()); + } + store + }; - sqlx::query!( - "INSERT INTO v2_job_runtime (id, ping) SELECT unnest($1::uuid[]), null", - &uuids, - ) - .execute(&mut *tx) - .await?; + for uuid in &uuids { + let mut hasher = DefaultHasher::new(); + uuid.hash(&mut hasher); + let shard_id = (hasher.finish() as usize) % shard_db_store.len(); + uuid_per_db.get_mut(&shard_id).unwrap().push(*uuid); + } - sqlx::query!( - "INSERT INTO job_perms (job_id, email, username, is_admin, is_operator, folders, groups, workspace_id) - SELECT unnest($1::uuid[]), $2, $3, $4, $5, $6, $7, $8", - &uuids, - authed.email, - authed.username, - authed.is_admin, - authed.is_operator, - &[], - &[], - w_id, - ) - .execute(&mut *tx) - .await?; + let mut tx = user_db.begin(&authed).await?; + if let Some(custom_concurrency_key) = custom_concurrency_key { + insert_concurrency_keys(&mut tx, &custom_concurrency_key, &uuids).await?; + } + tx.commit().await?; - if let Some(flow_status) = flow_status { - sqlx::query!( - "INSERT INTO v2_job_status (id, flow_status) - SELECT unnest($1::uuid[]), $2", + let future_batch_jobs = uuid_per_db + .iter() + .filter_map(|(shard_id, shard_uuids)| { + if shard_uuids.is_empty() { + return None; + } + + let db = shard_db_store.get(shard_id).unwrap().clone(); + let w_id = w_id.clone(); + let raw_code = raw_code.clone(); + let raw_lock = raw_lock.clone(); + let raw_flow = + raw_flow.clone().map(sqlx::types::Json) as Option>; + let tag = tag.clone(); + let path = path.clone(); + let authed = authed.clone(); + let flow_status = flow_status.clone().map(|status| sqlx::types::Json(status)) + as Option>; + let shard_uuids = shard_uuids.clone(); + + Some(async move { + insert_batch_jobs_to_db( + &db, + &w_id, + raw_code, + raw_lock, + raw_flow, + &tag, + hash, + path, + job_kind, + language, + &authed, + concurrent_limit, + concurrent_time_window_s, + timeout, + flow_status, + Utc::now(), + &shard_uuids, + ) + .await + .map_err(|e| Error::InternalErr(format!("Shard insert failed: {}", e))) + }) + }) + .collect::>(); + + let results = join_all(future_batch_jobs).await; + for result in results { + result?; + } + } else { + let mut tx = user_db.begin(&authed).await?; + + insert_batch_jobs_to_db( + &mut *tx, + &w_id, + raw_code, + raw_lock, + raw_flow.map(sqlx::types::Json), + &tag, + hash, + path, + job_kind, + language, + &authed, + concurrent_limit, + concurrent_time_window_s, + timeout, + flow_status.map(sqlx::types::Json), + Utc::now(), &uuids, - sqlx::types::Json(flow_status) as sqlx::types::Json ) - .execute(&mut *tx) .await?; - } - if let Some(custom_concurrency_key) = custom_concurrency_key { - sqlx::query!( - "INSERT INTO concurrency_counter(concurrency_id, job_uuids) - VALUES ($1, '{}'::jsonb)", - &custom_concurrency_key - ) - .execute(&mut *tx) - .await?; - sqlx::query!( - "INSERT INTO concurrency_key (job_id, key) SELECT id, $1 FROM unnest($2::uuid[]) as id", - custom_concurrency_key, - &uuids - ) - .execute(&mut *tx) - .await?; - } + if let Some(custom_concurrency_key) = custom_concurrency_key { + insert_concurrency_keys(&mut tx, &custom_concurrency_key, &uuids).await?; + } - tx.commit().await?; + tx.commit().await?; + } Ok(Json(uuids)) } @@ -7444,47 +7571,104 @@ async fn list_completed_jobs( ) -> error::JsonResult> { let (per_page, offset) = paginate(pagination); + let fields = &[ + "v2_job.id", + "v2_job.workspace_id", + "v2_job.parent_job", + "v2_job.created_by", + "v2_job.created_at", + "v2_job_completed.started_at", + "v2_job_completed.duration_ms", + "v2_job_completed.status = 'success' OR v2_job_completed.status = 'skipped' as success", + "v2_job.runnable_id as script_hash", + "v2_job.runnable_path as script_path", + "false as deleted", + "v2_job_completed.status = 'canceled' as canceled", + "v2_job_completed.canceled_by", + "v2_job_completed.canceled_reason", + "v2_job.kind as job_kind", + "CASE WHEN v2_job.trigger_kind = 'schedule' THEN v2_job.trigger END as schedule_path", + "v2_job.permissioned_as", + "null as raw_code", + "null as flow_status", + "null as raw_flow", + "v2_job.flow_step_id IS NOT NULL as is_flow_step", + "v2_job.script_lang as language", + "v2_job_completed.status = 'skipped' as is_skipped", + "v2_job.permissioned_as_email as email", + "v2_job.visible_to_owner", + "v2_job_completed.memory_peak as mem_peak", + "v2_job.tag", + "v2_job.priority", + "v2_job_completed.result->'wm_labels' as labels", + "'CompletedJob' as type", + ]; + let sql = list_completed_jobs_query( &w_id, Some(per_page), offset, &lq, - &[ - "v2_job.id", - "v2_job.workspace_id", - "v2_job.parent_job", - "v2_job.created_by", - "v2_job.created_at", - "v2_job_completed.started_at", - "v2_job_completed.duration_ms", - "v2_job_completed.status = 'success' OR v2_job_completed.status = 'skipped' as success", - "v2_job.runnable_id as script_hash", - "v2_job.runnable_path as script_path", - "false as deleted", - "v2_job_completed.status = 'canceled' as canceled", - "v2_job_completed.canceled_by", - "v2_job_completed.canceled_reason", - "v2_job.kind as job_kind", - "CASE WHEN v2_job.trigger_kind = 'schedule' THEN v2_job.trigger END as schedule_path", - "v2_job.permissioned_as", - "null as raw_code", - "null as flow_status", - "null as raw_flow", - "v2_job.flow_step_id IS NOT NULL as is_flow_step", - "v2_job.script_lang as language", - "v2_job_completed.status = 'skipped' as is_skipped", - "v2_job.permissioned_as_email as email", - "v2_job.visible_to_owner", - "v2_job_completed.memory_peak as mem_peak", - "v2_job.tag", - "v2_job.priority", - "v2_job_completed.result->'wm_labels' as labels", - "'CompletedJob' as type", - ], + fields, false, get_scope_tags(&authed), ) .sql()?; + + if *SHARD_MODE { + let shard_db_map = SHARD_ID_TO_DB_INSTANCE.get().unwrap(); + let shard_count = shard_db_map.len(); + + if shard_count == 0 { + return Err(Error::InternalErr( + "No shard databases configured".to_string(), + )); + } + + let mut futures = Vec::new(); + + for shard_id in 0..shard_count { + let shard_db = get_shard_db_from_shard_id(shard_id).cloned().unwrap(); + let sql_owned = sql.clone(); + + futures.push(async move { + let jobs = sqlx::query_as::<_, ListableCompletedJob>(&sql_owned) + .fetch_all(&shard_db) + .await + .map_err(|e| { + Error::InternalErr(format!("Shard {} query failed: {}", shard_id, e)) + })?; + Ok::, Error>(jobs) + }); + } + + let shard_results = join_all(futures).await; + + let mut all_jobs = Vec::new(); + for result in shard_results { + match result { + Ok(jobs) => all_jobs.extend(jobs), + Err(e) => { + tracing::error!("Failed to query shard: {}", e); + return Err(e); + } + } + } + + all_jobs.sort_by(|a, b| { + if lq.order_desc.unwrap_or(true) { + b.created_at.cmp(&a.created_at) + } else { + a.created_at.cmp(&b.created_at) + } + }); + + let paginated_jobs: Vec = + all_jobs.into_iter().skip(offset).take(per_page).collect(); + + return Ok(Json(paginated_jobs)); + } + let mut tx = user_db.begin(&authed).await?; let jobs = sqlx::query_as::<_, ListableCompletedJob>(&sql) .fetch_all(&mut *tx) diff --git a/backend/windmill-common/Cargo.toml b/backend/windmill-common/Cargo.toml index c24b0d503ced0..ee067fffadbc4 100644 --- a/backend/windmill-common/Cargo.toml +++ b/backend/windmill-common/Cargo.toml @@ -87,7 +87,7 @@ strum.workspace = true strum_macros.workspace = true url.workspace = true async-recursion.workspace = true - +once_cell.workspace = true semver.workspace = true croner = "2.2.0" quick_cache.workspace = true diff --git a/backend/windmill-common/src/db.rs b/backend/windmill-common/src/db.rs index 1aa5e57ea0121..564f4bfdd6315 100644 --- a/backend/windmill-common/src/db.rs +++ b/backend/windmill-common/src/db.rs @@ -1,4 +1,9 @@ +use std::hash::{DefaultHasher, Hash, Hasher}; + use sqlx::{Acquire, Pool, Postgres, Transaction}; +use uuid::Uuid; + +use crate::SHARD_ID_TO_DB_INSTANCE; pub type DB = Pool; @@ -214,3 +219,24 @@ impl UserDB { Ok(tx) } } + +pub fn get_shard_id_from_job_uuid(job_id: Uuid) -> usize { + let shard_count = SHARD_ID_TO_DB_INSTANCE.get() + .map(|shards| shards.len()) + .unwrap_or(1); + + let mut hasher = DefaultHasher::new(); + job_id.hash(&mut hasher); + + (hasher.finish() as usize) % shard_count +} + +pub fn get_shard_db_from_shard_id(shard_id: usize) -> Option<&'static Pool> { + SHARD_ID_TO_DB_INSTANCE.get() + .and_then(|shards| shards.get(&shard_id)) +} + +pub fn get_shard_db_from_job_uuid(job_id: Uuid) -> Option<&'static Pool> { + let shard_id = get_shard_id_from_job_uuid(job_id); + get_shard_db_from_shard_id(shard_id) +} diff --git a/backend/windmill-common/src/lib.rs b/backend/windmill-common/src/lib.rs index 98dac87744daa..180c187b9aa10 100644 --- a/backend/windmill-common/src/lib.rs +++ b/backend/windmill-common/src/lib.rs @@ -6,8 +6,11 @@ * LICENSE-AGPL for a copy of the license. */ +use itertools::Itertools as _; +use once_cell::sync::OnceCell; use quick_cache::sync::Cache; use std::{ + collections::HashMap, future::Future, hash::{Hash, Hasher}, net::SocketAddr, @@ -23,7 +26,7 @@ use tokio::sync::broadcast; use ee_oss::CriticalErrorChannel; use error::Error; use scripts::ScriptLang; -use sqlx::{Acquire, Postgres}; +use sqlx::{Acquire, Pool, Postgres}; pub mod agent_workers; pub mod ai_providers; @@ -129,6 +132,14 @@ lazy_static::lazy_static! { pub static ref OTEL_TRACING_ENABLED: AtomicBool = AtomicBool::new(std::env::var("OTEL_TRACING").is_ok()); pub static ref OTEL_LOGS_ENABLED: AtomicBool = AtomicBool::new(std::env::var("OTEL_LOGS").is_ok()); + pub static ref SHARD_DB_URL: Option = std::env::var("SHARD_DB_URL").ok(); + + pub static ref SHARD_MODE: bool = std::env::var("SHARD_MODE").is_ok(); + + pub static ref SHARD_URLS: Option> = std::env::var("SHARD_URLS").ok().map(|shard_url| { + let shard_url = shard_url.split(',').map(|s| s.to_owned()).collect_vec(); + shard_url + }); pub static ref METRICS_DEBUG_ENABLED: AtomicBool = AtomicBool::new(false); @@ -160,6 +171,10 @@ lazy_static::lazy_static! { } +// Zero-overhead shard database instances (initialized once at startup) +pub static SHARD_DB_INSTANCE: OnceCell> = OnceCell::new(); +pub static SHARD_ID_TO_DB_INSTANCE: OnceCell>> = OnceCell::new(); + const LATEST_VERSION_ID_CACHE_TTL: std::time::Duration = std::time::Duration::from_secs(60); pub async fn shutdown_signal( @@ -354,13 +369,17 @@ pub async fn initial_connection() -> Result, error::E } pub async fn connect_db( + database_url: Option<&str>, server_mode: bool, indexer_mode: bool, worker_mode: bool, ) -> anyhow::Result> { use anyhow::Context; - let database_url = get_database_url().await?; + let database_url = match database_url { + Some(db_url) => db_url.to_owned(), + None => get_database_url().await?, + }; let max_connections = match std::env::var("DATABASE_CONNECTIONS") { Ok(n) => n.parse::().context("invalid DATABASE_CONNECTIONS")?, diff --git a/backend/windmill-common/src/queue.rs b/backend/windmill-common/src/queue.rs index 2656ce5af5104..4620e4e74e9f7 100644 --- a/backend/windmill-common/src/queue.rs +++ b/backend/windmill-common/src/queue.rs @@ -1,8 +1,25 @@ use std::collections::HashMap; +use futures::future::join_all; use sqlx::{Pool, Postgres}; +use crate::{db::get_shard_db_from_shard_id, SHARD_ID_TO_DB_INSTANCE, SHARD_MODE}; + pub async fn get_queue_counts(db: &Pool) -> HashMap { + if *SHARD_MODE { + return get_queue_counts_from_shards().await; + } + get_queue_counts_single_db(db).await +} + +pub async fn get_queue_running_counts(db: &Pool) -> HashMap { + if *SHARD_MODE { + return get_queue_running_counts_from_shards().await; + } + get_queue_running_counts_single_db(db).await +} + +async fn get_queue_counts_single_db(db: &Pool) -> HashMap { sqlx::query!( "SELECT tag AS \"tag!\", count(*) AS \"count!\" FROM v2_job_queue WHERE scheduled_for <= now() - ('3 seconds')::interval AND running = false @@ -15,7 +32,7 @@ pub async fn get_queue_counts(db: &Pool) -> HashMap { .unwrap_or_else(|| HashMap::new()) } -pub async fn get_queue_running_counts(db: &Pool) -> HashMap { +async fn get_queue_running_counts_single_db(db: &Pool) -> HashMap { sqlx::query!( "SELECT tag AS \"tag!\", count(*) AS \"count!\" FROM v2_job_queue WHERE running = true @@ -27,3 +44,69 @@ pub async fn get_queue_running_counts(db: &Pool) -> HashMap HashMap { + let shard_db_map = match SHARD_ID_TO_DB_INSTANCE.get() { + Some(map) => map, + None => return HashMap::new(), + }; + let shard_count = shard_db_map.len(); + + if shard_count == 0 { + return HashMap::new(); + } + + let mut futures = Vec::new(); + + for shard_id in 0..shard_count { + if let Some(shard_db) = get_shard_db_from_shard_id(shard_id) { + let future = async move { get_queue_counts_single_db(shard_db).await }; + futures.push(future); + } + } + + let shard_results = join_all(futures).await; + + let mut aggregated_counts = HashMap::new(); + + for shard_counts in shard_results { + for (tag, count) in shard_counts { + *aggregated_counts.entry(tag).or_insert(0) += count; + } + } + + aggregated_counts +} + +async fn get_queue_running_counts_from_shards() -> HashMap { + let shard_db_map = match SHARD_ID_TO_DB_INSTANCE.get() { + Some(map) => map, + None => return HashMap::new(), + }; + let shard_count = shard_db_map.len(); + + if shard_count == 0 { + return HashMap::new(); + } + + let mut futures = Vec::new(); + + for shard_id in 0..shard_count { + if let Some(shard_db) = get_shard_db_from_shard_id(shard_id) { + let future = async move { get_queue_running_counts_single_db(shard_db).await }; + futures.push(future); + } + } + + let shard_results = join_all(futures).await; + + let mut aggregated_counts = HashMap::new(); + + for shard_counts in shard_results { + for (tag, count) in shard_counts { + *aggregated_counts.entry(tag).or_insert(0) += count; + } + } + + aggregated_counts +} diff --git a/backend/windmill-queue/src/jobs.rs b/backend/windmill-queue/src/jobs.rs index 7c6a3b3dcdefb..84eeafa0d07ad 100644 --- a/backend/windmill-queue/src/jobs.rs +++ b/backend/windmill-queue/src/jobs.rs @@ -33,6 +33,7 @@ use windmill_common::add_time; use windmill_common::auth::JobPerms; #[cfg(feature = "benchmark")] use windmill_common::bench::BenchmarkIter; +use windmill_common::db::{get_shard_db_from_job_uuid, get_shard_id_from_job_uuid}; use windmill_common::jobs::{JobTriggerKind, EMAIL_ERROR_HANDLER_USER_EMAIL}; use windmill_common::utils::now_from_db; use windmill_common::worker::{Connection, SCRIPT_TOKEN_EXPIRY}; @@ -62,6 +63,7 @@ use windmill_common::{ }, DB, METRICS_ENABLED, }; +use windmill_common::{SHARD_DB_INSTANCE, SHARD_ID_TO_DB_INSTANCE, SHARD_MODE}; use backon::ConstantBuilder; use backon::{BackoffBuilder, Retryable}; @@ -179,6 +181,7 @@ pub async fn cancel_single_job<'c>( .await; let add_job = add_completed_job_error( &db, + &db, //put here for compile will need to pass the proper SHARD DB if shard mode is on &MiniPulledJob::from(&job_running), job_running.mem_peak.unwrap_or(0), Some(CanceledBy { username: Some(username.to_string()), reason: Some(reason) }), @@ -374,13 +377,25 @@ pub async fn append_logs( } match conn { Connection::Sql(pool) => { + let db = if *SHARD_MODE { + if let Some(worker_db) = SHARD_DB_INSTANCE.get() { + worker_db + } else if let Some(server_shards) = SHARD_ID_TO_DB_INSTANCE.get() { + let shard_id = get_shard_id_from_job_uuid(*job_id); + server_shards.get(&shard_id).unwrap_or(pool) + } else { + pool + } + } else { + pool + }; if let Err(err) = sqlx::query!( "INSERT INTO job_logs (logs, job_id, workspace_id) VALUES ($1, $2, $3) ON CONFLICT (job_id) DO UPDATE SET logs = concat(job_logs.logs, $1::text)", logs.as_ref(), job_id, workspace.as_ref(), ) - .execute(pool) + .execute(db) .warn_after_seconds(1) .await { @@ -389,12 +404,17 @@ pub async fn append_logs( } Connection::Http(client) => { if let Err(e) = client - .post::<_, String>( - &format!("/api/w/{}/agent_workers/push_logs/{}", workspace.as_ref(), job_id), - None, - &logs.as_ref(), - ) - .await { + .post::<_, String>( + &format!( + "/api/w/{}/agent_workers/push_logs/{}", + workspace.as_ref(), + job_id + ), + None, + &logs.as_ref(), + ) + .await + { tracing::error!(%job_id, %e, "error sending logs for job {job_id}: {e}"); }; } @@ -691,7 +711,8 @@ where } pub async fn add_completed_job_error( - db: &Pool, + db: &DB, + job_queue_db: &DB, queued_job: &MiniPulledJob, mem_peak: i32, canceled_by: Option, @@ -728,6 +749,7 @@ pub async fn add_completed_job_error( ); let _ = add_completed_job( db, + job_queue_db, &queued_job, false, false, @@ -748,7 +770,8 @@ lazy_static::lazy_static! { } pub async fn add_completed_job( - db: &Pool, + db: &DB, + job_queue_db: &DB, queued_job: &MiniPulledJob, success: bool, skipped: bool, @@ -773,6 +796,7 @@ pub async fn add_completed_job( let (opt_uuid, duration, _skip_downstream_error_handlers) = (|| { commit_completed_job( db, + job_queue_db, queued_job, success, skipped, @@ -825,6 +849,7 @@ pub async fn add_completed_job( async fn commit_completed_job( db: &Pool, + job_queue_db: &Pool, queued_job: &MiniPulledJob, success: bool, skipped: bool, @@ -838,7 +863,7 @@ async fn commit_completed_job( // let start = std::time::Instant::now(); let mut tx = db.begin().await?; - + let mut job_tx = job_queue_db.begin().await?; let job_id = queued_job.id; // tracing::error!("1 {:?}", start.elapsed()); @@ -851,7 +876,7 @@ async fn commit_completed_job( let mem_peak = mem_peak; // add_time!(bench, "add_completed_job query START"); - if let Some(value) = check_result_size(db, queued_job, result).await { + if let Some(value) = check_result_size(db, job_queue_db, queued_job, result).await { return value; } @@ -891,7 +916,7 @@ async fn commit_completed_job( /* $9 */ duration, /* $10 */ result_columns as Option<&Vec>, ) - .fetch_one(&mut *tx) + .fetch_one(&mut *job_tx) .await .map_err(|e| Error::internal_err(format!("Could not add completed job {job_id}: {e:#}")))?; @@ -904,7 +929,7 @@ async fn commit_completed_job( job_id, labels as Vec ) - .execute(&mut *tx) + .execute(&mut *job_tx) .await .map_err(|e| Error::InternalErr(format!("Could not update job labels: {e:#}")))?; } @@ -927,7 +952,7 @@ async fn commit_completed_job( _duration, parent_job ) - .execute(&mut *tx) + .execute(&mut *job_tx) .await .inspect_err(|e| { tracing::error!( @@ -960,14 +985,14 @@ async fn commit_completed_job( parent_job, &queued_job.workspace_id ) - .execute(&mut *tx) + .execute(&mut *job_tx) .await?; if flow_is_done { let r = sqlx::query_scalar!( "UPDATE parallel_monitor_lock SET last_ping = now() WHERE parent_flow_id = $1 and job_id = $2 RETURNING 1", parent_job, &queued_job.id - ).fetch_optional(&mut *tx).await?; + ).fetch_optional(&mut *job_tx).await?; if r.is_some() { tracing::info!( "parallel flow iteration is done, setting parallel monitor last ping lock for job {}", @@ -1011,7 +1036,7 @@ async fn commit_completed_job( &queued_job.id, &queued_job.workspace_id ) - .fetch_optional(&mut *tx) + .fetch_optional(&mut *job_tx) .await? .flatten() .unwrap_or(false); @@ -1076,25 +1101,35 @@ async fn commit_completed_job( } } } + if queued_job.concurrent_limit.is_some() { let concurrency_key = concurrency_key(db, &queued_job.id).await?; - if *DISABLE_CONCURRENCY_LIMIT || concurrency_key.is_none() { - tracing::warn!("Concurrency limit is disabled, skipping"); - } else { - let concurrency_key = concurrency_key.unwrap(); - sqlx::query_scalar!( - "UPDATE concurrency_counter SET job_uuids = job_uuids - $2 WHERE concurrency_id = $1", - concurrency_key, - queued_job.id.hyphenated().to_string(), - ) - .execute(&mut *tx) - .await - .map_err(|e| { - Error::internal_err(format!( - "Could not decrement concurrency counter for job_id={}: {e:#}", - queued_job.id - )) - })?; + match concurrency_key { + Some(concurrency_key) if !*DISABLE_CONCURRENCY_LIMIT => { + sqlx::query_scalar!( + " + UPDATE + concurrency_counter + SET + job_uuids = job_uuids - $2 + WHERE + concurrency_id = $1 + ", + concurrency_key, + queued_job.id.hyphenated().to_string(), + ) + .execute(&mut *tx) + .await + .map_err(|e| { + Error::internal_err(format!( + "Could not decrement concurrency counter for job_id={}: {e:#}", + queued_job.id + )) + })?; + } + _ => { + tracing::warn!("Concurrency limit is disabled, skipping"); + } } if let Err(e) = sqlx::query_scalar!( @@ -1113,10 +1148,49 @@ async fn commit_completed_job( } sqlx::query!("DELETE FROM job_perms WHERE job_id = $1", job_id) - .execute(&mut *tx) + .execute(&mut *job_tx) .await?; - tx.commit().await?; + let main_db_result = tx.commit().await; + + match main_db_result { + Ok(()) => match job_tx.commit().await { + Ok(()) => { + tracing::debug!( + "Both main DB and shard DB transactions committed successfully for job {}", + job_id + ); + } + Err(shard_error) => { + //TODO: looks for solution + tracing::error!( + job_id = %job_id, + workspace_id = %queued_job.workspace_id, + error = %shard_error, + "CRITICAL: Main DB committed but shard DB failed during job completion. Data inconsistency possible." + ); + + return Err(Error::internal_err(format!( + "Shard DB commit failed after main DB commit for job {}: {}", + job_id, shard_error + ))); + } + }, + Err(main_error) => { + if let Err(rollback_err) = job_tx.rollback().await { + tracing::error!( + job_id = %job_id, + main_error = %main_error, + rollback_error = %rollback_err, + "Failed to rollback shard DB after main DB commit failure" + ); + } + return Err(Error::internal_err(format!( + "Main DB commit failed for job {}: {}", + job_id, main_error + ))); + } + } tracing::info!( %job_id, @@ -1142,6 +1216,7 @@ async fn commit_completed_job( async fn check_result_size( db: &Pool, + job_queue_db: &DB, queued_job: &MiniPulledJob, result: Json<&T>, ) -> Option, i64, bool), Error>> { @@ -2571,7 +2646,8 @@ pub struct PulledJobResult { } pub async fn pull( - db: &Pool, + db: &DB, + job_queue_db: &DB, suspend_first: bool, worker_name: &str, query_o: Option<&(String, String)>, @@ -2595,7 +2671,7 @@ pub async fn pull( } else { sqlx::query_as::<_, PulledJob>(query_suspended) .bind(worker_name) - .fetch_optional(db) + .fetch_optional(job_queue_db) .await? }; if let Some(job) = job { @@ -2603,7 +2679,7 @@ pub async fn pull( } else { let job = sqlx::query_as::<_, PulledJob>(query_no_suspend) .bind(worker_name) - .fetch_optional(db) + .fetch_optional(job_queue_db) .await?; PulledJobResult { job, suspended: false } } @@ -2626,7 +2702,7 @@ pub async fn pull( tag, job.id ) - .execute(db) + .execute(job_queue_db) .await?; continue; } @@ -2634,7 +2710,7 @@ pub async fn pull( return Ok(njob); }; let (job, suspended) = pull_single_job_and_mark_as_running_no_concurrency_limit( - db, + job_queue_db, suspend_first, worker_name, #[cfg(feature = "benchmark")] @@ -2718,14 +2794,30 @@ pub async fn pull( let job_script_path = pulled_job.runnable_path.clone().unwrap_or_default(); let min_started_at = sqlx::query!( - "SELECT COALESCE((SELECT MIN(started_at) as min_started_at - FROM v2_job_queue INNER JOIN v2_job ON v2_job.id = v2_job_queue.id - WHERE v2_job.runnable_path = $1 AND v2_job.kind != 'dependencies' AND v2_job_queue.running = true AND v2_job_queue.workspace_id = $2 AND v2_job_queue.canceled_by IS NULL AND v2_job.concurrent_limit > 0), $3) as min_started_at, now() AS now", + r#" + SELECT + COALESCE( + ( + SELECT MIN(started_at) AS min_started_at + FROM v2_job_queue + INNER JOIN v2_job + ON v2_job.id = v2_job_queue.id + WHERE v2_job.runnable_path = $1 + AND v2_job.kind != 'dependencies' + AND v2_job_queue.running = true + AND v2_job_queue.workspace_id = $2 + AND v2_job_queue.canceled_by IS NULL + AND v2_job.concurrent_limit > 0 + ), + $3 + ) AS min_started_at, + NOW() AS now + "#, job_script_path, &pulled_job.workspace_id, max_ended_at ) - .fetch_one(db) + .fetch_one(job_queue_db) .await .map_err(|e| { Error::internal_err(format!( @@ -2734,15 +2826,27 @@ pub async fn pull( })?; let job_uuid: Uuid = pulled_job.id; - let avg_script_duration: Option = sqlx::query_scalar!( - "SELECT CAST(ROUND(AVG(duration_ms), 0) AS BIGINT) AS avg_duration_s FROM - (SELECT duration_ms FROM concurrency_key LEFT JOIN v2_job_completed ON v2_job_completed.id = concurrency_key.job_id WHERE key = $1 AND ended_at IS NOT NULL - ORDER BY ended_at - DESC LIMIT 10) AS t", + + let completed_job_ids = sqlx::query_scalar!( + "SELECT job_id FROM concurrency_key WHERE key = $1 AND ended_at IS NOT NULL + ORDER BY ended_at DESC LIMIT 10", job_concurrency_key ) - .fetch_one(db) + .fetch_all(db) .await?; + + let avg_script_duration: Option = if completed_job_ids.is_empty() { + None + } else { + sqlx::query_scalar!( + "SELECT CAST(ROUND(AVG(duration_ms), 0) AS BIGINT) AS avg_duration_s FROM v2_job_completed + WHERE id = ANY($1)", + &completed_job_ids + ) + .fetch_optional(job_queue_db) + .await? + .flatten() + }; tracing::debug!( "avg script duration computed: {}", avg_script_duration.unwrap_or(0) @@ -2763,12 +2867,35 @@ pub async fn pull( (min_started_at_or_now + inc).max(now + Duration::try_seconds(3).unwrap_or_default()); let mut estimated_next_schedule_timestamp = min_started_p_inc; - let all_jobs = sqlx::query_scalar!( - "SELECT scheduled_for FROM v2_job_queue INNER JOIN concurrency_key ON concurrency_key.job_id = v2_job_queue.id - WHERE key = $1 AND running = false AND canceled_by IS NULL AND scheduled_for >= $2", - job_concurrency_key, - estimated_next_schedule_timestamp - inc - ).fetch_all(db).await?; + + let job_ids_with_key = sqlx::query_scalar!( + "SELECT job_id FROM concurrency_key WHERE key = $1", + job_concurrency_key + ) + .fetch_all(db) + .await?; + + let all_jobs = if job_ids_with_key.is_empty() { + Vec::new() + } else { + sqlx::query_scalar!( + " + SELECT + scheduled_for + FROM + v2_job_queue + WHERE + id = ANY($1) AND + running = false AND + canceled_by IS NULL AND + scheduled_for >= $2 + ", + &job_ids_with_key, + estimated_next_schedule_timestamp - inc + ) + .fetch_all(job_queue_db) + .await? + }; tracing::debug!( "all_jobs: {:?}, estimated_next_schedule_timestamp: {:?}, inc: {:?}", @@ -2826,22 +2953,25 @@ pub async fn pull( WITH ping AS ( UPDATE v2_job_runtime SET ping = null WHERE id = $2 ) - UPDATE v2_job_queue SET + UPDATE + v2_job_queue + SET running = false, started_at = null, scheduled_for = $1 - WHERE id = $2", + WHERE + id = $2", estimated_next_schedule_timestamp, job_uuid, ) - .execute(db) + .execute(job_queue_db) .await .map_err(|e| Error::internal_err(format!("Could not update and re-queue job {job_uuid}. The job will be marked as running but it is not running: {e:#}")))?; } } async fn pull_single_job_and_mark_as_running_no_concurrency_limit<'c>( - db: &Pool, + job_queue_db: &DB, suspend_first: bool, worker_name: &str, #[cfg(feature = "benchmark")] bench: &mut BenchmarkIter, @@ -2865,7 +2995,7 @@ async fn pull_single_job_and_mark_as_running_no_concurrency_limit<'c>( // tracing::info!("Pulling job with query: {}", query); sqlx::query_as::<_, PulledJob>(&query) .bind(worker_name) - .fetch_optional(db) + .fetch_optional(job_queue_db) .await? } else { None @@ -2892,7 +3022,7 @@ async fn pull_single_job_and_mark_as_running_no_concurrency_limit<'c>( let r = sqlx::query_as::<_, PulledJob>(query) .bind(worker_name) - .fetch_optional(db) + .fetch_optional(job_queue_db) .await?; #[cfg(feature = "benchmark")] @@ -4559,23 +4689,44 @@ pub async fn push<'c, 'd>( }) }; + let shard_mode = *SHARD_MODE; + let mut existing_job_id = false; let mut tx = tx.into_tx().await?; let job_id: Uuid = if let Some(job_id) = job_id { - let conflicting_id = sqlx::query_scalar!("SELECT 1 FROM v2_job WHERE id = $1", job_id) - .fetch_optional(&mut *tx) - .await?; + existing_job_id = true; + job_id + } else { + Ulid::new().into() + }; + + let mut job_queue_tx = if shard_mode { + if let Some(job_queue_db) = get_shard_db_from_job_uuid(job_id) { + Some(job_queue_db.begin().await?) + } else { + None + } + } else { + None + }; + + if existing_job_id { + let conflicting_id = if shard_mode { + sqlx::query_scalar!("SELECT 1 FROM v2_job WHERE id = $1", job_id) + .fetch_optional(&mut **job_queue_tx.as_mut().unwrap()) + .await? + } else { + sqlx::query_scalar!("SELECT 1 FROM v2_job WHERE id = $1", job_id) + .fetch_optional(&mut *tx) + .await? + }; if conflicting_id.is_some() { return Err(Error::BadRequest(format!( "Job with id {job_id} already exists" ))); } - - job_id - } else { - Ulid::new().into() - }; + } if concurrent_limit.is_some() { insert_concurrency_key( @@ -4736,7 +4887,9 @@ pub async fn push<'c, 'd>( trigger_kind as Option, running, ) - .execute(&mut *tx) + .execute(if !shard_mode {&mut *tx} else { + job_queue_tx.as_mut().unwrap() + }) .warn_after_seconds(1) .await?; @@ -4767,7 +4920,11 @@ pub async fn push<'c, 'd>( job_id, Json(flow_status) as Json, ) - .execute(&mut *tx) + .execute(if !shard_mode { + &mut *tx + } else { + job_queue_tx.as_mut().unwrap() + }) .warn_after_seconds(1) .await?; } @@ -4842,6 +4999,52 @@ pub async fn push<'c, 'd>( .await?; } + if shard_mode { + let job_queue_tx = job_queue_tx.unwrap(); + + let main_commit_result = tx.commit().await; + + let commit_result = match main_commit_result { + Ok(_) => match job_queue_tx.commit().await { + Ok(_) => { + tracing::debug!( + "Both main DB and shard DB committed successfully for job {}", + job_id + ); + let new_tx = _db.begin().await?; + Ok((job_id, new_tx)) + } + Err(shard_error) => { + tracing::error!( + job_id = %job_id, + workspace_id = %workspace_id, + error = %shard_error, + "CRITICAL: Main DB committed but shard DB failed during job push. Data inconsistency detected." + ); + + Err(Error::internal_err(format!( + "CRITICAL: Shard DB commit failed after main DB commit for job {} (workspace: {}): {}", + job_id, workspace_id, shard_error + ))) + } + }, + Err(main_error) => { + if let Err(rollback_err) = job_queue_tx.rollback().await { + tracing::error!( + job_id = %job_id, + main_error = %main_error, + rollback_error = %rollback_err, + "Failed to rollback shard DB after main DB commit failure" + ); + } + Err(Error::internal_err(format!( + "Main DB commit failed for job {}: {}", + job_id, main_error + ))) + } + }; + return commit_result; + } Ok((job_id, tx)) } diff --git a/backend/windmill-worker/src/ai_executor.rs b/backend/windmill-worker/src/ai_executor.rs index 2eddb657b59ca..5ff7feb2b9bfc 100644 --- a/backend/windmill-worker/src/ai_executor.rs +++ b/backend/windmill-worker/src/ai_executor.rs @@ -99,7 +99,7 @@ pub async fn handle_ai_agent_job( // connection conn: &Connection, db: &DB, - + job_queue_db: &DB, // agent job job: &MiniPulledJob, @@ -267,6 +267,7 @@ pub async fn handle_ai_agent_job( let agent_fut = run_agent( db, conn, + job_queue_db, job, parent_job, &args, @@ -396,7 +397,7 @@ pub async fn run_agent( // connection db: &DB, conn: &Connection, - + job_queue_db: &DB, // agent job and flow data job: &MiniPulledJob, parent_job: &Uuid, @@ -776,6 +777,7 @@ pub async fn run_agent( None, None, conn, + Some(job_queue_db), client, hostname, worker_name, @@ -798,6 +800,7 @@ pub async fn run_agent( let err_json = error_to_value(&err); let _ = handle_non_flow_job_error( db, + job_queue_db, &tool_job, 0, None, diff --git a/backend/windmill-worker/src/result_processor.rs b/backend/windmill-worker/src/result_processor.rs index 144c8d9101a4c..49ad4d4a46cca 100644 --- a/backend/windmill-worker/src/result_processor.rs +++ b/backend/windmill-worker/src/result_processor.rs @@ -17,7 +17,14 @@ use windmill_common::otel_oss::FutureExt; use uuid::Uuid; use windmill_common::{ - add_time, error::{self, Error}, flow_status::{FlowJobDuration}, jobs::JobKind, utils::WarnAfterExt, worker::{to_raw_value, Connection, WORKER_GROUP}, worker_group_job_stats::{accumulate_job_stats, flush_stats_to_db, JobStatsMap}, KillpillSender, DB + add_time, + error::{self, Error}, + flow_status::FlowJobDuration, + jobs::JobKind, + utils::WarnAfterExt, + worker::{to_raw_value, Connection, WORKER_GROUP}, + worker_group_job_stats::{accumulate_job_stats, flush_stats_to_db, JobStatsMap}, + KillpillSender, DB, }; #[cfg(feature = "benchmark")] @@ -55,6 +62,7 @@ async fn process_jc( worker_name: &str, base_internal_url: &str, db: &DB, + job_queue_db: &DB, worker_dir: &str, same_worker_tx: Option<&SameWorkerSender>, job_completed_sender: &JobCompletedSender, @@ -144,6 +152,7 @@ async fn process_jc( jc, &base_internal_url, &db, + &job_queue_db, worker_dir, same_worker_tx, &worker_name, @@ -186,6 +195,7 @@ pub fn start_background_processor( last_processing_duration: Arc, base_internal_url: String, db: DB, + job_queue_db: &DB, worker_dir: String, same_worker_tx: SameWorkerSender, worker_name: String, @@ -193,6 +203,7 @@ pub fn start_background_processor( is_dedicated_worker: bool, stats_map: JobStatsMap, ) -> JoinHandle<()> { + let job_queue_db = job_queue_db.clone(); tokio::spawn(async move { let mut has_been_killed = false; @@ -274,6 +285,7 @@ pub fn start_background_processor( &worker_name, &base_internal_url, &db, + &job_queue_db, &worker_dir, Some(&same_worker_tx), &job_completed_sender, @@ -325,6 +337,7 @@ pub fn start_background_processor( tracing::info!(parent_flow = %flow, "updating flow status after job completion"); if let Err(e) = update_flow_status_after_job_completion( &db, + &job_queue_db, &AuthedClient::new( base_internal_url.to_string(), w_id.clone(), @@ -496,6 +509,7 @@ pub async fn handle_receive_completed_job( jc: JobCompleted, base_internal_url: &str, db: &DB, + job_queue_db: &DB, worker_dir: &str, same_worker_tx: Option<&SameWorkerSender>, worker_name: &str, @@ -513,6 +527,7 @@ pub async fn handle_receive_completed_job( jc, &client, db, + job_queue_db, &worker_dir, same_worker_tx.clone(), worker_name, @@ -526,6 +541,7 @@ pub async fn handle_receive_completed_job( Err(err) => { handle_job_error( db, + job_queue_db, &client, job.as_ref(), mem_peak, @@ -561,6 +577,7 @@ pub async fn process_completed_job( }: JobCompleted, client: &AuthedClient, db: &DB, + job_queue_db: &DB, worker_dir: &str, same_worker_tx: Option<&SameWorkerSender>, worker_name: &str, @@ -590,7 +607,7 @@ pub async fn process_completed_job( WHERE id = $1 AND preprocessed = FALSE"#, job.id ) - .execute(db) + .execute(job_queue_db) .await .map_err(|e| { Error::InternalErr(format!( @@ -604,7 +621,7 @@ pub async fn process_completed_job( Json(preprocessed_args) as Json>>, job.id ) - .execute(db) + .execute(job_queue_db) .await?; } @@ -612,6 +629,7 @@ pub async fn process_completed_job( let (_, duration) = add_completed_job( db, + job_queue_db, &job, true, false, @@ -632,6 +650,7 @@ pub async fn process_completed_job( // tracing::info!(parent_flow = %parent_job, subflow = %job_id, "updating flow status (2)"); let r = update_flow_status_after_job_completion( db, + job_queue_db, client, parent_job, &job_id, @@ -657,6 +676,7 @@ pub async fn process_completed_job( } else { let result = add_completed_job_error( db, + job_queue_db, &job, mem_peak.to_owned(), canceled_by, @@ -673,13 +693,17 @@ pub async fn process_completed_job( tracing::error!(parent_flow = %parent_job, subflow = %job.id, "process completed job error, updating flow status"); let r = update_flow_status_after_job_completion( db, + job_queue_db, client, parent_job, &job.id, &job.workspace_id, false, Arc::new(serde_json::value::to_raw_value(&result).unwrap()), - duration.map(|x| FlowJobDuration { started_at: job.started_at.unwrap(), duration_ms: x }), + duration.map(|x| FlowJobDuration { + started_at: job.started_at.unwrap(), + duration_ms: x, + }), false, &same_worker_tx.expect(SAME_WORKER_REQUIREMENTS).to_owned(), &worker_dir, @@ -700,6 +724,7 @@ pub async fn process_completed_job( pub async fn handle_non_flow_job_error( db: &DB, + job_queue_db: &DB, job: &MiniPulledJob, mem_peak: i32, canceled_by: Option, @@ -716,6 +741,7 @@ pub async fn handle_non_flow_job_error( .await; add_completed_job_error( db, + job_queue_db, job, mem_peak, canceled_by, @@ -730,6 +756,7 @@ pub async fn handle_non_flow_job_error( #[tracing::instrument(name = "job_error", level = "info", skip_all, fields(job_id = %job.id))] pub async fn handle_job_error( db: &DB, + job_queue_db: &DB, client: &AuthedClient, job: &MiniPulledJob, mem_peak: i32, @@ -748,6 +775,7 @@ pub async fn handle_job_error( let update_job_future = || async { handle_non_flow_job_error( db, + job_queue_db, job, mem_peak, canceled_by.clone(), @@ -775,6 +803,7 @@ pub async fn handle_job_error( tracing::error!(parent_flow = %flow, subflow = %job_status_to_update, "handle job error, updating flow status: {err_json:?}"); let updated_flow = update_flow_status_after_job_completion( db, + job_queue_db, client, flow, &job_status_to_update, @@ -808,6 +837,7 @@ pub async fn handle_job_error( .await; let _ = add_completed_job_error( db, + job_queue_db, &MiniPulledJob::from(&parent_job), mem_peak, canceled_by.clone(), diff --git a/backend/windmill-worker/src/worker.rs b/backend/windmill-worker/src/worker.rs index 2ae1a42f74c92..8ded1939229d0 100644 --- a/backend/windmill-worker/src/worker.rs +++ b/backend/windmill-worker/src/worker.rs @@ -11,6 +11,8 @@ use anyhow::anyhow; use futures::TryFutureExt; +use sqlx::Pool; +use sqlx::Postgres; use tokio::time::timeout; use windmill_common::client::AuthedClient; use windmill_common::utils::report_critical_error; @@ -721,6 +723,7 @@ fn create_span(arc_job: &Arc, worker_name: &str, hostname: &str) pub async fn handle_all_job_kind_error( conn: &Connection, + job_queue_db: Option<&DB>, authed_client: &AuthedClient, job: Arc, err: Error, @@ -734,6 +737,7 @@ pub async fn handle_all_job_kind_error( Connection::Sql(db) => { handle_job_error( db, + job_queue_db.expect("Job queue database connection required for error handling"), authed_client, job.as_ref(), 0, @@ -776,6 +780,7 @@ pub async fn handle_all_job_kind_error( pub fn start_interactive_worker_shell( conn: Connection, + job_queue_db: Option<&DB>, hostname: String, worker_name: String, mut killpill_rx: tokio::sync::broadcast::Receiver<()>, @@ -783,12 +788,12 @@ pub fn start_interactive_worker_shell( base_internal_url: String, worker_dir: String, ) -> JoinHandle<()> { + let job_queue_db = job_queue_db.cloned(); tokio::spawn(async move { let mut occupancy_metrics = OccupancyMetrics::new(Instant::now()); let mut last_executed_job: Option = Instant::now().checked_sub(Duration::from_millis(2500)); - loop { if let Ok(_) = killpill_rx.try_recv() { break; @@ -802,6 +807,7 @@ pub fn start_interactive_worker_shell( let mut bench = windmill_common::bench::BenchmarkIter::new(); let job = pull( &db, + &db, // Use same database for both main and job queue in non-sharded mode false, &worker_name, Some(&query), @@ -854,6 +860,7 @@ pub fn start_interactive_worker_shell( raw_flow, parent_runnable_path, &conn, + job_queue_db.as_ref(), &authed_client, &hostname, &worker_name, @@ -912,6 +919,7 @@ pub async fn create_job_dir(worker_directory: &str, job_id: impl Display) -> Str pub async fn run_worker( conn: &Connection, + job_queue_db: Option>, hostname: &str, worker_name: String, i_worker: u64, @@ -1231,6 +1239,9 @@ pub async fn run_worker( last_processing_duration.clone(), base_internal_url.to_string(), db.clone(), + &job_queue_db + .clone() + .expect("Job queue database connection required for background processor"), worker_dir.clone(), same_worker_tx.clone(), worker_name.clone(), @@ -1246,6 +1257,7 @@ pub async fn run_worker( let interactive_shell = if i_worker == 1 { let it_shell = start_interactive_worker_shell( conn.clone(), + job_queue_db.as_ref(), hostname.to_owned(), worker_name.clone(), killpill_rx.resubscribe(), @@ -1546,6 +1558,9 @@ pub async fn run_worker( Duration::from_secs(10), pull( &db, + job_queue_db.as_ref().expect( + "Job queue database connection required for job pulling", + ), suspend_first, &worker_name, None, @@ -1837,6 +1852,7 @@ pub async fn run_worker( raw_flow, parent_runnable_path, &conn, + job_queue_db.as_ref(), &authed_client, hostname, &worker_name, @@ -1900,6 +1916,7 @@ pub async fn run_worker( } handle_all_job_kind_error( &conn, + job_queue_db.as_ref(), &authed_client, arc_job.clone(), err, @@ -2269,6 +2286,7 @@ pub async fn handle_queued_job( raw_flow: Option>>, parent_runnable_path: Option, conn: &Connection, + job_queue_db: Option<&DB>, client: &AuthedClient, hostname: &str, worker_name: &str, @@ -2564,6 +2582,8 @@ pub async fn handle_queued_job( handle_ai_agent_job( conn, db, + job_queue_db + .expect("Job queue database connection required for error handling"), job.as_ref(), &client, &mut canceled_by, diff --git a/backend/windmill-worker/src/worker_flow.rs b/backend/windmill-worker/src/worker_flow.rs index 832af368d1469..bf86c581be6ac 100644 --- a/backend/windmill-worker/src/worker_flow.rs +++ b/backend/windmill-worker/src/worker_flow.rs @@ -74,6 +74,7 @@ use windmill_queue::{canceled_job_to_result, push}; // #[instrument(level = "trace", skip_all)] pub async fn update_flow_status_after_job_completion( db: &DB, + job_queue_db: &DB, client: &AuthedClient, flow: uuid::Uuid, job_id_for_status: &Uuid, @@ -106,6 +107,7 @@ pub async fn update_flow_status_after_job_completion( potentially_crash_for_testing(); let nrec = match update_flow_status_after_job_completion_internal( db, + job_queue_db, client, rec.flow, &rec.job_id_for_status, @@ -130,6 +132,7 @@ pub async fn update_flow_status_after_job_completion( tracing::error!("Error while updating flow status of {} after completion of {}, updating flow status again with error: {e:#}", rec.flow, &rec.job_id_for_status); update_flow_status_after_job_completion_internal( db, + job_queue_db, client, rec.flow, &rec.job_id_for_status, @@ -269,6 +272,7 @@ fn result_has_recover_true(nresult: Arc>) -> bool { // #[instrument(level = "trace", skip_all)] pub async fn update_flow_status_after_job_completion_internal( db: &DB, + job_queue_db: &DB, client: &AuthedClient, flow: uuid::Uuid, job_id_for_status: &Uuid, @@ -1452,6 +1456,7 @@ pub async fn update_flow_status_after_job_completion_internal( if flow_job.is_canceled() { add_completed_job_error( db, + job_queue_db, &flow_job, 0, Some(CanceledBy { @@ -1478,6 +1483,7 @@ pub async fn update_flow_status_after_job_completion_internal( let duration = if success { let (_, duration) = add_completed_job( db, + job_queue_db, &flow_job, true, stop_early && skip_if_stop_early, @@ -1493,6 +1499,7 @@ pub async fn update_flow_status_after_job_completion_internal( } else { let (_, duration) = add_completed_job( db, + job_queue_db, &flow_job, false, stop_early && skip_if_stop_early, @@ -1540,8 +1547,18 @@ pub async fn update_flow_status_after_job_completion_internal( &db.into(), ) .await; - let _ = add_completed_job_error(db, &flow_job, 0, None, e, worker_name, true, None) - .await; + let _ = add_completed_job_error( + db, + job_queue_db, + &flow_job, + 0, + None, + e, + worker_name, + true, + None, + ) + .await; true } Ok(_) => false, From bd70d55075bcabf9747613d00c5d7a25bc30f8f1 Mon Sep 17 00:00:00 2001 From: dieriba Date: Wed, 24 Sep 2025 18:20:16 +0200 Subject: [PATCH 2/9] easy --- .../20250922155947_v2_job.up.sql | 10 +- .../20250924160628_v2_as_queue.down.sql | 2 + .../20250924160628_v2_as_queue.up.sql | 53 +++++++++++ backend/src/main.rs | 4 +- backend/windmill-api/src/jobs.rs | 94 +++++++++++++++++-- backend/windmill-common/src/lib.rs | 1 - backend/windmill-queue/src/jobs.rs | 7 +- benchmarks/README.md | 4 +- 8 files changed, 153 insertions(+), 22 deletions(-) create mode 100644 backend/shard-migrations/20250924160628_v2_as_queue.down.sql create mode 100644 backend/shard-migrations/20250924160628_v2_as_queue.up.sql diff --git a/backend/shard-migrations/20250922155947_v2_job.up.sql b/backend/shard-migrations/20250922155947_v2_job.up.sql index f67cc5cb1da7f..7ea2d3f8efc38 100644 --- a/backend/shard-migrations/20250922155947_v2_job.up.sql +++ b/backend/shard-migrations/20250922155947_v2_job.up.sql @@ -1,6 +1,6 @@ -- Add up migration script here -CREATE TYPE IF NOT EXISTS job_kind AS ENUM ( +CREATE TYPE job_kind AS ENUM ( 'script', 'preview', 'flow', 'dependencies', 'flowpreview', 'script_hub', 'identity', 'flowdependencies', 'http', 'graphql', 'postgresql', 'noop', 'appdependencies', 'deploymentcallback', 'singlescriptflow', 'flowscript', @@ -8,23 +8,23 @@ CREATE TYPE IF NOT EXISTS job_kind AS ENUM ( ); -CREATE TYPE IF NOT EXISTS job_status AS ENUM ('success', 'failure', 'canceled', 'skipped'); +CREATE TYPE job_status AS ENUM ('success', 'failure', 'canceled', 'skipped'); -CREATE TYPE IF NOT EXISTS job_trigger_kind AS ENUM ( +CREATE TYPE job_trigger_kind AS ENUM ( 'webhook', 'http', 'websocket', 'kafka', 'email', 'nats', 'schedule', 'app', 'ui', 'postgres', 'sqs', 'gcp', 'mqtt' ); -CREATE TYPE IF NOT EXISTS script_lang AS ENUM ( +CREATE TYPE script_lang AS ENUM ( 'python3', 'deno', 'go', 'bash', 'postgresql', 'nativets', 'bun', 'mysql', 'bigquery', 'snowflake', 'graphql', 'powershell', 'mssql', 'php', 'bunnative', 'rust', 'ansible', 'csharp', 'oracledb', 'nu', 'java', 'duckdb' ); -CREATE TABLE IF NOT EXISTS v2_job ( +CREATE TABLE v2_job ( id UUID PRIMARY KEY, raw_code TEXT, raw_lock TEXT, diff --git a/backend/shard-migrations/20250924160628_v2_as_queue.down.sql b/backend/shard-migrations/20250924160628_v2_as_queue.down.sql new file mode 100644 index 0000000000000..23bb72031f378 --- /dev/null +++ b/backend/shard-migrations/20250924160628_v2_as_queue.down.sql @@ -0,0 +1,2 @@ +-- Add down migration script here +DROP VIEW v2_as_queue; diff --git a/backend/shard-migrations/20250924160628_v2_as_queue.up.sql b/backend/shard-migrations/20250924160628_v2_as_queue.up.sql new file mode 100644 index 0000000000000..884e24daefcb0 --- /dev/null +++ b/backend/shard-migrations/20250924160628_v2_as_queue.up.sql @@ -0,0 +1,53 @@ +-- Add up migration script here +-- Add up migration script here +CREATE OR REPLACE VIEW v2_as_queue AS +SELECT + j.id, + j.workspace_id, + j.parent_job, + j.created_by, + j.created_at, + q.started_at, + q.scheduled_for, + q.running, + j.runnable_id AS script_hash, + j.runnable_path AS script_path, + j.args, + j.raw_code, + q.canceled_by IS NOT NULL AS canceled, + q.canceled_by, + q.canceled_reason, + r.ping AS last_ping, + j.kind AS job_kind, + CASE WHEN j.trigger_kind = 'schedule'::job_trigger_kind THEN j.trigger END + AS schedule_path, + j.permissioned_as, + COALESCE(s.flow_status, s.workflow_as_code_status) AS flow_status, + j.raw_flow, + j.flow_step_id IS NOT NULL AS is_flow_step, + j.script_lang AS language, + q.suspend, + q.suspend_until, + j.same_worker, + j.raw_lock, + j.pre_run_error, + j.permissioned_as_email AS email, + j.visible_to_owner, + r.memory_peak AS mem_peak, + j.flow_innermost_root_job AS root_job, + s.flow_leaf_jobs AS leaf_jobs, + j.tag, + j.concurrent_limit, + j.concurrency_time_window_s, + j.timeout, + j.flow_step_id, + j.cache_ttl, + j.priority, + NULL::TEXT AS logs, + j.script_entrypoint_override, + j.preprocessed +FROM v2_job_queue q + JOIN v2_job j USING (id) + LEFT JOIN v2_job_runtime r USING (id) + LEFT JOIN v2_job_status s USING (id) +; diff --git a/backend/src/main.rs b/backend/src/main.rs index 154a97497f150..f3fe0911b471c 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -305,6 +305,7 @@ async fn initialize_worker_shard_db() -> anyhow::Result> { anyhow!("SHARD_DB_URL environment variable is required for worker shard mode") })?; + tracing::info!("Shard url: {}", shard); let shard = connect_db(Some(shard), false, false, true).await?; SHARD_DB_INSTANCE.set(shard.clone()).map_err(|_| { anyhow!("SHARD_DB_INSTANCE already initialized") @@ -319,6 +320,7 @@ async fn initialize_server_shard_instances() -> anyhow::Result<()> { let mut shard_to_db = HashMap::new(); for (i, shard_url) in shard_urls.iter().enumerate() { + println!("Url: {}", &shard_url); let shard = connect_db(Some(&shard_url), true, false, false).await?; shard_to_db.insert(i, shard); } @@ -531,7 +533,7 @@ async fn windmill_main() -> anyhow::Result<()> { if server_mode { initialize_server_shard_instances().await?; } - if worker_mode { + else { job_queue_db = Some(initialize_worker_shard_db().await?); } } else { diff --git a/backend/windmill-api/src/jobs.rs b/backend/windmill-api/src/jobs.rs index e971ac47e4cc7..aa3effee393b8 100644 --- a/backend/windmill-api/src/jobs.rs +++ b/backend/windmill-api/src/jobs.rs @@ -2266,17 +2266,55 @@ async fn count_queue_jobs( let tags = cq .tags .map(|t| t.split(',').map(|s| s.to_string()).collect::>()); - Ok(Json( + + let count_queue_jobs = async |db: &DB| { sqlx::query_as!( QueueStats, - "SELECT coalesce(COUNT(*) FILTER(WHERE suspend = 0 AND running = false), 0) as \"database_length!\", coalesce(COUNT(*) FILTER(WHERE suspend > 0), 0) as \"suspended!\" FROM v2_as_queue WHERE (workspace_id = $1 OR $2) AND scheduled_for <= now() AND ($3::text[] IS NULL OR tag = ANY($3))", + " + SELECT + coalesce(COUNT(*) FILTER(WHERE suspend = 0 AND running = false), 0) as \"database_length!\", + coalesce(COUNT(*) FILTER(WHERE suspend > 0), 0) as \"suspended!\" + FROM + v2_as_queue + WHERE + (workspace_id = $1 OR $2) AND + scheduled_for <= now() AND ($3::text[] IS NULL OR tag = ANY($3)) + ", w_id, w_id == "admins" && cq.all_workspaces.unwrap_or(false), tags.as_ref().map(|v| v.as_slice()) ) - .fetch_one(&db) - .await?, - )) + .fetch_one(db) + .await + }; + + let count_queue_jobs = if *SHARD_MODE { + let count_futures = SHARD_ID_TO_DB_INSTANCE + .get() + .unwrap() + .iter() + .map(|(_, db)| count_queue_jobs(db)) + .collect_vec(); + + let completed_futures = futures::future::try_join_all(count_futures).await?; + + let mut count = QueueStats { database_length: 0, suspended: None }; + + for queue_count in completed_futures { + count.database_length += queue_count.database_length; + count.suspended = match (count.suspended, queue_count.suspended) { + (Some(i), Some(j)) => Some(i + j), + (None, Some(i)) => Some(i), + (count, _) => count, + }; + } + + count + } else { + count_queue_jobs(&db).await? + }; + + Ok(Json(count_queue_jobs)) } #[derive(Deserialize)] @@ -2336,15 +2374,51 @@ async fn count_completed_jobs( Extension(db): Extension, Path(w_id): Path, ) -> error::JsonResult { - Ok(Json( + let count_completed_jobs = async |db: &DB| { sqlx::query_as!( QueueStats, - "SELECT coalesce(COUNT(*), 0) as \"database_length!\", null::bigint as suspended FROM v2_job_completed WHERE workspace_id = $1", + " + SELECT + coalesce(COUNT(*), 0) as \"database_length!\", + null::bigint as suspended + FROM + v2_job_completed + WHERE + workspace_id = $1 + ", w_id ) - .fetch_one(&db) - .await?, - )) + .fetch_one(db) + .await + }; + + let count_completed_jobs = if *SHARD_MODE { + let count_futures = SHARD_ID_TO_DB_INSTANCE + .get() + .unwrap() + .iter() + .map(|(_, db)| count_completed_jobs(db)) + .collect_vec(); + + let completed_futures = futures::future::try_join_all(count_futures).await?; + + let mut count = QueueStats { database_length: 0, suspended: None }; + + for queue_count in completed_futures { + count.database_length += queue_count.database_length; + count.suspended = match (count.suspended, queue_count.suspended) { + (Some(i), Some(j)) => Some(i + j), + (None, Some(i)) => Some(i), + (count, _) => count, + }; + } + + count + } else { + count_completed_jobs(&db).await? + }; + + Ok(Json(count_completed_jobs)) } async fn list_jobs( diff --git a/backend/windmill-common/src/lib.rs b/backend/windmill-common/src/lib.rs index d26543730fb9d..38209eb11ee7a 100644 --- a/backend/windmill-common/src/lib.rs +++ b/backend/windmill-common/src/lib.rs @@ -171,7 +171,6 @@ lazy_static::lazy_static! { } -// Zero-overhead shard database instances (initialized once at startup) pub static SHARD_DB_INSTANCE: OnceCell> = OnceCell::new(); pub static SHARD_ID_TO_DB_INSTANCE: OnceCell>> = OnceCell::new(); diff --git a/backend/windmill-queue/src/jobs.rs b/backend/windmill-queue/src/jobs.rs index 84eeafa0d07ad..53edded3e6f52 100644 --- a/backend/windmill-queue/src/jobs.rs +++ b/backend/windmill-queue/src/jobs.rs @@ -862,8 +862,8 @@ async fn commit_completed_job( ) -> windmill_common::error::Result<(Option, i64, bool)> { // let start = std::time::Instant::now(); - let mut tx = db.begin().await?; - let mut job_tx = job_queue_db.begin().await?; + let (mut tx, mut job_tx) = tokio::try_join!(db.begin(), job_queue_db.begin())?; + let job_id = queued_job.id; // tracing::error!("1 {:?}", start.elapsed()); @@ -965,7 +965,7 @@ async fn commit_completed_job( // tracing::error!("Added completed job {:#?}", queued_job); let mut _skip_downstream_error_handlers = false; - tx = delete_job(tx, &job_id).await?; + job_tx = delete_job(job_tx, &job_id).await?; // tracing::error!("3 {:?}", start.elapsed()); if queued_job.is_flow_step() { @@ -4888,6 +4888,7 @@ pub async fn push<'c, 'd>( running, ) .execute(if !shard_mode {&mut *tx} else { + println!("Push"); job_queue_tx.as_mut().unwrap() }) .warn_after_seconds(1) diff --git a/benchmarks/README.md b/benchmarks/README.md index c358471626a9e..2f8055c06e428 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -75,11 +75,11 @@ in `benchmarks_noop.ts` You can build it locally with: ``` -deno install -A benchmarks_noop.ts +deno install -A benchmarks_oneoff.ts ``` and then ``` -benchmarks_noop -e admin@windmill.dev -p changeme --host YOUR_HOST +benchmark_oneoff -e admin@windmill.dev -p changeme --host YOUR_HOST ``` By default it creates 10000 jobs in Windmill in a single batch, but this is parametrizable. \ No newline at end of file From adb28cf02240830a313f0c0d5562f199310adba5 Mon Sep 17 00:00:00 2001 From: dieriba Date: Mon, 29 Sep 2025 17:24:48 +0200 Subject: [PATCH 3/9] fix --- backend/windmill-queue/src/jobs.rs | 33 +++++------------------------- 1 file changed, 5 insertions(+), 28 deletions(-) diff --git a/backend/windmill-queue/src/jobs.rs b/backend/windmill-queue/src/jobs.rs index 107e8e068c6ac..d9182d05064b2 100644 --- a/backend/windmill-queue/src/jobs.rs +++ b/backend/windmill-queue/src/jobs.rs @@ -924,29 +924,6 @@ async fn commit_completed_job( .await .map_err(|e| Error::internal_err(format!("Could not add completed job {job_id}: {e:#}")))?; - let duration = if let Some(duration) = duration { - duration - } else { - let already_inserted = sqlx::query_scalar!( - "SELECT EXISTS(SELECT 1 FROM v2_job_completed WHERE id = $1)", - job_id - ) - .fetch_one(&mut *tx) - .await - .map_err(|e| Error::internal_err(format!("Could not add completed job {job_id}: {e:#}")))? - .unwrap_or(false); - - if already_inserted { - return Err(Error::AlreadyCompleted(format!( - "The queued job {job_id} is already completed." - ))); - } else { - return Err(Error::AlreadyCompleted(format!( - "There is no queued job anymore for {job_id} but there is no completed job either." - ))); - } - }; - if let Some(labels) = result.wm_labels() { sqlx::query!( "UPDATE v2_job SET labels = ( @@ -1178,6 +1155,11 @@ async fn commit_completed_job( .execute(&mut *job_tx) .await?; + if !success || has_stream { + sqlx::query!("DELETE FROM job_result_stream_v2 WHERE job_id = $1", job_id) + .execute(&mut *tx) + .await?; + } let main_db_result = tx.commit().await; match main_db_result { @@ -1218,11 +1200,6 @@ async fn commit_completed_job( ))); } } - if !success || has_stream { - sqlx::query!("DELETE FROM job_result_stream_v2 WHERE job_id = $1", job_id).execute(&mut *tx); - } - - tx.commit().await?; tracing::info!( %job_id, From e2a38bef1d32a0b4ab9d550255bcd959ded79b28 Mon Sep 17 00:00:00 2001 From: dieriba Date: Tue, 30 Sep 2025 19:28:41 +0200 Subject: [PATCH 4/9] improve bench --- backend/windmill-api/src/jobs.rs | 29 +- benchmarks/benchmark_sharding.ts | 598 +++++++++++++++++++++++++++++++ 2 files changed, 622 insertions(+), 5 deletions(-) create mode 100644 benchmarks/benchmark_sharding.ts diff --git a/backend/windmill-api/src/jobs.rs b/backend/windmill-api/src/jobs.rs index 7c1c9eb5d73e3..67d2056f865ee 100644 --- a/backend/windmill-api/src/jobs.rs +++ b/backend/windmill-api/src/jobs.rs @@ -2249,7 +2249,7 @@ async fn list_filtered_uuids( Ok(Json(jobs)) } -#[derive(Serialize)] +#[derive(Debug, Serialize)] struct QueueStats { database_length: i64, suspended: Option, @@ -6005,6 +6005,7 @@ struct BatchInfo { path: Option, rawscript: Option, tag: Option, + number_of_shards: Option, } async fn insert_batch_jobs_to_db<'c, E: sqlx::Executor<'c, Database = Postgres>>( @@ -6298,10 +6299,28 @@ async fn add_batch_jobs( )); } + let total_available_shards = shard_db_store.len(); + let num_shards_to_use = if let Some(requested_shards) = batch_info.number_of_shards { + if requested_shards > total_available_shards { + return Err(Error::BadRequest(format!( + "Requested {} shards but only {} shards are available", + requested_shards, total_available_shards + ))); + } + if requested_shards == 0 { + return Err(Error::BadRequest( + "Number of shards must be greater than 0".to_string(), + )); + } + requested_shards + } else { + total_available_shards + }; + let mut uuid_per_db = { - let mut store = HashMap::with_capacity(shard_db_store.len()); - for (shard_id, _) in shard_db_store { - store.insert(*shard_id, Vec::new()); + let mut store = HashMap::with_capacity(num_shards_to_use); + for shard_id in 0..num_shards_to_use { + store.insert(shard_id, Vec::new()); } store }; @@ -6309,7 +6328,7 @@ async fn add_batch_jobs( for uuid in &uuids { let mut hasher = DefaultHasher::new(); uuid.hash(&mut hasher); - let shard_id = (hasher.finish() as usize) % shard_db_store.len(); + let shard_id = (hasher.finish() as usize) % num_shards_to_use; uuid_per_db.get_mut(&shard_id).unwrap().push(*uuid); } diff --git a/benchmarks/benchmark_sharding.ts b/benchmarks/benchmark_sharding.ts new file mode 100644 index 0000000000000..0cab9c10344c2 --- /dev/null +++ b/benchmarks/benchmark_sharding.ts @@ -0,0 +1,598 @@ +/// +/// + +import { Command } from "https://deno.land/x/cliffy@v0.25.7/command/mod.ts"; +import { UpgradeCommand } from "https://deno.land/x/cliffy@v0.25.7/command/upgrade/upgrade_command.ts"; +import { DenoLandProvider } from "https://deno.land/x/cliffy@v0.25.7/command/upgrade/mod.ts"; + +import { sleep } from "https://deno.land/x/sleep@v1.2.1/mod.ts"; + +import * as windmill from "https://deno.land/x/windmill@v1.174.0/mod.ts"; +import * as api from "https://deno.land/x/windmill@v1.174.0/windmill-api/index.ts"; + +import { VERSION, createBenchScript, getFlowPayload, login } from "./lib.ts"; + +export const NON_TEST_TAGS = [ + "deno", + "python", + "go", + "bash", + "dedicated", + "bun", + "nativets", + "flow", +]; + +interface ShardingBenchmarkResult { + numShards: number; + throughput: number; + totalDuration: number; + jobsCompleted: number; + avgLatency: number; +} + +interface ShardingBenchmarkOptions { + host: string; + email?: string; + password?: string; + token?: string; + workspace: string; + kind: string; + jobs: number; + maxShards: number; + noVerify?: boolean; +} + +interface BenchmarkContext { + config: { + token: string; + server: string; + workspace_id: string; + }; + nStepsFlow: number; + bodyTemplate: any; + getQueueCount: (tags?: string[]) => Promise; + getCompletedJobsCount: ( + tags?: string[], + baseline?: number + ) => Promise; +} + +async function verifyOutputs(uuids: string[], workspace: string) { + console.log("Verifying outputs"); + let incorrectResults = 0; + for (const uuid of uuids) { + try { + const job = await windmill.JobService.getCompletedJob({ + workspace, + id: uuid, + }); + if (!job.success) { + console.log(`Job ${uuid} did not complete`); + incorrectResults++; + } + if (job.result !== uuid) { + console.log( + `Job ${uuid} did not output the correct value: ${JSON.stringify(job)}` + ); + incorrectResults++; + } + } catch (_) { + console.log(`Job ${uuid} did not complete`); + incorrectResults++; + } + } + console.log(`Incorrect results: ${incorrectResults}`); +} + +async function initializeBenchmarkContext( + options: ShardingBenchmarkOptions +): Promise { + const { host, workspace, kind } = options; + + windmill.setClient("", host); + + const config = { + token: "", + server: host, + workspace_id: workspace, + }; + + let final_token: string; + if (!options.token) { + if (options.email && options.password) { + final_token = await login(options.email, options.password); + } else { + throw new Error("Token or email with password are required."); + } + } else { + final_token = options.token; + } + + config.token = final_token; + windmill.setClient(final_token, host); + + async function getQueueCount(tags?: string[]) { + return ( + await ( + await fetch( + config.server + + "/api/w/" + + config.workspace_id + + "/jobs/queue/count" + + (tags && tags.length > 0 ? "?tags=" + tags.join(",") : ""), + { headers: { ["Authorization"]: "Bearer " + config.token } } + ) + ).json() + ).database_length; + } + + async function getFlowStepCount( + workspace: string, + path: string + ): Promise { + const response = await fetch( + `${config.server}/api/w/${workspace}/flows/get/${path}`, + { headers: { ["Authorization"]: "Bearer " + config.token } } + ); + + const data = await response.json(); + let stepCount = 0; + + for (const mod of data.value.modules) { + if (mod.value.type === "flow" && mod.value.path) { + const subFlowCount = await getFlowStepCount(workspace, mod.value.path); + stepCount += subFlowCount; + } else { + stepCount += 1; + } + } + + return stepCount; + } + + async function getCompletedJobsCount( + tags?: string[], + baseline: number = 0 + ): Promise { + const completedJobs = ( + await ( + await fetch( + host + + "/api/w/" + + config.workspace_id + + "/jobs/completed/count" + + (tags && tags.length > 0 ? "?tags=" + tags.join(",") : ""), + { headers: { ["Authorization"]: "Bearer " + config.token } } + ) + ).json() + ).database_length; + return completedJobs - baseline; + } + + if ( + ["deno", "python", "go", "bash", "dedicated", "bun", "nativets"].includes( + kind + ) + ) { + await createBenchScript(kind, workspace); + } + + // Determine job body template and flow steps + let nStepsFlow = 0; + let bodyTemplate: any; + + if (kind === "noop") { + bodyTemplate = { + kind: "noop", + }; + } else if ( + ["deno", "python", "go", "bash", "dedicated", "bun", "nativets"].includes( + kind + ) + ) { + bodyTemplate = { + kind: "script", + path: "f/benchmarks/" + kind, + }; + } else if (["2steps", "bigscriptinflow"].includes(kind)) { + nStepsFlow = kind == "2steps" ? 2 : 1; + const payload = getFlowPayload(kind); + bodyTemplate = { + kind: "flow", + flow_value: payload.value, + }; + } else if (kind.startsWith("flow:")) { + console.log("Detected custom flow "); + let flow_path = kind.substr(5); + nStepsFlow = await getFlowStepCount(config.workspace_id, flow_path); + console.log(`Total steps of flow including sub-flows: ${nStepsFlow}`); + bodyTemplate = { + kind: "flow", + path: flow_path, + }; + } else if (kind.startsWith("script:")) { + console.log("Detected custom script"); + bodyTemplate = { + kind: "script", + path: kind.substr(7), + }; + } else if (kind == "bigrawscript") { + bodyTemplate = { + kind: "rawscript", + rawscript: { + language: api.RawScript.language.BASH, + content: + "# let's bloat that bash script, 3.. 2.. 1.. BOOM\n".repeat(100) + + 'echo "$WM_FLOW_JOB_ID"\n', + }, + }; + } else { + throw new Error("Unknown script pattern " + kind); + } + + return { + config, + nStepsFlow, + bodyTemplate, + getQueueCount, + getCompletedJobsCount, + }; +} + +async function runShardingBenchmark( + context: BenchmarkContext, + options: ShardingBenchmarkOptions, + numShards: number +): Promise { + const { jobs, kind } = options; + const { + config, + nStepsFlow, + bodyTemplate, + getQueueCount, + getCompletedJobsCount, + } = context; + console.log(`\n=== Running benchmark with ${numShards} shard(s) ===`); + + const pastJobs = await getCompletedJobsCount(NON_TEST_TAGS, 0); + + const enc = (s: string) => new TextEncoder().encode(s); + const jobsSent = jobs; + console.log(`Bulk creating ${jobsSent} jobs`); + + const start_create = Date.now(); + + // Create request body with number_of_shards + const body = JSON.stringify({ + ...bodyTemplate, + number_of_shards: numShards, + }); + + const response = await fetch( + config.server + + "/api/w/" + + config.workspace_id + + `/jobs/add_batch_jobs/${jobsSent}`, + { + method: "POST", + headers: { + ["Authorization"]: "Bearer " + config.token, + "Content-Type": "application/json", + }, + body, + } + ); + + if (!response.ok) { + throw new Error( + "Failed to create jobs: " + + response.statusText + + " " + + (await response.text()) + ); + } + + const uuids = await response.json(); + const end_create = Date.now(); + const create_duration = end_create - start_create; + console.log( + `Jobs successfully added to the queue in ${ + create_duration / 1000 + }s. Windmill will start pulling them\n` + ); + + let start = Date.now(); + let completedJobs = 0; + let lastElapsed = 0; + let lastCompletedJobs = 0; + let totalMonitoringOverhead = 0; // Track cumulative monitoring time + + let didStart = false; + while (completedJobs < jobsSent) { + if (!didStart) { + const queueCheckStart = Date.now(); + const actual_queue = await getQueueCount(NON_TEST_TAGS); + totalMonitoringOverhead += Date.now() - queueCheckStart; + + if (actual_queue < jobsSent) { + start = Date.now(); + didStart = true; + } + } else { + await sleep(500); + const monitoringStart = Date.now(); + completedJobs = await getCompletedJobsCount(NON_TEST_TAGS, pastJobs); + totalMonitoringOverhead += Date.now() - monitoringStart; + + const elapsed = start ? Date.now() - start - totalMonitoringOverhead : 0; + console.log({ totalMonitoringOverhead }); + if (nStepsFlow > 0) { + completedJobs = Math.floor(completedJobs / (nStepsFlow + 1)); + } + const avgThr = ((completedJobs / elapsed) * 1000).toFixed(2); + const instThr = + lastElapsed > 0 + ? ( + ((completedJobs - lastCompletedJobs) / (elapsed - lastElapsed)) * + 1000 + ).toFixed(2) + : 0; + + lastElapsed = elapsed; + lastCompletedJobs = completedJobs; + + await Deno.stdout.write( + enc( + `[${numShards} shards] elapsed: ${(elapsed / 1000).toFixed( + 2 + )} | jobs executed: ${completedJobs}/${jobsSent} (thr: inst ${instThr} - avg ${avgThr}) | remaining: ${ + jobsSent - completedJobs + } \r` + ) + ); + } + } + + const total_duration_sec = + (Date.now() - start - totalMonitoringOverhead) / 1000.0; + const throughput = jobsSent / total_duration_sec; + const avgLatency = total_duration_sec / jobsSent; + + console.log(`\n--- Results for ${numShards} shard(s) ---`); + console.log(`jobs: ${jobsSent}`); + console.log(`duration: ${total_duration_sec}s`); + console.log( + `monitoring overhead: ${(totalMonitoringOverhead / 1000).toFixed(2)}s` + ); + console.log(`throughput: ${throughput.toFixed(2)} jobs/s`); + console.log(`avg latency: ${(avgLatency * 1000).toFixed(2)}ms per job`); + console.log(`completed jobs: ${completedJobs}`); + + if ( + !options.noVerify && + kind !== "noop" && + kind !== "nativets" && + !kind.startsWith("flow:") && + !kind.startsWith("script:") + ) { + await verifyOutputs(uuids, config.workspace_id); + } + + return { + numShards, + throughput, + totalDuration: total_duration_sec, + jobsCompleted: completedJobs, + avgLatency, + }; +} + +export async function main({ + host, + email, + password, + token, + workspace, + kind, + jobs, + maxShards, + noVerify, +}: { + host: string; + email?: string; + password?: string; + token?: string; + workspace: string; + kind: string; + jobs: number; + maxShards: number; + noVerify?: boolean; +}) { + if (maxShards <= 0) { + throw new Error("Max shards must be greater than 0"); + } + + console.log( + "Started sharding benchmark with options", + JSON.stringify( + { + host, + email, + workspace, + kind, + jobs, + maxShards, + noVerify, + }, + null, + 4 + ) + ); + + const options: ShardingBenchmarkOptions = { + host, + email, + password, + token, + workspace, + kind, + jobs, + maxShards, + noVerify, + }; + + console.log("Initializing benchmark context..."); + const context = await initializeBenchmarkContext(options); + console.log("Context initialized successfully!"); + + const results: ShardingBenchmarkResult[] = []; + + for (let numShards = 1; numShards <= maxShards; numShards++) { + try { + const result = await runShardingBenchmark(context, options, numShards); + results.push(result); + + await sleep(1); + } catch (error) { + console.error( + `Failed to run benchmark with ${numShards} shard(s):`, + error + ); + break; + } + } + + console.log("\n" + "=".repeat(80)); + console.log("SHARDING BENCHMARK RESULTS SUMMARY"); + console.log("=".repeat(80)); + console.log( + "Shards | Throughput (jobs/s) | Duration (s) | Avg Latency (ms) | Scaling Factor" + ); + console.log("-".repeat(80)); + + const baselineThroughput = results[0]?.throughput || 1; + + for (const result of results) { + const scalingFactor = (result.throughput / baselineThroughput).toFixed(2); + console.log( + `${result.numShards.toString().padStart(6)} | ${result.throughput + .toFixed(2) + .padStart(18)} | ${result.totalDuration.toFixed(2).padStart(11)} | ${( + result.avgLatency * 1000 + ) + .toFixed(2) + .padStart(15)} | ${scalingFactor.padStart(13)}` + ); + } + + console.log("-".repeat(80)); + + if (results.length > 1) { + const linearScalingEfficiency = results.map((result, index) => { + const expectedThroughput = baselineThroughput * result.numShards; + const efficiency = (result.throughput / expectedThroughput) * 100; + return { shards: result.numShards, efficiency: efficiency.toFixed(1) }; + }); + + console.log("\nLinear Scaling Efficiency:"); + for (const eff of linearScalingEfficiency) { + console.log(`${eff.shards} shard(s): ${eff.efficiency}% efficiency`); + } + } + + console.log("\nBenchmark completed!"); + + return { + results, + summary: { + maxShards, + bestThroughput: Math.max(...results.map((r) => r.throughput)), + scalingEfficiency: + results.length > 1 + ? (results[results.length - 1].throughput / + (baselineThroughput * results[results.length - 1].numShards)) * + 100 + : 100, + }, + }; +} + +if (import.meta.main) { + await new Command() + .name("wmillbench-sharding") + .description( + "Run Sharding Benchmark to measure linear scaling of windmill with multiple database shards." + ) + .version(VERSION) + .option("--host ", "The windmill host to benchmark.", { + default: "http://127.0.0.1:8000", + }) + .option("-e --email ", "The email to use to login.", { + default: "admin@windmill.dev", + }) + .option( + "-p --password ", + "The password to use to login.", + { + default: "changeme", + } + ) + .env( + "WM_TOKEN=", + "The token to use when talking to the API server. Preferred over manual login." + ) + .option( + "-t --token ", + "The token to use when talking to the API server. Preferred over manual login." + ) + .env( + "WM_WORKSPACE=", + "The workspace to spawn scripts from." + ) + .option( + "-w --workspace ", + "The workspace to spawn scripts from.", + { default: "admins" } + ) + .option( + "--kind ", + "Specify the benchmark kind among: deno, identity, python, go, bash, dedicated, bun, noop, 2steps, nativets", + { + required: true, + } + ) + .option("-j --jobs ", "Number of jobs to create per test.", { + default: 10000, + }) + .env( + "MAX_SHARDS=", + "Maximum number of shards to test (will test 1, 2, 3, ... up to this number)" + ) + .option( + "--max-shards ", + "Maximum number of shards to test (will test 1, 2, 3, ... up to this number)", + { + required: true, + } + ) + .option("--no-verify", "Do not verify the output of the jobs.", { + default: false, + }) + .action(main) + .command( + "upgrade", + new UpgradeCommand({ + main: "main.ts", + args: [ + "--allow-net", + "--allow-read", + "--allow-write", + "--allow-env", + "--unstable", + ], + provider: new DenoLandProvider({ name: "wmillbench-sharding" }), + }) + ) + .parse(); +} From d067e264ce64ac61d3bbb240b52b8b779e429569 Mon Sep 17 00:00:00 2001 From: dieriba Date: Wed, 1 Oct 2025 01:27:53 +0200 Subject: [PATCH 5/9] fix neg --- benchmarks/benchmark_sharding.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/benchmarks/benchmark_sharding.ts b/benchmarks/benchmark_sharding.ts index 0cab9c10344c2..b553929136897 100644 --- a/benchmarks/benchmark_sharding.ts +++ b/benchmarks/benchmark_sharding.ts @@ -306,7 +306,7 @@ async function runShardingBenchmark( let completedJobs = 0; let lastElapsed = 0; let lastCompletedJobs = 0; - let totalMonitoringOverhead = 0; // Track cumulative monitoring time + let totalMonitoringOverhead = 0; let didStart = false; while (completedJobs < jobsSent) { @@ -317,16 +317,16 @@ async function runShardingBenchmark( if (actual_queue < jobsSent) { start = Date.now(); + totalMonitoringOverhead = 0; didStart = true; } } else { - await sleep(500); + await sleep(1); const monitoringStart = Date.now(); completedJobs = await getCompletedJobsCount(NON_TEST_TAGS, pastJobs); totalMonitoringOverhead += Date.now() - monitoringStart; - const elapsed = start ? Date.now() - start - totalMonitoringOverhead : 0; - console.log({ totalMonitoringOverhead }); + const elapsed = start ? Math.max(0, Date.now() - start - totalMonitoringOverhead) : 0; if (nStepsFlow > 0) { completedJobs = Math.floor(completedJobs / (nStepsFlow + 1)); } From 2e907949d685f23dcc8d79f5820ef3b7937bc766 Mon Sep 17 00:00:00 2001 From: Dieriba toure Date: Wed, 1 Oct 2025 19:06:56 +0200 Subject: [PATCH 6/9] update --- backend/Cargo.lock | 1 + backend/Cargo.toml | 1 + backend/src/main.rs | 8 ++-- backend/windmill-api/openapi.yaml | 15 ++++++++ backend/windmill-api/src/jobs.rs | 59 ++++++++++++++++++------------ backend/windmill-common/src/lib.rs | 3 +- benchmarks/benchmark_sharding.ts | 44 +++++++++++++++++----- 7 files changed, 93 insertions(+), 38 deletions(-) diff --git a/backend/Cargo.lock b/backend/Cargo.lock index 380d6d200fc61..8699332e3a1dd 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -15135,6 +15135,7 @@ dependencies = [ "gethostname", "git-version", "globset", + "indexmap 2.11.1", "k8s-openapi", "kube", "lazy_static", diff --git a/backend/Cargo.toml b/backend/Cargo.toml index c5c8edfb9e480..e0803196ae32d 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -135,6 +135,7 @@ lazy_static.workspace = true once_cell.workspace = true prometheus = { workspace = true, optional = true } uuid.workspace = true +indexmap.workspace = true gethostname.workspace = true serde_json.workspace = true serde_yml.workspace = true diff --git a/backend/src/main.rs b/backend/src/main.rs index d202adfbc7d02..55cf1c7c9d06a 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -27,7 +27,7 @@ use strum::IntoEnumIterator; use tokio::{fs::File, io::AsyncReadExt, task::JoinHandle}; use uuid::Uuid; use windmill_api::HTTP_CLIENT; - +use indexmap::map::IndexMap; #[cfg(feature = "enterprise")] use windmill_common::ee_oss::{ maybe_renew_license_key_on_start, LICENSE_KEY_ID, LICENSE_KEY_VALID, @@ -320,10 +320,10 @@ async fn initialize_server_shard_instances() -> anyhow::Result<()> { .as_ref() .ok_or_else(|| anyhow!("SHARD_URLS environment variable is required for server shard mode. Please set it as: SHARD_URLS=dburl1,dburl2,..."))?; - let mut shard_to_db = HashMap::new(); + let mut shard_to_db = IndexMap::new(); for (i, shard_url) in shard_urls.iter().enumerate() { - println!("Url: {}", &shard_url); - let shard = connect_db(Some(&shard_url), true, false, false).await?; + tracing::info!("Connecting to shard {}: {}", i, &shard_url); + let shard = connect_db(Some(&shard_url), true, false, false).await.with_context(|| format!("Failed to connect to shard {}", i))?; shard_to_db.insert(i, shard); } diff --git a/backend/windmill-api/openapi.yaml b/backend/windmill-api/openapi.yaml index 77d760dc4aa26..8878054c98855 100644 --- a/backend/windmill-api/openapi.yaml +++ b/backend/windmill-api/openapi.yaml @@ -7581,6 +7581,16 @@ paths: in: query schema: type: boolean + - name: tags + description: filter by tags (comma-separated) + in: query + schema: + type: string + - name: num_shards + description: number of shards to query (only applicable in shard mode) + in: query + schema: + type: integer responses: "200": description: queue count @@ -7604,6 +7614,11 @@ paths: - job parameters: - $ref: "#/components/parameters/WorkspaceId" + - name: num_shards + description: number of shards to query (only applicable in shard mode) + in: query + schema: + type: integer responses: "200": description: completed count diff --git a/backend/windmill-api/src/jobs.rs b/backend/windmill-api/src/jobs.rs index 67d2056f865ee..dce51583690eb 100644 --- a/backend/windmill-api/src/jobs.rs +++ b/backend/windmill-api/src/jobs.rs @@ -2259,6 +2259,7 @@ struct QueueStats { pub struct CountQueueJobsQuery { all_workspaces: Option, tags: Option, + num_shards: Option, } async fn count_queue_jobs( @@ -2274,13 +2275,13 @@ async fn count_queue_jobs( sqlx::query_as!( QueueStats, " - SELECT - coalesce(COUNT(*) FILTER(WHERE suspend = 0 AND running = false), 0) as \"database_length!\", - coalesce(COUNT(*) FILTER(WHERE suspend > 0), 0) as \"suspended!\" - FROM - v2_as_queue - WHERE - (workspace_id = $1 OR $2) AND + SELECT + coalesce(COUNT(*) FILTER(WHERE suspend = 0 AND running = false), 0) as \"database_length!\", + coalesce(COUNT(*) FILTER(WHERE suspend > 0), 0) as \"suspended!\" + FROM + v2_as_queue + WHERE + (workspace_id = $1 OR $2) AND scheduled_for <= now() AND ($3::text[] IS NULL OR tag = ANY($3)) ", w_id, @@ -2292,11 +2293,13 @@ async fn count_queue_jobs( }; let count_queue_jobs = if *SHARD_MODE { - let count_futures = SHARD_ID_TO_DB_INSTANCE - .get() - .unwrap() - .iter() - .map(|(_, db)| count_queue_jobs(db)) + let shard_db_store = SHARD_ID_TO_DB_INSTANCE.get().unwrap(); + println!("{:#?}", &cq.num_shards); + let num_shards_to_query = cq.num_shards.unwrap_or(shard_db_store.len()); + + let count_futures = (0..num_shards_to_query) + .filter_map(|shard_id| shard_db_store.get(&shard_id)) + .map(|db| count_queue_jobs(db)) .collect_vec(); let completed_futures = futures::future::try_join_all(count_futures).await?; @@ -2373,20 +2376,26 @@ async fn count_completed_jobs_detail( Ok(Json(stats)) } +#[derive(Deserialize)] +pub struct CountCompletedJobsSimpleQuery { + num_shards: Option, +} + async fn count_completed_jobs( Extension(db): Extension, Path(w_id): Path, + Query(cq): Query, ) -> error::JsonResult { let count_completed_jobs = async |db: &DB| { sqlx::query_as!( QueueStats, " - SELECT - coalesce(COUNT(*), 0) as \"database_length!\", - null::bigint as suspended - FROM - v2_job_completed - WHERE + SELECT + coalesce(COUNT(*), 0) as \"database_length!\", + null::bigint as suspended + FROM + v2_job_completed + WHERE workspace_id = $1 ", w_id @@ -2396,11 +2405,15 @@ async fn count_completed_jobs( }; let count_completed_jobs = if *SHARD_MODE { - let count_futures = SHARD_ID_TO_DB_INSTANCE - .get() - .unwrap() - .iter() - .map(|(_, db)| count_completed_jobs(db)) + let shard_db_store = SHARD_ID_TO_DB_INSTANCE.get().unwrap(); + + println!("{:#?}", &cq.num_shards); + + let num_shards_to_query = cq.num_shards.unwrap_or(shard_db_store.len()); + + let count_futures = (0..num_shards_to_query) + .filter_map(|shard_id| shard_db_store.get(&shard_id)) + .map(|db| count_completed_jobs(db)) .collect_vec(); let completed_futures = futures::future::try_join_all(count_futures).await?; diff --git a/backend/windmill-common/src/lib.rs b/backend/windmill-common/src/lib.rs index 38209eb11ee7a..035a7928e3a0b 100644 --- a/backend/windmill-common/src/lib.rs +++ b/backend/windmill-common/src/lib.rs @@ -6,6 +6,7 @@ * LICENSE-AGPL for a copy of the license. */ +use indexmap::IndexMap; use itertools::Itertools as _; use once_cell::sync::OnceCell; use quick_cache::sync::Cache; @@ -172,7 +173,7 @@ lazy_static::lazy_static! { } pub static SHARD_DB_INSTANCE: OnceCell> = OnceCell::new(); -pub static SHARD_ID_TO_DB_INSTANCE: OnceCell>> = OnceCell::new(); +pub static SHARD_ID_TO_DB_INSTANCE: OnceCell>> = OnceCell::new(); const LATEST_VERSION_ID_CACHE_TTL: std::time::Duration = std::time::Duration::from_secs(60); diff --git a/benchmarks/benchmark_sharding.ts b/benchmarks/benchmark_sharding.ts index b553929136897..2dba838fbbf16 100644 --- a/benchmarks/benchmark_sharding.ts +++ b/benchmarks/benchmark_sharding.ts @@ -51,10 +51,11 @@ interface BenchmarkContext { }; nStepsFlow: number; bodyTemplate: any; - getQueueCount: (tags?: string[]) => Promise; + getQueueCount: (tags?: string[], num_shards?: number) => Promise; getCompletedJobsCount: ( tags?: string[], - baseline?: number + baseline?: number, + num_shards?: number ) => Promise; } @@ -112,7 +113,15 @@ async function initializeBenchmarkContext( config.token = final_token; windmill.setClient(final_token, host); - async function getQueueCount(tags?: string[]) { + async function getQueueCount(tags?: string[], numShards?: number) { + const params = new URLSearchParams(); + if (tags && tags.length > 0) { + params.set("tags", tags.join(",")); + } + if (numShards !== undefined) { + params.set("num_shards", numShards.toString()); + } + const queryString = params.toString(); return ( await ( await fetch( @@ -120,7 +129,7 @@ async function initializeBenchmarkContext( "/api/w/" + config.workspace_id + "/jobs/queue/count" + - (tags && tags.length > 0 ? "?tags=" + tags.join(",") : ""), + (queryString ? "?" + queryString : ""), { headers: { ["Authorization"]: "Bearer " + config.token } } ) ).json() @@ -153,8 +162,17 @@ async function initializeBenchmarkContext( async function getCompletedJobsCount( tags?: string[], - baseline: number = 0 + baseline: number = 0, + numShards?: number ): Promise { + const params = new URLSearchParams(); + if (tags && tags.length > 0) { + params.set("tags", tags.join(",")); + } + if (numShards !== undefined) { + params.set("num_shards", numShards.toString()); + } + const queryString = params.toString(); const completedJobs = ( await ( await fetch( @@ -162,7 +180,7 @@ async function initializeBenchmarkContext( "/api/w/" + config.workspace_id + "/jobs/completed/count" + - (tags && tags.length > 0 ? "?tags=" + tags.join(",") : ""), + (queryString ? "?" + queryString : ""), { headers: { ["Authorization"]: "Bearer " + config.token } } ) ).json() @@ -255,7 +273,7 @@ async function runShardingBenchmark( } = context; console.log(`\n=== Running benchmark with ${numShards} shard(s) ===`); - const pastJobs = await getCompletedJobsCount(NON_TEST_TAGS, 0); + const pastJobs = await getCompletedJobsCount(NON_TEST_TAGS, 0, numShards); const enc = (s: string) => new TextEncoder().encode(s); const jobsSent = jobs; @@ -312,7 +330,7 @@ async function runShardingBenchmark( while (completedJobs < jobsSent) { if (!didStart) { const queueCheckStart = Date.now(); - const actual_queue = await getQueueCount(NON_TEST_TAGS); + const actual_queue = await getQueueCount(NON_TEST_TAGS, numShards); totalMonitoringOverhead += Date.now() - queueCheckStart; if (actual_queue < jobsSent) { @@ -323,10 +341,16 @@ async function runShardingBenchmark( } else { await sleep(1); const monitoringStart = Date.now(); - completedJobs = await getCompletedJobsCount(NON_TEST_TAGS, pastJobs); + completedJobs = await getCompletedJobsCount( + NON_TEST_TAGS, + pastJobs, + numShards + ); totalMonitoringOverhead += Date.now() - monitoringStart; - const elapsed = start ? Math.max(0, Date.now() - start - totalMonitoringOverhead) : 0; + const elapsed = start + ? Math.max(0, Date.now() - start - totalMonitoringOverhead) + : 0; if (nStepsFlow > 0) { completedJobs = Math.floor(completedJobs / (nStepsFlow + 1)); } From e5ecc90a7be2c8079c60482a57bdb9b014d72ef5 Mon Sep 17 00:00:00 2001 From: dieriba Date: Wed, 1 Oct 2025 23:40:10 +0200 Subject: [PATCH 7/9] remove the print --- backend/windmill-api/src/jobs.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/backend/windmill-api/src/jobs.rs b/backend/windmill-api/src/jobs.rs index 2a14b8005a301..3c265ce78782e 100644 --- a/backend/windmill-api/src/jobs.rs +++ b/backend/windmill-api/src/jobs.rs @@ -2294,7 +2294,6 @@ async fn count_queue_jobs( let count_queue_jobs = if *SHARD_MODE { let shard_db_store = SHARD_ID_TO_DB_INSTANCE.get().unwrap(); - println!("{:#?}", &cq.num_shards); let num_shards_to_query = cq.num_shards.unwrap_or(shard_db_store.len()); let count_futures = (0..num_shards_to_query) @@ -2407,8 +2406,6 @@ async fn count_completed_jobs( let count_completed_jobs = if *SHARD_MODE { let shard_db_store = SHARD_ID_TO_DB_INSTANCE.get().unwrap(); - println!("{:#?}", &cq.num_shards); - let num_shards_to_query = cq.num_shards.unwrap_or(shard_db_store.len()); let count_futures = (0..num_shards_to_query) From f302f802820dd893cef6206c8e6c93d2ab23b051 Mon Sep 17 00:00:00 2001 From: dieriba Date: Thu, 2 Oct 2025 10:12:00 +0200 Subject: [PATCH 8/9] fix bench --- benchmarks/benchmark_sharding.ts | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/benchmarks/benchmark_sharding.ts b/benchmarks/benchmark_sharding.ts index 2dba838fbbf16..6c121434e7f86 100644 --- a/benchmarks/benchmark_sharding.ts +++ b/benchmarks/benchmark_sharding.ts @@ -339,7 +339,6 @@ async function runShardingBenchmark( didStart = true; } } else { - await sleep(1); const monitoringStart = Date.now(); completedJobs = await getCompletedJobsCount( NON_TEST_TAGS, @@ -349,19 +348,18 @@ async function runShardingBenchmark( totalMonitoringOverhead += Date.now() - monitoringStart; const elapsed = start - ? Math.max(0, Date.now() - start - totalMonitoringOverhead) - : 0; + ? Math.max(1, Date.now() - start - totalMonitoringOverhead) + : 1; if (nStepsFlow > 0) { completedJobs = Math.floor(completedJobs / (nStepsFlow + 1)); } - const avgThr = ((completedJobs / elapsed) * 1000).toFixed(2); + const avgThr = elapsed > 0 ? ((completedJobs / elapsed) * 1000).toFixed(2) : "0.00"; + const timeDiff = elapsed - lastElapsed; + const jobsDiff = completedJobs - lastCompletedJobs; const instThr = - lastElapsed > 0 - ? ( - ((completedJobs - lastCompletedJobs) / (elapsed - lastElapsed)) * - 1000 - ).toFixed(2) - : 0; + lastElapsed > 0 && timeDiff > 0 && jobsDiff > 0 + ? ((jobsDiff / timeDiff) * 1000).toFixed(2) + : "0.00"; lastElapsed = elapsed; lastCompletedJobs = completedJobs; @@ -376,12 +374,14 @@ async function runShardingBenchmark( ) ); } + + await sleep(0.05); } - const total_duration_sec = - (Date.now() - start - totalMonitoringOverhead) / 1000.0; - const throughput = jobsSent / total_duration_sec; - const avgLatency = total_duration_sec / jobsSent; + const total_duration_ms = Math.max(1, Date.now() - start - totalMonitoringOverhead); + const total_duration_sec = total_duration_ms / 1000.0; + const throughput = total_duration_sec > 0 ? jobsSent / total_duration_sec : 0; + const avgLatency = jobsSent > 0 ? total_duration_sec / jobsSent : 0; console.log(`\n--- Results for ${numShards} shard(s) ---`); console.log(`jobs: ${jobsSent}`); @@ -477,7 +477,7 @@ export async function main({ const result = await runShardingBenchmark(context, options, numShards); results.push(result); - await sleep(1); + await sleep(0.3); } catch (error) { console.error( `Failed to run benchmark with ${numShards} shard(s):`, From ded2be0c8b49bfd8a6d57470844a2347efc812cf Mon Sep 17 00:00:00 2001 From: dieriba Date: Sun, 5 Oct 2025 22:50:24 +0200 Subject: [PATCH 9/9] fix --- backend/windmill-api/src/jobs.rs | 3 ++- backend/windmill-worker/src/ai_executor.rs | 6 +++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/backend/windmill-api/src/jobs.rs b/backend/windmill-api/src/jobs.rs index 96381ff3f52b0..f820e87503d52 100644 --- a/backend/windmill-api/src/jobs.rs +++ b/backend/windmill-api/src/jobs.rs @@ -14,6 +14,7 @@ use deno_core::{op2, serde_v8, v8, JsRuntime, OpState}; use futures::future::join_all; use futures::{StreamExt, TryFutureExt}; use http::{HeaderMap, HeaderName}; +use indexmap::IndexMap; use itertools::Itertools; use quick_cache::sync::Cache; use serde_json::value::RawValue; @@ -6439,7 +6440,7 @@ async fn add_batch_jobs( }; let mut uuid_per_db = { - let mut store = HashMap::with_capacity(num_shards_to_use); + let mut store = IndexMap::with_capacity(num_shards_to_use); for shard_id in 0..num_shards_to_use { store.insert(shard_id, Vec::new()); } diff --git a/backend/windmill-worker/src/ai_executor.rs b/backend/windmill-worker/src/ai_executor.rs index ad6cfb0325cb9..3d1ab085c4cc0 100644 --- a/backend/windmill-worker/src/ai_executor.rs +++ b/backend/windmill-worker/src/ai_executor.rs @@ -897,7 +897,7 @@ pub async fn run_agent( let inner_job_completed_tx_spawn = inner_job_completed_tx.clone(); let mut occupancy_metrics_spawn = occupancy_metrics.clone(); let mut killpill_rx_spawn = killpill_rx.resubscribe(); - + let queue_db = job_queue_db.clone(); // Spawn on separate tokio task with fresh stack let join_handle = tokio::task::spawn(async move { #[cfg(feature = "benchmark")] @@ -911,7 +911,7 @@ pub async fn run_agent( None, None, &conn_spawn, - Some(job_queue_db), + Some(&queue_db), &client_spawn, &hostname_spawn, &worker_name_spawn, @@ -953,7 +953,7 @@ pub async fn run_agent( let err_json = error_to_value(&err); let _ = handle_non_flow_job_error( db, - job_queue_db, + &job_queue_db, &tool_job, 0, None,