diff --git a/packages/shared/common/__tests__/internal/fdv2/FDv1PayloadAdaptor.test.ts b/packages/shared/common/__tests__/internal/fdv2/FDv1PayloadAdaptor.test.ts new file mode 100644 index 0000000000..478106d7ec --- /dev/null +++ b/packages/shared/common/__tests__/internal/fdv2/FDv1PayloadAdaptor.test.ts @@ -0,0 +1,80 @@ +import { fdv1PayloadAdaptor as FDv1PayloadAdaptor } from '../../../src/internal/fdv2/FDv1PayloadAdaptor'; +import { PayloadProcessor } from '../../../src/internal/fdv2/payloadProcessor'; +import { Event, PutObject } from '../../../src/internal/fdv2/proto'; + +// Mock PayloadProcessor that captures events +class MockPayloadProcessor extends PayloadProcessor { + public processedEvents: Event[] = []; + + constructor() { + super({}, undefined, undefined); + } + + override processEvents(events: Event[]) { + this.processedEvents = [...this.processedEvents, ...events]; + // Don't call super.processEvents to avoid side effects in tests + } +} + +it('includes server-intent as the first event and payload-transferred as the last eventwith correct structure', () => { + const processor = new MockPayloadProcessor(); + const adaptor = FDv1PayloadAdaptor(processor); + adaptor.processFullTransfer({ flags: {}, segments: {} }); + + const serverIntentEvent = processor.processedEvents[0] as Event; + expect(serverIntentEvent.event).toBe('server-intent'); + expect(serverIntentEvent.data).toBeDefined(); + + const intentData = serverIntentEvent.data as any; + expect(intentData.payloads).toBeDefined(); + expect(intentData.payloads.length).toBe(1); + expect(intentData.payloads[0].intentCode).toBe('xfer-full'); + expect(intentData.payloads[0].id).toBe('FDv1Fallback'); + expect(intentData.payloads[0].target).toBe(1); + expect(intentData.payloads[0].reason).toBe('payload-missing'); + + const payloadTransferredEvent = processor.processedEvents[ + processor.processedEvents.length - 1 + ] as Event; + expect(payloadTransferredEvent.event).toBe('payload-transferred'); + expect(payloadTransferredEvent.data).toBeDefined(); + + const transferredData = payloadTransferredEvent.data as any; + expect(transferredData.state).toBe(''); + expect(transferredData.version).toBe(1); + expect(transferredData.id).toBe('FDv1Fallback'); +}); + +it('pushFdv1Payload adds put-object events for flags and segments', () => { + const processor = new MockPayloadProcessor(); + const adaptor = FDv1PayloadAdaptor(processor); + const fdv1Payload = { + flags: { + 'flag-1': { key: 'flag-1', version: 1, on: true }, + 'flag-2': { key: 'flag-2', version: 2, on: false }, + }, + segments: { + 'segment-1': { key: 'segment-1', version: 1 }, + }, + }; + + adaptor.processFullTransfer(fdv1Payload); + + const putObjectEvents = processor.processedEvents.filter((e) => e.event === 'put-object'); + expect(putObjectEvents.length).toBe(3); + + const flag1Event = putObjectEvents.find((e) => (e.data as PutObject).key === 'flag-1'); + expect(flag1Event).toBeDefined(); + expect((flag1Event!.data as PutObject).kind).toBe('flag'); + expect((flag1Event!.data as PutObject).version).toBe(1); + + const flag2Event = putObjectEvents.find((e) => (e.data as PutObject).key === 'flag-2'); + expect(flag2Event).toBeDefined(); + expect((flag2Event!.data as PutObject).kind).toBe('flag'); + expect((flag2Event!.data as PutObject).version).toBe(2); + + const segment1Event = putObjectEvents.find((e) => (e.data as PutObject).key === 'segment-1'); + expect(segment1Event).toBeDefined(); + expect((segment1Event!.data as PutObject).kind).toBe('segment'); + expect((segment1Event!.data as PutObject).version).toBe(1); +}); diff --git a/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts b/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts index 87d2c77105..79cb4e6ee7 100644 --- a/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts +++ b/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts @@ -1004,3 +1004,161 @@ it('consumes cancellation tokens correctly', async () => { // eslint-disable-next-line no-underscore-dangle expect(underTest._cancelTokens.length).toEqual(0); }); + +it('handles multiple initializers with fallback when first initializer fails and second succeeds', async () => { + const mockInitializer1Error = { + name: 'Error', + message: 'First initializer failed', + }; + const mockInitializer1: DataSource = { + start: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: any) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, + ) => { + _statusCallback(DataSourceState.Initializing); + _statusCallback(DataSourceState.Closed, mockInitializer1Error); + }, + ), + stop: jest.fn(), + }; + + const mockInitializer2Data = { key: 'init2' }; + const mockInitializer2: DataSource = { + start: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: any) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, + ) => { + _statusCallback(DataSourceState.Initializing); + _statusCallback(DataSourceState.Valid); + _dataCallback(true, mockInitializer2Data); + _statusCallback(DataSourceState.Closed); + }, + ), + stop: jest.fn(), + }; + + const mockSynchronizer1Data = { key: 'sync1' }; + const mockSynchronizer1 = { + start: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: any) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, + ) => { + _statusCallback(DataSourceState.Initializing); + _statusCallback(DataSourceState.Valid); + _dataCallback(false, mockSynchronizer1Data); + }, + ), + stop: jest.fn(), + }; + + const underTest = new CompositeDataSource( + [makeDataSourceFactory(mockInitializer1), makeDataSourceFactory(mockInitializer2)], + [makeDataSourceFactory(mockSynchronizer1)], + [], + undefined, + makeTestTransitionConditions(), + makeZeroBackoff(), + ); + + let dataCallback; + const statusCallback = jest.fn(); + await new Promise((resolve) => { + dataCallback = jest.fn((_: boolean, data: any) => { + if (data === mockSynchronizer1Data) { + resolve(); + } + }); + + underTest.start(dataCallback, statusCallback); + }); + + expect(mockInitializer1.start).toHaveBeenCalledTimes(1); + expect(mockInitializer2.start).toHaveBeenCalledTimes(1); + expect(mockSynchronizer1.start).toHaveBeenCalledTimes(1); + expect(statusCallback).toHaveBeenCalledTimes(5); + expect(statusCallback).toHaveBeenNthCalledWith(1, DataSourceState.Initializing, undefined); + expect(statusCallback).toHaveBeenNthCalledWith( + 2, + DataSourceState.Interrupted, + mockInitializer1Error, + ); + expect(statusCallback).toHaveBeenNthCalledWith(3, DataSourceState.Valid, undefined); + expect(statusCallback).toHaveBeenNthCalledWith(4, DataSourceState.Interrupted, undefined); + expect(statusCallback).toHaveBeenNthCalledWith(5, DataSourceState.Valid, undefined); +}); + +it('does not run second initializer when first initializer succeeds', async () => { + const mockInitializer1Data = { key: 'init1' }; + const mockInitializer1: DataSource = { + start: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: any) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, + ) => { + _statusCallback(DataSourceState.Initializing); + _statusCallback(DataSourceState.Valid); + _dataCallback(true, mockInitializer1Data); + _statusCallback(DataSourceState.Closed); + }, + ), + stop: jest.fn(), + }; + + const mockInitializer2: DataSource = { + start: jest.fn(), + stop: jest.fn(), + }; + + const mockSynchronizer1Data = { key: 'sync1' }; + const mockSynchronizer1 = { + start: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: any) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, + ) => { + _statusCallback(DataSourceState.Initializing); + _statusCallback(DataSourceState.Valid); + _dataCallback(false, mockSynchronizer1Data); + }, + ), + stop: jest.fn(), + }; + + const underTest = new CompositeDataSource( + [makeDataSourceFactory(mockInitializer1), makeDataSourceFactory(mockInitializer2)], + [makeDataSourceFactory(mockSynchronizer1)], + [], + undefined, + makeTestTransitionConditions(), + makeZeroBackoff(), + ); + + let dataCallback; + const statusCallback = jest.fn(); + await new Promise((resolve) => { + dataCallback = jest.fn((_: boolean, data: any) => { + if (data === mockSynchronizer1Data) { + resolve(); + } + }); + + underTest.start(dataCallback, statusCallback); + }); + + expect(mockInitializer1.start).toHaveBeenCalledTimes(1); + expect(mockInitializer2.start).toHaveBeenCalledTimes(0); + expect(mockSynchronizer1.start).toHaveBeenCalledTimes(1); +}); diff --git a/packages/shared/common/src/internal/fdv2/FDv1PayloadAdaptor.ts b/packages/shared/common/src/internal/fdv2/FDv1PayloadAdaptor.ts new file mode 100644 index 0000000000..f797bfad5d --- /dev/null +++ b/packages/shared/common/src/internal/fdv2/FDv1PayloadAdaptor.ts @@ -0,0 +1,109 @@ +import { PayloadProcessor } from './payloadProcessor'; +import { Event } from './proto'; + +interface fdv1Payload { + flags: { [name: string]: any }; + segments: { [name: string]: any }; +} + +const PAYLOAD_ID = 'FDv1Fallback'; + +/** + * The FDv1PayloadAdaptor is a helper class that converts FDv1 payloads into events that the PayloadProcessor can understand. + */ +export interface FDv1PayloadAdaptor { + /** + * The PayloadProcessor that will be used to process the events. + */ + readonly _processor: PayloadProcessor; + + /** + * The selector that will be used to identify the payload. + */ + _selector: string; + + /** + * The method that will be used to set a selector for the payload that is + * being processed. + * + * @remarks + * This method probably shouldn't be used in most instances as FDv1 payloads + * do not have the concept of a selector. + * + * @param selector - The selector to set for the payload. + * @returns this FDv1PayloadAdaptor instance + */ + useSelector: (selector: string) => FDv1PayloadAdaptor; + + /** + * The method that will be used to process a full transfer changeset. + * + * @param data - The data to process. + */ + processFullTransfer: (data: fdv1Payload) => void; +} + +export function fdv1PayloadAdaptor(processor: PayloadProcessor): FDv1PayloadAdaptor { + return { + _processor: processor, + _selector: '', + useSelector(selector: string): FDv1PayloadAdaptor { + this._selector = selector; + return this; + }, + processFullTransfer(data) { + const events: Array = [ + { + event: 'server-intent', + data: { + payloads: [ + { + id: PAYLOAD_ID, + target: 1, + intentCode: 'xfer-full', + reason: 'payload-missing', + }, + ], + }, + }, + ]; + + Object.entries(data?.flags || []).forEach(([key, flag]) => { + events.push({ + event: 'put-object', + data: { + kind: 'flag', + key, + version: flag.version || 1, + object: flag, + }, + }); + }); + + Object.entries(data?.segments || []).forEach(([key, segment]) => { + events.push({ + event: 'put-object', + data: { + kind: 'segment', + key, + version: segment.version || 1, + object: segment, + }, + }); + }); + + events.push({ + event: 'payload-transferred', + data: { + // IMPORTANT: the selector MUST be empty or "live" data synchronizers + // will not work as it would try to resume from a bogus state. + state: this._selector, + version: 1, + id: PAYLOAD_ID, + }, + }); + + this._processor.processEvents(events); + }, + }; +} diff --git a/packages/shared/common/src/internal/fdv2/index.ts b/packages/shared/common/src/internal/fdv2/index.ts index b07537ddd9..d0178d6a47 100644 --- a/packages/shared/common/src/internal/fdv2/index.ts +++ b/packages/shared/common/src/internal/fdv2/index.ts @@ -1,3 +1,4 @@ +import { fdv1PayloadAdaptor as FDv1PayloadAdaptor } from './FDv1PayloadAdaptor'; import { FDv2EventsCollection, Payload, @@ -14,4 +15,5 @@ export { PayloadProcessor, PayloadStreamReader, Update, + FDv1PayloadAdaptor, }; diff --git a/packages/shared/common/src/internal/fdv2/payloadProcessor.ts b/packages/shared/common/src/internal/fdv2/payloadProcessor.ts index 1cd48cac7e..4beee20bb6 100644 --- a/packages/shared/common/src/internal/fdv2/payloadProcessor.ts +++ b/packages/shared/common/src/internal/fdv2/payloadProcessor.ts @@ -221,8 +221,11 @@ export class PayloadProcessor { private _processPayloadTransferred = (data: PayloadTransferred) => { // if the following properties haven't been provided by now, we should reset if ( - !this._tempId || // server intent hasn't been received yet. - !data.state || + // server intent hasn't been received yet. + !this._tempId || + // selector can be an empty string if we are using a file data initilizer + data.state === null || + data.state === undefined || !data.version ) { this._resetAll(); // a reset is best defensive action since payload transferred terminates a payload diff --git a/packages/shared/common/src/internal/fdv2/proto.ts b/packages/shared/common/src/internal/fdv2/proto.ts index cb3c04bf08..3062c7fcdd 100644 --- a/packages/shared/common/src/internal/fdv2/proto.ts +++ b/packages/shared/common/src/internal/fdv2/proto.ts @@ -1,14 +1,29 @@ +export type EventType = + | 'server-intent' + | 'put-object' + | 'delete-object' + | 'payload-transferred' + | 'goodbye' + | 'error' + | 'heart-beat'; +export type IntentCode = 'xfer-full' | 'xfer-changes' | 'none'; +export type ObjectKind = 'flag' | 'segment'; + export interface Event { - event: string; - data: any; + event: EventType; + data: + | ServerIntentData + | PutObject + | DeleteObject + | PayloadTransferred + | GoodbyeObject + | ErrorObject; } export interface ServerIntentData { payloads: PayloadIntent[]; } -export type IntentCode = 'xfer-full' | 'xfer-changes' | 'none'; - export interface PayloadIntent { id: string; target: number; @@ -17,19 +32,31 @@ export interface PayloadIntent { } export interface PutObject { - kind: string; + kind: ObjectKind; key: string; version: number; object: any; } export interface DeleteObject { - kind: string; + kind: ObjectKind; key: string; version: number; } +export interface GoodbyeObject { + reason: string; + silent: boolean; + catastrophe: boolean; +} + +export interface ErrorObject { + payload_id: string; + reason: string; +} + export interface PayloadTransferred { state: string; version: number; + id?: string; } diff --git a/packages/shared/sdk-server/__tests__/data_sources/fileDataInitilizerFDv2.test.ts b/packages/shared/sdk-server/__tests__/data_sources/fileDataInitilizerFDv2.test.ts new file mode 100644 index 0000000000..8215121867 --- /dev/null +++ b/packages/shared/sdk-server/__tests__/data_sources/fileDataInitilizerFDv2.test.ts @@ -0,0 +1,377 @@ +import { + DataSourceErrorKind, + Filesystem, + LDPollingError, + Platform, + subsystem, + WatchHandle, +} from '@launchdarkly/js-sdk-common'; + +import { FileSystemDataSourceConfiguration } from '../../src/api'; +import FileDataInitializerFDv2 from '../../src/data_sources/fileDataInitilizerFDv2'; +import { createBasicPlatform } from '../createBasicPlatform'; +import TestLogger, { LogLevel } from '../Logger'; + +class MockFilesystem implements Filesystem { + public fileData: Record< + string, + { + timestamp: number; + data: string; + } + > = {}; + + async getFileTimestamp(path: string): Promise { + return this.fileData[path]?.timestamp ?? 0; + } + + async readFile(path: string): Promise { + if (!this.fileData[path]) { + throw new Error('FILE NOT FOUND'); + } + return this.fileData[path]?.data; + } + + watch(_path: string, _callback: (eventType: string, filename: string) => void): WatchHandle { + return { + close: jest.fn(), + }; + } +} + +const flag1 = { + key: 'flag1', + on: true, + fallthrough: { variation: 0 }, + variations: ['value1'], + version: 1, +}; + +const segment1 = { + key: 'segment1', + include: ['user1'], + version: 1, +}; + +const flagOnlyJson = JSON.stringify({ + flags: { + flag1, + }, +}); + +const segmentOnlyJson = JSON.stringify({ + segments: { + segment1, + }, +}); + +const allPropertiesJson = JSON.stringify({ + flags: { + flag1, + }, + segments: { + segment1, + }, +}); + +describe('FileDataInitializerFDv2', () => { + let mockFilesystem: MockFilesystem; + let logger: TestLogger; + let platform: Platform; + let mockDataCallback: jest.Mock; + let mockStatusCallback: jest.Mock; + + beforeEach(() => { + jest.useFakeTimers(); + mockFilesystem = new MockFilesystem(); + logger = new TestLogger(); + platform = { + ...createBasicPlatform(), + fileSystem: mockFilesystem as unknown as Filesystem, + }; + mockDataCallback = jest.fn(); + mockStatusCallback = jest.fn(); + }); + + afterEach(() => { + jest.resetAllMocks(); + jest.useRealTimers(); + }); + + it('throws error when paths are not provided', () => { + const options: FileSystemDataSourceConfiguration = { + type: 'file', + paths: [], + }; + + expect(() => { + /* eslint-disable-next-line no-new */ + new FileDataInitializerFDv2(options, platform, logger); + }).toThrow('FileDataInitializerFDv2: paths are required'); + }); + + it('throws error when file system is not available', () => { + const platformWithoutFileSystem = { + ...createBasicPlatform(), + fileSystem: undefined, + }; + + const options: FileSystemDataSourceConfiguration = { + type: 'file', + paths: ['test.json'], + }; + + expect(() => { + /* eslint-disable-next-line no-new */ + new FileDataInitializerFDv2(options, platformWithoutFileSystem, logger); + }).toThrow('FileDataInitializerFDv2: file system is required'); + }); + + it('loads and processes JSON file with flags and segments', async () => { + mockFilesystem.fileData['test.json'] = { timestamp: 0, data: allPropertiesJson }; + + const options: FileSystemDataSourceConfiguration = { + type: 'file', + paths: ['test.json'], + }; + + const initializer = new FileDataInitializerFDv2(options, platform, logger); + initializer.start(mockDataCallback, mockStatusCallback); + + await jest.runAllTimersAsync(); + + expect(mockStatusCallback).toHaveBeenCalledWith(subsystem.DataSourceState.Valid); + expect(mockDataCallback).toHaveBeenCalled(); + }); + + it('loads and processes multiple JSON files', async () => { + mockFilesystem.fileData['flags.json'] = { timestamp: 0, data: flagOnlyJson }; + mockFilesystem.fileData['segments.json'] = { timestamp: 0, data: segmentOnlyJson }; + + const options: FileSystemDataSourceConfiguration = { + type: 'file', + paths: ['flags.json', 'segments.json'], + }; + + const initializer = new FileDataInitializerFDv2(options, platform, logger); + initializer.start(mockDataCallback, mockStatusCallback); + + await jest.runAllTimersAsync(); + + expect(mockStatusCallback).toHaveBeenCalledWith(subsystem.DataSourceState.Valid); + expect(mockDataCallback).toHaveBeenCalled(); + }); + + it('loads and processes YAML file when parser is provided', async () => { + const yamlData = 'flags:\n flag1:\n key: flag1\n on: true\n version: 1'; + const mockYamlParser = jest.fn((data: string) => { + if (data === yamlData) { + return { + flags: { + flag1, + }, + }; + } + return {}; + }); + + mockFilesystem.fileData['test.yaml'] = { timestamp: 0, data: yamlData }; + + const options: FileSystemDataSourceConfiguration = { + type: 'file', + paths: ['test.yaml'], + yamlParser: mockYamlParser, + }; + + const initializer = new FileDataInitializerFDv2(options, platform, logger); + initializer.start(mockDataCallback, mockStatusCallback); + + await jest.runAllTimersAsync(); + + expect(mockYamlParser).toHaveBeenCalledWith(yamlData); + expect(mockStatusCallback).toHaveBeenCalledWith(subsystem.DataSourceState.Valid); + expect(mockDataCallback).toHaveBeenCalled(); + }); + + it('throws error when YAML file is provided without parser', async () => { + const yamlData = 'flags:\n flag1:\n key: flag1'; + mockFilesystem.fileData['test.yaml'] = { timestamp: 0, data: yamlData }; + + const options: FileSystemDataSourceConfiguration = { + type: 'file', + paths: ['test.yaml'], + }; + + const initializer = new FileDataInitializerFDv2(options, platform, logger); + initializer.start(mockDataCallback, mockStatusCallback); + + await jest.runAllTimersAsync(); + + expect(mockStatusCallback).toHaveBeenCalledWith( + subsystem.DataSourceState.Closed, + expect.any(LDPollingError), + ); + expect(logger.getCount(LogLevel.Error)).toBeGreaterThan(0); + }); + + it('handles invalid JSON gracefully', async () => { + mockFilesystem.fileData['test.json'] = { timestamp: 0, data: 'invalid json {{{{' }; + + const options: FileSystemDataSourceConfiguration = { + type: 'file', + paths: ['test.json'], + }; + + const initializer = new FileDataInitializerFDv2(options, platform, logger); + initializer.start(mockDataCallback, mockStatusCallback); + + await jest.runAllTimersAsync(); + + expect(mockStatusCallback).toHaveBeenCalledWith( + subsystem.DataSourceState.Closed, + expect.any(LDPollingError), + ); + const errorCall = mockStatusCallback.mock.calls.find( + (call) => call[0] === subsystem.DataSourceState.Closed && call[1] instanceof LDPollingError, + ); + expect(errorCall).toBeDefined(); + if (errorCall && errorCall[1] instanceof LDPollingError) { + expect(errorCall[1].kind).toBe(DataSourceErrorKind.InvalidData); + } + expect(logger.getCount(LogLevel.Error)).toBeGreaterThan(0); + }); + + it('combines data from multiple files correctly', async () => { + const flag2 = { + key: 'flag2', + on: false, + fallthrough: { variation: 0 }, + variations: ['value2'], + version: 2, + }; + const segment2 = { + key: 'segment2', + include: ['user2'], + version: 2, + }; + + const flagFile = JSON.stringify({ + flags: { + flag1, + flag2, + }, + }); + const segmentFile = JSON.stringify({ + segments: { + segment1, + segment2, + }, + }); + + mockFilesystem.fileData['flags.json'] = { timestamp: 0, data: flagFile }; + mockFilesystem.fileData['segments.json'] = { timestamp: 0, data: segmentFile }; + + const options: FileSystemDataSourceConfiguration = { + type: 'file', + paths: ['flags.json', 'segments.json'], + }; + + const initializer = new FileDataInitializerFDv2(options, platform, logger); + initializer.start(mockDataCallback, mockStatusCallback); + + await jest.runAllTimersAsync(); + + expect(mockStatusCallback).toHaveBeenCalledWith(subsystem.DataSourceState.Valid); + expect(mockDataCallback).toHaveBeenCalled(); + + // Verify the combined data structure + const dataCall = mockDataCallback.mock.calls[0]; + expect(dataCall[0]).toBe(false); + expect(dataCall[1]).toHaveProperty('initMetadata'); + expect(dataCall[1]).toHaveProperty('payload'); + + const { payload } = dataCall[1]; + expect(payload).toHaveProperty('updates'); + expect(Array.isArray(payload.updates)).toBe(true); + + // Verify all flags are present + const flagUpdates = payload.updates.filter((update: any) => update.kind === 'flag'); + expect(flagUpdates.length).toBe(2); + + const flag1Update = flagUpdates.find((update: any) => update.key === 'flag1'); + expect(flag1Update).toBeDefined(); + expect(flag1Update.version).toBe(1); + expect(flag1Update.object).toEqual(flag1); + + const flag2Update = flagUpdates.find((update: any) => update.key === 'flag2'); + expect(flag2Update).toBeDefined(); + expect(flag2Update.version).toBe(2); + expect(flag2Update.object).toEqual(flag2); + + // Verify all segments are present + const segmentUpdates = payload.updates.filter((update: any) => update.kind === 'segment'); + expect(segmentUpdates.length).toBe(2); + + const segment1Update = segmentUpdates.find((update: any) => update.key === 'segment1'); + expect(segment1Update).toBeDefined(); + expect(segment1Update.version).toBe(1); + expect(segment1Update.object).toEqual(segment1); + + const segment2Update = segmentUpdates.find((update: any) => update.key === 'segment2'); + expect(segment2Update).toBeDefined(); + expect(segment2Update.version).toBe(2); + expect(segment2Update.object).toEqual(segment2); + }); + + it('overwrites data when the same key appears in multiple files', async () => { + // First file has flag1 with version 1 + const file1 = JSON.stringify({ + flags: { + flag1: { + ...flag1, + version: 1, + }, + }, + }); + + // Second file has flag1 with version 2 (should overwrite) + const file2 = JSON.stringify({ + flags: { + flag1: { + ...flag1, + version: 2, + on: false, // Different value to verify it's overwritten + }, + }, + }); + + mockFilesystem.fileData['file1.json'] = { timestamp: 0, data: file1 }; + mockFilesystem.fileData['file2.json'] = { timestamp: 0, data: file2 }; + + const options: FileSystemDataSourceConfiguration = { + type: 'file', + paths: ['file1.json', 'file2.json'], + }; + + const initializer = new FileDataInitializerFDv2(options, platform, logger); + initializer.start(mockDataCallback, mockStatusCallback); + + await jest.runAllTimersAsync(); + + expect(mockStatusCallback).toHaveBeenCalledWith(subsystem.DataSourceState.Valid); + expect(mockDataCallback).toHaveBeenCalled(); + + // Verify that flag1 from file2 (version 2, on: false) overwrote file1 + const dataCall = mockDataCallback.mock.calls[0]; + const { payload } = dataCall[1]; + const flagUpdates = payload.updates.filter((update: any) => update.kind === 'flag'); + + // Should only have one flag1 (not duplicated) + const flag1Updates = flagUpdates.filter((update: any) => update.key === 'flag1'); + expect(flag1Updates.length).toBe(1); + + const flag1Update = flag1Updates[0]; + expect(flag1Update.version).toBe(2); + expect(flag1Update.object.on).toBe(false); // Should be from file2 + }); +}); diff --git a/packages/shared/sdk-server/src/LDClientImpl.ts b/packages/shared/sdk-server/src/LDClientImpl.ts index a4f324ac99..97205d287b 100644 --- a/packages/shared/sdk-server/src/LDClientImpl.ts +++ b/packages/shared/sdk-server/src/LDClientImpl.ts @@ -34,6 +34,7 @@ import { Hook } from './api/integrations/Hook'; import { BigSegmentStoreMembership } from './api/interfaces'; import { LDWaitForInitializationOptions } from './api/LDWaitForInitializationOptions'; import { + isCustomOptions, isPollingOnlyOptions, isStandardOptions, isStreamingOnlyOptions, @@ -44,6 +45,7 @@ import { createPluginEnvironmentMetadata } from './createPluginEnvironmentMetada import { createPayloadListener } from './data_sources/createPayloadListenerFDv2'; import { createStreamListeners } from './data_sources/createStreamListeners'; import DataSourceUpdates from './data_sources/DataSourceUpdates'; +import FileDataInitializerFDv2 from './data_sources/fileDataInitilizerFDv2'; import OneShotInitializerFDv2 from './data_sources/OneShotInitializerFDv2'; import PollingProcessor from './data_sources/PollingProcessor'; import PollingProcessorFDv2 from './data_sources/PollingProcessorFDv2'; @@ -65,7 +67,10 @@ import FlagsStateBuilder from './FlagsStateBuilder'; import HookRunner from './hooks/HookRunner'; import MigrationOpEventToInputEvent from './MigrationOpEventConversion'; import MigrationOpTracker from './MigrationOpTracker'; -import Configuration, { DEFAULT_POLL_INTERVAL } from './options/Configuration'; +import Configuration, { + DEFAULT_POLL_INTERVAL, + DEFAULT_STREAM_RECONNECT_DELAY, +} from './options/Configuration'; import { ServerInternalOptions } from './options/ServerInternalOptions'; import VersionedDataKinds from './store/VersionedDataKinds'; @@ -323,60 +328,128 @@ function constructFDv2( if (!(config.offline || config.dataSystem!.useLdd)) { // make the FDv2 composite datasource with initializers/synchronizers const initializers: subsystem.LDDataSourceFactory[] = []; + const synchronizers: subsystem.LDDataSourceFactory[] = []; + const fdv1FallbackSynchronizers: subsystem.LDDataSourceFactory[] = []; - // use one shot initializer for performance and cost if we can do a combination of polling and streaming - if (isStandardOptions(dataSystem.dataSource)) { - initializers.push( - () => - new OneShotInitializerFDv2( - new Requestor(config, platform.requests, baseHeaders, '/sdk/poll', config.logger), - config.logger, - ), - ); - } + if (isCustomOptions(dataSystem.dataSource)) { + const { initializers: initializerConfigs = [], synchronizers: synchronizerConfigs = [] } = + dataSystem.dataSource; - const synchronizers: subsystem.LDDataSourceFactory[] = []; - // if streaming is configured, add streaming synchronizer - if (isStandardOptions(dataSystem.dataSource) || isStreamingOnlyOptions(dataSystem.dataSource)) { - const reconnectDelay = dataSystem.dataSource.streamInitialReconnectDelay; - synchronizers.push( - () => - new StreamingProcessorFDv2( - clientContext, - '/sdk/stream', - [], - baseHeaders, - diagnosticsManager, - reconnectDelay, - ), - ); - } + initializerConfigs.forEach((initializerConfig) => { + switch (initializerConfig.type) { + case 'file': { + initializers.push( + () => new FileDataInitializerFDv2(initializerConfig, platform, config.logger), + ); + break; + } + case 'polling': { + initializers.push( + () => + new OneShotInitializerFDv2( + new Requestor(config, platform.requests, baseHeaders, '/sdk/poll', config.logger), + config.logger, + ), + ); + break; + } + default: { + throw new Error('Unsupported initializer type'); + } + } + }); + synchronizerConfigs.forEach((synchronizerConfig) => { + switch (synchronizerConfig.type) { + case 'streaming': { + const { streamInitialReconnectDelay = DEFAULT_STREAM_RECONNECT_DELAY } = + synchronizerConfig; + synchronizers.push( + () => + new StreamingProcessorFDv2( + clientContext, + '/sdk/stream', + [], + baseHeaders, + diagnosticsManager, + streamInitialReconnectDelay, + ), + ); + break; + } + case 'polling': { + const { pollInterval = DEFAULT_POLL_INTERVAL } = synchronizerConfig; + synchronizers.push( + () => + new PollingProcessorFDv2( + new Requestor(config, platform.requests, baseHeaders, '/sdk/poll', config.logger), + pollInterval, + config.logger, + ), + ); + break; + } + default: { + throw new Error('Unsupported synchronizer type'); + } + } + }); + } else { + // use one shot initializer for performance and cost if we can do a combination of polling and streaming + if (isStandardOptions(dataSystem.dataSource)) { + initializers.push( + () => + new OneShotInitializerFDv2( + new Requestor(config, platform.requests, baseHeaders, '/sdk/poll', config.logger), + config.logger, + ), + ); + } - let pollingInterval = DEFAULT_POLL_INTERVAL; - // if polling is configured, add polling synchronizer - if (isStandardOptions(dataSystem.dataSource) || isPollingOnlyOptions(dataSystem.dataSource)) { - pollingInterval = dataSystem.dataSource.pollInterval ?? DEFAULT_POLL_INTERVAL; - synchronizers.push( + // if streaming is configured, add streaming synchronizer + if ( + isStandardOptions(dataSystem.dataSource) || + isStreamingOnlyOptions(dataSystem.dataSource) + ) { + const reconnectDelay = dataSystem.dataSource.streamInitialReconnectDelay; + synchronizers.push( + () => + new StreamingProcessorFDv2( + clientContext, + '/sdk/stream', + [], + baseHeaders, + diagnosticsManager, + reconnectDelay, + ), + ); + } + + let pollingInterval = DEFAULT_POLL_INTERVAL; + // if polling is configured, add polling synchronizer + if (isStandardOptions(dataSystem.dataSource) || isPollingOnlyOptions(dataSystem.dataSource)) { + pollingInterval = dataSystem.dataSource.pollInterval ?? DEFAULT_POLL_INTERVAL; + synchronizers.push( + () => + new PollingProcessorFDv2( + new Requestor(config, platform.requests, baseHeaders, '/sdk/poll', logger), + pollingInterval, + logger, + ), + ); + } + + // This is short term handling and will be removed once FDv2 adoption is sufficient. + fdv1FallbackSynchronizers.push( () => new PollingProcessorFDv2( - new Requestor(config, platform.requests, baseHeaders, '/sdk/poll', logger), + new Requestor(config, platform.requests, baseHeaders, '/sdk/latest-all', config.logger), pollingInterval, - logger, + config.logger, + true, ), ); } - // This is short term handling and will be removed once FDv2 adoption is sufficient. - const fdv1FallbackSynchronizers = [ - () => - new PollingProcessorFDv2( - new Requestor(config, platform.requests, baseHeaders, '/sdk/latest-all', config.logger), - pollingInterval, - config.logger, - true, - ), - ]; - dataSource = new CompositeDataSource( initializers, synchronizers, diff --git a/packages/shared/sdk-server/src/api/options/LDDataSystemOptions.ts b/packages/shared/sdk-server/src/api/options/LDDataSystemOptions.ts index cdb80f8ea5..34bc001454 100644 --- a/packages/shared/sdk-server/src/api/options/LDDataSystemOptions.ts +++ b/packages/shared/sdk-server/src/api/options/LDDataSystemOptions.ts @@ -36,7 +36,7 @@ export interface LDDataSystemOptions { /** * Configuration options for the Data Source that the SDK uses to get flags and other * data from the LaunchDarkly servers. Choose one of {@link StandardDataSourceOptions}, - * {@link StreamingDataSourceOptions}, or {@link PollingDataSourceOptions}; setting the + * {@link StreamingDataSourceOptions}, {@link PollingDataSourceOptions}, or {@link CustomDataSourceOptions}; setting the * type and the optional fields you want to customize. * * If not specified, this defaults to using the {@link StandardDataSourceOptions} which @@ -73,62 +73,108 @@ export interface LDDataSystemOptions { export type DataSourceOptions = | StandardDataSourceOptions | StreamingDataSourceOptions - | PollingDataSourceOptions; + | PollingDataSourceOptions + | CustomDataSourceOptions; -/** - * This standard data source is the recommended datasource for most customers. It will use - * a combination of streaming and polling to initialize the SDK, provide real time updates, - * and can switch between streaming and polling automatically to provide redundancy. - */ -export interface StandardDataSourceOptions { - dataSourceOptionsType: 'standard'; +export type DataSourceConfiguration = + | FileSystemDataSourceConfiguration + | StreamingDataSourceConfiguration + | PollingDataSourceConfiguration; + +export interface FileSystemDataSourceConfiguration { + type: 'file'; + /** + * The paths to the files to read data from. + */ + paths: Array; + /** + * A function to parse the data from the file. + */ + yamlParser?: (data: string) => any; +} + +export interface StreamingDataSourceConfiguration { + type: 'streaming'; /** * Sets the initial reconnect delay for the streaming connection, in seconds. Default if omitted. * * The streaming service uses a backoff algorithm (with jitter) every time the connection needs * to be reestablished. The delay for the first reconnection will start near this value, and then - * increase exponentially for any subsequent connection failures. + * increase exponentially up to a maximum for any subsequent connection failures. * * The default value is 1. */ streamInitialReconnectDelay?: number; +} +export interface PollingDataSourceConfiguration { + type: 'polling'; /** * The time between polling requests, in seconds. Default if omitted. */ pollInterval?: number; } +/** + * This standard data source is the recommended datasource for most customers. It will use + * a combination of streaming and polling to initialize the SDK, provide real time updates, + * and can switch between streaming and polling automatically to provide redundancy. + */ +export interface StandardDataSourceOptions + extends Omit, + Omit { + dataSourceOptionsType: 'standard'; +} + /** * This data source will make best effort to maintain a streaming connection to LaunchDarkly services * to provide real time data updates. */ -export interface StreamingDataSourceOptions { +export interface StreamingDataSourceOptions extends Omit { dataSourceOptionsType: 'streamingOnly'; - - /** - * Sets the initial reconnect delay for the streaming connection, in seconds. Default if omitted. - * - * The streaming service uses a backoff algorithm (with jitter) every time the connection needs - * to be reestablished. The delay for the first reconnection will start near this value, and then - * increase exponentially up to a maximum for any subsequent connection failures. - * - * The default value is 1. - */ - streamInitialReconnectDelay?: number; } /** * This data source will periodically make a request to LaunchDarkly services to retrieve updated data. */ -export interface PollingDataSourceOptions { +export interface PollingDataSourceOptions extends Omit { dataSourceOptionsType: 'pollingOnly'; +} + +/** + * Initializer configuration options + */ +export type InitializerDataSource = + | FileSystemDataSourceConfiguration + | PollingDataSourceConfiguration; + +/** + * Synchronizer configuration options + */ +export type SynchronizerDataSource = + | PollingDataSourceConfiguration + | StreamingDataSourceConfiguration; + +/** + * This data source will allow developers to define their own composite data source + */ +export interface CustomDataSourceOptions { + dataSourceOptionsType: 'custom'; /** - * The time between polling requests, in seconds. Default if omitted. + * Ordered list of {@link InitializerDataSource} that will run in order. The first + * initializer that successfully returns a valid payload will transition the sdk + * out of intialization stage into the synchronization stage. */ - pollInterval?: number; + initializers: Array; + + /** + * Order list of {@link SynchronizerDataSource} in priority order. Datasources will + * failover to the next datasource in this array until there are no datasources left + * to run. + */ + synchronizers: Array; } export function isStandardOptions(u: any): u is StandardDataSourceOptions { @@ -142,3 +188,7 @@ export function isStreamingOnlyOptions(u: any): u is StreamingDataSourceOptions export function isPollingOnlyOptions(u: any): u is PollingDataSourceOptions { return u.dataSourceOptionsType === 'pollingOnly'; } + +export function isCustomOptions(u: any): u is CustomDataSourceOptions { + return u.dataSourceOptionsType === 'custom'; +} diff --git a/packages/shared/sdk-server/src/data_sources/PollingProcessorFDv2.ts b/packages/shared/sdk-server/src/data_sources/PollingProcessorFDv2.ts index 713326ad5d..3aac11c15d 100644 --- a/packages/shared/sdk-server/src/data_sources/PollingProcessorFDv2.ts +++ b/packages/shared/sdk-server/src/data_sources/PollingProcessorFDv2.ts @@ -34,58 +34,9 @@ function processFDv1FlagsAndSegments( payloadProcessor: internal.PayloadProcessor, data: FlagsAndSegments, ) { - payloadProcessor.processEvents([ - { - event: `server-intent`, - data: { - payloads: [ - { - id: `FDv1Fallback`, - target: 1, - intentCode: `xfer-full`, - }, - ], - }, - }, - ]); - - Object.entries(data?.flags || []).forEach(([key, flag]) => { - payloadProcessor.processEvents([ - { - event: `put-object`, - data: { - kind: 'flag', - key, - version: flag.version, - object: flag, - }, - }, - ]); - }); - - Object.entries(data?.segments || []).forEach(([key, segment]) => { - payloadProcessor.processEvents([ - { - event: `put-object`, - data: { - kind: 'segment', - key, - version: segment.version, - object: segment, - }, - }, - ]); - }); - - payloadProcessor.processEvents([ - { - event: `payload-transferred`, - data: { - state: `FDv1Fallback`, - version: 1, - }, - }, - ]); + const adaptor = internal.FDv1PayloadAdaptor(payloadProcessor); + + adaptor.useSelector('FDv1Fallback').processFullTransfer(data); } /** diff --git a/packages/shared/sdk-server/src/data_sources/fileDataInitilizerFDv2.ts b/packages/shared/sdk-server/src/data_sources/fileDataInitilizerFDv2.ts new file mode 100644 index 0000000000..4e40cac96e --- /dev/null +++ b/packages/shared/sdk-server/src/data_sources/fileDataInitilizerFDv2.ts @@ -0,0 +1,153 @@ +import { + DataSourceErrorKind, + Filesystem, + internal, + LDLogger, + LDPollingError, + Platform, + subsystem as subsystemCommon, +} from '@launchdarkly/js-sdk-common'; + +import { FileSystemDataSourceConfiguration } from '../api'; +import { Flag } from '../evaluation/data/Flag'; +import { Segment } from '../evaluation/data/Segment'; +import { processFlag, processSegment } from '../store/serialization'; +import FileLoader from './FileLoader'; + +/** + * @internal + */ +export default class FileDataInitializerFDv2 implements subsystemCommon.DataSource { + private _paths: Array; + private _logger: LDLogger | undefined; + private _filesystem: Filesystem; + private _yamlParser?: (data: string) => any; + private _fileLoader?: FileLoader; + + constructor(options: FileSystemDataSourceConfiguration, platform: Platform, logger?: LDLogger) { + this._validateInputs(options, platform); + + this._paths = options.paths; + this._logger = logger; + this._filesystem = platform.fileSystem!; + this._yamlParser = options.yamlParser; + } + + private _validateInputs(options: FileSystemDataSourceConfiguration, platform: Platform) { + if (!options.paths || options.paths.length === 0) { + throw new Error('FileDataInitializerFDv2: paths are required'); + } + + if (!platform.fileSystem) { + throw new Error('FileDataInitializerFDv2: file system is required'); + } + } + + start( + dataCallback: (basis: boolean, data: any) => void, + statusCallback: (status: subsystemCommon.DataSourceState, err?: any) => void, + ) { + statusCallback(subsystemCommon.DataSourceState.Initializing); + const initMetadata = internal.initMetadataFromHeaders(undefined); + + const payloadProcessor = new internal.PayloadProcessor( + { + flag: (flag: Flag) => { + processFlag(flag); + return flag; + }, + segment: (segment: Segment) => { + processSegment(segment); + return segment; + }, + }, + (errorKind: DataSourceErrorKind, message: string) => { + statusCallback( + subsystemCommon.DataSourceState.Interrupted, + new LDPollingError(errorKind, message), + ); + }, + this._logger, + ); + + const adaptor = internal.FDv1PayloadAdaptor(payloadProcessor); + + this._fileLoader = new FileLoader( + this._filesystem, + this._paths, + false, // autoupdate is always false for initializer + (results: { path: string; data: string }[]) => { + try { + const parsedData = this._processFileData(results); + + payloadProcessor.addPayloadListener((payload) => { + // NOTE: file data initializer will never have a valid basis, so we always pass false + dataCallback(false, { initMetadata, payload }); + }); + + statusCallback(subsystemCommon.DataSourceState.Valid); + + adaptor.processFullTransfer(parsedData); + + statusCallback(subsystemCommon.DataSourceState.Closed); + } catch (err) { + this._logger?.error('File contained invalid data', err); + statusCallback( + subsystemCommon.DataSourceState.Closed, + new LDPollingError(DataSourceErrorKind.InvalidData, 'Malformed data in file response'), + ); + } + }, + ); + + this._fileLoader.loadAndWatch().catch((err) => { + this._logger?.error('Error loading files', err); + statusCallback( + subsystemCommon.DataSourceState.Closed, + new LDPollingError( + DataSourceErrorKind.NetworkError, + `Failed to load files: ${err instanceof Error ? err.message : String(err)}`, + ), + ); + }); + } + + private _processFileData(results: { path: string; data: string }[]) { + const combined: any = results.reduce( + (acc, curr) => { + let parsed: any; + if (curr.path.endsWith('.yml') || curr.path.endsWith('.yaml')) { + if (this._yamlParser) { + parsed = this._yamlParser(curr.data); + } else { + throw new Error(`Attempted to parse yaml file (${curr.path}) without parser.`); + } + } else { + parsed = JSON.parse(curr.data); + } + return { + segments: { + ...acc.segments, + ...(parsed.segments ?? {}), + }, + flags: { + ...acc.flags, + ...(parsed.flags ?? {}), + }, + }; + }, + { + segments: {}, + flags: {}, + }, + ); + + return combined; + } + + stop() { + if (this._fileLoader) { + this._fileLoader.close(); + } + } +} diff --git a/packages/shared/sdk-server/src/options/Configuration.ts b/packages/shared/sdk-server/src/options/Configuration.ts index 5eb7bbc5a3..aaec4dd3f0 100644 --- a/packages/shared/sdk-server/src/options/Configuration.ts +++ b/packages/shared/sdk-server/src/options/Configuration.ts @@ -16,6 +16,7 @@ import { LDBigSegmentsOptions, LDOptions, LDProxyOptions, LDTLSOptions } from '. import { Hook } from '../api/integrations'; import { DataSourceOptions, + isCustomOptions, isPollingOnlyOptions, isStandardOptions, isStreamingOnlyOptions, @@ -78,7 +79,7 @@ const validations: Record = { }; export const DEFAULT_POLL_INTERVAL = 30; -const DEFAULT_STREAM_RECONNECT_DELAY = 1; +export const DEFAULT_STREAM_RECONNECT_DELAY = 1; const defaultStandardDataSourceOptions: StandardDataSourceOptions = { dataSourceOptionsType: 'standard', @@ -237,6 +238,9 @@ function validateDataSystemOptions(options: Options): { options.dataSource, defaultPollingDataSourceOptions, )); + } else if (isCustomOptions(options.dataSource)) { + validatedDataSourceOptions = options.dataSource; + errors = []; } else { // provided datasource options don't fit any expected form, drop them and use defaults validatedDataSourceOptions = defaultStandardDataSourceOptions;