Skip to content

Commit b1b1883

Browse files
committed
feat: implement the custom data source option
this enables users to specify file data source initializer
1 parent b1e0123 commit b1b1883

File tree

6 files changed

+251
-49
lines changed

6 files changed

+251
-49
lines changed

packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1004,3 +1004,157 @@ it('consumes cancellation tokens correctly', async () => {
10041004
// eslint-disable-next-line no-underscore-dangle
10051005
expect(underTest._cancelTokens.length).toEqual(0);
10061006
});
1007+
1008+
it('handles multiple initializers with fallback when first initializer fails and second succeeds', async () => {
1009+
const mockInitializer1Error = {
1010+
name: 'Error',
1011+
message: 'First initializer failed',
1012+
};
1013+
const mockInitializer1: DataSource = {
1014+
start: jest
1015+
.fn()
1016+
.mockImplementation(
1017+
(
1018+
_dataCallback: (basis: boolean, data: any) => void,
1019+
_statusCallback: (status: DataSourceState, err?: any) => void,
1020+
) => {
1021+
_statusCallback(DataSourceState.Initializing);
1022+
_statusCallback(DataSourceState.Closed, mockInitializer1Error);
1023+
},
1024+
),
1025+
stop: jest.fn(),
1026+
};
1027+
1028+
const mockInitializer2Data = { key: 'init2' };
1029+
const mockInitializer2: DataSource = {
1030+
start: jest
1031+
.fn()
1032+
.mockImplementation(
1033+
(
1034+
_dataCallback: (basis: boolean, data: any) => void,
1035+
_statusCallback: (status: DataSourceState, err?: any) => void,
1036+
) => {
1037+
_statusCallback(DataSourceState.Initializing);
1038+
_statusCallback(DataSourceState.Valid);
1039+
_dataCallback(true, mockInitializer2Data);
1040+
_statusCallback(DataSourceState.Closed);
1041+
},
1042+
),
1043+
stop: jest.fn(),
1044+
};
1045+
1046+
const mockSynchronizer1Data = { key: 'sync1' };
1047+
const mockSynchronizer1 = {
1048+
start: jest
1049+
.fn()
1050+
.mockImplementation(
1051+
(
1052+
_dataCallback: (basis: boolean, data: any) => void,
1053+
_statusCallback: (status: DataSourceState, err?: any) => void,
1054+
) => {
1055+
_statusCallback(DataSourceState.Initializing);
1056+
_statusCallback(DataSourceState.Valid);
1057+
_dataCallback(false, mockSynchronizer1Data);
1058+
},
1059+
),
1060+
stop: jest.fn(),
1061+
};
1062+
1063+
const underTest = new CompositeDataSource(
1064+
[makeDataSourceFactory(mockInitializer1), makeDataSourceFactory(mockInitializer2)],
1065+
[makeDataSourceFactory(mockSynchronizer1)],
1066+
[],
1067+
undefined,
1068+
makeTestTransitionConditions(),
1069+
makeZeroBackoff(),
1070+
);
1071+
1072+
let dataCallback;
1073+
const statusCallback = jest.fn();
1074+
await new Promise<void>((resolve) => {
1075+
dataCallback = jest.fn((_: boolean, data: any) => {
1076+
if (data === mockSynchronizer1Data) {
1077+
resolve();
1078+
}
1079+
});
1080+
1081+
underTest.start(dataCallback, statusCallback);
1082+
});
1083+
1084+
expect(mockInitializer1.start).toHaveBeenCalledTimes(1);
1085+
expect(mockInitializer2.start).toHaveBeenCalledTimes(1);
1086+
expect(mockSynchronizer1.start).toHaveBeenCalledTimes(1);
1087+
expect(statusCallback).toHaveBeenCalledTimes(5);
1088+
expect(statusCallback).toHaveBeenNthCalledWith(1, DataSourceState.Initializing, undefined);
1089+
expect(statusCallback).toHaveBeenNthCalledWith(2, DataSourceState.Interrupted, mockInitializer1Error);
1090+
expect(statusCallback).toHaveBeenNthCalledWith(3, DataSourceState.Valid, undefined);
1091+
expect(statusCallback).toHaveBeenNthCalledWith(4, DataSourceState.Interrupted, undefined);
1092+
expect(statusCallback).toHaveBeenNthCalledWith(5, DataSourceState.Valid, undefined);
1093+
});
1094+
1095+
it('does not run second initializer when first initializer succeeds', async () => {
1096+
const mockInitializer1Data = { key: 'init1' };
1097+
const mockInitializer1: DataSource = {
1098+
start: jest
1099+
.fn()
1100+
.mockImplementation(
1101+
(
1102+
_dataCallback: (basis: boolean, data: any) => void,
1103+
_statusCallback: (status: DataSourceState, err?: any) => void,
1104+
) => {
1105+
_statusCallback(DataSourceState.Initializing);
1106+
_statusCallback(DataSourceState.Valid);
1107+
_dataCallback(true, mockInitializer1Data);
1108+
_statusCallback(DataSourceState.Closed);
1109+
},
1110+
),
1111+
stop: jest.fn(),
1112+
};
1113+
1114+
const mockInitializer2: DataSource = {
1115+
start: jest.fn(),
1116+
stop: jest.fn(),
1117+
};
1118+
1119+
const mockSynchronizer1Data = { key: 'sync1' };
1120+
const mockSynchronizer1 = {
1121+
start: jest
1122+
.fn()
1123+
.mockImplementation(
1124+
(
1125+
_dataCallback: (basis: boolean, data: any) => void,
1126+
_statusCallback: (status: DataSourceState, err?: any) => void,
1127+
) => {
1128+
_statusCallback(DataSourceState.Initializing);
1129+
_statusCallback(DataSourceState.Valid);
1130+
_dataCallback(false, mockSynchronizer1Data);
1131+
},
1132+
),
1133+
stop: jest.fn(),
1134+
};
1135+
1136+
const underTest = new CompositeDataSource(
1137+
[makeDataSourceFactory(mockInitializer1), makeDataSourceFactory(mockInitializer2)],
1138+
[makeDataSourceFactory(mockSynchronizer1)],
1139+
[],
1140+
undefined,
1141+
makeTestTransitionConditions(),
1142+
makeZeroBackoff(),
1143+
);
1144+
1145+
let dataCallback;
1146+
const statusCallback = jest.fn();
1147+
await new Promise<void>((resolve) => {
1148+
dataCallback = jest.fn((_: boolean, data: any) => {
1149+
if (data === mockSynchronizer1Data) {
1150+
resolve();
1151+
}
1152+
});
1153+
1154+
underTest.start(dataCallback, statusCallback);
1155+
});
1156+
1157+
expect(mockInitializer1.start).toHaveBeenCalledTimes(1);
1158+
expect(mockInitializer2.start).toHaveBeenCalledTimes(0);
1159+
expect(mockSynchronizer1.start).toHaveBeenCalledTimes(1);
1160+
});

