From 2f0acd561c7b18defcfd75b63a9af70c84bee0f6 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Wed, 16 Jul 2025 15:36:54 +0200 Subject: [PATCH 1/5] Improve --- api/src/k8s_client.rs | 93 ++++++++++++++++++++----------------------- 1 file changed, 44 insertions(+), 49 deletions(-) diff --git a/api/src/k8s_client.rs b/api/src/k8s_client.rs index 1329018b..2afa0632 100644 --- a/api/src/k8s_client.rs +++ b/api/src/k8s_client.rs @@ -332,7 +332,7 @@ impl K8sClient for HttpK8sClient { "kind": "StatefulSet", "metadata": { "name": stateful_set_name, - "namespace": NAMESPACE_NAME, + "namespace": NAMESPACE_NAME }, "spec": { "replicas": 1, @@ -348,22 +348,33 @@ impl K8sClient for HttpK8sClient { } }, "spec": { - "volumes": [ + "terminationGracePeriodSeconds": 60, + "initContainers": [ { - "name": REPLICATOR_CONFIG_FILE_VOLUME_NAME, - "configMap": { - "name": replicator_config_map_name - } - }, - { - "name": VECTOR_CONFIG_FILE_VOLUME_NAME, - "configMap": { - "name": VECTOR_CONFIG_MAP_NAME - } - }, - { - "name": LOGS_VOLUME_NAME, - "emptyDir": {} + "name": vector_container_name, + "image": VECTOR_IMAGE_NAME, + "restartPolicy": "Always", + "env": [ + { + "name": "LOGFLARE_API_KEY", + "valueFrom": { + "secretKeyRef": { + "name": LOGFLARE_SECRET_NAME, + "key": "key" + } + } + } + ], + "volumeMounts": [ + { + "name": VECTOR_CONFIG_FILE_VOLUME_NAME, + "mountPath": "/etc/vector" + }, + { + "name": LOGS_VOLUME_NAME, + "mountPath": "/var/log" + } + ] } ], "containers": [ @@ -402,42 +413,26 @@ impl K8sClient for HttpK8sClient { { "name": LOGS_VOLUME_NAME, "mountPath": "/app/logs" - }, + } ] + } + ], + "volumes": [ + { + "name": REPLICATOR_CONFIG_FILE_VOLUME_NAME, + "configMap": { + "name": replicator_config_map_name + } }, { - "name": vector_container_name, - "image": VECTOR_IMAGE_NAME, - "env": [ - { - "name": "LOGFLARE_API_KEY", - "valueFrom": { - "secretKeyRef": { - "name": LOGFLARE_SECRET_NAME, - "key": "key" - } - } - } - ], - "resources": { - "limits": { - "memory": "200Mi", - }, - "requests": { - "memory": "200Mi", - "cpu": "100m" - } - }, - "volumeMounts": [ - { - "name": VECTOR_CONFIG_FILE_VOLUME_NAME, - "mountPath": "/etc/vector" - }, - { - "name": LOGS_VOLUME_NAME, - "mountPath": "/var/log" - } - ], + "name": VECTOR_CONFIG_FILE_VOLUME_NAME, + "configMap": { + "name": VECTOR_CONFIG_MAP_NAME + } + }, + { + "name": LOGS_VOLUME_NAME, + "emptyDir": {} } ] } From 753a8f6000ea2f935a0f81c1dd5fe41ac8f0d928 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Wed, 16 Jul 2025 15:53:01 +0200 Subject: [PATCH 2/5] Improve --- etl/src/pipeline.rs | 2 ++ replicator/src/core.rs | 8 ++++---- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/etl/src/pipeline.rs b/etl/src/pipeline.rs index 2d40ee85..a999b7b5 100644 --- a/etl/src/pipeline.rs +++ b/etl/src/pipeline.rs @@ -242,6 +242,8 @@ where ))); } + info!("pipeline completed successfully"); + Ok(()) } diff --git a/replicator/src/core.rs b/replicator/src/core.rs index e1a96de8..c169505e 100644 --- a/replicator/src/core.rs +++ b/replicator/src/core.rs @@ -16,7 +16,8 @@ use std::fmt; use tracing::{info, instrument, warn}; pub async fn start_replicator() -> anyhow::Result<()> { - info!("starting replicator service"); + info!("starting replicator"); + let replicator_config = load_replicator_config()?; log_config(&replicator_config); @@ -68,7 +69,8 @@ pub async fn start_replicator() -> anyhow::Result<()> { } } - info!("replicator service completed"); + info!("replicator completed successfully"); + Ok(()) } @@ -179,8 +181,6 @@ where warn!("failed to send shutdown signal: {:?}", e); return; } - - info!("pipeline shutdown successfully") }); // Wait for the pipeline to finish (either normally or via shutdown). From 632a1aa754039b4a9673d0c1b8cf27a3abbffe09 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Wed, 16 Jul 2025 16:44:03 +0200 Subject: [PATCH 3/5] Improve --- etl/src/pipeline.rs | 16 +++++++++------- etl/src/replication/apply.rs | 2 ++ 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/etl/src/pipeline.rs b/etl/src/pipeline.rs index a999b7b5..1bb94270 100644 --- a/etl/src/pipeline.rs +++ b/etl/src/pipeline.rs @@ -208,18 +208,20 @@ where if let Err(err) = apply_worker_result { errors.push(err); + info!("apply worker completed with an error, shutting down table sync workers"); + // TODO: in the future we might build a system based on the `ReactiveFuture` that // automatically sends a shutdown signal to table sync workers on apply worker failure. // If there was an error in the apply worker, we want to shut down all table sync // workers, since without an apply worker they are lost. - if let Err(err) = self.shutdown_tx.shutdown() { - info!( - "shut down signal could not be delivered, likely because no workers are running: {:?}", - err - ); + // + // If shutdown fails, it means that all receivers of the shutdown signal were not active + // and in our case receivers are either apply worker or table sync workers. Since the apply + // worker is terminated, we can infer that a failure of sending a message implies that there + // are no active table sync workers. + if self.shutdown_tx.shutdown().is_err() { + info!("there are no table sync workers to shutdown"); } - - info!("apply worker completed with an error, shutting down table sync workers"); } else { info!("apply worker completed successfully"); } diff --git a/etl/src/replication/apply.rs b/etl/src/replication/apply.rs index cb3768f5..caae4735 100644 --- a/etl/src/replication/apply.rs +++ b/etl/src/replication/apply.rs @@ -356,6 +356,7 @@ where // the table sync workers to get stuck if there are no changes in the cdc stream. if !state.handling_transaction() { debug!("processing syncing tables after a period of inactivity of {} seconds", REFRESH_INTERVAL.as_secs()); + let continue_loop = hook.process_syncing_tables(state.next_status_update.flush_lsn, true).await?; if !continue_loop { break Ok(ApplyLoopResult::ApplyStopped); @@ -462,6 +463,7 @@ where "updating lsn for next status update to {}", last_commit_end_lsn ); + state .next_status_update .update_flush_lsn(last_commit_end_lsn); From 0c18a3a67857b33d2d22d610db7558a51c89a358 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Wed, 16 Jul 2025 16:47:39 +0200 Subject: [PATCH 4/5] Improve --- etl/src/replication/apply.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/etl/src/replication/apply.rs b/etl/src/replication/apply.rs index caae4735..dab8ed1d 100644 --- a/etl/src/replication/apply.rs +++ b/etl/src/replication/apply.rs @@ -463,7 +463,7 @@ where "updating lsn for next status update to {}", last_commit_end_lsn ); - + state .next_status_update .update_flush_lsn(last_commit_end_lsn); From 22c02b1eb2a41e15bb28cff5c586bd32e23712fa Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Wed, 16 Jul 2025 16:49:33 +0200 Subject: [PATCH 5/5] Improve --- replicator/src/core.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/replicator/src/core.rs b/replicator/src/core.rs index c169505e..af2a7316 100644 --- a/replicator/src/core.rs +++ b/replicator/src/core.rs @@ -179,7 +179,6 @@ where if let Err(e) = shutdown_tx.shutdown() { warn!("failed to send shutdown signal: {:?}", e); - return; } });