Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ public interface Watcher<T> {
* Currently only used to indicate if the Watch should ignore the watch reconnect limit
*
* @return
*
* @deprecated to be removed in a future release.
*/
@Deprecated
default boolean reconnecting() {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,30 @@

public class WatcherException extends Exception {
private final String rawWatchMessage;
private final boolean reconnectionError;

public WatcherException(String message, Throwable cause) {
this(message, cause, null);
}

public WatcherException(String message) {
this(message, null, null);
}

public WatcherException(String message, Throwable cause, boolean reconnectionError) {
super(message);
rawWatchMessage = null;
this.reconnectionError = reconnectionError;
}

public WatcherException(String message, Throwable cause, String rawWatchMessage) {
super(message, cause);
this.rawWatchMessage = rawWatchMessage;
this.reconnectionError = false;
}

public KubernetesClientException asClientException() {
final Throwable cause = getCause();
final Throwable cause = Optional.ofNullable(getCause()).orElse(this);
return cause instanceof KubernetesClientException ? (KubernetesClientException) cause
: new KubernetesClientException(getMessage(), cause);
}
Expand All @@ -47,8 +54,13 @@ public boolean isHttpGone() {
|| (cause.getStatus() != null && cause.getStatus().getCode() == HttpURLConnection.HTTP_GONE));
}

public boolean isReconnectionError() {
return reconnectionError;
}

@SuppressWarnings("unused")
public Optional<String> getRawWatchMessage() {
return Optional.ofNullable(rawWatchMessage);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ final synchronized void cancelReconnect() {
/**
* Called to reestablish the connection. Should only be called once per request.
*/
void scheduleReconnect(WatchRequestState state) {
void scheduleReconnect(WatchRequestState state, Throwable t) {
if (!state.reconnected.compareAndSet(false, true)) {
return;
}
Expand All @@ -221,7 +221,7 @@ void scheduleReconnect(WatchRequestState state) {
}

if (cannotReconnect()) {
close(new WatcherException("Exhausted reconnects"));
close(new WatcherException("Exhausted reconnects", t, true));
return;
}

Expand Down Expand Up @@ -252,7 +252,7 @@ synchronized void reconnect() {
} catch (Exception e) {
// An unexpected error occurred and we didn't even get an onFailure callback.
logger.error("Exception in reconnect", e);
close(new WatcherException("Unhandled exception in reconnect attempt", e));
close(new WatcherException("Unhandled exception in reconnect attempt", e, true));
}
}

Expand Down Expand Up @@ -430,7 +430,7 @@ void watchEnded(Throwable t, WatchRequestState state) {
logEndError(t);
}
} finally {
scheduleReconnect(state);
scheduleReconnect(state, t);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ public void onClose(WatcherException exception) {
// start a whole new list/watch cycle
reconnect();
} else {
onException("watch", exception);
onException("watch", exception.isReconnectionError() ? exception.asClientException() : exception);
}
}

Expand All @@ -371,10 +371,6 @@ public void onClose() {
log.debug("Watch gracefully closed for {}", Reflector.this);
}

@Override
public boolean reconnecting() {
return true;
}
}

ReflectorWatcher getWatcher() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ void cancelReconnectNonNullAttempt() throws MalformedURLException {
awm.baseOperation.context = Mockito.mock(OperationContext.class);
Mockito.when(awm.baseOperation.context.getExecutor()).thenReturn(executor);

awm.scheduleReconnect(new WatchRequestState());
awm.scheduleReconnect(new WatchRequestState(), null);
// When
awm.cancelReconnect();
// Then
Expand All @@ -168,7 +168,7 @@ protected void startWatch() {
if (first) {
first = false;
// simulate failing before the call to startWatch finishes
ForkJoinPool.commonPool().execute(() -> scheduleReconnect(new WatchRequestState()));
ForkJoinPool.commonPool().execute(() -> scheduleReconnect(new WatchRequestState(), null));
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,10 @@ void testFailSafeReconnect() throws MalformedURLException, InterruptedException
return new WatchConnectionManager(client,
baseOperation, Mockito.mock(ListOptions.class), Mockito.mock(Watcher.class), 0, 0, 0) {
@Override
void scheduleReconnect(WatchRequestState state) {
void scheduleReconnect(WatchRequestState state, Throwable t) {
reconnect.countDown();
}

};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ private void setupHttpWatch(CompletableFuture<HttpResponse<AsyncBody>> future, C

WatchHTTPManager<HasMetadata, KubernetesResourceList<HasMetadata>> watch = new WatchHTTPManager(client,
baseOperation, Mockito.mock(ListOptions.class), Mockito.mock(Watcher.class), 1, 0) {

@Override
void scheduleReconnect(WatchRequestState state) {
void scheduleReconnect(WatchRequestState state, Throwable t) {
reconnect.countDown();
}
};
Expand Down
Loading