From 7a82904823f3f2049d93906efd37bc9a0c81d3a3 Mon Sep 17 00:00:00 2001 From: "claude[bot]" <209825114+claude[bot]@users.noreply.github.com> Date: Tue, 19 Aug 2025 02:35:19 +0000 Subject: [PATCH] feat: add retry logic to get_step_of_flow_status using backon - Added backon retry with constant backoff (500ms delay, max 3 retries) - Added warning logs for retry attempts to aid debugging - Follows existing codebase patterns for retry resilience - Improves reliability when fetching flow status information Co-authored-by: windmill-internal-app[bot] --- backend/windmill-queue/src/flow_status.rs | 30 +++++++++++++++++------ 1 file changed, 23 insertions(+), 7 deletions(-) diff --git a/backend/windmill-queue/src/flow_status.rs b/backend/windmill-queue/src/flow_status.rs index 3bc6b3c81579a..36d357e8cb3d6 100644 --- a/backend/windmill-queue/src/flow_status.rs +++ b/backend/windmill-queue/src/flow_status.rs @@ -4,6 +4,7 @@ use windmill_common::{ utils::WarnAfterExt, DB, }; +use backon::{BackoffBuilder, ConstantBuilder, Retryable}; #[derive(Debug, Copy, Clone)] pub enum Step { @@ -118,14 +119,29 @@ pub async fn update_workflow_as_code_status( // TODO: merge as a CTE #[tracing::instrument(level = "trace", skip_all)] async fn get_step_of_flow_status(db: &DB, id: Uuid) -> error::Result { - let r = sqlx::query!( - "SELECT (flow_status->'step')::integer as step, jsonb_array_length(flow_status->'modules') as len - FROM v2_job_status WHERE id = $1", - id + let r = (|| async { + sqlx::query!( + "SELECT (flow_status->'step')::integer as step, jsonb_array_length(flow_status->'modules') as len + FROM v2_job_status WHERE id = $1", + id + ) + .fetch_one(db) + .await + .map_err(|e| Error::internal_err(format!("fetching step flow status: {e:#}"))) + }) + .retry( + ConstantBuilder::default() + .with_delay(std::time::Duration::from_millis(500)) + .with_max_times(3) + .build(), ) - .fetch_one(db) - .await - .map_err(|e| Error::internal_err(format!("fetching step flow status: {e:#}")))?; + .notify(|err, dur| { + tracing::warn!( + "Could not fetch step flow status for job {id}, retrying in {dur:#?}, err: {err:#?}" + ); + }) + .sleep(tokio::time::sleep) + .await?; if let Some(step) = r.step { Ok(Step::from_i32_and_len(step, r.len.unwrap_or(0) as usize))