diff --git a/coprocessor/fhevm-engine/.sqlx/query-280922cbaa3f2c2c2893da7bc015793f752df19c8940cbc2d26c788cae901d95.json b/coprocessor/fhevm-engine/.sqlx/query-280922cbaa3f2c2c2893da7bc015793f752df19c8940cbc2d26c788cae901d95.json deleted file mode 100644 index 81e75ee72..000000000 --- a/coprocessor/fhevm-engine/.sqlx/query-280922cbaa3f2c2c2893da7bc015793f752df19c8940cbc2d26c788cae901d95.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n UPDATE pbs_computations\n SET is_completed = TRUE, completed_at = NOW()\n WHERE handle = $1;", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Bytea" - ] - }, - "nullable": [] - }, - "hash": "280922cbaa3f2c2c2893da7bc015793f752df19c8940cbc2d26c788cae901d95" -} diff --git a/coprocessor/fhevm-engine/.sqlx/query-6652db2d435f82819a96e368230c09453a363c00285ef2444344a38a78c4a561.json b/coprocessor/fhevm-engine/.sqlx/query-6652db2d435f82819a96e368230c09453a363c00285ef2444344a38a78c4a561.json new file mode 100644 index 000000000..88f49823e --- /dev/null +++ b/coprocessor/fhevm-engine/.sqlx/query-6652db2d435f82819a96e368230c09453a363c00285ef2444344a38a78c4a561.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE pbs_computations\n SET is_completed = TRUE, error = $2, completed_at = NOW()\n WHERE handle = $1;", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Bytea", + "Text" + ] + }, + "nullable": [] + }, + "hash": "6652db2d435f82819a96e368230c09453a363c00285ef2444344a38a78c4a561" +} diff --git a/coprocessor/fhevm-engine/.sqlx/query-6fe4873bbef6fac75d2674b5b4c88696cf131a1a8717d734a728951bc28c4fac.json b/coprocessor/fhevm-engine/.sqlx/query-6fe4873bbef6fac75d2674b5b4c88696cf131a1a8717d734a728951bc28c4fac.json new file mode 100644 index 000000000..7b8a10930 --- /dev/null +++ b/coprocessor/fhevm-engine/.sqlx/query-6fe4873bbef6fac75d2674b5b4c88696cf131a1a8717d734a728951bc28c4fac.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE pbs_computations\n SET is_completed = TRUE, error = NULL, completed_at = NOW()\n WHERE handle = $1;", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Bytea" + ] + }, + "nullable": [] + }, + "hash": "6fe4873bbef6fac75d2674b5b4c88696cf131a1a8717d734a728951bc28c4fac" +} diff --git a/coprocessor/fhevm-engine/.sqlx/query-a006d9ccf85f04ed5c9d65eb759b6e83376343cfc56ad730b15d5fa476d5db37.json b/coprocessor/fhevm-engine/.sqlx/query-a006d9ccf85f04ed5c9d65eb759b6e83376343cfc56ad730b15d5fa476d5db37.json deleted file mode 100644 index d916d4852..000000000 --- a/coprocessor/fhevm-engine/.sqlx/query-a006d9ccf85f04ed5c9d65eb759b6e83376343cfc56ad730b15d5fa476d5db37.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n UPDATE ciphertexts\n SET ciphertext128 = $1\n WHERE handle = $2;", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Bytea", - "Bytea" - ] - }, - "nullable": [] - }, - "hash": "a006d9ccf85f04ed5c9d65eb759b6e83376343cfc56ad730b15d5fa476d5db37" -} diff --git a/coprocessor/fhevm-engine/.sqlx/query-d61a9e4afee1d2141abdff349750feeec085f2752ea8609dce966c34cbdd5299.json b/coprocessor/fhevm-engine/.sqlx/query-d61a9e4afee1d2141abdff349750feeec085f2752ea8609dce966c34cbdd5299.json new file mode 100644 index 000000000..33bdbfb3c --- /dev/null +++ b/coprocessor/fhevm-engine/.sqlx/query-d61a9e4afee1d2141abdff349750feeec085f2752ea8609dce966c34cbdd5299.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE ciphertexts\n SET ciphertext128 = $1\n WHERE handle = $2;", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Bytea", + "Bytea" + ] + }, + "nullable": [] + }, + "hash": "d61a9e4afee1d2141abdff349750feeec085f2752ea8609dce966c34cbdd5299" +} diff --git a/coprocessor/fhevm-engine/.sqlx/query-e7d1a9d37e7ac71a112454fa89589e2d1f075a140ba0fbc79fdf403ca09da6db.json b/coprocessor/fhevm-engine/.sqlx/query-e7d1a9d37e7ac71a112454fa89589e2d1f075a140ba0fbc79fdf403ca09da6db.json new file mode 100644 index 000000000..bb58c5340 --- /dev/null +++ b/coprocessor/fhevm-engine/.sqlx/query-e7d1a9d37e7ac71a112454fa89589e2d1f075a140ba0fbc79fdf403ca09da6db.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE pbs_computations\n SET is_completed = FALSE, error = $2, schedule_order = NOW() + INTERVAL '1 minute'\n WHERE handle = $1;", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Bytea", + "Text" + ] + }, + "nullable": [] + }, + "hash": "e7d1a9d37e7ac71a112454fa89589e2d1f075a140ba0fbc79fdf403ca09da6db" +} diff --git a/coprocessor/fhevm-engine/db-migration/migrations/20251031144714_pbs_computations_retries.sql b/coprocessor/fhevm-engine/db-migration/migrations/20251031144714_pbs_computations_retries.sql new file mode 100644 index 000000000..482f35067 --- /dev/null +++ b/coprocessor/fhevm-engine/db-migration/migrations/20251031144714_pbs_computations_retries.sql @@ -0,0 +1,23 @@ +-- Add migration script here +BEGIN; +-- Lock the table to ensure the backfill of schedule_order is safe +LOCK TABLE pbs_computations IN ACCESS EXCLUSIVE MODE; + +ALTER TABLE pbs_computations + ADD COLUMN IF NOT EXISTS error TEXT, + ADD COLUMN IF NOT EXISTS schedule_order TIMESTAMPTZ DEFAULT NOW(); + +-- Backfill existing rows +UPDATE pbs_computations +SET schedule_order = created_at +WHERE schedule_order IS NULL; + +-- enforce not-null after backfill +ALTER TABLE pbs_computations + ALTER COLUMN schedule_order SET NOT NULL; +COMMIT; + +-- Partial index for unfinished rows ordered/filtered by created_at +CREATE INDEX IF NOT EXISTS idx_pbs_comp_unfinished_created_at +ON pbs_computations (schedule_order, created_at) +WHERE is_completed = FALSE; \ No newline at end of file diff --git a/coprocessor/fhevm-engine/fhevm-engine-common/src/utils.rs b/coprocessor/fhevm-engine/fhevm-engine-common/src/utils.rs index 7228723b1..305941575 100644 --- a/coprocessor/fhevm-engine/fhevm-engine-common/src/utils.rs +++ b/coprocessor/fhevm-engine/fhevm-engine-common/src/utils.rs @@ -110,3 +110,23 @@ impl Default for HeartBeat { Self::new() } } + +#[macro_export] +macro_rules! with_panic_guard { + ($body:expr) => {{ + use std::panic::{catch_unwind, AssertUnwindSafe}; + match catch_unwind(AssertUnwindSafe(|| $body)) { + Ok(v) => Ok(v), + Err(payload) => { + let msg = if let Some(s) = (&*payload).downcast_ref::<&'static str>() { + s.to_string() + } else if let Some(s) = (&*payload).downcast_ref::() { + s.clone() + } else { + "panic payload: non-string".to_string() + }; + Err(msg) + } + } + }}; +} diff --git a/coprocessor/fhevm-engine/sns-worker/src/aws_upload.rs b/coprocessor/fhevm-engine/sns-worker/src/aws_upload.rs index 8a7ea5341..83f54dd81 100644 --- a/coprocessor/fhevm-engine/sns-worker/src/aws_upload.rs +++ b/coprocessor/fhevm-engine/sns-worker/src/aws_upload.rs @@ -1,5 +1,6 @@ use crate::{ - BigCiphertext, Ciphertext128Format, Config, ExecutionError, HandleItem, S3Config, UploadJob, + BigCiphertext, Ciphertext128Format, Config, ExecutionError, HandleItem, S3Config, TaskStatus, + UploadJob, }; use aws_sdk_s3::error::SdkError; use aws_sdk_s3::operation::head_bucket::HeadBucketError; @@ -486,6 +487,7 @@ async fn fetch_pending_uploads( ct128: Arc::new(ct128), otel: telemetry::tracer_with_handle("recovery_task", handle, &transaction_id), transaction_id, + status: TaskStatus::default(), }; // Instruct the uploader to acquire DB lock when processing the item diff --git a/coprocessor/fhevm-engine/sns-worker/src/executor.rs b/coprocessor/fhevm-engine/sns-worker/src/executor.rs index 617545d32..3dc0032de 100644 --- a/coprocessor/fhevm-engine/sns-worker/src/executor.rs +++ b/coprocessor/fhevm-engine/sns-worker/src/executor.rs @@ -7,16 +7,19 @@ use crate::HandleItem; use crate::InternalEvents; use crate::KeySet; use crate::SchedulePolicy; +use crate::TaskStatus; use crate::UploadJob; use crate::SNS_LATENCY_OP_HISTOGRAM; use crate::{Config, ExecutionError}; use aws_sdk_s3::Client; +use core::panic; use fhevm_engine_common::healthz_server::{HealthCheckService, HealthStatus, Version}; use fhevm_engine_common::pg_pool::PostgresPoolManager; use fhevm_engine_common::pg_pool::ServiceError; use fhevm_engine_common::telemetry; use fhevm_engine_common::types::{get_ct_type, SupportedFheCiphertexts}; use fhevm_engine_common::utils::compact_hex; +use fhevm_engine_common::with_panic_guard; use rayon::prelude::*; use sqlx::postgres::PgListener; use sqlx::Pool; @@ -356,15 +359,16 @@ async fn fetch_and_execute_sns_tasks( token.clone(), )?; - update_computations_status(trx, &tasks).await?; - let s = t.child_span("batch_store_ciphertext128"); - update_ciphertext128(trx, &tasks).await?; + update_ciphertext128(trx, &mut tasks).await?; notify_ciphertext128_ready(trx, &conf.db.notify_channel).await?; // Try to enqueue the tasks for upload in the DB // This is a best-effort attempt, as the upload worker might not be available enqueue_upload_tasks(trx, &tasks).await?; + + update_computations_status(trx, &tasks).await?; + telemetry::end_span(s); db_txn.commit().await?; @@ -397,6 +401,7 @@ pub async fn query_sns_tasks( ON a.handle = c.handle WHERE c.ciphertext IS NOT NULL AND a.is_completed = FALSE + AND a.schedule_order <= NOW() ORDER BY a.created_at {} FOR UPDATE SKIP LOCKED LIMIT $1; @@ -435,6 +440,7 @@ pub async fn query_sns_tasks( ct128: Arc::new(BigCiphertext::default()), // to be computed otel: telemetry::tracer_with_handle("task", handle, &transaction_id), transaction_id, + status: TaskStatus::default(), }) }) .collect::, ExecutionError>>()?; @@ -446,9 +452,10 @@ async fn enqueue_upload_tasks( db_txn: &mut Transaction<'_, Postgres>, tasks: &[HandleItem], ) -> Result<(), ExecutionError> { - for task in tasks.iter() { + for task in tasks.iter().filter(|t| t.is_completed()) { task.enqueue_upload_task(db_txn).await?; } + Ok(()) } @@ -522,28 +529,42 @@ fn compute_task( let ct64_compressed = task.ct64_compressed.as_ref(); if ct64_compressed.is_empty() { error!({ handle }, "Empty ciphertext64, skipping task"); + task.status = TaskStatus::UnrecoverableErr("Empty ciphertext64".to_string()); return; // Skip empty ciphertexts } let s = task.otel.child_span("decompress_ct64"); - let ct = decompress_ct(&task.handle, ct64_compressed).unwrap(); // TODO handle error properly - telemetry::end_span(s); + let ct: SupportedFheCiphertexts = match decompress_ct_with_guard(&task.handle, ct64_compressed) + { + Ok(ct) => { + telemetry::end_span(s); + ct + } + Err(err) => { + error!( { handle, error = %err }, "Failed to decompress ct64"); + telemetry::end_span_with_err(s, "failed to decompress".to_string()); + + task.status = TaskStatus::UnrecoverableErr(err.to_string()); + return; + } + }; let ct_type = ct.type_name().to_owned(); - info!( { handle, ct_type }, "Converting ciphertext"); + info!( { handle, ct_type }, "Squash_noise ct"); let mut span = task.otel.child_span("squash_noise"); telemetry::attribute(&mut span, "ct_type", ct_type); - match ct.squash_noise_and_serialize(enable_compression) { + match squash_noise_with_guard(&ct, enable_compression) { Ok(bytes) => { telemetry::end_span(span); + info!( handle = handle, length = bytes.len(), compressed = enable_compression, - "Ciphertext converted" + "Squash_noise completed" ); #[cfg(feature = "test_decrypt_128")] @@ -556,6 +577,7 @@ fn compute_task( }; task.ct128 = Arc::new(BigCiphertext::new(bytes, format)); + task.status = TaskStatus::Completed; // Start uploading the ciphertexts as soon as the ct128 is computed // @@ -586,11 +608,25 @@ fn compute_task( } Err(err) => { telemetry::end_span_with_err(span, err.to_string()); + task.status = TaskStatus::UnrecoverableErr(err.to_string()); error!({ handle = handle, error = %err }, "Failed to convert ct"); } }; } +fn squash_noise_with_guard( + ct: &SupportedFheCiphertexts, + enable_compression: bool, +) -> Result, ExecutionError> { + with_panic_guard!(ct.squash_noise_and_serialize(enable_compression)).map_err(|e| { + // Map panic to SquashNoisePanic + ExecutionError::SquashNoisePanic(format!( + "Panic occurred while squashing noise and serializing: {}", + e + )) + })? +} + /// Updates the database with the computed large ciphertexts. /// /// The ct128 is temporarily stored in PostgresDB to ensure reliability. @@ -602,48 +638,54 @@ fn compute_task( /// completely. async fn update_ciphertext128( db_txn: &mut Transaction<'_, Postgres>, - tasks: &[HandleItem], + tasks: &mut [HandleItem], ) -> Result<(), ExecutionError> { for task in tasks { - if !task.ct128.is_empty() { - let ciphertext128 = task.ct128.bytes(); - let s = task.otel.child_span("ct128_db_insert"); - - // Insert the ciphertext128 into the database for reliability - // Later on, we clean up all uploaded ct128 - let res = sqlx::query!( - " - UPDATE ciphertexts - SET ciphertext128 = $1 - WHERE handle = $2;", - ciphertext128, - task.handle - ) - .execute(db_txn.as_mut()) - .await; - - match res { - Ok(val) => { - info!( - handle = compact_hex(&task.handle), - query_res = format!("{:?}", val), - "Inserted ct128 in DB" - ); - telemetry::end_span(s); - } - Err(err) => { - error!( handle = ?task.handle, error = %err, "Failed to insert ct128 in DB"); - telemetry::end_span_with_err(s, err.to_string()); - // Although this is a single error, we drop the entire batch to be on the safe side - // This will ensure we will not mark a task as completed falsely - return Err(err.into()); - } - } + if task.ct128.is_empty() { + error!( + handle = compact_hex(&task.handle), + "ct128 not computed for task" + ); + continue; + } - // Notify add_ciphertexts - } else { - error!( handle = ?task.handle, "Large ciphertext not computed for task"); + let ciphertext128 = task.ct128.bytes(); + let s = task.otel.child_span("ct128_db_insert"); + + // Insert the ciphertext128 into the database for reliability + // Later on, we clean up all uploaded ct128 + let res = sqlx::query!( + " + UPDATE ciphertexts + SET ciphertext128 = $1 + WHERE handle = $2;", + ciphertext128, + task.handle + ) + .execute(db_txn.as_mut()) + .await; + + match res { + Ok(val) => { + info!( + handle = compact_hex(&task.handle), + query_res = format!("{:?}", val), + "Inserted ct128 in DB" + ); + telemetry::end_span(s); + } + Err(err) => { + error!( handle = compact_hex(&task.handle), error = %err, "Failed to insert ct128 into DB"); + telemetry::end_span_with_err(s, err.to_string()); + // Although the S3-upload might still succeed, we consider this as a failure + // Worst-case scenario, the SnS-computation will be retried later. + // However, if both DB insertion and S3 upload fail, this guarantees that the computation + // will be retried and the ct128 uploaded. + task.status = TaskStatus::TransientErr(err.to_string()); + } } + + // Notify add_ciphertexts } Ok(()) @@ -654,18 +696,49 @@ async fn update_computations_status( tasks: &[HandleItem], ) -> Result<(), ExecutionError> { for task in tasks { - if !task.ct128.is_empty() { - sqlx::query!( - " - UPDATE pbs_computations - SET is_completed = TRUE, completed_at = NOW() - WHERE handle = $1;", - task.handle - ) - .execute(db_txn.as_mut()) - .await?; - } else { - error!( handle = ?task.handle, "Large ciphertext not computed for task"); + match &task.status { + TaskStatus::Completed => { + // Mark the computation as completed and clear transient error + sqlx::query!( + " + UPDATE pbs_computations + SET is_completed = TRUE, error = NULL, completed_at = NOW() + WHERE handle = $1;", + task.handle + ) + .execute(db_txn.as_mut()) + .await?; + } + TaskStatus::UnrecoverableErr(err_msg) => { + // The computation should not be retried unless manually triggered + warn!( handle = compact_hex(&task.handle), error = %err_msg, "Computation failed, unrecoverable err"); + sqlx::query!( + " + UPDATE pbs_computations + SET is_completed = TRUE, error = $2, completed_at = NOW() + WHERE handle = $1;", + task.handle, + err_msg + ) + .execute(db_txn.as_mut()) + .await?; + } + TaskStatus::TransientErr(err_msg) => { + warn!( handle = compact_hex(&task.handle), error = %err_msg, "Computation failed, transient err"); + sqlx::query!( + " + UPDATE pbs_computations + SET is_completed = FALSE, error = $2, schedule_order = NOW() + INTERVAL '1 minute' + WHERE handle = $1;", + task.handle, + err_msg + ) + .execute(db_txn.as_mut()) + .await?; + } + TaskStatus::Pending => { + error!( handle = ?task.handle, "Unexpected task status"); + } } } Ok(()) @@ -684,13 +757,23 @@ async fn notify_ciphertext128_ready( } /// Decompresses a ciphertext based on its type. -fn decompress_ct( +fn decompress_ct_with_guard( handle: &[u8], compressed_ct: &[u8], ) -> Result { let ct_type = get_ct_type(handle)?; - let result = SupportedFheCiphertexts::decompress_no_memcheck(ct_type, compressed_ct)?; + let result = with_panic_guard!(SupportedFheCiphertexts::decompress_no_memcheck( + ct_type, + compressed_ct + )) + .map_err(|e| { + // Map panic to DecompressionError + ExecutionError::DecompressionPanic(format!( + "Panic occurred while decompressing ct of type {}: {}", + ct_type, e + )) + })??; Ok(result) } #[cfg(feature = "test_decrypt_128")] diff --git a/coprocessor/fhevm-engine/sns-worker/src/lib.rs b/coprocessor/fhevm-engine/sns-worker/src/lib.rs index c8bb63912..4954169c4 100644 --- a/coprocessor/fhevm-engine/sns-worker/src/lib.rs +++ b/coprocessor/fhevm-engine/sns-worker/src/lib.rs @@ -237,6 +237,9 @@ pub struct HandleItem { pub otel: OtelTracer, pub transaction_id: Option>, + + /// Status of the processing task + status: TaskStatus, } impl HandleItem { @@ -312,6 +315,10 @@ impl HandleItem { Ok(()) } + + fn is_completed(&self) -> bool { + matches!(self.status, TaskStatus::Completed) + } } impl From for ServiceError { @@ -325,6 +332,15 @@ impl From for ServiceError { } } +#[derive(Clone, Debug, Default, PartialEq, Eq)] +enum TaskStatus { + #[default] + Pending, // Task is pending processing + TransientErr(String), // A temporary error occurred, can retry + UnrecoverableErr(String), // A permanent error occurred, cannot retry + Completed, // The operation was successful +} + #[derive(Error, Debug)] pub enum ExecutionError { #[error("Conversion error: {0}")] @@ -368,6 +384,12 @@ pub enum ExecutionError { #[error("Internal send error: {0}")] InternalSendError(String), + + #[error("Panic on decompression: {0}")] + DecompressionPanic(String), + + #[error("Panic on squash noise: {0}")] + SquashNoisePanic(String), } #[derive(Clone)]