Skip to content

Commit 738b55c

Browse files
del watcher from watchers when canceled
Signed-off-by: chaofengliu-okg <[email protected]>
1 parent 6ea9fe4 commit 738b55c

File tree

1 file changed

+14
-7
lines changed

1 file changed

+14
-7
lines changed

jetcd-core/src/main/java/io/etcd/jetcd/impl/WatchImpl.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -226,13 +226,20 @@ public void close() {
226226

227227
@Override
228228
public void cancel() {
229-
WriteStream<WatchRequest> ws = wstream.get();
230-
if (ws instanceof GrpcWriteStream<?>) {
231-
GrpcWriteStream<?> gws = (GrpcWriteStream<?>) ws;
232-
var observer = gws.streamObserver();
233-
if (observer instanceof ClientCallStreamObserver<?>) {
234-
ClientCallStreamObserver<?> callObs = (ClientCallStreamObserver<?>) observer;
235-
callObs.cancel("Watcher cancelled", null);
229+
synchronized (WatchImpl.this.lock) {
230+
if (closed.compareAndSet(false, true)) {
231+
WriteStream<WatchRequest> ws = wstream.get();
232+
if (ws instanceof GrpcWriteStream<?>) {
233+
GrpcWriteStream<?> gws = (GrpcWriteStream<?>) ws;
234+
var observer = gws.streamObserver();
235+
if (observer instanceof ClientCallStreamObserver<?>) {
236+
ClientCallStreamObserver<?> callObs = (ClientCallStreamObserver<?>) observer;
237+
callObs.cancel("Watcher cancelled", null);
238+
}
239+
}
240+
id = -1;
241+
listener.onCompleted();
242+
watchers.remove(this);
236243
}
237244
}
238245
}

0 commit comments

Comments
 (0)