packages/shared/sdk-server/src/LDClientImpl.ts

Lines changed: 84 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import { Hook } from './api/integrations/Hook';
3434
import { BigSegmentStoreMembership } from './api/interfaces';
3535
import { LDWaitForInitializationOptions } from './api/LDWaitForInitializationOptions';
3636
import {
37+
isCustomOptions,
3738
isPollingOnlyOptions,
3839
isStandardOptions,
3940
isStreamingOnlyOptions,
@@ -65,9 +66,10 @@ import FlagsStateBuilder from './FlagsStateBuilder';
6566
import HookRunner from './hooks/HookRunner';
6667
import MigrationOpEventToInputEvent from './MigrationOpEventConversion';
6768
import MigrationOpTracker from './MigrationOpTracker';
68-
import Configuration, { DEFAULT_POLL_INTERVAL } from './options/Configuration';
69+
import Configuration, { DEFAULT_POLL_INTERVAL, DEFAULT_STREAM_RECONNECT_DELAY } from './options/Configuration';
6970
import { ServerInternalOptions } from './options/ServerInternalOptions';
7071
import VersionedDataKinds from './store/VersionedDataKinds';
72+
import FileDataInitializerFDv2 from './data_sources/fileDataInitilizerFDv2';
7173

7274
const { ClientMessages, ErrorKinds, NullEventProcessor } = internal;
7375
enum InitState {
@@ -323,60 +325,96 @@ function constructFDv2(
323325
if (!(config.offline || config.dataSystem!.useLdd)) {
324326
// make the FDv2 composite datasource with initializers/synchronizers
325327
const initializers: subsystem.LDDataSourceFactory[] = [];
328+
const synchronizers: subsystem.LDDataSourceFactory[] = [];
329+
const fdv1FallbackSynchronizers: subsystem.LDDataSourceFactory[] = [];
330+
331+
if (isCustomOptions(dataSystem.dataSource)) {
332+
const {
333+
initializers: initializerConfigs = [],
334+
synchronizers: synchronizerConfigs = [],
335+
} = dataSystem.dataSource;
336+
337+
initializerConfigs.forEach((initializerConfig) => {
338+
switch (initializerConfig.type) {
339+
case 'file':
340+
initializers.push(() => new FileDataInitializerFDv2(initializerConfig, platform, config.logger));
341+
break;
342+
case 'polling':
343+
const { pollInterval = DEFAULT_POLL_INTERVAL } = initializerConfig;
344+
initializers.push(() => new PollingProcessorFDv2(new Requestor(config, platform.requests, baseHeaders, '/sdk/poll', config.logger), pollInterval, config.logger));
345+
break;
346+
default:
347+
throw new Error('Unsupported initializer type');
348+
}
349+
});
350+
synchronizerConfigs.forEach((synchronizerConfig) => {
351+
switch (synchronizerConfig.type) {
352+
case 'streaming':
353+
const { streamInitialReconnectDelay = DEFAULT_STREAM_RECONNECT_DELAY } = synchronizerConfig;
354+
synchronizers.push(() => new StreamingProcessorFDv2(clientContext, '/sdk/stream', [], baseHeaders, diagnosticsManager, streamInitialReconnectDelay));
355+
break;
356+
case 'polling':
357+
const { pollInterval = DEFAULT_POLL_INTERVAL } = synchronizerConfig;
358+
synchronizers.push(() => new PollingProcessorFDv2(new Requestor(config, platform.requests, baseHeaders, '/sdk/poll', config.logger), pollInterval, config.logger));
359+
break;
360+
default:
361+
throw new Error('Unsupported synchronizer type');
362+
}
363+
});
364+
} else {
365+
// use one shot initializer for performance and cost if we can do a combination of polling and streaming
366+
if (isStandardOptions(dataSystem.dataSource)) {
367+
initializers.push(
368+
() =>
369+
new OneShotInitializerFDv2(
370+
new Requestor(config, platform.requests, baseHeaders, '/sdk/poll', config.logger),
371+
config.logger,
372+
),
373+
);
374+
}
326375

327-
// use one shot initializer for performance and cost if we can do a combination of polling and streaming
328-
if (isStandardOptions(dataSystem.dataSource)) {
329-
initializers.push(
330-
() =>
331-
new OneShotInitializerFDv2(
332-
new Requestor(config, platform.requests, baseHeaders, '/sdk/poll', config.logger),
333-
config.logger,
334-
),
335-
);
336-
}
376+
// if streaming is configured, add streaming synchronizer
377+
if (isStandardOptions(dataSystem.dataSource) || isStreamingOnlyOptions(dataSystem.dataSource)) {
378+
const reconnectDelay = dataSystem.dataSource.streamInitialReconnectDelay;
379+
synchronizers.push(
380+
() =>
381+
new StreamingProcessorFDv2(
382+
clientContext,
383+
'/sdk/stream',
384+
[],
385+
baseHeaders,
386+
diagnosticsManager,
387+
reconnectDelay,
388+
),
389+
);
390+
}
337391

338-
const synchronizers: subsystem.LDDataSourceFactory[] = [];
339-
// if streaming is configured, add streaming synchronizer
340-
if (isStandardOptions(dataSystem.dataSource) || isStreamingOnlyOptions(dataSystem.dataSource)) {
341-
const reconnectDelay = dataSystem.dataSource.streamInitialReconnectDelay;
342-
synchronizers.push(
343-
() =>
344-
new StreamingProcessorFDv2(
345-
clientContext,
346-
'/sdk/stream',
347-
[],
348-
baseHeaders,
349-
diagnosticsManager,
350-
reconnectDelay,
351-
),
352-
);
353-
}
392+
let pollingInterval = DEFAULT_POLL_INTERVAL;
393+
// if polling is configured, add polling synchronizer
394+
if (isStandardOptions(dataSystem.dataSource) || isPollingOnlyOptions(dataSystem.dataSource)) {
395+
pollingInterval = dataSystem.dataSource.pollInterval ?? DEFAULT_POLL_INTERVAL;
396+
synchronizers.push(
397+
() =>
398+
new PollingProcessorFDv2(
399+
new Requestor(config, platform.requests, baseHeaders, '/sdk/poll', logger),
400+
pollingInterval,
401+
logger,
402+
),
403+
);
404+
}
354405

355-
let pollingInterval = DEFAULT_POLL_INTERVAL;
356-
// if polling is configured, add polling synchronizer
357-
if (isStandardOptions(dataSystem.dataSource) || isPollingOnlyOptions(dataSystem.dataSource)) {
358-
pollingInterval = dataSystem.dataSource.pollInterval ?? DEFAULT_POLL_INTERVAL;
359-
synchronizers.push(
406+
// This is short term handling and will be removed once FDv2 adoption is sufficient.
407+
fdv1FallbackSynchronizers.push(
360408
() =>
361409
new PollingProcessorFDv2(
362-
new Requestor(config, platform.requests, baseHeaders, '/sdk/poll', logger),
410+
new Requestor(config, platform.requests, baseHeaders, '/sdk/latest-all', config.logger),
363411
pollingInterval,
364-
logger,
412+
config.logger,
413+
true,
365414
),
366-
);
415+
);
367416
}
368417

369-
// This is short term handling and will be removed once FDv2 adoption is sufficient.
370-
const fdv1FallbackSynchronizers = [
371-
() =>
372-
new PollingProcessorFDv2(
373-
new Requestor(config, platform.requests, baseHeaders, '/sdk/latest-all', config.logger),
374-
pollingInterval,
375-
config.logger,
376-
true,
377-
),
378-
];
379-
380418
dataSource = new CompositeDataSource(
381419
initializers,
382420
synchronizers,

packages/shared/sdk-server/src/api/options/LDDataSystemOptions.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ export interface LDDataSystemOptions {
7373
export type DataSourceOptions =
7474
| StandardDataSourceOptions
7575
| StreamingDataSourceOptions
76-
| PollingDataSourceOptions;
76+
| PollingDataSourceOptions
77+
| CustomDataSourceOptions;
7778

7879
export type DataSourceConfiguration =
7980
| FileSystemDataSourceConfiguration
@@ -187,3 +188,7 @@ export function isStreamingOnlyOptions(u: any): u is StreamingDataSourceOptions
187188
export function isPollingOnlyOptions(u: any): u is PollingDataSourceOptions {
188189
return u.dataSourceOptionsType === 'pollingOnly';
189190
}
191+
192+
export function isCustomOptions(u: any): u is CustomDataSourceOptions {
193+
return u.dataSourceOptionsType === 'custom';
194+
}

packages/shared/sdk-server/src/data_sources/fileDataInitilizerFDv2.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ export default class FileDataInitializerFDv2 implements subsystemCommon.DataSour
2424
private _yamlParser?: (data: string) => any;
2525
private _fileLoader?: FileLoader;
2626

27-
constructor(options: FileSystemDataSourceConfiguration, platform: Platform, logger: LDLogger) {
27+
constructor(options: FileSystemDataSourceConfiguration, platform: Platform, logger?: LDLogger) {
2828
this._validateInputs(options, platform);
2929

3030
this._paths = options.paths;

packages/shared/sdk-server/src/options/Configuration.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,15 @@ import { LDBigSegmentsOptions, LDOptions, LDProxyOptions, LDTLSOptions } from '.
1616
import { Hook } from '../api/integrations';
1717
import {
1818
DataSourceOptions,
19+
isCustomOptions,
1920
isPollingOnlyOptions,
2021
isStandardOptions,
2122
isStreamingOnlyOptions,
2223
LDDataSystemOptions,
2324
PollingDataSourceOptions,
2425
StandardDataSourceOptions,
2526
StreamingDataSourceOptions,
27+
CustomDataSourceOptions,
2628
} from '../api/options/LDDataSystemOptions';
2729
import {
2830
LDDataSourceUpdates,
@@ -78,7 +80,7 @@ const validations: Record<string, TypeValidator> = {
7880
};
7981

8082
export const DEFAULT_POLL_INTERVAL = 30;
81-
const DEFAULT_STREAM_RECONNECT_DELAY = 1;
83+
export const DEFAULT_STREAM_RECONNECT_DELAY = 1;
8284

8385
const defaultStandardDataSourceOptions: StandardDataSourceOptions = {
8486
dataSourceOptionsType: 'standard',
@@ -237,6 +239,9 @@ function validateDataSystemOptions(options: Options): {
237239
options.dataSource,
238240
defaultPollingDataSourceOptions,
239241
));
242+
} else if (isCustomOptions(options.dataSource)) {
243+
validatedDataSourceOptions = options.dataSource;
244+
errors = [];
240245
} else {
241246
// provided datasource options don't fit any expected form, drop them and use defaults
242247
validatedDataSourceOptions = defaultStandardDataSourceOptions;

0 commit comments

Comments
 (0)