@@ -134,9 +134,9 @@ void flb_engine_reschedule_retries(struct flb_config *config)
134
134
task = mk_list_entry (t_head , struct flb_task , _head );
135
135
136
136
if (task -> users > 0 ) {
137
- flb_debug ("[engine] retry=%p for task %i already scheduled to run, "
138
- "not re-scheduling it." ,
139
- retry , task -> id );
137
+ flb_debug ("[engine] task %i already scheduled to run, not re-scheduling it." ,
138
+ task -> id
139
+ );
140
140
141
141
continue ;
142
142
}
@@ -581,7 +581,7 @@ static FLB_INLINE int flb_engine_handle_event(flb_pipefd_t fd, int mask,
581
581
return 0 ;
582
582
}
583
583
else if (config -> shutdown_fd == fd ) {
584
- flb_utils_pipe_byte_consume (fd );
584
+ flb_utils_timer_consume (fd );
585
585
return FLB_ENGINE_SHUTDOWN ;
586
586
}
587
587
else if (config -> ch_manager [0 ] == fd ) {
@@ -716,7 +716,6 @@ int flb_engine_start(struct flb_config *config)
716
716
struct flb_sched * sched ;
717
717
struct flb_net_dns dns_ctx ;
718
718
struct flb_notification * notification ;
719
- int exiting = FLB_FALSE ;
720
719
721
720
/* Initialize the networking layer */
722
721
flb_net_lib_init ();
@@ -1001,6 +1000,12 @@ int flb_engine_start(struct flb_config *config)
1001
1000
flb_event_priority_live_foreach (event , evl_bktq , evl , FLB_ENGINE_LOOP_MAX_ITER ) {
1002
1001
if (event -> type == FLB_ENGINE_EV_CORE ) {
1003
1002
ret = flb_engine_handle_event (event -> fd , event -> mask , config );
1003
+
1004
+ /*
1005
+ * This block will be called once on engine stop.
1006
+ * Will reschedule task to 1 sec. retry.
1007
+ * Also timer with shutdown event will be created.
1008
+ */
1004
1009
if (ret == FLB_ENGINE_STOP ) {
1005
1010
if (config -> grace_count == 0 ) {
1006
1011
if (config -> grace >= 0 ) {
@@ -1015,11 +1020,7 @@ int flb_engine_start(struct flb_config *config)
1015
1020
}
1016
1021
1017
1022
/* mark the runtime as the ingestion is not active and that we are in shutting down mode */
1018
- config -> is_ingestion_active = FLB_FALSE ;
1019
- config -> is_shutting_down = FLB_TRUE ;
1020
-
1021
- /* pause all input plugin instances */
1022
- flb_input_pause_all (config );
1023
+ flb_engine_stop_ingestion (config );
1023
1024
1024
1025
/*
1025
1026
* We are preparing to shutdown, we give a graceful time
@@ -1028,6 +1029,7 @@ int flb_engine_start(struct flb_config *config)
1028
1029
event = & config -> event_shutdown ;
1029
1030
event -> mask = MK_EVENT_EMPTY ;
1030
1031
event -> status = MK_EVENT_NONE ;
1032
+ event -> priority = FLB_ENGINE_PRIORITY_SHUTDOWN ;
1031
1033
1032
1034
/*
1033
1035
* Configure a timer of 1 second, on expiration the code will
@@ -1038,11 +1040,18 @@ int flb_engine_start(struct flb_config *config)
1038
1040
* If no tasks exists, there is no need to wait for the maximum
1039
1041
* grace period.
1040
1042
*/
1041
- config -> shutdown_fd = mk_event_timeout_create (evl ,
1042
- 1 ,
1043
- 0 ,
1044
- event );
1045
- event -> priority = FLB_ENGINE_PRIORITY_SHUTDOWN ;
1043
+ if (config -> shutdown_fd <= 0 ) {
1044
+ config -> shutdown_fd = mk_event_timeout_create (evl ,
1045
+ 1 ,
1046
+ 0 ,
1047
+ event );
1048
+
1049
+ if (config -> shutdown_fd == -1 ) {
1050
+ flb_error ("[engine] could not create shutdown timer" );
1051
+ /* fail early so we don't silently skip scheduled shutdown */
1052
+ return -1 ;
1053
+ }
1054
+ }
1046
1055
}
1047
1056
else if (ret == FLB_ENGINE_SHUTDOWN ) {
1048
1057
/* Increase the grace counter */
@@ -1062,10 +1071,19 @@ int flb_engine_start(struct flb_config *config)
1062
1071
fs_chunks = 0 ;
1063
1072
tasks = flb_task_running_count (config );
1064
1073
flb_storage_chunk_count (config , & mem_chunks , & fs_chunks );
1074
+
1075
+ if ((mem_chunks + fs_chunks ) > 0 ) {
1076
+ flb_info ("[engine] pending chunk count: memory=%d, filesystem=%d; grace_timer=%d" ,
1077
+ mem_chunks , fs_chunks , config -> grace_count );
1078
+ }
1079
+
1080
+ if (tasks > 0 ) {
1081
+ flb_task_running_print (config );
1082
+ }
1083
+
1065
1084
ret = tasks + mem_chunks + fs_chunks ;
1066
1085
if (ret > 0 && (config -> grace_count < config -> grace || config -> grace == -1 )) {
1067
1086
if (config -> grace_count == 1 ) {
1068
- flb_task_running_print (config );
1069
1087
/*
1070
1088
* If storage.backlog.shutdown_flush is enabled, attempt to flush pending
1071
1089
* filesystem chunks during shutdown. This is particularly useful in scenarios
@@ -1079,32 +1097,10 @@ int flb_engine_start(struct flb_config *config)
1079
1097
}
1080
1098
}
1081
1099
}
1082
- if ((mem_chunks + fs_chunks ) > 0 ) {
1083
- flb_info ("[engine] pending chunk count: memory=%d, filesystem=%d; grace_timer=%d" ,
1084
- mem_chunks , fs_chunks , config -> grace_count );
1085
- }
1086
-
1087
1100
/* Create new tasks for pending chunks */
1088
1101
flb_engine_flush (config , NULL );
1089
- if (config -> grace_count < config -> grace_input ) {
1090
- if (exiting == FLB_FALSE ) {
1091
- flb_engine_exit (config );
1092
- exiting = FLB_TRUE ;
1093
- }
1094
- } else {
1095
- if (config -> is_ingestion_active == FLB_TRUE ) {
1096
- flb_engine_stop_ingestion (config );
1097
- }
1098
- }
1099
1102
}
1100
1103
else {
1101
- if (tasks > 0 ) {
1102
- flb_task_running_print (config );
1103
- }
1104
- if ((mem_chunks + fs_chunks ) > 0 ) {
1105
- flb_info ("[engine] pending chunk count: memory=%d, filesystem=%d; grace_timer=%d" ,
1106
- mem_chunks , fs_chunks , config -> grace_count );
1107
- }
1108
1104
flb_info ("[engine] service has stopped (%i pending tasks)" ,
1109
1105
tasks );
1110
1106
ret = config -> exit_status_code ;
@@ -1115,7 +1111,6 @@ int flb_engine_start(struct flb_config *config)
1115
1111
& config -> event_shutdown );
1116
1112
}
1117
1113
1118
- config = NULL ;
1119
1114
return ret ;
1120
1115
}
1121
1116
}
0 commit comments