Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 82 additions & 14 deletions kube-runtime/src/utils/predicate.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use crate::{reflector::ObjectRef, watcher::Error};
use crate::watcher::Error;
use core::{
pin::Pin,
task::{ready, Context, Poll},
};
use futures::Stream;
use kube_client::Resource;
use kube_client::{api::ObjectMeta, Resource};
use pin_project::pin_project;
use std::{
collections::{hash_map::DefaultHasher, HashMap},
hash::{Hash, Hasher},
marker::PhantomData,
};

fn hash<T: Hash + ?Sized>(t: &T) -> u64 {
Expand All @@ -17,6 +18,24 @@ fn hash<T: Hash + ?Sized>(t: &T) -> u64 {
hasher.finish()
}

/// Private cache key that includes UID in equality for predicate filtering
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct PredicateCacheKey {
name: String,
namespace: Option<String>,
uid: Option<String>,
}

impl From<&ObjectMeta> for PredicateCacheKey {
fn from(meta: &ObjectMeta) -> Self {
Self {
name: meta.name.clone().unwrap_or_default(),
namespace: meta.namespace.clone(),
uid: meta.uid.clone(),
}
}
}

/// A predicate is a hasher of Kubernetes objects stream filtering
pub trait Predicate<K> {
/// A predicate only needs to implement optional hashing when keys exist
Expand Down Expand Up @@ -103,7 +122,9 @@ pub struct PredicateFilter<St, K: Resource, P: Predicate<K>> {
#[pin]
stream: St,
predicate: P,
cache: HashMap<ObjectRef<K>, u64>,
cache: HashMap<PredicateCacheKey, u64>,
// K: Resource necessary to get .meta() of an object during polling
_phantom: PhantomData<K>,
}
impl<St, K, P> PredicateFilter<St, K, P>
where
Expand All @@ -116,6 +137,7 @@ where
stream,
predicate,
cache: HashMap::new(),
_phantom: PhantomData,
}
}
}
Expand All @@ -134,17 +156,9 @@ where
break match ready!(me.stream.as_mut().poll_next(cx)) {
Some(Ok(obj)) => {
if let Some(val) = me.predicate.hash_property(&obj) {
let key = ObjectRef::from_obj(&obj);
let changed = if let Some(old) = me.cache.get(&key) {
*old != val
} else {
true
};
if let Some(old) = me.cache.get_mut(&key) {
*old = val;
} else {
me.cache.insert(key, val);
}
let key = PredicateCacheKey::from(obj.meta());
let changed = me.cache.get(&key) != Some(&val);
me.cache.insert(key, val);
if changed {
Some(Ok(obj))
} else {
Expand Down Expand Up @@ -251,4 +265,58 @@ pub(crate) mod tests {
assert_eq!(second.meta().generation, Some(2));
assert!(matches!(poll!(rx.next()), Poll::Ready(None)));
}

#[tokio::test]
async fn predicate_filtering_should_handle_recreated_resources_with_same_generation() {
use k8s_openapi::api::core::v1::Pod;

let mkobj = |g: i32, uid: &str| {
let p: Pod = serde_json::from_value(json!({
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"name": "blog",
"namespace": "default",
"generation": Some(g),
"uid": uid,
},
"spec": {
"containers": [{
"name": "blog",
"image": "clux/blog:0.1.0"
}],
}
}))
.unwrap();
p
};

// Simulate: create (gen=1, uid=1) -> update (gen=1, uid=1) -> delete ->
// create (gen=1, uid=2) -> delete -> create (gen=2, uid=3)
let data = stream::iter([
Ok(mkobj(1, "uid-1")),
Ok(mkobj(1, "uid-1")),
Ok(mkobj(1, "uid-2")),
Ok(mkobj(2, "uid-3")),
]);
let mut rx = pin!(PredicateFilter::new(data, predicates::generation));

// mkobj(1, uid-1) passed through
let first = rx.next().now_or_never().unwrap().unwrap().unwrap();
assert_eq!(first.meta().generation, Some(1));
assert_eq!(first.meta().uid.as_deref(), Some("uid-1"));

// (no repeat mkobj(1, uid-1) - same UID and generation)
// mkobj(1, uid-2) next - different UID detected
let second = rx.next().now_or_never().unwrap().unwrap().unwrap();
assert_eq!(second.meta().generation, Some(1));
assert_eq!(second.meta().uid.as_deref(), Some("uid-2"));

// mkobj(2, uid-3) next
let third = rx.next().now_or_never().unwrap().unwrap().unwrap();
assert_eq!(third.meta().generation, Some(2));
assert_eq!(third.meta().uid.as_deref(), Some("uid-3"));

assert!(matches!(poll!(rx.next()), Poll::Ready(None)));
}
}