Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 44 additions & 11 deletions backend/src/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2196,7 +2196,18 @@ async fn handle_zombie_jobs(db: &Pool<Postgres>, base_internal_url: &str, worker
}

let jobs = sqlx::query_as::<_, QueuedJob>(
"SELECT *, null as workflow_as_code_status FROM v2_as_queue WHERE id = ANY($1)",
"SELECT j.id, j.workspace_id, j.parent_job, j.created_by, j.created_at, q.started_at, q.scheduled_for, q.running,
j.runnable_id AS script_hash, j.runnable_path AS script_path, j.args, j.raw_code,
q.canceled_by IS NOT NULL AS canceled, q.canceled_by, q.canceled_reason, r.ping AS last_ping,
j.kind AS job_kind, CASE WHEN j.trigger_kind = 'schedule'::job_trigger_kind THEN j.trigger END AS schedule_path,
j.permissioned_as, COALESCE(s.flow_status, s.workflow_as_code_status) AS flow_status, j.raw_flow,
j.flow_step_id IS NOT NULL AS is_flow_step, j.script_lang AS language, q.suspend, q.suspend_until,
j.same_worker, j.raw_lock, j.pre_run_error, j.permissioned_as_email AS email, j.visible_to_owner,
r.memory_peak AS mem_peak, j.flow_innermost_root_job AS root_job, s.flow_leaf_jobs AS leaf_jobs,
j.tag, j.concurrent_limit, j.concurrency_time_window_s, j.timeout, j.flow_step_id, j.cache_ttl,
j.priority, NULL::TEXT AS logs, j.script_entrypoint_override, j.preprocessed, null as workflow_as_code_status
FROM v2_job_queue q JOIN v2_job j USING (id) LEFT JOIN v2_job_runtime r USING (id) LEFT JOIN v2_job_status s USING (id)
WHERE j.id = ANY($1)",
)
.bind(&timeouts[..])
.fetch_all(db)
Expand All @@ -2210,8 +2221,19 @@ async fn handle_zombie_jobs(db: &Pool<Postgres>, base_internal_url: &str, worker
let non_restartable_jobs = if *RESTART_ZOMBIE_JOBS {
vec![]
} else {
sqlx::query_as::<_, QueuedJob>("SELECT *, null as workflow_as_code_status FROM v2_as_queue WHERE last_ping < now() - ($1 || ' seconds')::interval
AND running = true AND job_kind NOT IN ('flow', 'flowpreview', 'flownode', 'singlescriptflow') AND same_worker = false")
sqlx::query_as::<_, QueuedJob>("SELECT j.id, j.workspace_id, j.parent_job, j.created_by, j.created_at, q.started_at, q.scheduled_for, q.running,
j.runnable_id AS script_hash, j.runnable_path AS script_path, j.args, j.raw_code,
q.canceled_by IS NOT NULL AS canceled, q.canceled_by, q.canceled_reason, r.ping AS last_ping,
j.kind AS job_kind, CASE WHEN j.trigger_kind = 'schedule'::job_trigger_kind THEN j.trigger END AS schedule_path,
j.permissioned_as, COALESCE(s.flow_status, s.workflow_as_code_status) AS flow_status, j.raw_flow,
j.flow_step_id IS NOT NULL AS is_flow_step, j.script_lang AS language, q.suspend, q.suspend_until,
j.same_worker, j.raw_lock, j.pre_run_error, j.permissioned_as_email AS email, j.visible_to_owner,
r.memory_peak AS mem_peak, j.flow_innermost_root_job AS root_job, s.flow_leaf_jobs AS leaf_jobs,
j.tag, j.concurrent_limit, j.concurrency_time_window_s, j.timeout, j.flow_step_id, j.cache_ttl,
j.priority, NULL::TEXT AS logs, j.script_entrypoint_override, j.preprocessed, null as workflow_as_code_status
FROM v2_job_queue q JOIN v2_job j USING (id) LEFT JOIN v2_job_runtime r USING (id) LEFT JOIN v2_job_status s USING (id)
WHERE r.ping < now() - ($1 || ' seconds')::interval
AND q.running = true AND j.kind NOT IN ('flow', 'flowpreview', 'flownode', 'singlescriptflow') AND j.same_worker = false")
.bind(ZOMBIE_JOB_TIMEOUT.as_str())
.fetch_all(db)
.await
Expand All @@ -2236,7 +2258,18 @@ async fn handle_zombie_jobs(db: &Pool<Postgres>, base_internal_url: &str, worker
}

let zombie_jobs_restart_limit_reached = sqlx::query_as::<_, QueuedJob>(
"SELECT *, null as workflow_as_code_status FROM v2_as_queue WHERE id = ANY($1)",
"SELECT j.id, j.workspace_id, j.parent_job, j.created_by, j.created_at, q.started_at, q.scheduled_for, q.running,
j.runnable_id AS script_hash, j.runnable_path AS script_path, j.args, j.raw_code,
q.canceled_by IS NOT NULL AS canceled, q.canceled_by, q.canceled_reason, r.ping AS last_ping,
j.kind AS job_kind, CASE WHEN j.trigger_kind = 'schedule'::job_trigger_kind THEN j.trigger END AS schedule_path,
j.permissioned_as, COALESCE(s.flow_status, s.workflow_as_code_status) AS flow_status, j.raw_flow,
j.flow_step_id IS NOT NULL AS is_flow_step, j.script_lang AS language, q.suspend, q.suspend_until,
j.same_worker, j.raw_lock, j.pre_run_error, j.permissioned_as_email AS email, j.visible_to_owner,
r.memory_peak AS mem_peak, j.flow_innermost_root_job AS root_job, s.flow_leaf_jobs AS leaf_jobs,
j.tag, j.concurrent_limit, j.concurrency_time_window_s, j.timeout, j.flow_step_id, j.cache_ttl,
j.priority, NULL::TEXT AS logs, j.script_entrypoint_override, j.preprocessed, null as workflow_as_code_status
FROM v2_job_queue q JOIN v2_job j USING (id) LEFT JOIN v2_job_runtime r USING (id) LEFT JOIN v2_job_status s USING (id)
WHERE j.id = ANY($1)",
)
.bind(&zombie_jobs_uuid_restart_limit_reached[..])
.fetch_all(db)
Expand Down Expand Up @@ -2431,13 +2464,13 @@ async fn handle_zombie_flows(db: &DB) -> error::Result<()> {
let flows = sqlx::query!(
r#"
SELECT
id AS "id!", workspace_id AS "workspace_id!", parent_job, is_flow_step,
flow_status AS "flow_status: Box<str>", last_ping, same_worker
FROM v2_as_queue
WHERE running = true AND suspend = 0 AND suspend_until IS null AND scheduled_for <= now()
AND (job_kind = 'flow' OR job_kind = 'flowpreview' OR job_kind = 'flownode')
AND last_ping IS NOT NULL AND last_ping < NOW() - ($1 || ' seconds')::interval
AND canceled = false
j.id AS "id!", j.workspace_id AS "workspace_id!", j.parent_job, j.flow_step_id IS NOT NULL AS "is_flow_step?",
COALESCE(s.flow_status, s.workflow_as_code_status) AS "flow_status: Box<str>", r.ping AS last_ping, j.same_worker AS "same_worker?"
FROM v2_job_queue q JOIN v2_job j USING (id) LEFT JOIN v2_job_runtime r USING (id) LEFT JOIN v2_job_status s USING (id)
WHERE q.running = true AND q.suspend = 0 AND q.suspend_until IS null AND q.scheduled_for <= now()
AND (j.kind = 'flow' OR j.kind = 'flowpreview' OR j.kind = 'flownode')
AND r.ping IS NOT NULL AND r.ping < NOW() - ($1 || ' seconds')::interval
AND q.canceled_by IS NULL

"#,
FLOW_ZOMBIE_TRANSITION_TIMEOUT.as_str()
Expand Down
12 changes: 11 additions & 1 deletion backend/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,17 @@ pub async fn listen_for_uuid_on(

pub async fn completed_job(uuid: Uuid, db: &Pool<Postgres>) -> CompletedJob {
sqlx::query_as::<_, CompletedJob>(
"SELECT *, result->'wm_labels' as labels FROM v2_as_completed_job WHERE id = $1",
"SELECT j.id, j.workspace_id, j.parent_job, j.created_by, j.created_at, c.duration_ms,
c.status = 'success' OR c.status = 'skipped' AS success, j.runnable_id AS script_hash, j.runnable_path AS script_path,
j.args, c.result, FALSE AS deleted, j.raw_code, c.status = 'canceled' AS canceled,
c.canceled_by, c.canceled_reason, j.kind AS job_kind,
CASE WHEN j.trigger_kind = 'schedule'::job_trigger_kind THEN j.trigger END AS schedule_path,
j.permissioned_as, COALESCE(c.flow_status, c.workflow_as_code_status) AS flow_status, j.raw_flow,
j.flow_step_id IS NOT NULL AS is_flow_step, j.script_lang AS language, c.started_at,
c.status = 'skipped' AS is_skipped, j.raw_lock, j.permissioned_as_email AS email, j.visible_to_owner,
c.memory_peak AS mem_peak, j.tag, j.priority, NULL::TEXT AS logs, c.result_columns,
j.script_entrypoint_override, j.preprocessed, c.result->'wm_labels' as labels
FROM v2_job_completed c JOIN v2_job j USING (id) WHERE j.id = $1",
)
.bind(uuid)
.fetch_one(db)
Expand Down
8 changes: 4 additions & 4 deletions backend/tests/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2369,7 +2369,7 @@ async fn test_script_schedule_handlers(db: Pool<Postgres>) -> anyhow::Result<()>
let uuid = uuid.unwrap().unwrap();

let completed_job = sqlx::query!(
"SELECT script_path FROM v2_as_completed_job WHERE id = $1",
"SELECT j.runnable_path as script_path FROM v2_job_completed c JOIN v2_job j USING (id) WHERE j.id = $1",
uuid
)
.fetch_one(&db2)
Expand Down Expand Up @@ -2440,7 +2440,7 @@ async fn test_script_schedule_handlers(db: Pool<Postgres>) -> anyhow::Result<()>
let uuid = uuid.unwrap().unwrap();

let completed_job =
sqlx::query!("SELECT script_path FROM v2_as_completed_job WHERE id = $1", uuid)
sqlx::query!("SELECT j.runnable_path as script_path FROM v2_job_completed c JOIN v2_job j USING (id) WHERE j.id = $1", uuid)
.fetch_one(&db2)
.await
.unwrap();
Expand Down Expand Up @@ -2527,7 +2527,7 @@ async fn test_flow_schedule_handlers(db: Pool<Postgres>) -> anyhow::Result<()> {
let uuid = uuid.unwrap().unwrap();

let completed_job = sqlx::query!(
"SELECT script_path FROM v2_as_completed_job WHERE id = $1",
"SELECT j.runnable_path as script_path FROM v2_job_completed c JOIN v2_job j USING (id) WHERE j.id = $1",
uuid
)
.fetch_one(&db2)
Expand Down Expand Up @@ -2599,7 +2599,7 @@ async fn test_flow_schedule_handlers(db: Pool<Postgres>) -> anyhow::Result<()> {
let uuid = uuid.unwrap().unwrap();

let completed_job =
sqlx::query!("SELECT script_path FROM v2_as_completed_job WHERE id = $1", uuid)
sqlx::query!("SELECT j.runnable_path as script_path FROM v2_job_completed c JOIN v2_job j USING (id) WHERE j.id = $1", uuid)
.fetch_one(&db2)
.await
.unwrap();
Expand Down
44 changes: 22 additions & 22 deletions backend/windmill-api/src/approvals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,31 +204,31 @@ pub async fn get_approval_form_details(
"WITH job_info AS (
-- Query for Teams (running jobs)
SELECT
parent.job_kind AS \"job_kind!: JobKind\",
parent.script_hash AS \"script_hash: ScriptHash\",
parent.raw_flow AS \"raw_flow: sqlx::types::Json<Box<RawValue>>\",
child.parent_job AS \"parent_job: Uuid\",
parent.created_at AS \"created_at!: chrono::NaiveDateTime\",
parent.created_by AS \"created_by!\",
parent.script_path,
parent.args AS \"args: sqlx::types::Json<Box<RawValue>>\"
FROM v2_as_queue child
JOIN v2_as_queue parent ON parent.id = child.parent_job
WHERE child.id = $1 AND child.workspace_id = $2
parent_j.kind AS \"job_kind!: JobKind\",
parent_j.runnable_id AS \"script_hash: ScriptHash\",
parent_j.raw_flow AS \"raw_flow: sqlx::types::Json<Box<RawValue>>\",
child_j.parent_job AS \"parent_job: Uuid\",
parent_j.created_at AS \"created_at!: chrono::NaiveDateTime\",
parent_j.created_by AS \"created_by!\",
parent_j.runnable_path as script_path,
parent_j.args AS \"args: sqlx::types::Json<Box<RawValue>>\"
FROM v2_job_queue child_q JOIN v2_job child_j USING (id)
JOIN v2_job parent_j ON parent_j.id = child_j.parent_job
WHERE child_j.id = $1 AND child_j.workspace_id = $2
UNION ALL
-- Query for Slack (completed jobs)
SELECT
v2_as_queue.job_kind AS \"job_kind!: JobKind\",
v2_as_queue.script_hash AS \"script_hash: ScriptHash\",
v2_as_queue.raw_flow AS \"raw_flow: sqlx::types::Json<Box<RawValue>>\",
v2_as_completed_job.parent_job AS \"parent_job: Uuid\",
v2_as_completed_job.created_at AS \"created_at!: chrono::NaiveDateTime\",
v2_as_completed_job.created_by AS \"created_by!\",
v2_as_queue.script_path,
v2_as_queue.args AS \"args: sqlx::types::Json<Box<RawValue>>\"
FROM v2_as_queue
JOIN v2_as_completed_job ON v2_as_completed_job.parent_job = v2_as_queue.id
WHERE v2_as_completed_job.id = $1 AND v2_as_completed_job.workspace_id = $2
parent_j.kind AS \"job_kind!: JobKind\",
parent_j.runnable_id AS \"script_hash: ScriptHash\",
parent_j.raw_flow AS \"raw_flow: sqlx::types::Json<Box<RawValue>>\",
completed_j.parent_job AS \"parent_job: Uuid\",
completed_j.created_at AS \"created_at!: chrono::NaiveDateTime\",
completed_j.created_by AS \"created_by!\",
parent_j.runnable_path as script_path,
parent_j.args AS \"args: sqlx::types::Json<Box<RawValue>>\"
FROM v2_job_completed completed_c JOIN v2_job completed_j USING (id)
JOIN v2_job parent_j ON parent_j.id = completed_j.parent_job
WHERE completed_j.id = $1 AND completed_j.workspace_id = $2
)
SELECT * FROM job_info LIMIT 1",
job_id,
Expand Down
14 changes: 7 additions & 7 deletions backend/windmill-api/src/apps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2365,13 +2365,13 @@ async fn check_if_allowed_to_access_s3_file_from_app(
|| {
sqlx::query_scalar!(
r#"SELECT EXISTS (
SELECT 1 FROM v2_as_completed_job
WHERE workspace_id = $2
AND (job_kind = 'appscript' OR job_kind = 'preview')
AND created_by = 'anonymous'
AND started_at > now() - interval '3 hours'
AND script_path LIKE $3 || '/%'
AND result @> ('{"s3":"' || $1 || '"}')::jsonb
SELECT 1 FROM v2_job_completed c JOIN v2_job j USING (id)
WHERE j.workspace_id = $2
AND (j.kind = 'appscript' OR j.kind = 'preview')
AND j.created_by = 'anonymous'
AND c.started_at > now() - interval '3 hours'
AND j.runnable_path LIKE $3 || '/%'
AND c.result @> ('{"s3":"' || $1 || '"}')::jsonb
)"#,
file_query.s3,
w_id,
Expand Down
12 changes: 6 additions & 6 deletions backend/windmill-api/src/concurrency_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,23 +159,23 @@ async fn get_concurrent_intervals(
let lq = ListCompletedQuery { order_desc: Some(true), ..lq };
let lqc = lq.clone();
let lqq: ListQueueQuery = lqc.into();
let mut sqlb_q = SqlBuilder::select_from("v2_as_queue")
let mut sqlb_q = SqlBuilder::select_from("v2_job_queue q JOIN v2_job USING (id) LEFT JOIN v2_job_runtime r USING (id) LEFT JOIN v2_job_status s USING (id)")
.fields(UnifiedJob::queued_job_fields())
.order_by("created_at", lq.order_desc.unwrap_or(true))
.limit(row_limit)
.clone();
let mut sqlb_c = SqlBuilder::select_from("v2_as_completed_job")
let mut sqlb_c = SqlBuilder::select_from("v2_job_completed c JOIN v2_job USING (id)")
.fields(UnifiedJob::completed_job_fields())
.order_by("started_at", lq.order_desc.unwrap_or(true))
.limit(row_limit)
.clone();
let mut sqlb_q_user = SqlBuilder::select_from("v2_as_queue")
.fields(&["id"])
let mut sqlb_q_user = SqlBuilder::select_from("v2_job_queue q JOIN v2_job USING (id)")
.fields(&["v2_job.id"])
.order_by("created_at", lq.order_desc.unwrap_or(true))
.limit(row_limit)
.clone();
let mut sqlb_c_user = SqlBuilder::select_from("v2_as_completed_job")
.fields(&["id"])
let mut sqlb_c_user = SqlBuilder::select_from("v2_job_completed c JOIN v2_job USING (id)")
.fields(&["v2_job.id"])
.order_by("started_at", lq.order_desc.unwrap_or(true))
.limit(row_limit)
.clone();
Expand Down
Loading