@@ -109,6 +109,7 @@ pub async fn pegboard_client(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResu
109109 activity ( UpdateMetricsInput {
110110 client_id,
111111 flavor,
112+ draining : state. drain_timeout_ts . is_some ( ) ,
112113 clear : false ,
113114 } ) ,
114115 ) )
@@ -125,6 +126,7 @@ pub async fn pegboard_client(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResu
125126 activity ( UpdateMetricsInput {
126127 client_id,
127128 flavor,
129+ draining : state. drain_timeout_ts . is_some ( ) ,
128130 clear : false ,
129131 } ) ,
130132 ) )
@@ -254,6 +256,7 @@ pub async fn pegboard_client(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResu
254256 ctx. activity ( UpdateMetricsInput {
255257 client_id : input. client_id ,
256258 flavor : input. flavor ,
259+ draining : false ,
257260 clear : true ,
258261 } )
259262 . await ?;
@@ -691,6 +694,7 @@ pub async fn handle_commands(
691694 activity ( UpdateMetricsInput {
692695 client_id,
693696 flavor,
697+ draining : drain_timeout_ts. is_some ( ) ,
694698 clear : false ,
695699 } ) ,
696700 ) )
@@ -933,24 +937,75 @@ async fn check_expired(ctx: &ActivityCtx, input: &CheckExpiredInput) -> GlobalRe
933937struct UpdateMetricsInput {
934938 client_id : Uuid ,
935939 flavor : ClientFlavor ,
940+ #[ serde( default ) ]
941+ draining : bool ,
936942 clear : bool ,
937943}
938944
939945#[ activity( UpdateMetrics ) ]
940946async fn update_metrics ( ctx : & ActivityCtx , input : & UpdateMetricsInput ) -> GlobalResult < ( ) > {
941947 if input. clear {
948+ metrics:: CLIENT_MEMORY_TOTAL
949+ . with_label_values ( & [
950+ & input. client_id . to_string ( ) ,
951+ & input. flavor . to_string ( ) ,
952+ "active" ,
953+ ] )
954+ . set ( 0 ) ;
955+ metrics:: CLIENT_CPU_TOTAL
956+ . with_label_values ( & [
957+ & input. client_id . to_string ( ) ,
958+ & input. flavor . to_string ( ) ,
959+ "active" ,
960+ ] )
961+ . set ( 0 ) ;
962+ metrics:: CLIENT_MEMORY_TOTAL
963+ . with_label_values ( & [
964+ & input. client_id . to_string ( ) ,
965+ & input. flavor . to_string ( ) ,
966+ "draining" ,
967+ ] )
968+ . set ( 0 ) ;
969+ metrics:: CLIENT_CPU_TOTAL
970+ . with_label_values ( & [
971+ & input. client_id . to_string ( ) ,
972+ & input. flavor . to_string ( ) ,
973+ "draining" ,
974+ ] )
975+ . set ( 0 ) ;
942976 metrics:: CLIENT_MEMORY_ALLOCATED
943- . with_label_values ( & [ & input. client_id . to_string ( ) , & input. flavor . to_string ( ) ] )
977+ . with_label_values ( & [
978+ & input. client_id . to_string ( ) ,
979+ & input. flavor . to_string ( ) ,
980+ "active" ,
981+ ] )
944982 . set ( 0 ) ;
945-
946983 metrics:: CLIENT_CPU_ALLOCATED
947- . with_label_values ( & [ & input. client_id . to_string ( ) , & input. flavor . to_string ( ) ] )
984+ . with_label_values ( & [
985+ & input. client_id . to_string ( ) ,
986+ & input. flavor . to_string ( ) ,
987+ "active" ,
988+ ] )
989+ . set ( 0 ) ;
990+ metrics:: CLIENT_MEMORY_ALLOCATED
991+ . with_label_values ( & [
992+ & input. client_id . to_string ( ) ,
993+ & input. flavor . to_string ( ) ,
994+ "draining" ,
995+ ] )
996+ . set ( 0 ) ;
997+ metrics:: CLIENT_CPU_ALLOCATED
998+ . with_label_values ( & [
999+ & input. client_id . to_string ( ) ,
1000+ & input. flavor . to_string ( ) ,
1001+ "draining" ,
1002+ ] )
9481003 . set ( 0 ) ;
9491004
9501005 return Ok ( ( ) ) ;
9511006 }
9521007
953- let ( total_mem, total_cpu , remaining_mem , remaining_cpu) =
1008+ let ( total_mem, remaining_mem , total_cpu , remaining_cpu) =
9541009 ctx. fdb ( )
9551010 . await ?
9561011 . run ( |tx, _mc| async move {
@@ -992,35 +1047,80 @@ async fn update_metrics(ctx: &ActivityCtx, input: &UpdateMetricsInput) -> Global
9921047 )
9931048 . map_err ( |x| fdb:: FdbBindingError :: CustomError ( x. into ( ) ) ) ?;
9941049
995- Ok ( (
996- total_mem,
997- remaining_mem,
998- total_cpu,
999- remaining_cpu,
1000- ) )
1050+ Ok ( ( total_mem, remaining_mem, total_cpu, remaining_cpu) )
10011051 } )
10021052 . custom_instrument ( tracing:: info_span!( "client_update_metrics_tx" ) )
10031053 . await ?;
10041054
1055+ let ( state, other_state) = if input. draining {
1056+ ( "draining" , "active" )
1057+ } else {
1058+ ( "active" , "draining" )
1059+ } ;
1060+ let allocated_mem = total_mem. saturating_sub ( remaining_mem) ;
1061+ let allocated_cpu = total_cpu. saturating_sub ( remaining_cpu) ;
1062+
10051063 metrics:: CLIENT_MEMORY_TOTAL
1006- . with_label_values ( & [ & input. client_id . to_string ( ) , & input. flavor . to_string ( ) ] )
1064+ . with_label_values ( & [
1065+ & input. client_id . to_string ( ) ,
1066+ & input. flavor . to_string ( ) ,
1067+ state,
1068+ ] )
10071069 . set ( total_mem. try_into ( ) ?) ;
1008-
10091070 metrics:: CLIENT_CPU_TOTAL
1010- . with_label_values ( & [ & input. client_id . to_string ( ) , & input. flavor . to_string ( ) ] )
1071+ . with_label_values ( & [
1072+ & input. client_id . to_string ( ) ,
1073+ & input. flavor . to_string ( ) ,
1074+ state,
1075+ ] )
10111076 . set ( total_cpu. try_into ( ) ?) ;
10121077
1013- let allocated_mem = total_mem. saturating_sub ( remaining_mem) ;
1014- let allocated_cpu = total_cpu. saturating_sub ( remaining_cpu) ;
1015-
10161078 metrics:: CLIENT_MEMORY_ALLOCATED
1017- . with_label_values ( & [ & input. client_id . to_string ( ) , & input. flavor . to_string ( ) ] )
1079+ . with_label_values ( & [
1080+ & input. client_id . to_string ( ) ,
1081+ & input. flavor . to_string ( ) ,
1082+ state,
1083+ ] )
10181084 . set ( allocated_mem. try_into ( ) ?) ;
1019-
10201085 metrics:: CLIENT_CPU_ALLOCATED
1021- . with_label_values ( & [ & input. client_id . to_string ( ) , & input. flavor . to_string ( ) ] )
1086+ . with_label_values ( & [
1087+ & input. client_id . to_string ( ) ,
1088+ & input. flavor . to_string ( ) ,
1089+ state,
1090+ ] )
10221091 . set ( allocated_cpu. try_into ( ) ?) ;
10231092
1093+ // Clear other state
1094+ metrics:: CLIENT_MEMORY_TOTAL
1095+ . with_label_values ( & [
1096+ & input. client_id . to_string ( ) ,
1097+ & input. flavor . to_string ( ) ,
1098+ other_state,
1099+ ] )
1100+ . set ( 0 ) ;
1101+ metrics:: CLIENT_CPU_TOTAL
1102+ . with_label_values ( & [
1103+ & input. client_id . to_string ( ) ,
1104+ & input. flavor . to_string ( ) ,
1105+ other_state,
1106+ ] )
1107+ . set ( 0 ) ;
1108+
1109+ metrics:: CLIENT_MEMORY_ALLOCATED
1110+ . with_label_values ( & [
1111+ & input. client_id . to_string ( ) ,
1112+ & input. flavor . to_string ( ) ,
1113+ other_state,
1114+ ] )
1115+ . set ( 0 ) ;
1116+ metrics:: CLIENT_CPU_ALLOCATED
1117+ . with_label_values ( & [
1118+ & input. client_id . to_string ( ) ,
1119+ & input. flavor . to_string ( ) ,
1120+ other_state,
1121+ ] )
1122+ . set ( 0 ) ;
1123+
10241124 Ok ( ( ) )
10251125}
10261126
0 commit comments