Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 31 additions & 13 deletions kube-runtime/src/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use kube_client::{
Api, Error as ClientErr,
};
use serde::de::DeserializeOwned;
use std::{clone::Clone, collections::VecDeque, fmt::Debug, time::Duration};
use std::{clone::Clone, collections::VecDeque, fmt::Debug, future, time::Duration};
use thiserror::Error;
use tracing::{debug, error, warn};

Expand Down Expand Up @@ -844,18 +844,36 @@ pub fn watch_object<K: Resource + Clone + DeserializeOwned + Debug + Send + 'sta
// filtering by object name in given scope, so there's at most one matching object
// footgun: Api::all may generate events from namespaced objects with the same name in different namespaces
let fields = format!("metadata.name={name}");
watcher(api, Config::default().fields(&fields)).filter_map(|event| async {
match event {
// Pass up `Some` for Found / Updated
Ok(Event::Apply(obj) | Event::InitApply(obj)) => Some(Ok(Some(obj))),
// Pass up `None` for Deleted
Ok(Event::Delete(_)) => Some(Ok(None)),
// Ignore marker events
Ok(Event::Init | Event::InitDone) => None,
// Bubble up errors
Err(err) => Some(Err(err)),
}
})
watcher(api, Config::default().fields(&fields))
// The `obj_seen` state is used to track whether the object exists in each Init / InitApply / InitDone
// sequence of events. If the object wasn't seen in any particular sequence it is treated as deleted and
// `None` is emitted when the InitDone event is received.
//
// The first check ensures `None` is emitted if the object was already gone (or not found), subsequent
// checks ensure `None` is emitted even if for some reason the Delete event wasn't received, which
// could happen given K8S events aren't guaranteed delivery.
.scan(false, |obj_seen, event| {
if matches!(event, Ok(Event::Init)) {
*obj_seen = false;
} else if matches!(event, Ok(Event::InitApply(_))) {
*obj_seen = true;
}
future::ready(Some((*obj_seen, event)))
})
.filter_map(|(obj_seen, event)| async move {
match event {
// Pass up `Some` for Found / Updated
Ok(Event::Apply(obj) | Event::InitApply(obj)) => Some(Ok(Some(obj))),
// Pass up `None` for Deleted
Ok(Event::Delete(_)) => Some(Ok(None)),
// Pass up `None` if the object wasn't seen in the initial list
Ok(Event::InitDone) if !obj_seen => Some(Ok(None)),
// Ignore marker events
Ok(Event::Init | Event::InitDone) => None,
// Bubble up errors
Err(err) => Some(Err(err)),
}
})
}

/// Default watcher backoff inspired by Kubernetes' client-go.
Expand Down