@@ -14,7 +14,7 @@ use kube_client::{
1414 Api , Error as ClientErr ,
1515} ;
1616use serde:: de:: DeserializeOwned ;
17- use std:: { clone:: Clone , collections:: VecDeque , fmt:: Debug , time:: Duration } ;
17+ use std:: { clone:: Clone , collections:: VecDeque , fmt:: Debug , future , time:: Duration } ;
1818use thiserror:: Error ;
1919use tracing:: { debug, error, warn} ;
2020
@@ -844,18 +844,36 @@ pub fn watch_object<K: Resource + Clone + DeserializeOwned + Debug + Send + 'sta
844844 // filtering by object name in given scope, so there's at most one matching object
845845 // footgun: Api::all may generate events from namespaced objects with the same name in different namespaces
846846 let fields = format ! ( "metadata.name={name}" ) ;
847- watcher ( api, Config :: default ( ) . fields ( & fields) ) . filter_map ( |event| async {
848- match event {
849- // Pass up `Some` for Found / Updated
850- Ok ( Event :: Apply ( obj) | Event :: InitApply ( obj) ) => Some ( Ok ( Some ( obj) ) ) ,
851- // Pass up `None` for Deleted
852- Ok ( Event :: Delete ( _) ) => Some ( Ok ( None ) ) ,
853- // Ignore marker events
854- Ok ( Event :: Init | Event :: InitDone ) => None ,
855- // Bubble up errors
856- Err ( err) => Some ( Err ( err) ) ,
857- }
858- } )
847+ watcher ( api, Config :: default ( ) . fields ( & fields) )
848+ // track whether the object was seen in each initial listing
849+ . scan ( false , |obj_seen, event| {
850+ if matches ! ( event, Ok ( Event :: Init ) ) {
851+ * obj_seen = false ;
852+ } else if matches ! ( event, Ok ( Event :: InitApply ( _) ) ) {
853+ * obj_seen = true ;
854+ }
855+ future:: ready ( Some ( ( * obj_seen, event) ) )
856+ } )
857+ . filter_map ( |( obj_seen, event) | async move {
858+ match event {
859+ // Pass up `Some` for Found / Updated
860+ Ok ( Event :: Apply ( obj) ) | Ok ( Event :: InitApply ( obj) ) => Some ( Ok ( Some ( obj) ) ) ,
861+ // Pass up `None` for Deleted
862+ Ok ( Event :: Delete ( _) ) => Some ( Ok ( None ) ) ,
863+ // Ignore marker event
864+ Ok ( Event :: Init ) => None ,
865+ // Pass up `None` if the object wasn't seen in any initial list
866+ Ok ( Event :: InitDone ) => {
867+ if obj_seen {
868+ None
869+ } else {
870+ Some ( Ok ( None ) )
871+ }
872+ }
873+ // Bubble up errors
874+ Err ( err) => Some ( Err ( err) ) ,
875+ }
876+ } )
859877}
860878
861879/// Default watcher backoff inspired by Kubernetes' client-go.
0 commit comments