1- use crate :: { reflector :: ObjectRef , watcher:: Error } ;
1+ use crate :: watcher:: Error ;
22use core:: {
33 pin:: Pin ,
4- task:: { ready , Context , Poll } ,
4+ task:: { Context , Poll , ready } ,
55} ;
66use futures:: Stream ;
7- use kube_client:: Resource ;
7+ use kube_client:: { Resource , api :: ObjectMeta } ;
88use pin_project:: pin_project;
99use std:: {
10- collections:: { hash_map:: DefaultHasher , HashMap } ,
10+ collections:: { HashMap , hash_map:: DefaultHasher } ,
1111 hash:: { Hash , Hasher } ,
12+ marker:: PhantomData ,
1213} ;
1314
1415fn hash < T : Hash + ?Sized > ( t : & T ) -> u64 {
@@ -17,6 +18,24 @@ fn hash<T: Hash + ?Sized>(t: &T) -> u64 {
1718 hasher. finish ( )
1819}
1920
21+ /// Private cache key that includes UID in equality for predicate filtering
22+ #[ derive( Debug , Clone , PartialEq , Eq , Hash ) ]
23+ struct PredicateCacheKey {
24+ name : String ,
25+ namespace : Option < String > ,
26+ uid : Option < String > ,
27+ }
28+
29+ impl From < & ObjectMeta > for PredicateCacheKey {
30+ fn from ( meta : & ObjectMeta ) -> Self {
31+ Self {
32+ name : meta. name . clone ( ) . unwrap_or_default ( ) ,
33+ namespace : meta. namespace . clone ( ) ,
34+ uid : meta. uid . clone ( ) ,
35+ }
36+ }
37+ }
38+
2039/// A predicate is a hasher of Kubernetes objects stream filtering
2140pub trait Predicate < K > {
2241 /// A predicate only needs to implement optional hashing when keys exist
@@ -103,7 +122,8 @@ pub struct PredicateFilter<St, K: Resource, P: Predicate<K>> {
103122 #[ pin]
104123 stream : St ,
105124 predicate : P ,
106- cache : HashMap < ObjectRef < K > , ( Option < String > , u64 ) > ,
125+ cache : HashMap < PredicateCacheKey , u64 > ,
126+ _phantom : PhantomData < K > ,
107127}
108128impl < St , K , P > PredicateFilter < St , K , P >
109129where
@@ -116,6 +136,7 @@ where
116136 stream,
117137 predicate,
118138 cache : HashMap :: new ( ) ,
139+ _phantom : PhantomData ,
119140 }
120141 }
121142}
@@ -134,14 +155,9 @@ where
134155 break match ready ! ( me. stream. as_mut( ) . poll_next( cx) ) {
135156 Some ( Ok ( obj) ) => {
136157 if let Some ( val) = me. predicate . hash_property ( & obj) {
137- let key = ObjectRef :: from_obj ( & obj) ;
138- let uid = obj. meta ( ) . uid . clone ( ) ;
139- let changed = if let Some ( ( old_uid, old_val) ) = me. cache . get ( & key) {
140- old_uid != & uid || * old_val != val
141- } else {
142- true
143- } ;
144- me. cache . insert ( key, ( uid, val) ) ;
158+ let key = PredicateCacheKey :: from ( obj. meta ( ) ) ;
159+ let changed = me. cache . get ( & key) != Some ( & val) ;
160+ me. cache . insert ( key, val) ;
145161 if changed {
146162 Some ( Ok ( obj) )
147163 } else {
@@ -199,8 +215,8 @@ pub mod predicates {
199215pub ( crate ) mod tests {
200216 use std:: { pin:: pin, task:: Poll } ;
201217
202- use super :: { predicates , Error , PredicateFilter } ;
203- use futures:: { poll , stream , FutureExt , StreamExt } ;
218+ use super :: { Error , PredicateFilter , predicates } ;
219+ use futures:: { FutureExt , StreamExt , poll , stream } ;
204220 use kube_client:: Resource ;
205221 use serde_json:: json;
206222
@@ -276,8 +292,8 @@ pub(crate) mod tests {
276292
277293 // Simulate: create (gen=1, uid=1) -> delete -> create (gen=1, uid=2)
278294 let data = stream:: iter ( [
279- Ok ( mkobj ( 1 , "uid-1" ) ) , // First resource created, generation=1
280- Ok ( mkobj ( 1 , "uid-2" ) ) , // Resource recreated with same generation but different UID
295+ Ok ( mkobj ( 1 , "uid-1" ) ) , // First resource created, generation=1
296+ Ok ( mkobj ( 1 , "uid-2" ) ) , // Resource recreated with same generation but different UID
281297 ] ) ;
282298 let mut rx = pin ! ( PredicateFilter :: new( data, predicates:: generation) ) ;
283299
0 commit comments