@@ -388,7 +388,7 @@ private CompletableFuture<Void> sendMessages(final boolean cachedMessagesOnly) {
388388 Mono .fromFuture (() -> sendMessage (envelope )).timeout (sendFuturesTimeout )
389389 // Note that this will retry both for "send to client" timeouts and failures to delete messages on
390390 // acknowledgement
391- .retryWhen (Retry .backoff (4 , Duration .ofSeconds (1 ))),
391+ .retryWhen (Retry .backoff (4 , Duration .ofSeconds (1 )). filter ( throwable -> ! isConnectionClosedException ( throwable )) ),
392392 MESSAGE_SENDER_MAX_CONCURRENCY )
393393 .doOnError (this ::measureSendMessageErrors )
394394 .subscribeOn (messageDeliveryScheduler )
@@ -410,10 +410,7 @@ private void measureSendMessageErrors(final Throwable e) {
410410
411411 if (e instanceof TimeoutException ) {
412412 errorType = "timeout" ;
413- } else if (e instanceof java .nio .channels .ClosedChannelException ||
414- e == WebSocketResourceProvider .CONNECTION_CLOSED_EXCEPTION ||
415- e instanceof org .eclipse .jetty .io .EofException ||
416- (e instanceof StaticException staticException && "Closed" .equals (staticException .getMessage ()))) {
413+ } else if (isConnectionClosedException (e )) {
417414 errorType = "connectionClosed" ;
418415 } else {
419416 logger .warn ("Send message failed" , e );
@@ -427,6 +424,13 @@ private void measureSendMessageErrors(final Throwable e) {
427424 .increment ();
428425 }
429426
427+ private static boolean isConnectionClosedException (final Throwable throwable ) {
428+ return throwable instanceof java .nio .channels .ClosedChannelException ||
429+ throwable == WebSocketResourceProvider .CONNECTION_CLOSED_EXCEPTION ||
430+ throwable instanceof org .eclipse .jetty .io .EofException ||
431+ (throwable instanceof StaticException staticException && "Closed" .equals (staticException .getMessage ()));
432+ }
433+
430434 private CompletableFuture <Void > sendMessage (Envelope envelope ) {
431435 final UUID messageGuid = UUID .fromString (envelope .getServerGuid ());
432436
0 commit comments