diff --git a/kube-runtime/src/utils/predicate.rs b/kube-runtime/src/utils/predicate.rs index 179ba8d80..797532137 100644 --- a/kube-runtime/src/utils/predicate.rs +++ b/kube-runtime/src/utils/predicate.rs @@ -1,14 +1,15 @@ -use crate::{reflector::ObjectRef, watcher::Error}; +use crate::watcher::Error; use core::{ pin::Pin, task::{ready, Context, Poll}, }; use futures::Stream; -use kube_client::Resource; +use kube_client::{api::ObjectMeta, Resource}; use pin_project::pin_project; use std::{ collections::{hash_map::DefaultHasher, HashMap}, hash::{Hash, Hasher}, + marker::PhantomData, }; fn hash(t: &T) -> u64 { @@ -17,6 +18,24 @@ fn hash(t: &T) -> u64 { hasher.finish() } +/// Private cache key that includes UID in equality for predicate filtering +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +struct PredicateCacheKey { + name: String, + namespace: Option, + uid: Option, +} + +impl From<&ObjectMeta> for PredicateCacheKey { + fn from(meta: &ObjectMeta) -> Self { + Self { + name: meta.name.clone().unwrap_or_default(), + namespace: meta.namespace.clone(), + uid: meta.uid.clone(), + } + } +} + /// A predicate is a hasher of Kubernetes objects stream filtering pub trait Predicate { /// A predicate only needs to implement optional hashing when keys exist @@ -103,7 +122,9 @@ pub struct PredicateFilter> { #[pin] stream: St, predicate: P, - cache: HashMap, u64>, + cache: HashMap, + // K: Resource necessary to get .meta() of an object during polling + _phantom: PhantomData, } impl PredicateFilter where @@ -116,6 +137,7 @@ where stream, predicate, cache: HashMap::new(), + _phantom: PhantomData, } } } @@ -134,17 +156,9 @@ where break match ready!(me.stream.as_mut().poll_next(cx)) { Some(Ok(obj)) => { if let Some(val) = me.predicate.hash_property(&obj) { - let key = ObjectRef::from_obj(&obj); - let changed = if let Some(old) = me.cache.get(&key) { - *old != val - } else { - true - }; - if let Some(old) = me.cache.get_mut(&key) { - *old = val; - } else { - me.cache.insert(key, val); - } + let key = PredicateCacheKey::from(obj.meta()); + let changed = me.cache.get(&key) != Some(&val); + me.cache.insert(key, val); if changed { Some(Ok(obj)) } else { @@ -251,4 +265,58 @@ pub(crate) mod tests { assert_eq!(second.meta().generation, Some(2)); assert!(matches!(poll!(rx.next()), Poll::Ready(None))); } + + #[tokio::test] + async fn predicate_filtering_should_handle_recreated_resources_with_same_generation() { + use k8s_openapi::api::core::v1::Pod; + + let mkobj = |g: i32, uid: &str| { + let p: Pod = serde_json::from_value(json!({ + "apiVersion": "v1", + "kind": "Pod", + "metadata": { + "name": "blog", + "namespace": "default", + "generation": Some(g), + "uid": uid, + }, + "spec": { + "containers": [{ + "name": "blog", + "image": "clux/blog:0.1.0" + }], + } + })) + .unwrap(); + p + }; + + // Simulate: create (gen=1, uid=1) -> update (gen=1, uid=1) -> delete -> + // create (gen=1, uid=2) -> delete -> create (gen=2, uid=3) + let data = stream::iter([ + Ok(mkobj(1, "uid-1")), + Ok(mkobj(1, "uid-1")), + Ok(mkobj(1, "uid-2")), + Ok(mkobj(2, "uid-3")), + ]); + let mut rx = pin!(PredicateFilter::new(data, predicates::generation)); + + // mkobj(1, uid-1) passed through + let first = rx.next().now_or_never().unwrap().unwrap().unwrap(); + assert_eq!(first.meta().generation, Some(1)); + assert_eq!(first.meta().uid.as_deref(), Some("uid-1")); + + // (no repeat mkobj(1, uid-1) - same UID and generation) + // mkobj(1, uid-2) next - different UID detected + let second = rx.next().now_or_never().unwrap().unwrap().unwrap(); + assert_eq!(second.meta().generation, Some(1)); + assert_eq!(second.meta().uid.as_deref(), Some("uid-2")); + + // mkobj(2, uid-3) next + let third = rx.next().now_or_never().unwrap().unwrap().unwrap(); + assert_eq!(third.meta().generation, Some(2)); + assert_eq!(third.meta().uid.as_deref(), Some("uid-3")); + + assert!(matches!(poll!(rx.next()), Poll::Ready(None))); + } }