From f10b1c932262e9e984a2a9e8e3f695543c24748b Mon Sep 17 00:00:00 2001 From: doxxx93 Date: Wed, 22 Oct 2025 09:27:49 +0900 Subject: [PATCH 1/6] fix(predicate): track resource UID to handle recreated resources Fixes #1830 Resources recreated with the same name/namespace but different UIDs were incorrectly filtered out by the predicate filter. The cache now tracks both ObjectRef and UID to distinguish between different resource instances. The bug occurred because ObjectRef deliberately excludes UID from its Hash implementation (for general-purpose use), but PredicateFilter needs to distinguish between different resource instances. Changes: - Modified PredicateFilter cache to HashMap, (Option, u64)> - Updated poll_next to check both UID and predicate value changes - Added regression test for recreated resources with same generation The fix ensures that when a resource is deleted and recreated with the same name/namespace but a different UID (and same generation), the new resource is properly reconciled by controllers using predicate filters. Signed-off-by: doxxx93 --- kube-runtime/src/utils/predicate.rs | 60 +++++++++++++++++++++++++---- 1 file changed, 52 insertions(+), 8 deletions(-) diff --git a/kube-runtime/src/utils/predicate.rs b/kube-runtime/src/utils/predicate.rs index 179ba8d80..4e5d934e9 100644 --- a/kube-runtime/src/utils/predicate.rs +++ b/kube-runtime/src/utils/predicate.rs @@ -103,7 +103,7 @@ pub struct PredicateFilter> { #[pin] stream: St, predicate: P, - cache: HashMap, u64>, + cache: HashMap, (Option, u64)>, } impl PredicateFilter where @@ -135,16 +135,13 @@ where Some(Ok(obj)) => { if let Some(val) = me.predicate.hash_property(&obj) { let key = ObjectRef::from_obj(&obj); - let changed = if let Some(old) = me.cache.get(&key) { - *old != val + let uid = obj.meta().uid.clone(); + let changed = if let Some((old_uid, old_val)) = me.cache.get(&key) { + old_uid != &uid || *old_val != val } else { true }; - if let Some(old) = me.cache.get_mut(&key) { - *old = val; - } else { - me.cache.insert(key, val); - } + me.cache.insert(key, (uid, val)); if changed { Some(Ok(obj)) } else { @@ -251,4 +248,51 @@ pub(crate) mod tests { assert_eq!(second.meta().generation, Some(2)); assert!(matches!(poll!(rx.next()), Poll::Ready(None))); } + + #[tokio::test] + async fn predicate_filtering_should_handle_recreated_resources_with_same_generation() { + use k8s_openapi::api::core::v1::Pod; + + let mkobj = |g: i32, uid: &str| { + let p: Pod = serde_json::from_value(json!({ + "apiVersion": "v1", + "kind": "Pod", + "metadata": { + "name": "blog", + "namespace": "default", + "generation": Some(g), + "uid": uid, + }, + "spec": { + "containers": [{ + "name": "blog", + "image": "clux/blog:0.1.0" + }], + } + })) + .unwrap(); + p + }; + + // Simulate: create (gen=1, uid=1) -> delete -> create (gen=1, uid=2) + let data = stream::iter([ + Ok(mkobj(1, "uid-1")), // First resource created, generation=1 + Ok(mkobj(1, "uid-2")), // Resource recreated with same generation but different UID + ]); + let mut rx = pin!(PredicateFilter::new(data, predicates::generation)); + + // First object should pass through + let first = rx.next().now_or_never().unwrap().unwrap().unwrap(); + assert_eq!(first.meta().generation, Some(1)); + assert_eq!(first.meta().uid.as_deref(), Some("uid-1")); + + // Second object should also pass through because it's a different resource + // (different UID), even though it has the same generation + let second = rx.next().now_or_never().unwrap().unwrap().unwrap(); + assert_eq!(second.meta().generation, Some(1)); + assert_eq!(second.meta().uid.as_deref(), Some("uid-2")); + + // Stream should be exhausted + assert!(matches!(poll!(rx.next()), Poll::Ready(None))); + } } From b683a2e0fc0e0b7e84ed801aff3365a6539a6937 Mon Sep 17 00:00:00 2001 From: doxxx93 Date: Thu, 23 Oct 2025 18:58:47 +0900 Subject: [PATCH 2/6] fix(predicate): enhance UID tracking in predicate filtering with a dedicated cache key Signed-off-by: doxxx93 --- kube-runtime/src/utils/predicate.rs | 50 +++++++++++++++++++---------- 1 file changed, 33 insertions(+), 17 deletions(-) diff --git a/kube-runtime/src/utils/predicate.rs b/kube-runtime/src/utils/predicate.rs index 4e5d934e9..cb37b429b 100644 --- a/kube-runtime/src/utils/predicate.rs +++ b/kube-runtime/src/utils/predicate.rs @@ -1,14 +1,15 @@ -use crate::{reflector::ObjectRef, watcher::Error}; +use crate::watcher::Error; use core::{ pin::Pin, - task::{ready, Context, Poll}, + task::{Context, Poll, ready}, }; use futures::Stream; -use kube_client::Resource; +use kube_client::{Resource, api::ObjectMeta}; use pin_project::pin_project; use std::{ - collections::{hash_map::DefaultHasher, HashMap}, + collections::{HashMap, hash_map::DefaultHasher}, hash::{Hash, Hasher}, + marker::PhantomData, }; fn hash(t: &T) -> u64 { @@ -17,6 +18,24 @@ fn hash(t: &T) -> u64 { hasher.finish() } +/// Private cache key that includes UID in equality for predicate filtering +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +struct PredicateCacheKey { + name: String, + namespace: Option, + uid: Option, +} + +impl From<&ObjectMeta> for PredicateCacheKey { + fn from(meta: &ObjectMeta) -> Self { + Self { + name: meta.name.clone().unwrap_or_default(), + namespace: meta.namespace.clone(), + uid: meta.uid.clone(), + } + } +} + /// A predicate is a hasher of Kubernetes objects stream filtering pub trait Predicate { /// A predicate only needs to implement optional hashing when keys exist @@ -103,7 +122,8 @@ pub struct PredicateFilter> { #[pin] stream: St, predicate: P, - cache: HashMap, (Option, u64)>, + cache: HashMap, + _phantom: PhantomData, } impl PredicateFilter where @@ -116,6 +136,7 @@ where stream, predicate, cache: HashMap::new(), + _phantom: PhantomData, } } } @@ -134,14 +155,9 @@ where break match ready!(me.stream.as_mut().poll_next(cx)) { Some(Ok(obj)) => { if let Some(val) = me.predicate.hash_property(&obj) { - let key = ObjectRef::from_obj(&obj); - let uid = obj.meta().uid.clone(); - let changed = if let Some((old_uid, old_val)) = me.cache.get(&key) { - old_uid != &uid || *old_val != val - } else { - true - }; - me.cache.insert(key, (uid, val)); + let key = PredicateCacheKey::from(obj.meta()); + let changed = me.cache.get(&key) != Some(&val); + me.cache.insert(key, val); if changed { Some(Ok(obj)) } else { @@ -199,8 +215,8 @@ pub mod predicates { pub(crate) mod tests { use std::{pin::pin, task::Poll}; - use super::{predicates, Error, PredicateFilter}; - use futures::{poll, stream, FutureExt, StreamExt}; + use super::{Error, PredicateFilter, predicates}; + use futures::{FutureExt, StreamExt, poll, stream}; use kube_client::Resource; use serde_json::json; @@ -276,8 +292,8 @@ pub(crate) mod tests { // Simulate: create (gen=1, uid=1) -> delete -> create (gen=1, uid=2) let data = stream::iter([ - Ok(mkobj(1, "uid-1")), // First resource created, generation=1 - Ok(mkobj(1, "uid-2")), // Resource recreated with same generation but different UID + Ok(mkobj(1, "uid-1")), // First resource created, generation=1 + Ok(mkobj(1, "uid-2")), // Resource recreated with same generation but different UID ]); let mut rx = pin!(PredicateFilter::new(data, predicates::generation)); From 8a5446bb177f4992289502b24c2b6672ee47161b Mon Sep 17 00:00:00 2001 From: doxxx93 Date: Thu, 23 Oct 2025 20:09:21 +0900 Subject: [PATCH 3/6] fix: format predicate.rs to pass rustfmt check Reorder imports alphabetically to match nightly rustfmt requirements. Signed-off-by: doxxx93 --- kube-runtime/src/utils/predicate.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/kube-runtime/src/utils/predicate.rs b/kube-runtime/src/utils/predicate.rs index cb37b429b..cc122973d 100644 --- a/kube-runtime/src/utils/predicate.rs +++ b/kube-runtime/src/utils/predicate.rs @@ -1,13 +1,13 @@ use crate::watcher::Error; use core::{ pin::Pin, - task::{Context, Poll, ready}, + task::{ready, Context, Poll}, }; use futures::Stream; -use kube_client::{Resource, api::ObjectMeta}; +use kube_client::{api::ObjectMeta, Resource}; use pin_project::pin_project; use std::{ - collections::{HashMap, hash_map::DefaultHasher}, + collections::{hash_map::DefaultHasher, HashMap}, hash::{Hash, Hasher}, marker::PhantomData, }; @@ -215,8 +215,8 @@ pub mod predicates { pub(crate) mod tests { use std::{pin::pin, task::Poll}; - use super::{Error, PredicateFilter, predicates}; - use futures::{FutureExt, StreamExt, poll, stream}; + use super::{predicates, Error, PredicateFilter}; + use futures::{poll, stream, FutureExt, StreamExt}; use kube_client::Resource; use serde_json::json; From eaa580ebd664e426e293939ad001be62f7383f2e Mon Sep 17 00:00:00 2001 From: doxxx93 Date: Thu, 23 Oct 2025 20:48:41 +0900 Subject: [PATCH 4/6] fix(predicate): update tests to validate UID tracking for resource recreation scenarios Signed-off-by: doxxx93 --- kube-runtime/src/utils/predicate.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/kube-runtime/src/utils/predicate.rs b/kube-runtime/src/utils/predicate.rs index cc122973d..32cca9537 100644 --- a/kube-runtime/src/utils/predicate.rs +++ b/kube-runtime/src/utils/predicate.rs @@ -123,6 +123,7 @@ pub struct PredicateFilter> { stream: St, predicate: P, cache: HashMap, + // K: Resource necessary to get .meta() of an object during polling _phantom: PhantomData, } impl PredicateFilter @@ -290,10 +291,11 @@ pub(crate) mod tests { p }; - // Simulate: create (gen=1, uid=1) -> delete -> create (gen=1, uid=2) + // Simulate: create (gen=1, uid=1) -> delete -> create (gen=1, uid=2) -> delete -> create (gen=2, uid=3) let data = stream::iter([ Ok(mkobj(1, "uid-1")), // First resource created, generation=1 Ok(mkobj(1, "uid-2")), // Resource recreated with same generation but different UID + Ok(mkobj(2, "uid-3")), // Resource recreated again with new generation and different UID ]); let mut rx = pin!(PredicateFilter::new(data, predicates::generation)); @@ -308,6 +310,12 @@ pub(crate) mod tests { assert_eq!(second.meta().generation, Some(1)); assert_eq!(second.meta().uid.as_deref(), Some("uid-2")); + // Third object should also pass through because it's a different resource + // (different UID and generation) + let third = rx.next().now_or_never().unwrap().unwrap().unwrap(); + assert_eq!(third.meta().generation, Some(2)); + assert_eq!(third.meta().uid.as_deref(), Some("uid-3")); + // Stream should be exhausted assert!(matches!(poll!(rx.next()), Poll::Ready(None))); } From 73eea7f94b83eb2791d14614b388649571ec0e8e Mon Sep 17 00:00:00 2001 From: doxxx93 Date: Thu, 23 Oct 2025 20:53:49 +0900 Subject: [PATCH 5/6] fix(predicate): update filtering logic to handle same UID and generation scenarios Signed-off-by: doxxx93 --- kube-runtime/src/utils/predicate.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/kube-runtime/src/utils/predicate.rs b/kube-runtime/src/utils/predicate.rs index 32cca9537..7cccce34d 100644 --- a/kube-runtime/src/utils/predicate.rs +++ b/kube-runtime/src/utils/predicate.rs @@ -291,9 +291,10 @@ pub(crate) mod tests { p }; - // Simulate: create (gen=1, uid=1) -> delete -> create (gen=1, uid=2) -> delete -> create (gen=2, uid=3) + // Simulate: create (gen=1, uid=1) -> update (gen=1, uid=1) -> delete -> create (gen=1, uid=2) -> delete -> create (gen=2, uid-3) let data = stream::iter([ Ok(mkobj(1, "uid-1")), // First resource created, generation=1 + Ok(mkobj(1, "uid-1")), // Same resource, same generation (should be filtered out) Ok(mkobj(1, "uid-2")), // Resource recreated with same generation but different UID Ok(mkobj(2, "uid-3")), // Resource recreated again with new generation and different UID ]); @@ -304,13 +305,14 @@ pub(crate) mod tests { assert_eq!(first.meta().generation, Some(1)); assert_eq!(first.meta().uid.as_deref(), Some("uid-1")); - // Second object should also pass through because it's a different resource + // Second object (same UID, same generation) should be filtered out - no event + // Third object should pass through because it's a different resource // (different UID), even though it has the same generation let second = rx.next().now_or_never().unwrap().unwrap().unwrap(); assert_eq!(second.meta().generation, Some(1)); assert_eq!(second.meta().uid.as_deref(), Some("uid-2")); - // Third object should also pass through because it's a different resource + // Fourth object should also pass through because it's a different resource // (different UID and generation) let third = rx.next().now_or_never().unwrap().unwrap().unwrap(); assert_eq!(third.meta().generation, Some(2)); From 8b1b7c777cb5d09964615455058c12d2e594a5c4 Mon Sep 17 00:00:00 2001 From: doxxx93 Date: Thu, 23 Oct 2025 22:30:03 +0900 Subject: [PATCH 6/6] fix(predicate): refine filtering logic to ensure unique UID handling across generations Signed-off-by: doxxx93 --- kube-runtime/src/utils/predicate.rs | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/kube-runtime/src/utils/predicate.rs b/kube-runtime/src/utils/predicate.rs index 7cccce34d..797532137 100644 --- a/kube-runtime/src/utils/predicate.rs +++ b/kube-runtime/src/utils/predicate.rs @@ -291,34 +291,32 @@ pub(crate) mod tests { p }; - // Simulate: create (gen=1, uid=1) -> update (gen=1, uid=1) -> delete -> create (gen=1, uid=2) -> delete -> create (gen=2, uid-3) + // Simulate: create (gen=1, uid=1) -> update (gen=1, uid=1) -> delete -> + // create (gen=1, uid=2) -> delete -> create (gen=2, uid=3) let data = stream::iter([ - Ok(mkobj(1, "uid-1")), // First resource created, generation=1 - Ok(mkobj(1, "uid-1")), // Same resource, same generation (should be filtered out) - Ok(mkobj(1, "uid-2")), // Resource recreated with same generation but different UID - Ok(mkobj(2, "uid-3")), // Resource recreated again with new generation and different UID + Ok(mkobj(1, "uid-1")), + Ok(mkobj(1, "uid-1")), + Ok(mkobj(1, "uid-2")), + Ok(mkobj(2, "uid-3")), ]); let mut rx = pin!(PredicateFilter::new(data, predicates::generation)); - // First object should pass through + // mkobj(1, uid-1) passed through let first = rx.next().now_or_never().unwrap().unwrap().unwrap(); assert_eq!(first.meta().generation, Some(1)); assert_eq!(first.meta().uid.as_deref(), Some("uid-1")); - // Second object (same UID, same generation) should be filtered out - no event - // Third object should pass through because it's a different resource - // (different UID), even though it has the same generation + // (no repeat mkobj(1, uid-1) - same UID and generation) + // mkobj(1, uid-2) next - different UID detected let second = rx.next().now_or_never().unwrap().unwrap().unwrap(); assert_eq!(second.meta().generation, Some(1)); assert_eq!(second.meta().uid.as_deref(), Some("uid-2")); - // Fourth object should also pass through because it's a different resource - // (different UID and generation) + // mkobj(2, uid-3) next let third = rx.next().now_or_never().unwrap().unwrap().unwrap(); assert_eq!(third.meta().generation, Some(2)); assert_eq!(third.meta().uid.as_deref(), Some("uid-3")); - // Stream should be exhausted assert!(matches!(poll!(rx.next()), Poll::Ready(None))); } }