@@ -7,7 +7,7 @@ pub mod store;
77
88use crate :: {
99 config:: Config ,
10- data:: { self , Application , Dependency , Host , Integration , Log , Payload , Telemetry } ,
10+ data:: { self , Application , Dependency , Endpoint , Host , Integration , Log , Payload , Telemetry } ,
1111 metrics:: { ContextKey , MetricBuckets , MetricContexts } ,
1212} ;
1313use ddcommon:: Endpoint ;
@@ -79,6 +79,7 @@ pub enum TelemetryActions {
7979 AddDependency ( Dependency ) ,
8080 AddIntegration ( Integration ) ,
8181 AddLog ( ( LogIdentifier , Log ) ) ,
82+ AddEndpoint ( Endpoint ) ,
8283 Lifecycle ( LifecycleAction ) ,
8384 #[ serde( skip) ]
8485 CollectStats ( oneshot:: Sender < TelemetryWorkerStats > ) ,
@@ -110,6 +111,7 @@ struct TelemetryWorkerData {
110111 dependencies : store:: Store < Dependency > ,
111112 configurations : store:: Store < data:: Configuration > ,
112113 integrations : store:: Store < data:: Integration > ,
114+ endpoints : store:: Store < data:: Endpoint > ,
113115 logs : store:: QueueHashMap < LogIdentifier , Log > ,
114116 metric_contexts : MetricContexts ,
115117 metric_buckets : MetricBuckets ,
@@ -181,6 +183,8 @@ pub struct TelemetryWorkerStats {
181183 pub configurations_unflushed : u32 ,
182184 pub integrations_stored : u32 ,
183185 pub integrations_unflushed : u32 ,
186+ pub endpoints_stored : u32 ,
187+ pub endpoints_unflushed : u32 ,
184188 pub logs : u32 ,
185189 pub metric_contexts : u32 ,
186190 pub metric_buckets : MetricBucketStats ,
@@ -197,6 +201,8 @@ impl Add for TelemetryWorkerStats {
197201 configurations_unflushed : self . configurations_unflushed + rhs. configurations_unflushed ,
198202 integrations_stored : self . integrations_stored + rhs. integrations_stored ,
199203 integrations_unflushed : self . integrations_unflushed + rhs. integrations_unflushed ,
204+ endpoints_stored : self . endpoints_stored + rhs. endpoints_stored ,
205+ endpoints_unflushed : self . endpoints_unflushed + rhs. endpoints_unflushed ,
200206 logs : self . logs + rhs. logs ,
201207 metric_contexts : self . metric_contexts + rhs. metric_contexts ,
202208 metric_buckets : MetricBucketStats {
@@ -315,7 +321,7 @@ impl TelemetryWorker {
315321 }
316322 }
317323 }
318- AddConfig ( _) | AddDependency ( _) | AddIntegration ( _) | Lifecycle ( ExtendedHeartbeat ) => { }
324+ AddConfig ( _) | AddDependency ( _) | AddIntegration ( _) | AddEndpoint ( _ ) | Lifecycle ( ExtendedHeartbeat ) => { }
319325 Lifecycle ( Stop ) => {
320326 if !self . data . started {
321327 return BREAK ;
@@ -372,6 +378,7 @@ impl TelemetryWorker {
372378 AddDependency ( dep) => self . data . dependencies . insert ( dep) ,
373379 AddIntegration ( integration) => self . data . integrations . insert ( integration) ,
374380 AddConfig ( cfg) => self . data . configurations . insert ( cfg) ,
381+ AddEndpoint ( endpoint) => self . data . endpoints . insert ( endpoint) ,
375382 AddLog ( ( identifier, log) ) => {
376383 let ( l, new) = self . data . logs . get_mut_or_insert ( identifier, log) ;
377384 if !new {
@@ -424,6 +431,7 @@ impl TelemetryWorker {
424431 self . data . dependencies . unflush_stored ( ) ;
425432 self . data . integrations . unflush_stored ( ) ;
426433 self . data . configurations . unflush_stored ( ) ;
434+ self . data . endpoints . unflush_stored ( ) ;
427435
428436 let app_started = data:: Payload :: AppStarted ( self . build_app_started ( ) ) ;
429437 match self . send_payload ( & app_started) . await {
@@ -516,6 +524,13 @@ impl TelemetryWorker {
516524 } ,
517525 ) )
518526 }
527+ if self . data . endpoints . flush_not_empty ( ) {
528+ payloads. push ( data:: Payload :: AppEndpointsChange (
529+ data:: AppEndpointsChange {
530+ endpoints : self . data . endpoints . unflushed ( ) . cloned ( ) . collect ( ) ,
531+ } ,
532+ ) )
533+ }
519534 payloads
520535 }
521536
@@ -618,6 +633,9 @@ impl TelemetryWorker {
618633 . data
619634 . configurations
620635 . removed_flushed ( p. configuration . len ( ) ) ,
636+ AppEndpointsChange ( p) => {
637+ self . data . endpoints . removed_flushed ( p. endpoints . len ( ) )
638+ }
621639 MessageBatch ( batch) => {
622640 for p in batch {
623641 self . payload_sent_success ( p) ;
@@ -722,6 +740,8 @@ impl TelemetryWorker {
722740 configurations_unflushed : self . data . configurations . len_unflushed ( ) as u32 ,
723741 integrations_stored : self . data . integrations . len_stored ( ) as u32 ,
724742 integrations_unflushed : self . data . integrations . len_unflushed ( ) as u32 ,
743+ endpoints_stored : self . data . endpoints . len_stored ( ) as u32 ,
744+ endpoints_unflushed : self . data . endpoints . len_unflushed ( ) as u32 ,
725745 logs : self . data . logs . len ( ) as u32 ,
726746 metric_contexts : self . data . metric_contexts . lock ( ) . len ( ) as u32 ,
727747 metric_buckets : self . data . metric_buckets . stats ( ) ,
@@ -910,7 +930,7 @@ impl TelemetryWorkerHandle {
910930 }
911931}
912932
913- /// How many dependencies/integrations/configs we keep in memory at most
933+ /// How many dependencies/integrations/configs/endpoints we keep in memory at most
914934pub const MAX_ITEMS : usize = 5000 ;
915935
916936#[ derive( Debug , Default , Clone , Copy ) ]
@@ -930,6 +950,7 @@ pub struct TelemetryWorkerBuilder {
930950 pub dependencies : store:: Store < data:: Dependency > ,
931951 pub integrations : store:: Store < data:: Integration > ,
932952 pub configurations : store:: Store < data:: Configuration > ,
953+ pub endpoints : store:: Store < data:: Endpoint > ,
933954 pub native_deps : bool ,
934955 pub rust_shared_lib_deps : bool ,
935956 pub config : Config ,
@@ -980,6 +1001,7 @@ impl TelemetryWorkerBuilder {
9801001 dependencies : store:: Store :: new ( MAX_ITEMS ) ,
9811002 integrations : store:: Store :: new ( MAX_ITEMS ) ,
9821003 configurations : store:: Store :: new ( MAX_ITEMS ) ,
1004+ endpoints : store:: Store :: new ( MAX_ITEMS ) ,
9831005 native_deps : true ,
9841006 rust_shared_lib_deps : false ,
9851007 config : Config :: default ( ) ,
@@ -1010,6 +1032,7 @@ impl TelemetryWorkerBuilder {
10101032 dependencies : self . dependencies ,
10111033 integrations : self . integrations ,
10121034 configurations : self . configurations ,
1035+ endpoints : self . endpoints ,
10131036 logs : store:: QueueHashMap :: default ( ) ,
10141037 metric_contexts : contexts. clone ( ) ,
10151038 metric_buckets : MetricBuckets :: default ( ) ,
0 commit comments