diff --git a/backend/Cargo.lock b/backend/Cargo.lock index c0324ad510339..cfbad39fe2434 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -15140,6 +15140,7 @@ dependencies = [ "gethostname", "git-version", "globset", + "indexmap 2.11.1", "k8s-openapi", "kube", "lazy_static", @@ -15387,6 +15388,7 @@ dependencies = [ "magic-crypt", "mail-send", "object_store", + "once_cell", "openidconnect", "opentelemetry", "opentelemetry-appender-tracing", diff --git a/backend/Cargo.toml b/backend/Cargo.toml index 41a5f6c529890..1c1d5d956d5a3 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -135,6 +135,7 @@ lazy_static.workspace = true once_cell.workspace = true prometheus = { workspace = true, optional = true } uuid.workspace = true +indexmap.workspace = true gethostname.workspace = true serde_json.workspace = true serde_yml.workspace = true diff --git a/backend/shard-migrations/20250922155947_v2_job.down.sql b/backend/shard-migrations/20250922155947_v2_job.down.sql new file mode 100644 index 0000000000000..bcc36937bd081 --- /dev/null +++ b/backend/shard-migrations/20250922155947_v2_job.down.sql @@ -0,0 +1,8 @@ +-- Add down migration script here + +DROP TYPE IF EXISTS job_status CASCADE; +DROP TYPE IF EXISTS job_kind CASCADE; +DROP TYPE IF EXISTS job_trigger_kind CASCADE; +DROP TYPE IF EXISTS script_lang CASCADE; +DROP TABLE IF EXISTS v2_job; + diff --git a/backend/shard-migrations/20250922155947_v2_job.up.sql b/backend/shard-migrations/20250922155947_v2_job.up.sql new file mode 100644 index 0000000000000..7ea2d3f8efc38 --- /dev/null +++ b/backend/shard-migrations/20250922155947_v2_job.up.sql @@ -0,0 +1,79 @@ +-- Add up migration script here + +CREATE TYPE job_kind AS ENUM ( + 'script', 'preview', 'flow', 'dependencies', 'flowpreview', 'script_hub', + 'identity', 'flowdependencies', 'http', 'graphql', 'postgresql', 'noop', + 'appdependencies', 'deploymentcallback', 'singlescriptflow', 'flowscript', + 'flownode', 'appscript' +); + + +CREATE TYPE job_status AS ENUM ('success', 'failure', 'canceled', 'skipped'); + + + +CREATE TYPE job_trigger_kind AS ENUM ( + 'webhook', 'http', 'websocket', 'kafka', 'email', 'nats', 'schedule', + 'app', 'ui', 'postgres', 'sqs', 'gcp', 'mqtt' +); + +CREATE TYPE script_lang AS ENUM ( + 'python3', 'deno', 'go', 'bash', 'postgresql', 'nativets', 'bun', 'mysql', + 'bigquery', 'snowflake', 'graphql', 'powershell', 'mssql', 'php', 'bunnative', + 'rust', 'ansible', 'csharp', 'oracledb', 'nu', 'java', 'duckdb' +); + + +CREATE TABLE v2_job ( + id UUID PRIMARY KEY, + raw_code TEXT, + raw_lock TEXT, + raw_flow JSONB, + tag VARCHAR(255), + workspace_id VARCHAR(100) NOT NULL, + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + created_by VARCHAR(100) NOT NULL, + permissioned_as VARCHAR(100) NOT NULL, + permissioned_as_email VARCHAR(500), + kind job_kind NOT NULL, + runnable_id BIGINT, + runnable_path VARCHAR(500), + parent_job UUID, + root_job UUID, + script_lang script_lang, + script_entrypoint_override VARCHAR(500), + flow_step INTEGER, + flow_step_id VARCHAR(100), + flow_innermost_root_job UUID, + trigger VARCHAR(500), + trigger_kind job_trigger_kind, + same_worker BOOLEAN NOT NULL DEFAULT FALSE, + visible_to_owner BOOLEAN NOT NULL DEFAULT TRUE, + concurrent_limit INTEGER, + concurrency_time_window_s INTEGER, + cache_ttl INTEGER, + timeout INTEGER, + priority SMALLINT, + preprocessed BOOLEAN NOT NULL DEFAULT FALSE, + args JSONB, + labels TEXT[], + pre_run_error TEXT +); + +-- Create indices for v2_job +CREATE INDEX IF NOT EXISTS ix_job_created_at ON v2_job (created_at DESC); +CREATE INDEX IF NOT EXISTS ix_job_workspace_id_created_at_new_3 ON v2_job (workspace_id, created_at DESC); +CREATE INDEX IF NOT EXISTS ix_job_workspace_id_created_at_new_5 ON v2_job (workspace_id, created_at DESC) + WHERE ((kind = ANY (ARRAY['preview'::job_kind, 'flowpreview'::job_kind])) + AND (parent_job IS NULL)); +CREATE INDEX IF NOT EXISTS ix_job_workspace_id_created_at_new_8 ON v2_job (workspace_id, created_at DESC) + WHERE ((kind = 'deploymentcallback'::job_kind) AND (parent_job IS NULL)); +CREATE INDEX IF NOT EXISTS ix_job_workspace_id_created_at_new_9 ON v2_job (workspace_id, created_at DESC) + WHERE ((kind = ANY (ARRAY['dependencies'::job_kind, 'flowdependencies'::job_kind, 'appdependencies'::job_kind])) + AND (parent_job IS NULL)); +CREATE INDEX IF NOT EXISTS ix_v2_job_labels ON v2_job USING gin (labels) WHERE (labels IS NOT NULL); +CREATE INDEX IF NOT EXISTS ix_v2_job_workspace_id_created_at ON v2_job (workspace_id, created_at DESC) + WHERE ((kind = ANY (ARRAY['script'::job_kind, 'flow'::job_kind, 'singlescriptflow'::job_kind])) + AND (parent_job IS NULL)); +CREATE INDEX IF NOT EXISTS ix_job_root_job_index_by_path_2 ON v2_job (workspace_id, runnable_path, created_at DESC) + WHERE (parent_job IS NULL); \ No newline at end of file diff --git a/backend/shard-migrations/20250922160402_v2_queue.down.sql b/backend/shard-migrations/20250922160402_v2_queue.down.sql new file mode 100644 index 0000000000000..a74aba16985a6 --- /dev/null +++ b/backend/shard-migrations/20250922160402_v2_queue.down.sql @@ -0,0 +1,2 @@ +-- Add down migration script here +DROP TABLE IF EXISTS v2_job_queue; diff --git a/backend/shard-migrations/20250922160402_v2_queue.up.sql b/backend/shard-migrations/20250922160402_v2_queue.up.sql new file mode 100644 index 0000000000000..e5d7d40636ed0 --- /dev/null +++ b/backend/shard-migrations/20250922160402_v2_queue.up.sql @@ -0,0 +1,25 @@ +-- Add up migration script here +CREATE TABLE IF NOT EXISTS v2_job_queue ( + id UUID PRIMARY KEY REFERENCES v2_job(id) ON DELETE CASCADE, + workspace_id VARCHAR(100) NOT NULL, + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + started_at TIMESTAMP WITH TIME ZONE, + scheduled_for TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + running BOOLEAN NOT NULL DEFAULT FALSE, + canceled_by VARCHAR(100), + canceled_reason TEXT, + suspend INTEGER, + suspend_until TIMESTAMP WITH TIME ZONE, + tag VARCHAR(255), + priority SMALLINT, + worker VARCHAR(100), + extras JSONB +); + +-- Create indices for v2_job_queue +CREATE INDEX IF NOT EXISTS queue_sort_v2 ON v2_job_queue (priority DESC NULLS LAST, scheduled_for, tag) + WHERE (running = false); +CREATE INDEX IF NOT EXISTS queue_suspended ON v2_job_queue (priority DESC NULLS LAST, created_at, suspend_until, suspend, tag) + WHERE (suspend_until IS NOT NULL); +CREATE INDEX IF NOT EXISTS root_queue_index_by_path ON v2_job_queue (workspace_id, created_at); +CREATE INDEX IF NOT EXISTS v2_job_queue_suspend ON v2_job_queue (workspace_id, suspend) WHERE (suspend > 0); \ No newline at end of file diff --git a/backend/shard-migrations/20250922160457_v2_job_completed.down.sql b/backend/shard-migrations/20250922160457_v2_job_completed.down.sql new file mode 100644 index 0000000000000..7311e8ff11956 --- /dev/null +++ b/backend/shard-migrations/20250922160457_v2_job_completed.down.sql @@ -0,0 +1,3 @@ +-- Add down migration script here + +DROP TABLE IF EXISTS v2_job_completed CASCADE; \ No newline at end of file diff --git a/backend/shard-migrations/20250922160457_v2_job_completed.up.sql b/backend/shard-migrations/20250922160457_v2_job_completed.up.sql new file mode 100644 index 0000000000000..55bf3870990b1 --- /dev/null +++ b/backend/shard-migrations/20250922160457_v2_job_completed.up.sql @@ -0,0 +1,26 @@ +-- Add up migration script here +CREATE TABLE IF NOT EXISTS v2_job_completed ( + id UUID PRIMARY KEY REFERENCES v2_job(id) ON DELETE CASCADE, + workspace_id VARCHAR(100) NOT NULL, + duration_ms BIGINT, + result JSONB, + deleted BOOLEAN NOT NULL DEFAULT FALSE, + canceled_by VARCHAR(100), + canceled_reason TEXT, + flow_status JSONB, + started_at TIMESTAMP WITH TIME ZONE, + memory_peak INTEGER, + status job_status NOT NULL, + completed_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + worker VARCHAR(100), + workflow_as_code_status JSONB, + result_columns TEXT[], + retries UUID[], + extras JSONB +); + +-- Create indices for v2_job_completed +CREATE INDEX IF NOT EXISTS ix_completed_job_workspace_id_started_at_new_2 ON v2_job_completed (workspace_id, started_at DESC); +CREATE INDEX IF NOT EXISTS ix_job_completed_completed_at ON v2_job_completed (completed_at DESC); +CREATE INDEX IF NOT EXISTS labeled_jobs_on_jobs ON v2_job_completed USING gin (((result -> 'wm_labels'::text))) + WHERE (result ? 'wm_labels'::text); \ No newline at end of file diff --git a/backend/shard-migrations/20250922160534_v2_job_runtime.down.sql b/backend/shard-migrations/20250922160534_v2_job_runtime.down.sql new file mode 100644 index 0000000000000..3d74a5b19965a --- /dev/null +++ b/backend/shard-migrations/20250922160534_v2_job_runtime.down.sql @@ -0,0 +1,2 @@ +-- Add down migration script here +DROP TABLE IF EXISTS v2_job_runtime CASCADE; \ No newline at end of file diff --git a/backend/shard-migrations/20250922160534_v2_job_runtime.up.sql b/backend/shard-migrations/20250922160534_v2_job_runtime.up.sql new file mode 100644 index 0000000000000..3bdb144df7c1e --- /dev/null +++ b/backend/shard-migrations/20250922160534_v2_job_runtime.up.sql @@ -0,0 +1,6 @@ +-- Add up migration script here +CREATE TABLE IF NOT EXISTS v2_job_runtime ( + id UUID PRIMARY KEY REFERENCES v2_job(id) ON DELETE CASCADE, + ping TIMESTAMP WITH TIME ZONE, + memory_peak INTEGER +); \ No newline at end of file diff --git a/backend/shard-migrations/20250922160609_v2_job_status.down.sql b/backend/shard-migrations/20250922160609_v2_job_status.down.sql new file mode 100644 index 0000000000000..0f57539a1ec9a --- /dev/null +++ b/backend/shard-migrations/20250922160609_v2_job_status.down.sql @@ -0,0 +1,2 @@ +-- Add down migration script here +DROP TABLE IF EXISTS v2_job_status CASCADE; diff --git a/backend/shard-migrations/20250922160609_v2_job_status.up.sql b/backend/shard-migrations/20250922160609_v2_job_status.up.sql new file mode 100644 index 0000000000000..946160363f3a7 --- /dev/null +++ b/backend/shard-migrations/20250922160609_v2_job_status.up.sql @@ -0,0 +1,7 @@ +-- Add up migration script here +CREATE TABLE IF NOT EXISTS v2_job_status ( + id UUID PRIMARY KEY REFERENCES v2_job(id) ON DELETE CASCADE, + flow_status JSONB, + flow_leaf_jobs JSONB, + workflow_as_code_status JSONB +); \ No newline at end of file diff --git a/backend/shard-migrations/20250922160649_job_logs.down.sql b/backend/shard-migrations/20250922160649_job_logs.down.sql new file mode 100644 index 0000000000000..9adbe0e1d5b5c --- /dev/null +++ b/backend/shard-migrations/20250922160649_job_logs.down.sql @@ -0,0 +1,2 @@ +-- Add down migration script here +DROP TABLE IF EXISTS job_logs CASCADE; \ No newline at end of file diff --git a/backend/shard-migrations/20250922160649_job_logs.up.sql b/backend/shard-migrations/20250922160649_job_logs.up.sql new file mode 100644 index 0000000000000..8150c2584a107 --- /dev/null +++ b/backend/shard-migrations/20250922160649_job_logs.up.sql @@ -0,0 +1,14 @@ +-- Add up migration script here +CREATE TABLE IF NOT EXISTS job_logs ( + job_id UUID NOT NULL, + workspace_id VARCHAR(100) NOT NULL, + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + logs TEXT, + log_offset INTEGER, + log_file_index TEXT[], + PRIMARY KEY (job_id, log_offset) +); + +-- Create indices for job_logs +CREATE INDEX IF NOT EXISTS job_logs_job_id_idx ON job_logs (job_id); +CREATE INDEX IF NOT EXISTS job_logs_workspace_id_created_at_idx ON job_logs (workspace_id, created_at DESC); \ No newline at end of file diff --git a/backend/shard-migrations/20250922161225_job_perms.down.sql b/backend/shard-migrations/20250922161225_job_perms.down.sql new file mode 100644 index 0000000000000..a6487b1dda678 --- /dev/null +++ b/backend/shard-migrations/20250922161225_job_perms.down.sql @@ -0,0 +1,2 @@ +-- Add down migration script here +DROP TABLE IF EXISTS job_perms CASCADE; diff --git a/backend/shard-migrations/20250922161225_job_perms.up.sql b/backend/shard-migrations/20250922161225_job_perms.up.sql new file mode 100644 index 0000000000000..1fe04802ef221 --- /dev/null +++ b/backend/shard-migrations/20250922161225_job_perms.up.sql @@ -0,0 +1,13 @@ +-- Add up migration script here +CREATE TABLE IF NOT EXISTS job_perms ( + job_id UUID NOT NULL, + email VARCHAR(255) NOT NULL, + username VARCHAR(50) NOT NULL, + is_admin BOOLEAN NOT NULL, + is_operator BOOLEAN NOT NULL, + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP, + workspace_id VARCHAR(50) NOT NULL, + groups TEXT[] NOT NULL, + folders JSONB[] NOT NULL, + CONSTRAINT job_perms_pk PRIMARY KEY (job_id) +); \ No newline at end of file diff --git a/backend/shard-migrations/20250922161300_outstanding_wait_time.down.sql b/backend/shard-migrations/20250922161300_outstanding_wait_time.down.sql new file mode 100644 index 0000000000000..b31f3ebdf0cb9 --- /dev/null +++ b/backend/shard-migrations/20250922161300_outstanding_wait_time.down.sql @@ -0,0 +1,2 @@ +-- Add down migration script here +DROP TABLE IF EXISTS outstanding_wait_time CASCADE; diff --git a/backend/shard-migrations/20250922161300_outstanding_wait_time.up.sql b/backend/shard-migrations/20250922161300_outstanding_wait_time.up.sql new file mode 100644 index 0000000000000..419e21b4af47e --- /dev/null +++ b/backend/shard-migrations/20250922161300_outstanding_wait_time.up.sql @@ -0,0 +1,6 @@ +-- Add up migration script here +CREATE TABLE IF NOT EXISTS outstanding_wait_time ( + job_id UUID PRIMARY KEY, + self_wait_time_ms BIGINT DEFAULT NULL, + aggregate_wait_time_ms BIGINT DEFAULT NULL +); \ No newline at end of file diff --git a/backend/shard-migrations/20250924160628_v2_as_queue.down.sql b/backend/shard-migrations/20250924160628_v2_as_queue.down.sql new file mode 100644 index 0000000000000..23bb72031f378 --- /dev/null +++ b/backend/shard-migrations/20250924160628_v2_as_queue.down.sql @@ -0,0 +1,2 @@ +-- Add down migration script here +DROP VIEW v2_as_queue; diff --git a/backend/shard-migrations/20250924160628_v2_as_queue.up.sql b/backend/shard-migrations/20250924160628_v2_as_queue.up.sql new file mode 100644 index 0000000000000..884e24daefcb0 --- /dev/null +++ b/backend/shard-migrations/20250924160628_v2_as_queue.up.sql @@ -0,0 +1,53 @@ +-- Add up migration script here +-- Add up migration script here +CREATE OR REPLACE VIEW v2_as_queue AS +SELECT + j.id, + j.workspace_id, + j.parent_job, + j.created_by, + j.created_at, + q.started_at, + q.scheduled_for, + q.running, + j.runnable_id AS script_hash, + j.runnable_path AS script_path, + j.args, + j.raw_code, + q.canceled_by IS NOT NULL AS canceled, + q.canceled_by, + q.canceled_reason, + r.ping AS last_ping, + j.kind AS job_kind, + CASE WHEN j.trigger_kind = 'schedule'::job_trigger_kind THEN j.trigger END + AS schedule_path, + j.permissioned_as, + COALESCE(s.flow_status, s.workflow_as_code_status) AS flow_status, + j.raw_flow, + j.flow_step_id IS NOT NULL AS is_flow_step, + j.script_lang AS language, + q.suspend, + q.suspend_until, + j.same_worker, + j.raw_lock, + j.pre_run_error, + j.permissioned_as_email AS email, + j.visible_to_owner, + r.memory_peak AS mem_peak, + j.flow_innermost_root_job AS root_job, + s.flow_leaf_jobs AS leaf_jobs, + j.tag, + j.concurrent_limit, + j.concurrency_time_window_s, + j.timeout, + j.flow_step_id, + j.cache_ttl, + j.priority, + NULL::TEXT AS logs, + j.script_entrypoint_override, + j.preprocessed +FROM v2_job_queue q + JOIN v2_job j USING (id) + LEFT JOIN v2_job_runtime r USING (id) + LEFT JOIN v2_job_status s USING (id) +; diff --git a/backend/src/main.rs b/backend/src/main.rs index 8e508db009e71..55cf1c7c9d06a 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -5,7 +5,7 @@ * Please see the included NOTICE for copyright information and * LICENSE-AGPL for a copy of the license. */ -use anyhow::Context; +use anyhow::{anyhow, Context}; use monitor::{ load_base_url, load_otel, reload_critical_alerts_on_db_oversize, reload_delete_logs_periodically_setting, reload_indexer_config, @@ -16,7 +16,7 @@ use monitor::{ send_current_log_file_to_object_store, send_logs_to_object_store, WORKERS_NAMES, }; use rand::Rng; -use sqlx::postgres::PgListener; +use sqlx::{postgres::PgListener, Pool, Postgres}; use std::{ collections::HashMap, fs::{create_dir_all, DirBuilder}, @@ -27,7 +27,7 @@ use strum::IntoEnumIterator; use tokio::{fs::File, io::AsyncReadExt, task::JoinHandle}; use uuid::Uuid; use windmill_api::HTTP_CLIENT; - +use indexmap::map::IndexMap; #[cfg(feature = "enterprise")] use windmill_common::ee_oss::{ maybe_renew_license_key_on_start, LICENSE_KEY_ID, LICENSE_KEY_VALID, @@ -35,7 +35,7 @@ use windmill_common::ee_oss::{ use windmill_common::{ agent_workers::build_agent_http_client, - get_database_url, + connect_db, get_database_url, global_settings::{ APP_WORKSPACED_ROUTE_SETTING, BASE_URL_SETTING, BUNFIG_INSTALL_SCOPES_SETTING, CRITICAL_ALERTS_ON_DB_OVERSIZE_SETTING, CRITICAL_ALERT_MUTE_UI_SETTING, @@ -62,7 +62,8 @@ use windmill_common::{ worker::{ reload_custom_tags_setting, Connection, HUB_CACHE_DIR, TMP_DIR, TMP_LOGS_DIR, WORKER_GROUP, }, - KillpillSender, METRICS_ENABLED, + KillpillSender, DB, METRICS_ENABLED, SHARD_DB_INSTANCE, SHARD_DB_URL, SHARD_ID_TO_DB_INSTANCE, + SHARD_MODE, SHARD_URLS, }; #[cfg(feature = "enterprise")] @@ -301,6 +302,37 @@ async fn cache_hub_scripts(file_path: Option) -> anyhow::Result<()> { Ok(()) } +async fn initialize_worker_shard_db() -> anyhow::Result> { + let shard = SHARD_DB_URL.as_deref().ok_or_else(|| { + anyhow!("SHARD_DB_URL environment variable is required for worker shard mode") + })?; + + tracing::info!("Shard url: {}", shard); + let shard = connect_db(Some(shard), false, false, true).await?; + SHARD_DB_INSTANCE.set(shard.clone()).map_err(|_| { + anyhow!("SHARD_DB_INSTANCE already initialized") + })?; + Ok(shard) +} + +async fn initialize_server_shard_instances() -> anyhow::Result<()> { + let shard_urls = &*SHARD_URLS + .as_ref() + .ok_or_else(|| anyhow!("SHARD_URLS environment variable is required for server shard mode. Please set it as: SHARD_URLS=dburl1,dburl2,..."))?; + + let mut shard_to_db = IndexMap::new(); + for (i, shard_url) in shard_urls.iter().enumerate() { + tracing::info!("Connecting to shard {}: {}", i, &shard_url); + let shard = connect_db(Some(&shard_url), true, false, false).await.with_context(|| format!("Failed to connect to shard {}", i))?; + shard_to_db.insert(i, shard); + } + + SHARD_ID_TO_DB_INSTANCE.set(shard_to_db).map_err(|_| { + anyhow!("SHARD_ID_TO_DB_INSTANCE already initialized") + })?; + Ok(()) +} + async fn windmill_main() -> anyhow::Result<()> { let (killpill_tx, mut killpill_rx) = KillpillSender::new(2); let mut monitor_killpill_rx = killpill_tx.subscribe(); @@ -491,13 +523,24 @@ async fn windmill_main() -> anyhow::Result<()> { } let worker_mode = num_workers > 0; - + let shard_mode = *SHARD_MODE; + let mut job_queue_db = None; let conn = if mode == Mode::Agent { conn } else { // This time we use a pool of connections - let db = windmill_common::connect_db(server_mode, indexer_mode, worker_mode).await?; + let db = connect_db(None, server_mode, indexer_mode, worker_mode).await?; + if shard_mode { + if server_mode { + initialize_server_shard_instances().await?; + } + else { + job_queue_db = Some(initialize_worker_shard_db().await?); + } + } else { + job_queue_db = Some(db.clone()); + } // NOTE: Variable/resource cache initialization moved to API server in windmill-api Connection::Sql(db) @@ -791,6 +834,7 @@ Windmill Community Edition {GIT_VERSION} base_internal_url.clone(), hostname.clone(), &workers, + job_queue_db.as_ref(), ) .await?; tracing::info!("All workers exited."); @@ -1381,6 +1425,7 @@ pub async fn run_workers( base_internal_url: String, hostname: String, workers: &[WorkerConn], + job_queue_db: Option<&DB>, ) -> anyhow::Result<()> { let mut killpill_rxs = vec![]; let num_workers = workers.len(); @@ -1449,7 +1494,7 @@ pub async fn run_workers( let tx = tx.clone(); let base_internal_url = base_internal_url.clone(); let hostname = hostname.clone(); - + let job_queue_db = job_queue_db.cloned(); handles.push(tokio::spawn(async move { if num_workers > 1 { tracing::info!(worker = %worker_name, "starting worker {i}"); @@ -1457,6 +1502,7 @@ pub async fn run_workers( let f = windmill_worker::run_worker( &conn1, + job_queue_db, &hostname, worker_name, i as u64, diff --git a/backend/src/monitor.rs b/backend/src/monitor.rs index e012887d70d2c..b7dd37bcb5f10 100644 --- a/backend/src/monitor.rs +++ b/backend/src/monitor.rs @@ -2312,6 +2312,7 @@ async fn handle_zombie_jobs(db: &Pool, base_internal_url: &str, worker ); let _ = handle_job_error( db, + db, //to update &client, &MiniPulledJob::from(&job), 0, diff --git a/backend/tests/common/mod.rs b/backend/tests/common/mod.rs index 026eee12f717a..347350c8588be 100644 --- a/backend/tests/common/mod.rs +++ b/backend/tests/common/mod.rs @@ -261,6 +261,7 @@ pub fn spawn_test_worker( } windmill_worker::run_worker( &db.into(), + None, worker_instance, worker_name, 1, diff --git a/backend/windmill-api/openapi.yaml b/backend/windmill-api/openapi.yaml index 942d65b2b6d3d..7bf4c92ee2110 100644 --- a/backend/windmill-api/openapi.yaml +++ b/backend/windmill-api/openapi.yaml @@ -7909,6 +7909,16 @@ paths: in: query schema: type: boolean + - name: tags + description: filter by tags (comma-separated) + in: query + schema: + type: string + - name: num_shards + description: number of shards to query (only applicable in shard mode) + in: query + schema: + type: integer responses: "200": description: queue count @@ -7932,6 +7942,11 @@ paths: - job parameters: - $ref: "#/components/parameters/WorkspaceId" + - name: num_shards + description: number of shards to query (only applicable in shard mode) + in: query + schema: + type: integer responses: "200": description: completed count diff --git a/backend/windmill-api/src/jobs.rs b/backend/windmill-api/src/jobs.rs index e721facc6ea3e..f820e87503d52 100644 --- a/backend/windmill-api/src/jobs.rs +++ b/backend/windmill-api/src/jobs.rs @@ -11,8 +11,10 @@ use axum::extract::Request; use axum::http::HeaderValue; #[cfg(feature = "deno_core")] use deno_core::{op2, serde_v8, v8, JsRuntime, OpState}; +use futures::future::join_all; use futures::{StreamExt, TryFutureExt}; use http::{HeaderMap, HeaderName}; +use indexmap::IndexMap; use itertools::Itertools; use quick_cache::sync::Cache; use serde_json::value::RawValue; @@ -28,7 +30,7 @@ use tower::ServiceBuilder; #[cfg(all(feature = "enterprise", feature = "smtp"))] use windmill_common::auth::is_super_admin_email; use windmill_common::auth::TOKEN_PREFIX_LEN; -use windmill_common::db::UserDbWithAuthed; +use windmill_common::db::{get_shard_db_from_shard_id, UserDbWithAuthed}; use windmill_common::error::JsonResult; use windmill_common::flow_status::{JobResult, RestartedFrom}; use windmill_common::jobs::{ @@ -41,6 +43,7 @@ use windmill_common::worker::{Connection, CLOUD_HOSTED, TMP_DIR}; use windmill_common::DYNAMIC_INPUT_CACHE; #[cfg(all(feature = "enterprise", feature = "smtp"))] use windmill_common::{email_oss::send_email_html, server::load_smtp_config}; +use windmill_common::{SHARD_ID_TO_DB_INSTANCE, SHARD_MODE}; use windmill_common::variables::get_workspace_key; @@ -2250,7 +2253,7 @@ async fn list_filtered_uuids( Ok(Json(jobs)) } -#[derive(Serialize)] +#[derive(Debug, Serialize)] struct QueueStats { database_length: i64, suspended: Option, @@ -2260,6 +2263,7 @@ struct QueueStats { pub struct CountQueueJobsQuery { all_workspaces: Option, tags: Option, + num_shards: Option, } async fn count_queue_jobs( @@ -2270,17 +2274,56 @@ async fn count_queue_jobs( let tags = cq .tags .map(|t| t.split(',').map(|s| s.to_string()).collect::>()); - Ok(Json( + + let count_queue_jobs = async |db: &DB| { sqlx::query_as!( QueueStats, - "SELECT coalesce(COUNT(*) FILTER(WHERE suspend = 0 AND running = false), 0) as \"database_length!\", coalesce(COUNT(*) FILTER(WHERE suspend > 0), 0) as \"suspended!\" FROM v2_as_queue WHERE (workspace_id = $1 OR $2) AND scheduled_for <= now() AND ($3::text[] IS NULL OR tag = ANY($3))", + " + SELECT + coalesce(COUNT(*) FILTER(WHERE suspend = 0 AND running = false), 0) as \"database_length!\", + coalesce(COUNT(*) FILTER(WHERE suspend > 0), 0) as \"suspended!\" + FROM + v2_as_queue + WHERE + (workspace_id = $1 OR $2) AND + scheduled_for <= now() AND ($3::text[] IS NULL OR tag = ANY($3)) + ", w_id, w_id == "admins" && cq.all_workspaces.unwrap_or(false), tags.as_ref().map(|v| v.as_slice()) ) - .fetch_one(&db) - .await?, - )) + .fetch_one(db) + .await + }; + + let count_queue_jobs = if *SHARD_MODE { + let shard_db_store = SHARD_ID_TO_DB_INSTANCE.get().unwrap(); + let num_shards_to_query = cq.num_shards.unwrap_or(shard_db_store.len()); + + let count_futures = (0..num_shards_to_query) + .filter_map(|shard_id| shard_db_store.get(&shard_id)) + .map(|db| count_queue_jobs(db)) + .collect_vec(); + + let completed_futures = futures::future::try_join_all(count_futures).await?; + + let mut count = QueueStats { database_length: 0, suspended: None }; + + for queue_count in completed_futures { + count.database_length += queue_count.database_length; + count.suspended = match (count.suspended, queue_count.suspended) { + (Some(i), Some(j)) => Some(i + j), + (None, Some(i)) => Some(i), + (count, _) => count, + }; + } + + count + } else { + count_queue_jobs(&db).await? + }; + + Ok(Json(count_queue_jobs)) } #[derive(Deserialize)] @@ -2336,19 +2379,63 @@ async fn count_completed_jobs_detail( Ok(Json(stats)) } +#[derive(Deserialize)] +pub struct CountCompletedJobsSimpleQuery { + num_shards: Option, +} + async fn count_completed_jobs( Extension(db): Extension, Path(w_id): Path, + Query(cq): Query, ) -> error::JsonResult { - Ok(Json( + let count_completed_jobs = async |db: &DB| { sqlx::query_as!( QueueStats, - "SELECT coalesce(COUNT(*), 0) as \"database_length!\", null::bigint as suspended FROM v2_job_completed WHERE workspace_id = $1", + " + SELECT + coalesce(COUNT(*), 0) as \"database_length!\", + null::bigint as suspended + FROM + v2_job_completed + WHERE + workspace_id = $1 + ", w_id ) - .fetch_one(&db) - .await?, - )) + .fetch_one(db) + .await + }; + + let count_completed_jobs = if *SHARD_MODE { + let shard_db_store = SHARD_ID_TO_DB_INSTANCE.get().unwrap(); + + let num_shards_to_query = cq.num_shards.unwrap_or(shard_db_store.len()); + + let count_futures = (0..num_shards_to_query) + .filter_map(|shard_id| shard_db_store.get(&shard_id)) + .map(|db| count_completed_jobs(db)) + .collect_vec(); + + let completed_futures = futures::future::try_join_all(count_futures).await?; + + let mut count = QueueStats { database_length: 0, suspended: None }; + + for queue_count in completed_futures { + count.database_length += queue_count.database_length; + count.suspended = match (count.suspended, queue_count.suspended) { + (Some(i), Some(j)) => Some(i + j), + (None, Some(i)) => Some(i), + (count, _) => count, + }; + } + + count + } else { + count_completed_jobs(&db).await? + }; + + Ok(Json(count_completed_jobs)) } async fn list_jobs( @@ -6040,6 +6127,117 @@ struct BatchInfo { path: Option, rawscript: Option, tag: Option, + number_of_shards: Option, +} + +async fn insert_batch_jobs_to_db<'c, E: sqlx::Executor<'c, Database = Postgres>>( + executor: E, + w_id: &str, + raw_code: Option, + raw_lock: Option, + raw_flow: Option>, + tag: &str, + hash: Option, + path: Option, + job_kind: JobKind, + language: ScriptLang, + authed: &ApiAuthed, + concurrent_limit: Option, + concurrent_time_window_s: Option, + timeout: Option, + flow_status: Option>, + scheduled_for: chrono::DateTime, + uuids: &[Uuid], +) -> Result<(), Error> { + sqlx::query!( + r#" + WITH uuid_table AS ( + SELECT unnest($25::uuid[]) AS uuid + ), + inserted_job AS ( + INSERT INTO v2_job ( + id, workspace_id, raw_code, raw_lock, raw_flow, tag, + runnable_id, runnable_path, kind, script_lang, + created_by, permissioned_as, permissioned_as_email, + concurrent_limit, concurrency_time_window_s, timeout, args + ) + SELECT + uuid, $1, $2, $3, $4, $5, $6, $7, $8, $9, + $10, $11, $12, $13, $14, $15, + ('{ "uuid": "' || uuid || '" }')::jsonb + FROM uuid_table + RETURNING id AS "id!" + ), + inserted_queue AS ( + INSERT INTO v2_job_queue (id, workspace_id, scheduled_for, tag) + SELECT uuid, $1, $23, $24 FROM uuid_table + ), + inserted_runtime AS ( + INSERT INTO v2_job_runtime (id, ping) SELECT uuid, null FROM uuid_table + ), + inserted_job_perms AS ( + INSERT INTO job_perms (job_id, email, username, is_admin, is_operator, folders, groups, workspace_id) + SELECT uuid, $16, $17, $18, $19, $20, $21, $1 FROM uuid_table + ) + INSERT INTO v2_job_status (id, flow_status) + SELECT uuid, $22 + FROM uuid_table + WHERE $22::jsonb IS NOT NULL + "#, + w_id, + raw_code.as_deref(), + raw_lock.as_deref(), + raw_flow as Option>, + tag, + hash, + path.as_deref(), + job_kind as JobKind, + language as ScriptLang, + authed.username, + username_to_permissioned_as(&authed.username), + authed.email, + concurrent_limit, + concurrent_time_window_s, + timeout, + authed.email, + authed.username, + authed.is_admin, + authed.is_operator, + &[], + &[], + flow_status as Option>, + scheduled_for, + tag, + uuids + ) + .execute(executor) + .await?; + + Ok(()) +} + +async fn insert_concurrency_keys( + tx: &mut Transaction<'_, Postgres>, + custom_concurrency_key: &str, + uuids: &[Uuid], +) -> Result<(), Error> { + sqlx::query!( + "INSERT INTO concurrency_counter(concurrency_id, job_uuids) + VALUES ($1, '{}'::jsonb)", + custom_concurrency_key + ) + .execute(&mut **tx) + .await?; + + sqlx::query!( + "INSERT INTO concurrency_key (job_id, key) SELECT id, $1 FROM unnest($2::uuid[]) as id", + custom_concurrency_key, + uuids + ) + .execute(&mut **tx) + .await?; + + Ok(()) } #[tracing::instrument(level = "trace", skip_all)] @@ -6195,6 +6393,11 @@ async fn add_batch_jobs( } }; + let uuids = (0..n) + .into_iter() + .map(|_| Uuid::new_v4()) + .collect::>(); + let language = language.unwrap_or(ScriptLang::Deno); let tag = if let Some(dedicated_worker) = dedicated_worker { @@ -6209,106 +6412,134 @@ async fn add_batch_jobs( format!("{}", language.as_str()) }; - let mut tx = user_db.begin(&authed).await?; + if *SHARD_MODE { + let shard_db_store = SHARD_ID_TO_DB_INSTANCE.get().unwrap(); - let uuids = sqlx::query_scalar!( - r#"WITH uuid_table as ( - select gen_random_uuid() as uuid from generate_series(1, $16) - ) - INSERT INTO v2_job - (id, workspace_id, raw_code, raw_lock, raw_flow, tag, runnable_id, runnable_path, kind, - script_lang, created_by, permissioned_as, permissioned_as_email, concurrent_limit, - concurrency_time_window_s, timeout, args) - (SELECT uuid, $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, - ('{ "uuid": "' || uuid || '" }')::jsonb FROM uuid_table) - RETURNING id AS "id!""#, - w_id, - raw_code, - raw_lock, - raw_flow.map(sqlx::types::Json) as Option>, - tag, - hash, - path, - job_kind.clone() as JobKind, - language as ScriptLang, - authed.username, - username_to_permissioned_as(&authed.username), - authed.email, - concurrent_limit, - concurrent_time_window_s, - timeout, - n, - ) - .fetch_all(&mut *tx) - .await?; + if shard_db_store.is_empty() { + return Err(Error::InternalErr( + "No shard databases configured".to_string(), + )); + } - let uuids = sqlx::query_scalar!( - r#"WITH uuid_table as ( - select unnest($4::uuid[]) as uuid - ) - INSERT INTO v2_job_queue - (id, workspace_id, scheduled_for, tag) - (SELECT uuid, $1, $2, $3 FROM uuid_table) - RETURNING id"#, - w_id, - Utc::now(), - tag, - &uuids - ) - .fetch_all(&mut *tx) - .await?; + let total_available_shards = shard_db_store.len(); + let num_shards_to_use = if let Some(requested_shards) = batch_info.number_of_shards { + if requested_shards > total_available_shards { + return Err(Error::BadRequest(format!( + "Requested {} shards but only {} shards are available", + requested_shards, total_available_shards + ))); + } + if requested_shards == 0 { + return Err(Error::BadRequest( + "Number of shards must be greater than 0".to_string(), + )); + } + requested_shards + } else { + total_available_shards + }; - sqlx::query!( - "INSERT INTO v2_job_runtime (id, ping) SELECT unnest($1::uuid[]), null", - &uuids, - ) - .execute(&mut *tx) - .await?; + let mut uuid_per_db = { + let mut store = IndexMap::with_capacity(num_shards_to_use); + for shard_id in 0..num_shards_to_use { + store.insert(shard_id, Vec::new()); + } + store + }; - sqlx::query!( - "INSERT INTO job_perms (job_id, email, username, is_admin, is_operator, folders, groups, workspace_id) - SELECT unnest($1::uuid[]), $2, $3, $4, $5, $6, $7, $8", - &uuids, - authed.email, - authed.username, - authed.is_admin, - authed.is_operator, - &[], - &[], - w_id, - ) - .execute(&mut *tx) - .await?; + for uuid in &uuids { + let mut hasher = DefaultHasher::new(); + uuid.hash(&mut hasher); + let shard_id = (hasher.finish() as usize) % num_shards_to_use; + uuid_per_db.get_mut(&shard_id).unwrap().push(*uuid); + } - if let Some(flow_status) = flow_status { - sqlx::query!( - "INSERT INTO v2_job_status (id, flow_status) - SELECT unnest($1::uuid[]), $2", + let mut tx = user_db.begin(&authed).await?; + if let Some(custom_concurrency_key) = custom_concurrency_key { + insert_concurrency_keys(&mut tx, &custom_concurrency_key, &uuids).await?; + } + tx.commit().await?; + + let future_batch_jobs = uuid_per_db + .iter() + .filter_map(|(shard_id, shard_uuids)| { + if shard_uuids.is_empty() { + return None; + } + + let db = shard_db_store.get(shard_id).unwrap().clone(); + let w_id = w_id.clone(); + let raw_code = raw_code.clone(); + let raw_lock = raw_lock.clone(); + let raw_flow = + raw_flow.clone().map(sqlx::types::Json) as Option>; + let tag = tag.clone(); + let path = path.clone(); + let authed = authed.clone(); + let flow_status = flow_status.clone().map(|status| sqlx::types::Json(status)) + as Option>; + let shard_uuids = shard_uuids.clone(); + + Some(async move { + insert_batch_jobs_to_db( + &db, + &w_id, + raw_code, + raw_lock, + raw_flow, + &tag, + hash, + path, + job_kind, + language, + &authed, + concurrent_limit, + concurrent_time_window_s, + timeout, + flow_status, + Utc::now(), + &shard_uuids, + ) + .await + .map_err(|e| Error::InternalErr(format!("Shard insert failed: {}", e))) + }) + }) + .collect::>(); + + let results = join_all(future_batch_jobs).await; + for result in results { + result?; + } + } else { + let mut tx = user_db.begin(&authed).await?; + + insert_batch_jobs_to_db( + &mut *tx, + &w_id, + raw_code, + raw_lock, + raw_flow.map(sqlx::types::Json), + &tag, + hash, + path, + job_kind, + language, + &authed, + concurrent_limit, + concurrent_time_window_s, + timeout, + flow_status.map(sqlx::types::Json), + Utc::now(), &uuids, - sqlx::types::Json(flow_status) as sqlx::types::Json ) - .execute(&mut *tx) .await?; - } - if let Some(custom_concurrency_key) = custom_concurrency_key { - sqlx::query!( - "INSERT INTO concurrency_counter(concurrency_id, job_uuids) - VALUES ($1, '{}'::jsonb)", - &custom_concurrency_key - ) - .execute(&mut *tx) - .await?; - sqlx::query!( - "INSERT INTO concurrency_key (job_id, key) SELECT id, $1 FROM unnest($2::uuid[]) as id", - custom_concurrency_key, - &uuids - ) - .execute(&mut *tx) - .await?; - } + if let Some(custom_concurrency_key) = custom_concurrency_key { + insert_concurrency_keys(&mut tx, &custom_concurrency_key, &uuids).await?; + } - tx.commit().await?; + tx.commit().await?; + } Ok(Json(uuids)) } @@ -7684,47 +7915,104 @@ async fn list_completed_jobs( ) -> error::JsonResult> { let (per_page, offset) = paginate(pagination); + let fields = &[ + "v2_job.id", + "v2_job.workspace_id", + "v2_job.parent_job", + "v2_job.created_by", + "v2_job.created_at", + "v2_job_completed.started_at", + "v2_job_completed.duration_ms", + "v2_job_completed.status = 'success' OR v2_job_completed.status = 'skipped' as success", + "v2_job.runnable_id as script_hash", + "v2_job.runnable_path as script_path", + "false as deleted", + "v2_job_completed.status = 'canceled' as canceled", + "v2_job_completed.canceled_by", + "v2_job_completed.canceled_reason", + "v2_job.kind as job_kind", + "CASE WHEN v2_job.trigger_kind = 'schedule' THEN v2_job.trigger END as schedule_path", + "v2_job.permissioned_as", + "null as raw_code", + "null as flow_status", + "null as raw_flow", + "v2_job.flow_step_id IS NOT NULL as is_flow_step", + "v2_job.script_lang as language", + "v2_job_completed.status = 'skipped' as is_skipped", + "v2_job.permissioned_as_email as email", + "v2_job.visible_to_owner", + "v2_job_completed.memory_peak as mem_peak", + "v2_job.tag", + "v2_job.priority", + "v2_job_completed.result->'wm_labels' as labels", + "'CompletedJob' as type", + ]; + let sql = list_completed_jobs_query( &w_id, Some(per_page), offset, &lq, - &[ - "v2_job.id", - "v2_job.workspace_id", - "v2_job.parent_job", - "v2_job.created_by", - "v2_job.created_at", - "v2_job_completed.started_at", - "v2_job_completed.duration_ms", - "v2_job_completed.status = 'success' OR v2_job_completed.status = 'skipped' as success", - "v2_job.runnable_id as script_hash", - "v2_job.runnable_path as script_path", - "false as deleted", - "v2_job_completed.status = 'canceled' as canceled", - "v2_job_completed.canceled_by", - "v2_job_completed.canceled_reason", - "v2_job.kind as job_kind", - "CASE WHEN v2_job.trigger_kind = 'schedule' THEN v2_job.trigger END as schedule_path", - "v2_job.permissioned_as", - "null as raw_code", - "null as flow_status", - "null as raw_flow", - "v2_job.flow_step_id IS NOT NULL as is_flow_step", - "v2_job.script_lang as language", - "v2_job_completed.status = 'skipped' as is_skipped", - "v2_job.permissioned_as_email as email", - "v2_job.visible_to_owner", - "v2_job_completed.memory_peak as mem_peak", - "v2_job.tag", - "v2_job.priority", - "v2_job_completed.result->'wm_labels' as labels", - "'CompletedJob' as type", - ], + fields, false, get_scope_tags(&authed), ) .sql()?; + + if *SHARD_MODE { + let shard_db_map = SHARD_ID_TO_DB_INSTANCE.get().unwrap(); + let shard_count = shard_db_map.len(); + + if shard_count == 0 { + return Err(Error::InternalErr( + "No shard databases configured".to_string(), + )); + } + + let mut futures = Vec::new(); + + for shard_id in 0..shard_count { + let shard_db = get_shard_db_from_shard_id(shard_id).cloned().unwrap(); + let sql_owned = sql.clone(); + + futures.push(async move { + let jobs = sqlx::query_as::<_, ListableCompletedJob>(&sql_owned) + .fetch_all(&shard_db) + .await + .map_err(|e| { + Error::InternalErr(format!("Shard {} query failed: {}", shard_id, e)) + })?; + Ok::, Error>(jobs) + }); + } + + let shard_results = join_all(futures).await; + + let mut all_jobs = Vec::new(); + for result in shard_results { + match result { + Ok(jobs) => all_jobs.extend(jobs), + Err(e) => { + tracing::error!("Failed to query shard: {}", e); + return Err(e); + } + } + } + + all_jobs.sort_by(|a, b| { + if lq.order_desc.unwrap_or(true) { + b.created_at.cmp(&a.created_at) + } else { + a.created_at.cmp(&b.created_at) + } + }); + + let paginated_jobs: Vec = + all_jobs.into_iter().skip(offset).take(per_page).collect(); + + return Ok(Json(paginated_jobs)); + } + let mut tx = user_db.begin(&authed).await?; let jobs = sqlx::query_as::<_, ListableCompletedJob>(&sql) .fetch_all(&mut *tx) diff --git a/backend/windmill-common/Cargo.toml b/backend/windmill-common/Cargo.toml index c24b0d503ced0..ee067fffadbc4 100644 --- a/backend/windmill-common/Cargo.toml +++ b/backend/windmill-common/Cargo.toml @@ -87,7 +87,7 @@ strum.workspace = true strum_macros.workspace = true url.workspace = true async-recursion.workspace = true - +once_cell.workspace = true semver.workspace = true croner = "2.2.0" quick_cache.workspace = true diff --git a/backend/windmill-common/src/db.rs b/backend/windmill-common/src/db.rs index 1aa5e57ea0121..564f4bfdd6315 100644 --- a/backend/windmill-common/src/db.rs +++ b/backend/windmill-common/src/db.rs @@ -1,4 +1,9 @@ +use std::hash::{DefaultHasher, Hash, Hasher}; + use sqlx::{Acquire, Pool, Postgres, Transaction}; +use uuid::Uuid; + +use crate::SHARD_ID_TO_DB_INSTANCE; pub type DB = Pool; @@ -214,3 +219,24 @@ impl UserDB { Ok(tx) } } + +pub fn get_shard_id_from_job_uuid(job_id: Uuid) -> usize { + let shard_count = SHARD_ID_TO_DB_INSTANCE.get() + .map(|shards| shards.len()) + .unwrap_or(1); + + let mut hasher = DefaultHasher::new(); + job_id.hash(&mut hasher); + + (hasher.finish() as usize) % shard_count +} + +pub fn get_shard_db_from_shard_id(shard_id: usize) -> Option<&'static Pool> { + SHARD_ID_TO_DB_INSTANCE.get() + .and_then(|shards| shards.get(&shard_id)) +} + +pub fn get_shard_db_from_job_uuid(job_id: Uuid) -> Option<&'static Pool> { + let shard_id = get_shard_id_from_job_uuid(job_id); + get_shard_db_from_shard_id(shard_id) +} diff --git a/backend/windmill-common/src/lib.rs b/backend/windmill-common/src/lib.rs index 3f2212d4dd1a3..f407b7d8ce6d4 100644 --- a/backend/windmill-common/src/lib.rs +++ b/backend/windmill-common/src/lib.rs @@ -6,8 +6,12 @@ * LICENSE-AGPL for a copy of the license. */ +use indexmap::IndexMap; +use itertools::Itertools as _; +use once_cell::sync::OnceCell; use quick_cache::sync::Cache; use std::{ + collections::HashMap, future::Future, hash::{Hash, Hasher}, net::SocketAddr, @@ -23,7 +27,7 @@ use tokio::{spawn, sync::broadcast}; use ee_oss::CriticalErrorChannel; use error::Error; use scripts::ScriptLang; -use sqlx::{Acquire, Postgres}; +use sqlx::{Acquire, Pool, Postgres}; pub mod agent_workers; pub mod ai_providers; @@ -129,6 +133,14 @@ lazy_static::lazy_static! { pub static ref OTEL_TRACING_ENABLED: AtomicBool = AtomicBool::new(std::env::var("OTEL_TRACING").is_ok()); pub static ref OTEL_LOGS_ENABLED: AtomicBool = AtomicBool::new(std::env::var("OTEL_LOGS").is_ok()); + pub static ref SHARD_DB_URL: Option = std::env::var("SHARD_DB_URL").ok(); + + pub static ref SHARD_MODE: bool = std::env::var("SHARD_MODE").is_ok(); + + pub static ref SHARD_URLS: Option> = std::env::var("SHARD_URLS").ok().map(|shard_url| { + let shard_url = shard_url.split(',').map(|s| s.to_owned()).collect_vec(); + shard_url + }); pub static ref METRICS_DEBUG_ENABLED: AtomicBool = AtomicBool::new(false); @@ -160,6 +172,9 @@ lazy_static::lazy_static! { } +pub static SHARD_DB_INSTANCE: OnceCell> = OnceCell::new(); +pub static SHARD_ID_TO_DB_INSTANCE: OnceCell>> = OnceCell::new(); + const LATEST_VERSION_ID_CACHE_TTL: std::time::Duration = std::time::Duration::from_secs(60); pub async fn shutdown_signal( @@ -393,13 +408,17 @@ pub async fn initial_connection() -> Result, error::E } pub async fn connect_db( + database_url: Option<&str>, server_mode: bool, indexer_mode: bool, worker_mode: bool, ) -> anyhow::Result> { use anyhow::Context; - let database_url = get_database_url().await?; + let database_url = match database_url { + Some(db_url) => db_url.to_owned(), + None => get_database_url().await?, + }; let max_connections = match std::env::var("DATABASE_CONNECTIONS") { Ok(n) => n.parse::().context("invalid DATABASE_CONNECTIONS")?, diff --git a/backend/windmill-common/src/queue.rs b/backend/windmill-common/src/queue.rs index 2656ce5af5104..4620e4e74e9f7 100644 --- a/backend/windmill-common/src/queue.rs +++ b/backend/windmill-common/src/queue.rs @@ -1,8 +1,25 @@ use std::collections::HashMap; +use futures::future::join_all; use sqlx::{Pool, Postgres}; +use crate::{db::get_shard_db_from_shard_id, SHARD_ID_TO_DB_INSTANCE, SHARD_MODE}; + pub async fn get_queue_counts(db: &Pool) -> HashMap { + if *SHARD_MODE { + return get_queue_counts_from_shards().await; + } + get_queue_counts_single_db(db).await +} + +pub async fn get_queue_running_counts(db: &Pool) -> HashMap { + if *SHARD_MODE { + return get_queue_running_counts_from_shards().await; + } + get_queue_running_counts_single_db(db).await +} + +async fn get_queue_counts_single_db(db: &Pool) -> HashMap { sqlx::query!( "SELECT tag AS \"tag!\", count(*) AS \"count!\" FROM v2_job_queue WHERE scheduled_for <= now() - ('3 seconds')::interval AND running = false @@ -15,7 +32,7 @@ pub async fn get_queue_counts(db: &Pool) -> HashMap { .unwrap_or_else(|| HashMap::new()) } -pub async fn get_queue_running_counts(db: &Pool) -> HashMap { +async fn get_queue_running_counts_single_db(db: &Pool) -> HashMap { sqlx::query!( "SELECT tag AS \"tag!\", count(*) AS \"count!\" FROM v2_job_queue WHERE running = true @@ -27,3 +44,69 @@ pub async fn get_queue_running_counts(db: &Pool) -> HashMap HashMap { + let shard_db_map = match SHARD_ID_TO_DB_INSTANCE.get() { + Some(map) => map, + None => return HashMap::new(), + }; + let shard_count = shard_db_map.len(); + + if shard_count == 0 { + return HashMap::new(); + } + + let mut futures = Vec::new(); + + for shard_id in 0..shard_count { + if let Some(shard_db) = get_shard_db_from_shard_id(shard_id) { + let future = async move { get_queue_counts_single_db(shard_db).await }; + futures.push(future); + } + } + + let shard_results = join_all(futures).await; + + let mut aggregated_counts = HashMap::new(); + + for shard_counts in shard_results { + for (tag, count) in shard_counts { + *aggregated_counts.entry(tag).or_insert(0) += count; + } + } + + aggregated_counts +} + +async fn get_queue_running_counts_from_shards() -> HashMap { + let shard_db_map = match SHARD_ID_TO_DB_INSTANCE.get() { + Some(map) => map, + None => return HashMap::new(), + }; + let shard_count = shard_db_map.len(); + + if shard_count == 0 { + return HashMap::new(); + } + + let mut futures = Vec::new(); + + for shard_id in 0..shard_count { + if let Some(shard_db) = get_shard_db_from_shard_id(shard_id) { + let future = async move { get_queue_running_counts_single_db(shard_db).await }; + futures.push(future); + } + } + + let shard_results = join_all(futures).await; + + let mut aggregated_counts = HashMap::new(); + + for shard_counts in shard_results { + for (tag, count) in shard_counts { + *aggregated_counts.entry(tag).or_insert(0) += count; + } + } + + aggregated_counts +} diff --git a/backend/windmill-queue/src/jobs.rs b/backend/windmill-queue/src/jobs.rs index 3010874ac50ad..1a800986bec3d 100644 --- a/backend/windmill-queue/src/jobs.rs +++ b/backend/windmill-queue/src/jobs.rs @@ -33,6 +33,7 @@ use windmill_common::add_time; use windmill_common::auth::JobPerms; #[cfg(feature = "benchmark")] use windmill_common::bench::BenchmarkIter; +use windmill_common::db::{get_shard_db_from_job_uuid, get_shard_id_from_job_uuid}; use windmill_common::jobs::{JobTriggerKind, EMAIL_ERROR_HANDLER_USER_EMAIL}; use windmill_common::utils::now_from_db; use windmill_common::worker::{Connection, SCRIPT_TOKEN_EXPIRY}; @@ -61,6 +62,7 @@ use windmill_common::{ }, DB, METRICS_ENABLED, }; +use windmill_common::{SHARD_DB_INSTANCE, SHARD_ID_TO_DB_INSTANCE, SHARD_MODE}; use backon::ConstantBuilder; use backon::{BackoffBuilder, Retryable}; @@ -175,6 +177,7 @@ pub async fn cancel_single_job<'c>( .await; let add_job = add_completed_job_error( &db, + &db, //put here for compile will need to pass the proper SHARD DB if shard mode is on &MiniPulledJob::from(&job_running), job_running.mem_peak.unwrap_or(0), Some(CanceledBy { username: Some(username.to_string()), reason: Some(reason) }), @@ -370,13 +373,25 @@ pub async fn append_logs( } match conn { Connection::Sql(pool) => { + let db = if *SHARD_MODE { + if let Some(worker_db) = SHARD_DB_INSTANCE.get() { + worker_db + } else if let Some(server_shards) = SHARD_ID_TO_DB_INSTANCE.get() { + let shard_id = get_shard_id_from_job_uuid(*job_id); + server_shards.get(&shard_id).unwrap_or(pool) + } else { + pool + } + } else { + pool + }; if let Err(err) = sqlx::query!( "INSERT INTO job_logs (logs, job_id, workspace_id) VALUES ($1, $2, $3) ON CONFLICT (job_id) DO UPDATE SET logs = concat(job_logs.logs, $1::text)", logs.as_ref(), job_id, workspace.as_ref(), ) - .execute(pool) + .execute(db) .warn_after_seconds(1) .await { @@ -385,12 +400,17 @@ pub async fn append_logs( } Connection::Http(client) => { if let Err(e) = client - .post::<_, String>( - &format!("/api/w/{}/agent_workers/push_logs/{}", workspace.as_ref(), job_id), - None, - &logs.as_ref(), - ) - .await { + .post::<_, String>( + &format!( + "/api/w/{}/agent_workers/push_logs/{}", + workspace.as_ref(), + job_id + ), + None, + &logs.as_ref(), + ) + .await + { tracing::error!(%job_id, %e, "error sending logs for job {job_id}: {e}"); }; } @@ -689,7 +709,8 @@ where } pub async fn add_completed_job_error( - db: &Pool, + db: &DB, + job_queue_db: &DB, queued_job: &MiniPulledJob, mem_peak: i32, canceled_by: Option, @@ -726,6 +747,7 @@ pub async fn add_completed_job_error( ); let _ = add_completed_job( db, + job_queue_db, &queued_job, false, false, @@ -752,7 +774,8 @@ struct OutputWrapper { } pub async fn add_completed_job( - db: &Pool, + db: &DB, + job_queue_db: &DB, queued_job: &MiniPulledJob, success: bool, skipped: bool, @@ -778,6 +801,7 @@ pub async fn add_completed_job( let (opt_uuid, duration, _skip_downstream_error_handlers) = (|| { commit_completed_job( db, + job_queue_db, queued_job, success, skipped, @@ -888,6 +912,7 @@ pub async fn add_completed_job( async fn commit_completed_job( db: &Pool, + job_queue_db: &Pool, queued_job: &MiniPulledJob, success: bool, skipped: bool, @@ -901,7 +926,7 @@ async fn commit_completed_job( ) -> windmill_common::error::Result<(Option, i64, bool)> { // let start = std::time::Instant::now(); - let mut tx = db.begin().await?; + let (mut tx, mut job_tx) = tokio::try_join!(db.begin(), job_queue_db.begin())?; let job_id = queued_job.id; // tracing::error!("1 {:?}", start.elapsed()); @@ -915,7 +940,7 @@ async fn commit_completed_job( let mem_peak = mem_peak; // add_time!(bench, "add_completed_job query START"); - if let Some(value) = check_result_size(db, queued_job, result).await { + if let Some(value) = check_result_size(db, job_queue_db, queued_job, result).await { return value; } @@ -955,33 +980,10 @@ async fn commit_completed_job( /* $9 */ duration, /* $10 */ result_columns as Option<&Vec>, ) - .fetch_optional(&mut *tx) + .fetch_one(&mut *job_tx) .await .map_err(|e| Error::internal_err(format!("Could not add completed job {job_id}: {e:#}")))?; - let duration = if let Some(duration) = duration { - duration - } else { - let already_inserted = sqlx::query_scalar!( - "SELECT EXISTS(SELECT 1 FROM v2_job_completed WHERE id = $1)", - job_id - ) - .fetch_one(&mut *tx) - .await - .map_err(|e| Error::internal_err(format!("Could not add completed job {job_id}: {e:#}")))? - .unwrap_or(false); - - if already_inserted { - return Err(Error::AlreadyCompleted(format!( - "The queued job {job_id} is already completed." - ))); - } else { - return Err(Error::AlreadyCompleted(format!( - "There is no queued job anymore for {job_id} but there is no completed job either." - ))); - } - }; - if let Some(labels) = result.wm_labels() { sqlx::query!( "UPDATE v2_job SET labels = ( @@ -991,7 +993,7 @@ async fn commit_completed_job( job_id, labels as Vec ) - .execute(&mut *tx) + .execute(&mut *job_tx) .await .map_err(|e| Error::InternalErr(format!("Could not update job labels: {e:#}")))?; } @@ -1014,7 +1016,7 @@ async fn commit_completed_job( duration, parent_job ) - .execute(&mut *tx) + .execute(&mut *job_tx) .await .inspect_err(|e| { tracing::error!( @@ -1027,7 +1029,7 @@ async fn commit_completed_job( // tracing::error!("Added completed job {:#?}", queued_job); let mut _skip_downstream_error_handlers = false; - tx = delete_job(tx, &job_id).await?; + job_tx = delete_job(job_tx, &job_id).await?; // tracing::error!("3 {:?}", start.elapsed()); if queued_job.is_flow_step() { @@ -1047,14 +1049,14 @@ async fn commit_completed_job( parent_job, &queued_job.workspace_id ) - .execute(&mut *tx) + .execute(&mut *job_tx) .await?; if flow_is_done { let r = sqlx::query_scalar!( "UPDATE parallel_monitor_lock SET last_ping = now() WHERE parent_flow_id = $1 and job_id = $2 RETURNING 1", parent_job, &queued_job.id - ).fetch_optional(&mut *tx).await?; + ).fetch_optional(&mut *job_tx).await?; if r.is_some() { tracing::info!( "parallel flow iteration is done, setting parallel monitor last ping lock for job {}", @@ -1098,7 +1100,7 @@ async fn commit_completed_job( &queued_job.id, &queued_job.workspace_id ) - .fetch_optional(&mut *tx) + .fetch_optional(&mut *job_tx) .await? .flatten() .unwrap_or(false); @@ -1163,25 +1165,35 @@ async fn commit_completed_job( } } } + if queued_job.concurrent_limit.is_some() { let concurrency_key = concurrency_key(db, &queued_job.id).await?; - if *DISABLE_CONCURRENCY_LIMIT || concurrency_key.is_none() { - tracing::warn!("Concurrency limit is disabled, skipping"); - } else { - let concurrency_key = concurrency_key.unwrap(); - sqlx::query_scalar!( - "UPDATE concurrency_counter SET job_uuids = job_uuids - $2 WHERE concurrency_id = $1", - concurrency_key, - queued_job.id.hyphenated().to_string(), - ) - .execute(&mut *tx) - .await - .map_err(|e| { - Error::internal_err(format!( - "Could not decrement concurrency counter for job_id={}: {e:#}", - queued_job.id - )) - })?; + match concurrency_key { + Some(concurrency_key) if !*DISABLE_CONCURRENCY_LIMIT => { + sqlx::query_scalar!( + " + UPDATE + concurrency_counter + SET + job_uuids = job_uuids - $2 + WHERE + concurrency_id = $1 + ", + concurrency_key, + queued_job.id.hyphenated().to_string(), + ) + .execute(&mut *tx) + .await + .map_err(|e| { + Error::internal_err(format!( + "Could not decrement concurrency counter for job_id={}: {e:#}", + queued_job.id + )) + })?; + } + _ => { + tracing::warn!("Concurrency limit is disabled, skipping"); + } } if let Err(e) = sqlx::query_scalar!( @@ -1200,7 +1212,7 @@ async fn commit_completed_job( } sqlx::query!("DELETE FROM job_perms WHERE job_id = $1", job_id) - .execute(&mut *tx) + .execute(&mut *job_tx) .await?; if !success || has_stream { @@ -1208,12 +1220,49 @@ async fn commit_completed_job( .execute(&mut *tx) .await?; } + let main_db_result = tx.commit().await; + + match main_db_result { + Ok(()) => match job_tx.commit().await { + Ok(()) => { + tracing::debug!( + "Both main DB and shard DB transactions committed successfully for job {}", + job_id + ); + } + Err(shard_error) => { + //TODO: looks for solution + tracing::error!( + job_id = %job_id, + workspace_id = %queued_job.workspace_id, + error = %shard_error, + "CRITICAL: Main DB committed but shard DB failed during job completion. Data inconsistency possible." + ); - tx.commit().await?; + return Err(Error::internal_err(format!( + "Shard DB commit failed after main DB commit for job {}: {}", + job_id, shard_error + ))); + } + }, + Err(main_error) => { + if let Err(rollback_err) = job_tx.rollback().await { + tracing::error!( + job_id = %job_id, + main_error = %main_error, + rollback_error = %rollback_err, + "Failed to rollback shard DB after main DB commit failure" + ); + } + return Err(Error::internal_err(format!( + "Main DB commit failed for job {}: {}", + job_id, main_error + ))); + } + } tracing::info!( %job_id, - root_job = ?queued_job.flow_innermost_root_job.map(|x| x.to_string()).unwrap_or_else(|| String::new()), path = &queued_job.runnable_path(), job_kind = ?queued_job.kind, started_at = ?queued_job.started_at.map(|x| x.to_string()).unwrap_or_else(|| String::new()), @@ -1235,6 +1284,7 @@ async fn commit_completed_job( async fn check_result_size( db: &Pool, + job_queue_db: &DB, queued_job: &MiniPulledJob, result: Json<&T>, ) -> Option, i64, bool), Error>> { @@ -2243,7 +2293,8 @@ impl PulledJobResult { } pub async fn pull( - db: &Pool, + db: &DB, + job_queue_db: &DB, suspend_first: bool, worker_name: &str, query_o: Option<&(String, String)>, @@ -2271,7 +2322,7 @@ pub async fn pull( } else { sqlx::query_as::<_, PulledJob>(query_suspended) .bind(worker_name) - .fetch_optional(db) + .fetch_optional(job_queue_db) .await? }; @@ -2280,7 +2331,7 @@ pub async fn pull( } else { let job = sqlx::query_as::<_, PulledJob>(query_no_suspend) .bind(worker_name) - .fetch_optional(db) + .fetch_optional(job_queue_db) .await?; (job, false) }; @@ -2329,7 +2380,7 @@ pub async fn pull( tag, job.id ) - .execute(db) + .execute(job_queue_db) .await?; continue; } @@ -2337,7 +2388,7 @@ pub async fn pull( return Ok(njob); }; let (job, suspended) = pull_single_job_and_mark_as_running_no_concurrency_limit( - db, + job_queue_db, suspend_first, worker_name, #[cfg(feature = "benchmark")] @@ -2387,7 +2438,7 @@ pub async fn pull( } async fn pull_single_job_and_mark_as_running_no_concurrency_limit<'c>( - db: &Pool, + job_queue_db: &DB, suspend_first: bool, worker_name: &str, #[cfg(feature = "benchmark")] bench: &mut BenchmarkIter, @@ -2411,7 +2462,7 @@ async fn pull_single_job_and_mark_as_running_no_concurrency_limit<'c>( // tracing::info!("Pulling job with query: {}", query); sqlx::query_as::<_, PulledJob>(&query) .bind(worker_name) - .fetch_optional(db) + .fetch_optional(job_queue_db) .await? } else { None @@ -2438,7 +2489,7 @@ async fn pull_single_job_and_mark_as_running_no_concurrency_limit<'c>( let r = sqlx::query_as::<_, PulledJob>(query) .bind(worker_name) - .fetch_optional(db) + .fetch_optional(job_queue_db) .await?; #[cfg(feature = "benchmark")] @@ -4116,23 +4167,44 @@ pub async fn push<'c, 'd>( }) }; + let shard_mode = *SHARD_MODE; + let mut existing_job_id = false; let mut tx = tx.into_tx().await?; let job_id: Uuid = if let Some(job_id) = job_id { - let conflicting_id = sqlx::query_scalar!("SELECT 1 FROM v2_job WHERE id = $1", job_id) - .fetch_optional(&mut *tx) - .await?; + existing_job_id = true; + job_id + } else { + Ulid::new().into() + }; + + let mut job_queue_tx = if shard_mode { + if let Some(job_queue_db) = get_shard_db_from_job_uuid(job_id) { + Some(job_queue_db.begin().await?) + } else { + None + } + } else { + None + }; + + if existing_job_id { + let conflicting_id = if shard_mode { + sqlx::query_scalar!("SELECT 1 FROM v2_job WHERE id = $1", job_id) + .fetch_optional(&mut **job_queue_tx.as_mut().unwrap()) + .await? + } else { + sqlx::query_scalar!("SELECT 1 FROM v2_job WHERE id = $1", job_id) + .fetch_optional(&mut *tx) + .await? + }; if conflicting_id.is_some() { return Err(Error::BadRequest(format!( "Job with id {job_id} already exists" ))); } - - job_id - } else { - Ulid::new().into() - }; + } if concurrent_limit.is_some() { insert_concurrency_key( @@ -4294,7 +4366,10 @@ pub async fn push<'c, 'd>( running, end_user_email, ) - .execute(&mut *tx) + .execute(if !shard_mode {&mut *tx} else { + println!("Push"); + job_queue_tx.as_mut().unwrap() + }) .warn_after_seconds(1) .await?; @@ -4325,7 +4400,11 @@ pub async fn push<'c, 'd>( job_id, Json(flow_status) as Json, ) - .execute(&mut *tx) + .execute(if !shard_mode { + &mut *tx + } else { + job_queue_tx.as_mut().unwrap() + }) .warn_after_seconds(1) .await?; } @@ -4400,6 +4479,52 @@ pub async fn push<'c, 'd>( .await?; } + if shard_mode { + let job_queue_tx = job_queue_tx.unwrap(); + + let main_commit_result = tx.commit().await; + + let commit_result = match main_commit_result { + Ok(_) => match job_queue_tx.commit().await { + Ok(_) => { + tracing::debug!( + "Both main DB and shard DB committed successfully for job {}", + job_id + ); + let new_tx = _db.begin().await?; + Ok((job_id, new_tx)) + } + Err(shard_error) => { + tracing::error!( + job_id = %job_id, + workspace_id = %workspace_id, + error = %shard_error, + "CRITICAL: Main DB committed but shard DB failed during job push. Data inconsistency detected." + ); + + Err(Error::internal_err(format!( + "CRITICAL: Shard DB commit failed after main DB commit for job {} (workspace: {}): {}", + job_id, workspace_id, shard_error + ))) + } + }, + Err(main_error) => { + if let Err(rollback_err) = job_queue_tx.rollback().await { + tracing::error!( + job_id = %job_id, + main_error = %main_error, + rollback_error = %rollback_err, + "Failed to rollback shard DB after main DB commit failure" + ); + } + Err(Error::internal_err(format!( + "Main DB commit failed for job {}: {}", + job_id, main_error + ))) + } + }; + return commit_result; + } Ok((job_id, tx)) } diff --git a/backend/windmill-worker/src/ai_executor.rs b/backend/windmill-worker/src/ai_executor.rs index 08c8ede63af7c..3d1ab085c4cc0 100644 --- a/backend/windmill-worker/src/ai_executor.rs +++ b/backend/windmill-worker/src/ai_executor.rs @@ -117,7 +117,7 @@ pub async fn handle_ai_agent_job( // connection conn: &Connection, db: &DB, - + job_queue_db: &DB, // agent job job: &MiniPulledJob, @@ -292,6 +292,7 @@ pub async fn handle_ai_agent_job( let agent_fut = run_agent( db, conn, + job_queue_db, job, parent_job, &args, @@ -422,7 +423,7 @@ pub async fn run_agent( // connection db: &DB, conn: &Connection, - + job_queue_db: &DB, // agent job and flow data job: &MiniPulledJob, parent_job: &Uuid, @@ -896,7 +897,7 @@ pub async fn run_agent( let inner_job_completed_tx_spawn = inner_job_completed_tx.clone(); let mut occupancy_metrics_spawn = occupancy_metrics.clone(); let mut killpill_rx_spawn = killpill_rx.resubscribe(); - + let queue_db = job_queue_db.clone(); // Spawn on separate tokio task with fresh stack let join_handle = tokio::task::spawn(async move { #[cfg(feature = "benchmark")] @@ -910,6 +911,7 @@ pub async fn run_agent( None, None, &conn_spawn, + Some(&queue_db), &client_spawn, &hostname_spawn, &worker_name_spawn, @@ -951,6 +953,7 @@ pub async fn run_agent( let err_json = error_to_value(&err); let _ = handle_non_flow_job_error( db, + &job_queue_db, &tool_job, 0, None, diff --git a/backend/windmill-worker/src/result_processor.rs b/backend/windmill-worker/src/result_processor.rs index 3b0645ef065a3..2ce9bab1d7f49 100644 --- a/backend/windmill-worker/src/result_processor.rs +++ b/backend/windmill-worker/src/result_processor.rs @@ -62,6 +62,7 @@ async fn process_jc( worker_name: &str, base_internal_url: &str, db: &DB, + job_queue_db: &DB, worker_dir: &str, same_worker_tx: Option<&SameWorkerSender>, job_completed_sender: &JobCompletedSender, @@ -151,6 +152,7 @@ async fn process_jc( jc, &base_internal_url, &db, + &job_queue_db, worker_dir, same_worker_tx, &worker_name, @@ -193,6 +195,7 @@ pub fn start_background_processor( last_processing_duration: Arc, base_internal_url: String, db: DB, + job_queue_db: &DB, worker_dir: String, same_worker_tx: SameWorkerSender, worker_name: String, @@ -200,6 +203,7 @@ pub fn start_background_processor( is_dedicated_worker: bool, stats_map: JobStatsMap, ) -> JoinHandle<()> { + let job_queue_db = job_queue_db.clone(); tokio::spawn(async move { let mut has_been_killed = false; @@ -281,6 +285,7 @@ pub fn start_background_processor( &worker_name, &base_internal_url, &db, + &job_queue_db, &worker_dir, Some(&same_worker_tx), &job_completed_sender, @@ -332,6 +337,7 @@ pub fn start_background_processor( tracing::info!(parent_flow = %flow, "updating flow status after job completion"); if let Err(e) = update_flow_status_after_job_completion( &db, + &job_queue_db, &AuthedClient::new( base_internal_url.to_string(), w_id.clone(), @@ -506,6 +512,7 @@ pub async fn handle_receive_completed_job( jc: JobCompleted, base_internal_url: &str, db: &DB, + job_queue_db: &DB, worker_dir: &str, same_worker_tx: Option<&SameWorkerSender>, worker_name: &str, @@ -523,6 +530,7 @@ pub async fn handle_receive_completed_job( jc, &client, db, + job_queue_db, &worker_dir, same_worker_tx.clone(), worker_name, @@ -536,6 +544,7 @@ pub async fn handle_receive_completed_job( Err(err) => { handle_job_error( db, + job_queue_db, &client, job.as_ref(), mem_peak, @@ -572,6 +581,7 @@ pub async fn process_completed_job( }: JobCompleted, client: &AuthedClient, db: &DB, + job_queue_db: &DB, worker_dir: &str, same_worker_tx: Option<&SameWorkerSender>, worker_name: &str, @@ -601,7 +611,7 @@ pub async fn process_completed_job( WHERE id = $1 AND preprocessed = FALSE"#, job.id ) - .execute(db) + .execute(job_queue_db) .await .map_err(|e| { Error::InternalErr(format!( @@ -615,7 +625,7 @@ pub async fn process_completed_job( Json(preprocessed_args) as Json>>, job.id ) - .execute(db) + .execute(job_queue_db) .await?; } @@ -623,6 +633,7 @@ pub async fn process_completed_job( let (_, duration) = add_completed_job( db, + job_queue_db, &job, true, false, @@ -644,6 +655,7 @@ pub async fn process_completed_job( // tracing::info!(parent_flow = %parent_job, subflow = %job_id, "updating flow status (2)"); let r = update_flow_status_after_job_completion( db, + job_queue_db, client, parent_job, &job_id, @@ -669,6 +681,7 @@ pub async fn process_completed_job( } else { let result = add_completed_job_error( db, + job_queue_db, &job, mem_peak.to_owned(), canceled_by, @@ -685,6 +698,7 @@ pub async fn process_completed_job( tracing::error!(parent_flow = %parent_job, subflow = %job.id, "process completed job error, updating flow status"); let r = update_flow_status_after_job_completion( db, + job_queue_db, client, parent_job, &job.id, @@ -715,6 +729,7 @@ pub async fn process_completed_job( pub async fn handle_non_flow_job_error( db: &DB, + job_queue_db: &DB, job: &MiniPulledJob, mem_peak: i32, canceled_by: Option, @@ -731,6 +746,7 @@ pub async fn handle_non_flow_job_error( .await; add_completed_job_error( db, + job_queue_db, job, mem_peak, canceled_by, @@ -745,6 +761,7 @@ pub async fn handle_non_flow_job_error( #[tracing::instrument(name = "job_error", level = "info", skip_all, fields(job_id = %job.id))] pub async fn handle_job_error( db: &DB, + job_queue_db: &DB, client: &AuthedClient, job: &MiniPulledJob, mem_peak: i32, @@ -763,6 +780,7 @@ pub async fn handle_job_error( let update_job_future = || async { handle_non_flow_job_error( db, + job_queue_db, job, mem_peak, canceled_by.clone(), @@ -790,6 +808,7 @@ pub async fn handle_job_error( tracing::error!(parent_flow = %flow, subflow = %job_status_to_update, "handle job error, updating flow status: {err_json:?}"); let updated_flow = update_flow_status_after_job_completion( db, + job_queue_db, client, flow, &job_status_to_update, @@ -823,6 +842,7 @@ pub async fn handle_job_error( .await; let _ = add_completed_job_error( db, + job_queue_db, &MiniPulledJob::from(&parent_job), mem_peak, canceled_by.clone(), diff --git a/backend/windmill-worker/src/worker.rs b/backend/windmill-worker/src/worker.rs index aed1f82d93b64..780b46480c804 100644 --- a/backend/windmill-worker/src/worker.rs +++ b/backend/windmill-worker/src/worker.rs @@ -11,6 +11,8 @@ use anyhow::anyhow; use futures::TryFutureExt; +use sqlx::Pool; +use sqlx::Postgres; use tokio::time::timeout; use windmill_common::client::AuthedClient; use windmill_common::scripts::hash_to_codebase_id; @@ -731,6 +733,7 @@ fn create_span(arc_job: &Arc, worker_name: &str, hostname: &str) pub async fn handle_all_job_kind_error( conn: &Connection, + job_queue_db: Option<&DB>, authed_client: &AuthedClient, job: Arc, err: Error, @@ -744,6 +747,7 @@ pub async fn handle_all_job_kind_error( Connection::Sql(db) => { handle_job_error( db, + job_queue_db.expect("Job queue database connection required for error handling"), authed_client, job.as_ref(), 0, @@ -787,6 +791,7 @@ pub async fn handle_all_job_kind_error( pub fn start_interactive_worker_shell( conn: Connection, + job_queue_db: Option<&DB>, hostname: String, worker_name: String, mut killpill_rx: tokio::sync::broadcast::Receiver<()>, @@ -794,12 +799,12 @@ pub fn start_interactive_worker_shell( base_internal_url: String, worker_dir: String, ) -> JoinHandle<()> { + let job_queue_db = job_queue_db.cloned(); tokio::spawn(async move { let mut occupancy_metrics = OccupancyMetrics::new(Instant::now()); let mut last_executed_job: Option = Instant::now().checked_sub(Duration::from_millis(2500)); - loop { if let Ok(_) = killpill_rx.try_recv() { tracing::info!("Received killpill, exiting worker shell"); @@ -814,6 +819,7 @@ pub fn start_interactive_worker_shell( let mut bench = windmill_common::bench::BenchmarkIter::new(); let job = pull( &db, + &db, // Use same database for both main and job queue in non-sharded mode false, &worker_name, Some(&query), @@ -877,6 +883,7 @@ pub fn start_interactive_worker_shell( raw_flow, parent_runnable_path, &conn, + job_queue_db.as_ref(), &authed_client, &hostname, &worker_name, @@ -935,6 +942,7 @@ pub async fn create_job_dir(worker_directory: &str, job_id: impl Display) -> Str pub async fn run_worker( conn: &Connection, + job_queue_db: Option>, hostname: &str, worker_name: String, i_worker: u64, @@ -1254,6 +1262,9 @@ pub async fn run_worker( last_processing_duration.clone(), base_internal_url.to_string(), db.clone(), + &job_queue_db + .clone() + .expect("Job queue database connection required for background processor"), worker_dir.clone(), same_worker_tx.clone(), worker_name.clone(), @@ -1269,6 +1280,7 @@ pub async fn run_worker( let interactive_shell = if i_worker == 1 { let it_shell = start_interactive_worker_shell( conn.clone(), + job_queue_db.as_ref(), hostname.to_owned(), worker_name.clone(), killpill_rx.resubscribe(), @@ -1569,6 +1581,9 @@ pub async fn run_worker( Duration::from_secs(10), pull( &db, + job_queue_db.as_ref().expect( + "Job queue database connection required for job pulling", + ), suspend_first, &worker_name, None, @@ -1872,6 +1887,7 @@ pub async fn run_worker( raw_flow, parent_runnable_path, &conn, + job_queue_db.as_ref(), &authed_client, hostname, &worker_name, @@ -1935,6 +1951,7 @@ pub async fn run_worker( } handle_all_job_kind_error( &conn, + job_queue_db.as_ref(), &authed_client, arc_job.clone(), err, @@ -2312,6 +2329,7 @@ pub async fn handle_queued_job( raw_flow: Option>>, parent_runnable_path: Option, conn: &Connection, + job_queue_db: Option<&DB>, client: &AuthedClient, hostname: &str, worker_name: &str, @@ -2612,6 +2630,8 @@ pub async fn handle_queued_job( handle_ai_agent_job( conn, db, + job_queue_db + .expect("Job queue database connection required for error handling"), job.as_ref(), &client, &mut canceled_by, diff --git a/backend/windmill-worker/src/worker_flow.rs b/backend/windmill-worker/src/worker_flow.rs index 1e6a7befe2fde..8b27e2f21fb1f 100644 --- a/backend/windmill-worker/src/worker_flow.rs +++ b/backend/windmill-worker/src/worker_flow.rs @@ -74,6 +74,7 @@ use windmill_queue::{canceled_job_to_result, push}; // #[instrument(level = "trace", skip_all)] pub async fn update_flow_status_after_job_completion( db: &DB, + job_queue_db: &DB, client: &AuthedClient, flow: uuid::Uuid, job_id_for_status: &Uuid, @@ -106,6 +107,7 @@ pub async fn update_flow_status_after_job_completion( potentially_crash_for_testing(); let nrec = match update_flow_status_after_job_completion_internal( db, + job_queue_db, client, rec.flow, &rec.job_id_for_status, @@ -130,6 +132,7 @@ pub async fn update_flow_status_after_job_completion( tracing::error!("Error while updating flow status of {} after completion of {}, updating flow status again with error: {e:#}", rec.flow, &rec.job_id_for_status); update_flow_status_after_job_completion_internal( db, + job_queue_db, client, rec.flow, &rec.job_id_for_status, @@ -269,6 +272,7 @@ fn result_has_recover_true(nresult: Arc>) -> bool { // #[instrument(level = "trace", skip_all)] pub async fn update_flow_status_after_job_completion_internal( db: &DB, + job_queue_db: &DB, client: &AuthedClient, flow: uuid::Uuid, job_id_for_status: &Uuid, @@ -1438,6 +1442,7 @@ pub async fn update_flow_status_after_job_completion_internal( if flow_job.is_canceled() { add_completed_job_error( db, + job_queue_db, &flow_job, 0, Some(CanceledBy { @@ -1464,6 +1469,7 @@ pub async fn update_flow_status_after_job_completion_internal( let duration = if success { let (_, duration) = add_completed_job( db, + job_queue_db, &flow_job, true, stop_early && skip_if_stop_early, @@ -1480,6 +1486,7 @@ pub async fn update_flow_status_after_job_completion_internal( } else { let (_, duration) = add_completed_job( db, + job_queue_db, &flow_job, false, stop_early && skip_if_stop_early, @@ -1528,8 +1535,18 @@ pub async fn update_flow_status_after_job_completion_internal( &db.into(), ) .await; - let _ = add_completed_job_error(db, &flow_job, 0, None, e, worker_name, true, None) - .await; + let _ = add_completed_job_error( + db, + job_queue_db, + &flow_job, + 0, + None, + e, + worker_name, + true, + None, + ) + .await; true } Ok(_) => false, diff --git a/benchmarks/README.md b/benchmarks/README.md index c358471626a9e..2f8055c06e428 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -75,11 +75,11 @@ in `benchmarks_noop.ts` You can build it locally with: ``` -deno install -A benchmarks_noop.ts +deno install -A benchmarks_oneoff.ts ``` and then ``` -benchmarks_noop -e admin@windmill.dev -p changeme --host YOUR_HOST +benchmark_oneoff -e admin@windmill.dev -p changeme --host YOUR_HOST ``` By default it creates 10000 jobs in Windmill in a single batch, but this is parametrizable. \ No newline at end of file diff --git a/benchmarks/benchmark_sharding.ts b/benchmarks/benchmark_sharding.ts new file mode 100644 index 0000000000000..6c121434e7f86 --- /dev/null +++ b/benchmarks/benchmark_sharding.ts @@ -0,0 +1,622 @@ +/// +/// + +import { Command } from "https://deno.land/x/cliffy@v0.25.7/command/mod.ts"; +import { UpgradeCommand } from "https://deno.land/x/cliffy@v0.25.7/command/upgrade/upgrade_command.ts"; +import { DenoLandProvider } from "https://deno.land/x/cliffy@v0.25.7/command/upgrade/mod.ts"; + +import { sleep } from "https://deno.land/x/sleep@v1.2.1/mod.ts"; + +import * as windmill from "https://deno.land/x/windmill@v1.174.0/mod.ts"; +import * as api from "https://deno.land/x/windmill@v1.174.0/windmill-api/index.ts"; + +import { VERSION, createBenchScript, getFlowPayload, login } from "./lib.ts"; + +export const NON_TEST_TAGS = [ + "deno", + "python", + "go", + "bash", + "dedicated", + "bun", + "nativets", + "flow", +]; + +interface ShardingBenchmarkResult { + numShards: number; + throughput: number; + totalDuration: number; + jobsCompleted: number; + avgLatency: number; +} + +interface ShardingBenchmarkOptions { + host: string; + email?: string; + password?: string; + token?: string; + workspace: string; + kind: string; + jobs: number; + maxShards: number; + noVerify?: boolean; +} + +interface BenchmarkContext { + config: { + token: string; + server: string; + workspace_id: string; + }; + nStepsFlow: number; + bodyTemplate: any; + getQueueCount: (tags?: string[], num_shards?: number) => Promise; + getCompletedJobsCount: ( + tags?: string[], + baseline?: number, + num_shards?: number + ) => Promise; +} + +async function verifyOutputs(uuids: string[], workspace: string) { + console.log("Verifying outputs"); + let incorrectResults = 0; + for (const uuid of uuids) { + try { + const job = await windmill.JobService.getCompletedJob({ + workspace, + id: uuid, + }); + if (!job.success) { + console.log(`Job ${uuid} did not complete`); + incorrectResults++; + } + if (job.result !== uuid) { + console.log( + `Job ${uuid} did not output the correct value: ${JSON.stringify(job)}` + ); + incorrectResults++; + } + } catch (_) { + console.log(`Job ${uuid} did not complete`); + incorrectResults++; + } + } + console.log(`Incorrect results: ${incorrectResults}`); +} + +async function initializeBenchmarkContext( + options: ShardingBenchmarkOptions +): Promise { + const { host, workspace, kind } = options; + + windmill.setClient("", host); + + const config = { + token: "", + server: host, + workspace_id: workspace, + }; + + let final_token: string; + if (!options.token) { + if (options.email && options.password) { + final_token = await login(options.email, options.password); + } else { + throw new Error("Token or email with password are required."); + } + } else { + final_token = options.token; + } + + config.token = final_token; + windmill.setClient(final_token, host); + + async function getQueueCount(tags?: string[], numShards?: number) { + const params = new URLSearchParams(); + if (tags && tags.length > 0) { + params.set("tags", tags.join(",")); + } + if (numShards !== undefined) { + params.set("num_shards", numShards.toString()); + } + const queryString = params.toString(); + return ( + await ( + await fetch( + config.server + + "/api/w/" + + config.workspace_id + + "/jobs/queue/count" + + (queryString ? "?" + queryString : ""), + { headers: { ["Authorization"]: "Bearer " + config.token } } + ) + ).json() + ).database_length; + } + + async function getFlowStepCount( + workspace: string, + path: string + ): Promise { + const response = await fetch( + `${config.server}/api/w/${workspace}/flows/get/${path}`, + { headers: { ["Authorization"]: "Bearer " + config.token } } + ); + + const data = await response.json(); + let stepCount = 0; + + for (const mod of data.value.modules) { + if (mod.value.type === "flow" && mod.value.path) { + const subFlowCount = await getFlowStepCount(workspace, mod.value.path); + stepCount += subFlowCount; + } else { + stepCount += 1; + } + } + + return stepCount; + } + + async function getCompletedJobsCount( + tags?: string[], + baseline: number = 0, + numShards?: number + ): Promise { + const params = new URLSearchParams(); + if (tags && tags.length > 0) { + params.set("tags", tags.join(",")); + } + if (numShards !== undefined) { + params.set("num_shards", numShards.toString()); + } + const queryString = params.toString(); + const completedJobs = ( + await ( + await fetch( + host + + "/api/w/" + + config.workspace_id + + "/jobs/completed/count" + + (queryString ? "?" + queryString : ""), + { headers: { ["Authorization"]: "Bearer " + config.token } } + ) + ).json() + ).database_length; + return completedJobs - baseline; + } + + if ( + ["deno", "python", "go", "bash", "dedicated", "bun", "nativets"].includes( + kind + ) + ) { + await createBenchScript(kind, workspace); + } + + // Determine job body template and flow steps + let nStepsFlow = 0; + let bodyTemplate: any; + + if (kind === "noop") { + bodyTemplate = { + kind: "noop", + }; + } else if ( + ["deno", "python", "go", "bash", "dedicated", "bun", "nativets"].includes( + kind + ) + ) { + bodyTemplate = { + kind: "script", + path: "f/benchmarks/" + kind, + }; + } else if (["2steps", "bigscriptinflow"].includes(kind)) { + nStepsFlow = kind == "2steps" ? 2 : 1; + const payload = getFlowPayload(kind); + bodyTemplate = { + kind: "flow", + flow_value: payload.value, + }; + } else if (kind.startsWith("flow:")) { + console.log("Detected custom flow "); + let flow_path = kind.substr(5); + nStepsFlow = await getFlowStepCount(config.workspace_id, flow_path); + console.log(`Total steps of flow including sub-flows: ${nStepsFlow}`); + bodyTemplate = { + kind: "flow", + path: flow_path, + }; + } else if (kind.startsWith("script:")) { + console.log("Detected custom script"); + bodyTemplate = { + kind: "script", + path: kind.substr(7), + }; + } else if (kind == "bigrawscript") { + bodyTemplate = { + kind: "rawscript", + rawscript: { + language: api.RawScript.language.BASH, + content: + "# let's bloat that bash script, 3.. 2.. 1.. BOOM\n".repeat(100) + + 'echo "$WM_FLOW_JOB_ID"\n', + }, + }; + } else { + throw new Error("Unknown script pattern " + kind); + } + + return { + config, + nStepsFlow, + bodyTemplate, + getQueueCount, + getCompletedJobsCount, + }; +} + +async function runShardingBenchmark( + context: BenchmarkContext, + options: ShardingBenchmarkOptions, + numShards: number +): Promise { + const { jobs, kind } = options; + const { + config, + nStepsFlow, + bodyTemplate, + getQueueCount, + getCompletedJobsCount, + } = context; + console.log(`\n=== Running benchmark with ${numShards} shard(s) ===`); + + const pastJobs = await getCompletedJobsCount(NON_TEST_TAGS, 0, numShards); + + const enc = (s: string) => new TextEncoder().encode(s); + const jobsSent = jobs; + console.log(`Bulk creating ${jobsSent} jobs`); + + const start_create = Date.now(); + + // Create request body with number_of_shards + const body = JSON.stringify({ + ...bodyTemplate, + number_of_shards: numShards, + }); + + const response = await fetch( + config.server + + "/api/w/" + + config.workspace_id + + `/jobs/add_batch_jobs/${jobsSent}`, + { + method: "POST", + headers: { + ["Authorization"]: "Bearer " + config.token, + "Content-Type": "application/json", + }, + body, + } + ); + + if (!response.ok) { + throw new Error( + "Failed to create jobs: " + + response.statusText + + " " + + (await response.text()) + ); + } + + const uuids = await response.json(); + const end_create = Date.now(); + const create_duration = end_create - start_create; + console.log( + `Jobs successfully added to the queue in ${ + create_duration / 1000 + }s. Windmill will start pulling them\n` + ); + + let start = Date.now(); + let completedJobs = 0; + let lastElapsed = 0; + let lastCompletedJobs = 0; + let totalMonitoringOverhead = 0; + + let didStart = false; + while (completedJobs < jobsSent) { + if (!didStart) { + const queueCheckStart = Date.now(); + const actual_queue = await getQueueCount(NON_TEST_TAGS, numShards); + totalMonitoringOverhead += Date.now() - queueCheckStart; + + if (actual_queue < jobsSent) { + start = Date.now(); + totalMonitoringOverhead = 0; + didStart = true; + } + } else { + const monitoringStart = Date.now(); + completedJobs = await getCompletedJobsCount( + NON_TEST_TAGS, + pastJobs, + numShards + ); + totalMonitoringOverhead += Date.now() - monitoringStart; + + const elapsed = start + ? Math.max(1, Date.now() - start - totalMonitoringOverhead) + : 1; + if (nStepsFlow > 0) { + completedJobs = Math.floor(completedJobs / (nStepsFlow + 1)); + } + const avgThr = elapsed > 0 ? ((completedJobs / elapsed) * 1000).toFixed(2) : "0.00"; + const timeDiff = elapsed - lastElapsed; + const jobsDiff = completedJobs - lastCompletedJobs; + const instThr = + lastElapsed > 0 && timeDiff > 0 && jobsDiff > 0 + ? ((jobsDiff / timeDiff) * 1000).toFixed(2) + : "0.00"; + + lastElapsed = elapsed; + lastCompletedJobs = completedJobs; + + await Deno.stdout.write( + enc( + `[${numShards} shards] elapsed: ${(elapsed / 1000).toFixed( + 2 + )} | jobs executed: ${completedJobs}/${jobsSent} (thr: inst ${instThr} - avg ${avgThr}) | remaining: ${ + jobsSent - completedJobs + } \r` + ) + ); + } + + await sleep(0.05); + } + + const total_duration_ms = Math.max(1, Date.now() - start - totalMonitoringOverhead); + const total_duration_sec = total_duration_ms / 1000.0; + const throughput = total_duration_sec > 0 ? jobsSent / total_duration_sec : 0; + const avgLatency = jobsSent > 0 ? total_duration_sec / jobsSent : 0; + + console.log(`\n--- Results for ${numShards} shard(s) ---`); + console.log(`jobs: ${jobsSent}`); + console.log(`duration: ${total_duration_sec}s`); + console.log( + `monitoring overhead: ${(totalMonitoringOverhead / 1000).toFixed(2)}s` + ); + console.log(`throughput: ${throughput.toFixed(2)} jobs/s`); + console.log(`avg latency: ${(avgLatency * 1000).toFixed(2)}ms per job`); + console.log(`completed jobs: ${completedJobs}`); + + if ( + !options.noVerify && + kind !== "noop" && + kind !== "nativets" && + !kind.startsWith("flow:") && + !kind.startsWith("script:") + ) { + await verifyOutputs(uuids, config.workspace_id); + } + + return { + numShards, + throughput, + totalDuration: total_duration_sec, + jobsCompleted: completedJobs, + avgLatency, + }; +} + +export async function main({ + host, + email, + password, + token, + workspace, + kind, + jobs, + maxShards, + noVerify, +}: { + host: string; + email?: string; + password?: string; + token?: string; + workspace: string; + kind: string; + jobs: number; + maxShards: number; + noVerify?: boolean; +}) { + if (maxShards <= 0) { + throw new Error("Max shards must be greater than 0"); + } + + console.log( + "Started sharding benchmark with options", + JSON.stringify( + { + host, + email, + workspace, + kind, + jobs, + maxShards, + noVerify, + }, + null, + 4 + ) + ); + + const options: ShardingBenchmarkOptions = { + host, + email, + password, + token, + workspace, + kind, + jobs, + maxShards, + noVerify, + }; + + console.log("Initializing benchmark context..."); + const context = await initializeBenchmarkContext(options); + console.log("Context initialized successfully!"); + + const results: ShardingBenchmarkResult[] = []; + + for (let numShards = 1; numShards <= maxShards; numShards++) { + try { + const result = await runShardingBenchmark(context, options, numShards); + results.push(result); + + await sleep(0.3); + } catch (error) { + console.error( + `Failed to run benchmark with ${numShards} shard(s):`, + error + ); + break; + } + } + + console.log("\n" + "=".repeat(80)); + console.log("SHARDING BENCHMARK RESULTS SUMMARY"); + console.log("=".repeat(80)); + console.log( + "Shards | Throughput (jobs/s) | Duration (s) | Avg Latency (ms) | Scaling Factor" + ); + console.log("-".repeat(80)); + + const baselineThroughput = results[0]?.throughput || 1; + + for (const result of results) { + const scalingFactor = (result.throughput / baselineThroughput).toFixed(2); + console.log( + `${result.numShards.toString().padStart(6)} | ${result.throughput + .toFixed(2) + .padStart(18)} | ${result.totalDuration.toFixed(2).padStart(11)} | ${( + result.avgLatency * 1000 + ) + .toFixed(2) + .padStart(15)} | ${scalingFactor.padStart(13)}` + ); + } + + console.log("-".repeat(80)); + + if (results.length > 1) { + const linearScalingEfficiency = results.map((result, index) => { + const expectedThroughput = baselineThroughput * result.numShards; + const efficiency = (result.throughput / expectedThroughput) * 100; + return { shards: result.numShards, efficiency: efficiency.toFixed(1) }; + }); + + console.log("\nLinear Scaling Efficiency:"); + for (const eff of linearScalingEfficiency) { + console.log(`${eff.shards} shard(s): ${eff.efficiency}% efficiency`); + } + } + + console.log("\nBenchmark completed!"); + + return { + results, + summary: { + maxShards, + bestThroughput: Math.max(...results.map((r) => r.throughput)), + scalingEfficiency: + results.length > 1 + ? (results[results.length - 1].throughput / + (baselineThroughput * results[results.length - 1].numShards)) * + 100 + : 100, + }, + }; +} + +if (import.meta.main) { + await new Command() + .name("wmillbench-sharding") + .description( + "Run Sharding Benchmark to measure linear scaling of windmill with multiple database shards." + ) + .version(VERSION) + .option("--host ", "The windmill host to benchmark.", { + default: "http://127.0.0.1:8000", + }) + .option("-e --email ", "The email to use to login.", { + default: "admin@windmill.dev", + }) + .option( + "-p --password ", + "The password to use to login.", + { + default: "changeme", + } + ) + .env( + "WM_TOKEN=", + "The token to use when talking to the API server. Preferred over manual login." + ) + .option( + "-t --token ", + "The token to use when talking to the API server. Preferred over manual login." + ) + .env( + "WM_WORKSPACE=", + "The workspace to spawn scripts from." + ) + .option( + "-w --workspace ", + "The workspace to spawn scripts from.", + { default: "admins" } + ) + .option( + "--kind ", + "Specify the benchmark kind among: deno, identity, python, go, bash, dedicated, bun, noop, 2steps, nativets", + { + required: true, + } + ) + .option("-j --jobs ", "Number of jobs to create per test.", { + default: 10000, + }) + .env( + "MAX_SHARDS=", + "Maximum number of shards to test (will test 1, 2, 3, ... up to this number)" + ) + .option( + "--max-shards ", + "Maximum number of shards to test (will test 1, 2, 3, ... up to this number)", + { + required: true, + } + ) + .option("--no-verify", "Do not verify the output of the jobs.", { + default: false, + }) + .action(main) + .command( + "upgrade", + new UpgradeCommand({ + main: "main.ts", + args: [ + "--allow-net", + "--allow-read", + "--allow-write", + "--allow-env", + "--unstable", + ], + provider: new DenoLandProvider({ name: "wmillbench-sharding" }), + }) + ) + .parse(); +}