Skip to content

Commit 7a22f26

Browse files
authored
[Hitless Upgrades] Timeouts seen during endpoint re-bind and migrate (#3426)
* [maintenace-event timeouts seen during endpoint re-bind and migrate on single proxy policy * [hitless upgrade] address review comments
1 parent c5ab6bb commit 7a22f26

File tree

2 files changed

+26
-12
lines changed

2 files changed

+26
-12
lines changed

src/main/java/io/lettuce/core/protocol/CommandHandler.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -627,13 +627,6 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
627627
}
628628

629629
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws InterruptedException {
630-
final boolean rebindInProgress = ctx.channel().hasAttr(REBIND_ATTRIBUTE)
631-
&& ctx.channel().attr(REBIND_ATTRIBUTE).get() != null
632-
&& ctx.channel().attr(REBIND_ATTRIBUTE).get().equals(RebindState.STARTED);
633-
if (debugEnabled && rebindInProgress) {
634-
logger.debug("{} Processing command while rebind is in progress, stack has {} more to process", logPrefix(),
635-
stack.size());
636-
}
637630

638631
if (pristine) {
639632

@@ -720,9 +713,17 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Interrup
720713
}
721714
}
722715

723-
if (rebindInProgress && stack.isEmpty()) {
724-
logger.info("{} Rebind completed at {}", logPrefix(), LocalTime.now());
725-
ctx.channel().attr(REBIND_ATTRIBUTE).set(RebindState.COMPLETED);
716+
final boolean rebindInProgress = ctx.channel().hasAttr(REBIND_ATTRIBUTE)
717+
&& ctx.channel().attr(REBIND_ATTRIBUTE).get() != null
718+
&& ctx.channel().attr(REBIND_ATTRIBUTE).get().equals(RebindState.STARTED);
719+
720+
if (rebindInProgress) {
721+
if (stack.isEmpty()) {
722+
logger.info("{} Rebind completed at {}", logPrefix(), LocalTime.now());
723+
ctx.channel().attr(REBIND_ATTRIBUTE).set(RebindState.COMPLETED);
724+
} else {
725+
logger.debug("{} Rebind in progress, {} commands remaining in the stack", logPrefix(), stack.size());
726+
}
726727
}
727728

728729
decodeBufferPolicy.afterDecoding(buffer);

src/main/java/io/lettuce/core/protocol/MaintenanceAwareConnectionWatchdog.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,13 @@ private static final class State {
361361
this.rebindAddress = rebindAddress;
362362
}
363363

364+
public String toString() {
365+
StringBuilder sb = new StringBuilder();
366+
367+
return sb.append("State [cutoff=").append(cutoff).append(", rebindAddress=").append(rebindAddress).append("]")
368+
.toString();
369+
}
370+
364371
}
365372

366373
private final AtomicReference<State> state = new AtomicReference<>();
@@ -401,11 +408,17 @@ public void rebind(Duration duration, SocketAddress rebindAddress) {
401408
public Mono<SocketAddress> wrappedSupplier(Mono<SocketAddress> original) {
402409
return Mono.defer(() -> {
403410
State current = state.get();
411+
logger.debug("RebindAwareAddressSupplier rebind state: {}", state.get());
404412
if (current != null && current.rebindAddress != null && clock.instant().isBefore(current.cutoff)) {
405-
return Mono.just(current.rebindAddress);
413+
logger.debug("RebindAwareAddressSupplier using rebind address: {}", state.get());
414+
return Mono.just(current.rebindAddress)
415+
.doOnSubscribe(s -> logger.debug("RebindAwareAddressSupplier subscribed to rebind address"))
416+
.doOnNext(address -> logger.debug("RebindAwareAddressSupplier rebind address: {}", address));
406417
} else {
418+
logger.debug("RebindAwareAddressSupplier falling back to original.");
407419
state.compareAndSet(current, null);
408-
return original;
420+
return original.doOnSubscribe(s -> logger.debug("RebindAwareAddressSupplier original to rebind address"))
421+
.doOnNext(address -> logger.debug("RebindAwareAddressSupplier original address: {}", address));
409422
}
410423
});
411424
}

0 commit comments

Comments
 (0)