@@ -45,6 +45,7 @@ export class StreamProcessor {
45
45
private readyState : number ;
46
46
private log : ConsoleLog ;
47
47
private retryAttempt = 0 ;
48
+ private retryTimeout : NodeJS . Timeout | undefined ;
48
49
49
50
constructor (
50
51
api : ClientApi ,
@@ -111,16 +112,19 @@ export class StreamProcessor {
111
112
} ;
112
113
113
114
const onFailed = ( msg : string ) => {
114
- if ( this . readyState !== StreamProcessor . CLOSED ) {
115
+ if ( this . readyState !== StreamProcessor . CLOSED && ! this . retryTimeout ) {
115
116
this . retryAttempt += 1 ;
116
117
117
118
const delayMs = this . getRandomRetryDelayMs ( ) ;
118
119
warnStreamDisconnectedWithRetry ( msg , delayMs , this . log ) ;
119
120
this . readyState = StreamProcessor . RETRYING ;
120
121
this . eventBus . emit ( StreamEvent . RETRYING ) ;
121
122
122
- setTimeout ( ( ) => {
123
- this . connect ( url , options , onConnected , onFailed ) ;
123
+ this . retryTimeout = setTimeout ( ( ) => {
124
+ this . retryTimeout = undefined ;
125
+ if ( this . readyState !== StreamProcessor . CLOSED ) {
126
+ this . connect ( url , options , onConnected , onFailed ) ;
127
+ }
124
128
} , delayMs ) ;
125
129
}
126
130
} ;
@@ -135,6 +139,14 @@ export class StreamProcessor {
135
139
return Math . min ( delayMs , 60000 ) ;
136
140
}
137
141
142
+ private cleanupConnection ( ) : void {
143
+ if ( this . request ) {
144
+ this . request . removeAllListeners ( ) ;
145
+ this . request . destroy ( ) ;
146
+ this . request = undefined ;
147
+ }
148
+ }
149
+
138
150
private connect (
139
151
url : string ,
140
152
options : RequestOptions ,
@@ -146,6 +158,8 @@ export class StreamProcessor {
146
158
return ;
147
159
}
148
160
161
+ this . cleanupConnection ( ) ;
162
+
149
163
const isSecure = url . startsWith ( 'https:' ) ;
150
164
this . log . debug ( 'SSE HTTP start request' , url ) ;
151
165
@@ -174,8 +188,8 @@ export class StreamProcessor {
174
188
. on ( 'timeout' , ( ) => {
175
189
onFailed (
176
190
'SSE request timed out after ' +
177
- StreamProcessor . SSE_TIMEOUT_MS +
178
- 'ms' ,
191
+ StreamProcessor . SSE_TIMEOUT_MS +
192
+ 'ms' ,
179
193
) ;
180
194
} )
181
195
. setTimeout ( StreamProcessor . SSE_TIMEOUT_MS ) ;
@@ -256,6 +270,8 @@ export class StreamProcessor {
256
270
return ;
257
271
}
258
272
273
+ clearTimeout ( this . retryTimeout ) ;
274
+
259
275
this . readyState = StreamProcessor . CLOSED ;
260
276
this . log . info ( 'Closing StreamProcessor' ) ;
261
277
0 commit comments