Skip to content

Commit f7f1d97

Browse files
committed
attempt to handle fatal error
Signed-off-by: lea konvalinka <[email protected]>
1 parent 057751b commit f7f1d97

File tree

8 files changed

+68
-25
lines changed

8 files changed

+68
-25
lines changed

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ public class FlagdOptions {
129129
* Defaults to empty list
130130
*/
131131
@Builder.Default
132-
private List<String> nonRetryableStatusCodes = new ArrayList<>();
132+
private List<String> fatalStatusCodes = new ArrayList<>();
133133

134134
/**
135135
* Selector to be used with flag sync gRPC contract.

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import dev.openfeature.contrib.providers.flagd.resolver.process.InProcessResolver;
66
import dev.openfeature.contrib.providers.flagd.resolver.rpc.RpcResolver;
77
import dev.openfeature.contrib.providers.flagd.resolver.rpc.cache.Cache;
8+
import dev.openfeature.sdk.ErrorCode;
89
import dev.openfeature.sdk.EvaluationContext;
910
import dev.openfeature.sdk.EventProvider;
1011
import dev.openfeature.sdk.Hook;
@@ -135,7 +136,7 @@ public void initialize(EvaluationContext evaluationContext) throws Exception {
135136
public void shutdown() {
136137
synchronized (syncResources) {
137138
try {
138-
if (!syncResources.isInitialized() || syncResources.isShutDown()) {
139+
if (syncResources.isShutDown()) {
139140
return;
140141
}
141142

@@ -193,7 +194,7 @@ EvaluationContext getEnrichedContext() {
193194

194195
@SuppressWarnings("checkstyle:fallthrough")
195196
private void onProviderEvent(FlagdProviderEvent flagdProviderEvent) {
196-
log.debug("FlagdProviderEvent event {} ", flagdProviderEvent.getEvent());
197+
log.info("FlagdProviderEvent event {} ", flagdProviderEvent.getEvent());
197198
synchronized (syncResources) {
198199
/*
199200
* We only use Error and Ready as previous states.
@@ -222,20 +223,26 @@ private void onProviderEvent(FlagdProviderEvent flagdProviderEvent) {
222223
onReady();
223224
syncResources.setPreviousEvent(ProviderEvent.PROVIDER_READY);
224225
break;
225-
226-
case PROVIDER_ERROR:
227-
if (syncResources.getPreviousEvent() != ProviderEvent.PROVIDER_ERROR) {
228-
onError();
229-
syncResources.setPreviousEvent(ProviderEvent.PROVIDER_ERROR);
226+
case PROVIDER_STALE:
227+
if (syncResources.getPreviousEvent() != ProviderEvent.PROVIDER_STALE) {
228+
onStale();
229+
syncResources.setPreviousEvent(ProviderEvent.PROVIDER_STALE);
230230
}
231231
break;
232-
232+
case PROVIDER_ERROR:
233+
onError();
234+
break;
233235
default:
234236
log.warn("Unknown event {}", flagdProviderEvent.getEvent());
235237
}
236238
}
237239
}
238240

241+
private void onError() {
242+
this.emitProviderError(ProviderEventDetails.builder().errorCode(ErrorCode.PROVIDER_FATAL).build());
243+
shutdown();
244+
}
245+
239246
private void onConfigurationChanged(FlagdProviderEvent flagdProviderEvent) {
240247
this.emitProviderConfigurationChanged(ProviderEventDetails.builder()
241248
.flagsChanged(flagdProviderEvent.getFlagsChanged())
@@ -255,7 +262,7 @@ private void onReady() {
255262
ProviderEventDetails.builder().message("connected to flagd").build());
256263
}
257264

258-
private void onError() {
265+
private void onStale() {
259266
log.debug(
260267
"Stream error. Emitting STALE, scheduling ERROR, and waiting {}s for connection to become available.",
261268
gracePeriod);
@@ -270,7 +277,7 @@ private void onError() {
270277
if (!errorExecutor.isShutdown()) {
271278
errorTask = errorExecutor.schedule(
272279
() -> {
273-
if (syncResources.getPreviousEvent() == ProviderEvent.PROVIDER_ERROR) {
280+
if (syncResources.getPreviousEvent() == ProviderEvent.PROVIDER_STALE) {
274281
log.error(
275282
"Provider did not reconnect successfully within {}s. Emitting ERROR event...",
276283
gracePeriod);

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,9 @@ public void init() throws Exception {
7777
storageStateChange.getSyncMetadata()));
7878
log.debug("post onConnectionEvent.accept ProviderEvent.PROVIDER_CONFIGURATION_CHANGED");
7979
break;
80+
case STALE:
81+
onConnectionEvent.accept(new FlagdProviderEvent(ProviderEvent.PROVIDER_STALE));
82+
break;
8083
case ERROR:
8184
onConnectionEvent.accept(new FlagdProviderEvent(ProviderEvent.PROVIDER_ERROR));
8285
break;

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,11 @@ private void streamerListener(final QueueSource connector) throws InterruptedExc
138138
}
139139
break;
140140
case ERROR:
141+
if (!stateBlockingQueue.offer(new StorageStateChange(StorageState.STALE))) {
142+
log.warn("Failed to convey STALE status, queue is full");
143+
}
144+
break;
145+
case FATAL:
141146
if (!stateBlockingQueue.offer(new StorageStateChange(StorageState.ERROR))) {
142147
log.warn("Failed to convey ERROR status, queue is full");
143148
}

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/QueuePayloadType.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,6 @@
33
/** Payload type emitted by {@link QueueSource}. */
44
public enum QueuePayloadType {
55
DATA,
6-
ERROR
6+
ERROR,
7+
FATAL
78
}

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java

Lines changed: 39 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import dev.openfeature.flagd.grpc.sync.Sync.SyncFlagsRequest;
1717
import dev.openfeature.flagd.grpc.sync.Sync.SyncFlagsResponse;
1818
import dev.openfeature.sdk.Awaitable;
19-
import dev.openfeature.sdk.exceptions.FatalError;
2019
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
2120
import io.grpc.Status;
2221
import io.grpc.StatusRuntimeException;
@@ -50,7 +49,7 @@ public class SyncStreamQueueSource implements QueueSource {
5049
private final BlockingQueue<QueuePayload> outgoingQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
5150
private final FlagSyncServiceStub flagSyncStub;
5251
private final FlagSyncServiceBlockingStub metadataStub;
53-
private final List<String> nonRetryableStatusCodes;
52+
private final List<String> fatalStatusCodes;
5453

5554
/**
5655
* Creates a new SyncStreamQueueSource responsible for observing the event stream.
@@ -67,7 +66,7 @@ public SyncStreamQueueSource(final FlagdOptions options, Consumer<FlagdProviderE
6766
FlagSyncServiceGrpc.newStub(channelConnector.getChannel()).withWaitForReady();
6867
metadataStub = FlagSyncServiceGrpc.newBlockingStub(channelConnector.getChannel())
6968
.withWaitForReady();
70-
nonRetryableStatusCodes = options.getNonRetryableStatusCodes();
69+
fatalStatusCodes = options.getFatalStatusCodes();
7170
}
7271

7372
// internal use only
@@ -85,7 +84,7 @@ protected SyncStreamQueueSource(
8584
flagSyncStub = stubMock;
8685
syncMetadataDisabled = options.isSyncMetadataDisabled();
8786
metadataStub = blockingStubMock;
88-
nonRetryableStatusCodes = options.getNonRetryableStatusCodes();
87+
fatalStatusCodes = options.getFatalStatusCodes();
8988
}
9089

9190
/** Initialize sync stream connector. */
@@ -123,12 +122,14 @@ private void observeSyncStream() {
123122
while (!shutdown.get()) {
124123
try {
125124
log.debug("Initializing sync stream request");
126-
SyncStreamObserver observer = new SyncStreamObserver(outgoingQueue);
125+
SyncStreamObserver observer = new SyncStreamObserver(outgoingQueue, fatalStatusCodes, maxBackoffMs);
127126
try {
128127
observer.metadata = getMetadata();
129128
} catch (StatusRuntimeException metaEx) {
130-
if (nonRetryableStatusCodes.contains(metaEx.getStatus().getCode().name())) {
131-
throw new FatalError("Failed to connect for metadata request, not retrying for error " + metaEx.getStatus());
129+
if (fatalStatusCodes.contains(metaEx.getStatus().getCode().name())) {
130+
//throw new FatalError("Failed to connect for metadata request, not retrying for error " + metaEx.getStatus());
131+
enqueueFatal("Fatal: Failed to connect for metadata request, not retrying for error " + metaEx.getStatus().getCode());
132+
return;
132133
}
133134
// retry for other status codes
134135
String message = metaEx.getMessage();
@@ -141,8 +142,10 @@ private void observeSyncStream() {
141142
try {
142143
syncFlags(observer);
143144
} catch (StatusRuntimeException ex) {
144-
if (nonRetryableStatusCodes.contains(ex.getStatus().toString())) {
145-
throw new FatalError("Failed to connect to sync stream, not retrying for error " + ex.getStatus());
145+
if (fatalStatusCodes.contains(ex.getStatus().getCode().toString())) {
146+
//throw new FatalError("Failed to connect for metadata request, not retrying for error " + ex.getStatus().getCode());
147+
enqueueFatal("Fatal: Failed to connect for metadata request, not retrying for error " + ex.getStatus().getCode());
148+
return;
146149
}
147150
// retry for other status codes
148151
log.error("Unexpected sync stream exception, will restart.", ex);
@@ -213,20 +216,35 @@ private void enqueueError(String message) {
213216
enqueueError(outgoingQueue, message);
214217
}
215218

219+
private void enqueueFatal(String message) {
220+
enqueueFatal(outgoingQueue, message);
221+
}
222+
216223
private static void enqueueError(BlockingQueue<QueuePayload> queue, String message) {
217224
if (!queue.offer(new QueuePayload(QueuePayloadType.ERROR, message, null))) {
218225
log.error("Failed to convey ERROR status, queue is full");
219226
}
220227
}
221228

229+
private static void enqueueFatal(BlockingQueue<QueuePayload> queue, String message) {
230+
if (!queue.offer(new QueuePayload(QueuePayloadType.FATAL, message, null))) {
231+
log.error("Failed to convey FATAL status, queue is full");
232+
}
233+
}
234+
222235
private static class SyncStreamObserver implements StreamObserver<SyncFlagsResponse> {
223236
private final BlockingQueue<QueuePayload> outgoingQueue;
224237
private final Awaitable done = new Awaitable();
238+
private final List<String> fatalStatusCodes;
239+
private final int maxBackoffMs;
225240

226241
private Struct metadata;
227242

228-
public SyncStreamObserver(BlockingQueue<QueuePayload> outgoingQueue) {
243+
public SyncStreamObserver(BlockingQueue<QueuePayload> outgoingQueue, List<String> fatalStatusCodes,
244+
int maxBackoffMs) {
229245
this.outgoingQueue = outgoingQueue;
246+
this.fatalStatusCodes = fatalStatusCodes;
247+
this.maxBackoffMs = maxBackoffMs;
230248
}
231249

232250
@Override
@@ -244,10 +262,20 @@ public void onNext(SyncFlagsResponse syncFlagsResponse) {
244262
@Override
245263
public void onError(Throwable throwable) {
246264
try {
265+
Status status = Status.fromThrowable(throwable);
247266
String message = throwable != null ? throwable.getMessage() : "unknown";
248267
log.debug("Stream error: {}, will restart", message, throwable);
249-
enqueueError(outgoingQueue, String.format("Error from stream: %s", message));
268+
if (fatalStatusCodes.contains(status.getCode())) {
269+
enqueueFatal(outgoingQueue, String.format("Error from stream: %s", message));
270+
} else {
271+
enqueueError(outgoingQueue, String.format("Error from stream: %s", message));
272+
}
250273
} finally {
274+
try {
275+
Thread.sleep(this.maxBackoffMs);
276+
} catch (InterruptedException e) {
277+
// ignore
278+
}
251279
done.wakeup();
252280
}
253281
}

providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/ProviderSteps.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,6 @@ public void the_flag_was_modded() {
202202

203203
@Then("the client is in {} state")
204204
public void the_client_is_in_fatal_state(String clientState) {
205-
assertThat(state.client.getProviderState()).isEqualTo(ProviderState.FATAL);
205+
assertThat(state.client.getProviderState()).isEqualTo(ProviderState.valueOf(clientState.toUpperCase()));
206206
}
207207
}

providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/config/ConfigSteps.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,6 @@ private static String mapOptionNames(String option) {
121121
propertyMapper.put("keepAliveTime", "keepAlive");
122122
propertyMapper.put("retryBackoffMaxMs", "keepAlive");
123123
propertyMapper.put("cache", "cacheType");
124-
propertyMapper.put("fatalStatusCodes", "nonRetryableStatusCodes");
125124

126125
if (propertyMapper.get(option) != null) {
127126
option = propertyMapper.get(option);

0 commit comments

Comments
 (0)