Skip to content

Commit f0a1db2

Browse files
committed
attempt to handle fatal error
Signed-off-by: lea konvalinka <[email protected]>
1 parent 8b7f574 commit f0a1db2

File tree

8 files changed

+60
-25
lines changed

8 files changed

+60
-25
lines changed

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ public class FlagdOptions {
129129
* Defaults to empty list
130130
*/
131131
@Builder.Default
132-
private List<String> nonRetryableStatusCodes = new ArrayList<>();
132+
private List<String> fatalStatusCodes = new ArrayList<>();
133133

134134
/**
135135
* Selector to be used with flag sync gRPC contract.

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import dev.openfeature.contrib.providers.flagd.resolver.process.InProcessResolver;
66
import dev.openfeature.contrib.providers.flagd.resolver.rpc.RpcResolver;
77
import dev.openfeature.contrib.providers.flagd.resolver.rpc.cache.Cache;
8+
import dev.openfeature.sdk.ErrorCode;
89
import dev.openfeature.sdk.EvaluationContext;
910
import dev.openfeature.sdk.EventProvider;
1011
import dev.openfeature.sdk.Hook;
@@ -135,7 +136,7 @@ public void initialize(EvaluationContext evaluationContext) throws Exception {
135136
public void shutdown() {
136137
synchronized (syncResources) {
137138
try {
138-
if (!syncResources.isInitialized() || syncResources.isShutDown()) {
139+
if (syncResources.isShutDown()) {
139140
return;
140141
}
141142

@@ -193,7 +194,7 @@ EvaluationContext getEnrichedContext() {
193194

194195
@SuppressWarnings("checkstyle:fallthrough")
195196
private void onProviderEvent(FlagdProviderEvent flagdProviderEvent) {
196-
log.debug("FlagdProviderEvent event {} ", flagdProviderEvent.getEvent());
197+
log.info("FlagdProviderEvent event {} ", flagdProviderEvent.getEvent());
197198
synchronized (syncResources) {
198199
/*
199200
* We only use Error and Ready as previous states.
@@ -222,20 +223,26 @@ private void onProviderEvent(FlagdProviderEvent flagdProviderEvent) {
222223
onReady();
223224
syncResources.setPreviousEvent(ProviderEvent.PROVIDER_READY);
224225
break;
225-
226-
case PROVIDER_ERROR:
227-
if (syncResources.getPreviousEvent() != ProviderEvent.PROVIDER_ERROR) {
228-
onError();
229-
syncResources.setPreviousEvent(ProviderEvent.PROVIDER_ERROR);
226+
case PROVIDER_STALE:
227+
if (syncResources.getPreviousEvent() != ProviderEvent.PROVIDER_STALE) {
228+
onStale();
229+
syncResources.setPreviousEvent(ProviderEvent.PROVIDER_STALE);
230230
}
231231
break;
232-
232+
case PROVIDER_ERROR:
233+
onError();
234+
break;
233235
default:
234236
log.warn("Unknown event {}", flagdProviderEvent.getEvent());
235237
}
236238
}
237239
}
238240

241+
private void onError() {
242+
this.emitProviderError(ProviderEventDetails.builder().errorCode(ErrorCode.PROVIDER_FATAL).build());
243+
shutdown();
244+
}
245+
239246
private void onConfigurationChanged(FlagdProviderEvent flagdProviderEvent) {
240247
this.emitProviderConfigurationChanged(ProviderEventDetails.builder()
241248
.flagsChanged(flagdProviderEvent.getFlagsChanged())
@@ -255,7 +262,7 @@ private void onReady() {
255262
ProviderEventDetails.builder().message("connected to flagd").build());
256263
}
257264

258-
private void onError() {
265+
private void onStale() {
259266
log.debug(
260267
"Stream error. Emitting STALE, scheduling ERROR, and waiting {}s for connection to become available.",
261268
gracePeriod);
@@ -270,7 +277,7 @@ private void onError() {
270277
if (!errorExecutor.isShutdown()) {
271278
errorTask = errorExecutor.schedule(
272279
() -> {
273-
if (syncResources.getPreviousEvent() == ProviderEvent.PROVIDER_ERROR) {
280+
if (syncResources.getPreviousEvent() == ProviderEvent.PROVIDER_STALE) {
274281
log.error(
275282
"Provider did not reconnect successfully within {}s. Emitting ERROR event...",
276283
gracePeriod);

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,9 @@ public void init() throws Exception {
7777
storageStateChange.getSyncMetadata()));
7878
log.debug("post onConnectionEvent.accept ProviderEvent.PROVIDER_CONFIGURATION_CHANGED");
7979
break;
80+
case STALE:
81+
onConnectionEvent.accept(new FlagdProviderEvent(ProviderEvent.PROVIDER_STALE));
82+
break;
8083
case ERROR:
8184
onConnectionEvent.accept(new FlagdProviderEvent(ProviderEvent.PROVIDER_ERROR));
8285
break;

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,11 @@ private void streamerListener(final QueueSource connector) throws InterruptedExc
138138
}
139139
break;
140140
case ERROR:
141+
if (!stateBlockingQueue.offer(new StorageStateChange(StorageState.STALE))) {
142+
log.warn("Failed to convey STALE status, queue is full");
143+
}
144+
break;
145+
case FATAL:
141146
if (!stateBlockingQueue.offer(new StorageStateChange(StorageState.ERROR))) {
142147
log.warn("Failed to convey ERROR status, queue is full");
143148
}

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/QueuePayloadType.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,6 @@
33
/** Payload type emitted by {@link QueueSource}. */
44
public enum QueuePayloadType {
55
DATA,
6-
ERROR
6+
ERROR,
7+
FATAL
78
}

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import dev.openfeature.flagd.grpc.sync.Sync.SyncFlagsRequest;
1717
import dev.openfeature.flagd.grpc.sync.Sync.SyncFlagsResponse;
1818
import dev.openfeature.sdk.Awaitable;
19-
import dev.openfeature.sdk.exceptions.FatalError;
2019
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
2120
import io.grpc.Status;
2221
import io.grpc.StatusRuntimeException;
@@ -51,7 +50,7 @@ public class SyncStreamQueueSource implements QueueSource {
5150
private final BlockingQueue<QueuePayload> outgoingQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
5251
private final FlagSyncServiceStub flagSyncStub;
5352
private final FlagSyncServiceBlockingStub metadataStub;
54-
private final List<String> nonRetryableStatusCodes;
53+
private final List<String> fatalStatusCodes;
5554

5655
/**
5756
* Creates a new SyncStreamQueueSource responsible for observing the event stream.
@@ -68,7 +67,7 @@ public SyncStreamQueueSource(final FlagdOptions options, Consumer<FlagdProviderE
6867
FlagSyncServiceGrpc.newStub(channelConnector.getChannel()).withWaitForReady();
6968
metadataStub = FlagSyncServiceGrpc.newBlockingStub(channelConnector.getChannel())
7069
.withWaitForReady();
71-
nonRetryableStatusCodes = options.getNonRetryableStatusCodes();
70+
fatalStatusCodes = options.getFatalStatusCodes();
7271
}
7372

7473
// internal use only
@@ -86,7 +85,7 @@ protected SyncStreamQueueSource(
8685
flagSyncStub = stubMock;
8786
syncMetadataDisabled = options.isSyncMetadataDisabled();
8887
metadataStub = blockingStubMock;
89-
nonRetryableStatusCodes = options.getNonRetryableStatusCodes();
88+
fatalStatusCodes = options.getFatalStatusCodes();
9089
}
9190

9291
/** Initialize sync stream connector. */
@@ -137,12 +136,14 @@ private void observeSyncStream() {
137136
}
138137

139138
log.debug("Initializing sync stream request");
140-
SyncStreamObserver observer = new SyncStreamObserver(outgoingQueue, shouldThrottle);
139+
SyncStreamObserver observer = new SyncStreamObserver(outgoingQueue, shouldThrottle, fatalStatusCodes);
141140
try {
142141
observer.metadata = getMetadata();
143142
} catch (StatusRuntimeException metaEx) {
144-
if (nonRetryableStatusCodes.contains(metaEx.getStatus().getCode().name())) {
145-
throw new FatalError("Failed to connect for metadata request, not retrying for error " + metaEx.getStatus());
143+
if (fatalStatusCodes.contains(metaEx.getStatus().getCode().name())) {
144+
//throw new FatalError("Failed to connect for metadata request, not retrying for error " + metaEx.getStatus());
145+
enqueueFatal("Fatal: Failed to connect for metadata request, not retrying for error " + metaEx.getStatus().getCode());
146+
return;
146147
}
147148
// retry for other status codes
148149
String message = metaEx.getMessage();
@@ -155,8 +156,10 @@ private void observeSyncStream() {
155156
try {
156157
syncFlags(observer);
157158
} catch (StatusRuntimeException ex) {
158-
if (nonRetryableStatusCodes.contains(ex.getStatus().toString())) {
159-
throw new FatalError("Failed to connect to sync stream, not retrying for error " + ex.getStatus());
159+
if (fatalStatusCodes.contains(ex.getStatus().getCode().toString())) {
160+
//throw new FatalError("Failed to connect for metadata request, not retrying for error " + ex.getStatus().getCode());
161+
enqueueFatal("Fatal: Failed to connect for metadata request, not retrying for error " + ex.getStatus().getCode());
162+
return;
160163
}
161164
// retry for other status codes
162165
log.error("Unexpected sync stream exception, will restart.", ex);
@@ -227,22 +230,34 @@ private void enqueueError(String message) {
227230
enqueueError(outgoingQueue, message);
228231
}
229232

233+
private void enqueueFatal(String message) {
234+
enqueueFatal(outgoingQueue, message);
235+
}
236+
230237
private static void enqueueError(BlockingQueue<QueuePayload> queue, String message) {
231238
if (!queue.offer(new QueuePayload(QueuePayloadType.ERROR, message, null))) {
232239
log.error("Failed to convey ERROR status, queue is full");
233240
}
234241
}
235242

243+
private static void enqueueFatal(BlockingQueue<QueuePayload> queue, String message) {
244+
if (!queue.offer(new QueuePayload(QueuePayloadType.FATAL, message, null))) {
245+
log.error("Failed to convey FATAL status, queue is full");
246+
}
247+
}
248+
236249
private static class SyncStreamObserver implements StreamObserver<SyncFlagsResponse> {
237250
private final BlockingQueue<QueuePayload> outgoingQueue;
238251
private final AtomicBoolean shouldThrottle;
239252
private final Awaitable done = new Awaitable();
253+
private final List<String> fatalStatusCodes;
240254

241255
private Struct metadata;
242256

243-
public SyncStreamObserver(BlockingQueue<QueuePayload> outgoingQueue, AtomicBoolean shouldThrottle) {
257+
public SyncStreamObserver(BlockingQueue<QueuePayload> outgoingQueue, AtomicBoolean shouldThrottle, List<String> fatalStatusCodes) {
244258
this.outgoingQueue = outgoingQueue;
245259
this.shouldThrottle = shouldThrottle;
260+
this.fatalStatusCodes = fatalStatusCodes;
246261
}
247262

248263
@Override
@@ -260,9 +275,14 @@ public void onNext(SyncFlagsResponse syncFlagsResponse) {
260275
@Override
261276
public void onError(Throwable throwable) {
262277
try {
278+
Status status = Status.fromThrowable(throwable);
263279
String message = throwable != null ? throwable.getMessage() : "unknown";
264280
log.debug("Stream error: {}, will restart", message, throwable);
265-
enqueueError(outgoingQueue, String.format("Error from stream: %s", message));
281+
if (fatalStatusCodes.contains(status.getCode())) {
282+
enqueueFatal(outgoingQueue, String.format("Error from stream: %s", message));
283+
} else {
284+
enqueueError(outgoingQueue, String.format("Error from stream: %s", message));
285+
}
266286

267287
// Set throttling flag to ensure backoff before retry
268288
this.shouldThrottle.set(true);

providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/ProviderSteps.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,6 @@ public void the_flag_was_modded() {
202202

203203
@Then("the client is in {} state")
204204
public void the_client_is_in_fatal_state(String clientState) {
205-
assertThat(state.client.getProviderState()).isEqualTo(ProviderState.FATAL);
205+
assertThat(state.client.getProviderState()).isEqualTo(ProviderState.valueOf(clientState.toUpperCase()));
206206
}
207207
}

providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/config/ConfigSteps.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,6 @@ private static String mapOptionNames(String option) {
121121
propertyMapper.put("keepAliveTime", "keepAlive");
122122
propertyMapper.put("retryBackoffMaxMs", "keepAlive");
123123
propertyMapper.put("cache", "cacheType");
124-
propertyMapper.put("fatalStatusCodes", "nonRetryableStatusCodes");
125124

126125
if (propertyMapper.get(option) != null) {
127126
option = propertyMapper.get(option);

0 commit comments

Comments
 (0)