From 66c80c7d50170cf56ea8e0e67bd5a41db789204b Mon Sep 17 00:00:00 2001 From: Steven Zhang Date: Tue, 18 Nov 2025 16:28:36 -0600 Subject: [PATCH 1/8] feat: add FDv2 file data source initiator this commit will introduce the implementation of FDv2 File initiator --- .../src/internal/fdv2/FDv2ChangeSetBuilder.ts | 96 +++++++++++ .../shared/common/src/internal/fdv2/index.ts | 2 + .../src/internal/fdv2/payloadProcessor.ts | 6 +- .../shared/common/src/internal/fdv2/proto.ts | 26 ++- .../shared/sdk-server/src/LDClientImpl.ts | 11 +- .../src/api/options/LDDataSystemOptions.ts | 25 ++- .../data_sources/fileDataInitilizerFDv2.ts | 162 ++++++++++++++++++ .../sdk-server/src/options/Configuration.ts | 9 + 8 files changed, 325 insertions(+), 12 deletions(-) create mode 100644 packages/shared/common/src/internal/fdv2/FDv2ChangeSetBuilder.ts create mode 100644 packages/shared/sdk-server/src/data_sources/fileDataInitilizerFDv2.ts diff --git a/packages/shared/common/src/internal/fdv2/FDv2ChangeSetBuilder.ts b/packages/shared/common/src/internal/fdv2/FDv2ChangeSetBuilder.ts new file mode 100644 index 0000000000..5df324270f --- /dev/null +++ b/packages/shared/common/src/internal/fdv2/FDv2ChangeSetBuilder.ts @@ -0,0 +1,96 @@ +import { PutObject, DeleteObject, Event } from './proto'; + +// eventually this will be the same as the IntentCode type, but for now we'll use a simpler type +type supportedIntentCodes = 'xfer-full'; + +/** + * FDv2ChangeSetBuilder is a helper for constructing a change set for FDv2. + * The main use case for this builder is to help construct a change set from + * a FDv1 payload. + * + * @experimental + * This type is not stable, and not subject to any backwards + * compatibility guarantees or semantic versioning. It is not suitable for production usage. + */ +export default class FDv2ChangeSetBuilder { + private intent?: supportedIntentCodes; + private events: Event[] = []; + + /** + * Begins a new change set with a given intent. + */ + start(intent: supportedIntentCodes): this { + this.intent = intent; + this.events = []; + + return this; + } + + /** + * Returns the completed changeset. + * NOTE: currently, this builder is not designed to continuously build changesets, rather + * it is designed to construct a single changeset at a time. We can easily expand this by + * resetting some values in the future. + */ + finish(): Array { + if (this.intent === undefined) { + throw new Error('changeset: cannot complete without a server-intent'); + } + + // NOTE: currently the only use case for this builder is to + // construct a change set for a file data intializer which only supports + // FDv1 format. As such, we need to use dummy values to satisfy the FDv2 + // protocol. + const events: Array = [ + { + event: 'server-intent', + data: { + payloads: [{ + id: 'dummy-id', + target: 1, + intentCode: this.intent, + reason: 'payload-missing', + }, + ] + } + }, + ...this.events, + { + event: 'payload-transferred', + data: { + // IMPORTANT: the selector MUST be empty or "live" data synchronizers + // will not work as it would try to resume from a bogus state. + state: '', + version: 1, + id: 'dummy-id', + } + }, + ]; + + return events; + } + + /** + * Adds a new object to the changeset. + */ + putObject(obj: PutObject): this { + this.events.push({ + event: 'put-object', + data: obj, + }); + + return this + } + + /** + * Adds a deletion to the changeset. + */ + deleteObject(obj: DeleteObject): this { + this.events.push({ + event: 'delete-object', + data: obj + }); + + return this + } +} diff --git a/packages/shared/common/src/internal/fdv2/index.ts b/packages/shared/common/src/internal/fdv2/index.ts index b07537ddd9..c7e97bd6b2 100644 --- a/packages/shared/common/src/internal/fdv2/index.ts +++ b/packages/shared/common/src/internal/fdv2/index.ts @@ -6,6 +6,7 @@ import { Update, } from './payloadProcessor'; import { PayloadStreamReader } from './payloadStreamReader'; +import FDv2ChangeSetBuilder from './FDv2ChangeSetBuilder'; export { FDv2EventsCollection, @@ -14,4 +15,5 @@ export { PayloadProcessor, PayloadStreamReader, Update, + FDv2ChangeSetBuilder, }; diff --git a/packages/shared/common/src/internal/fdv2/payloadProcessor.ts b/packages/shared/common/src/internal/fdv2/payloadProcessor.ts index 1cd48cac7e..e8b654693e 100644 --- a/packages/shared/common/src/internal/fdv2/payloadProcessor.ts +++ b/packages/shared/common/src/internal/fdv2/payloadProcessor.ts @@ -221,8 +221,10 @@ export class PayloadProcessor { private _processPayloadTransferred = (data: PayloadTransferred) => { // if the following properties haven't been provided by now, we should reset if ( - !this._tempId || // server intent hasn't been received yet. - !data.state || + // server intent hasn't been received yet. + !this._tempId || + // selector can be an empty string if we are using a file data initilizer + (data.state === null || data.state === undefined) || !data.version ) { this._resetAll(); // a reset is best defensive action since payload transferred terminates a payload diff --git a/packages/shared/common/src/internal/fdv2/proto.ts b/packages/shared/common/src/internal/fdv2/proto.ts index cb3c04bf08..493c91f97b 100644 --- a/packages/shared/common/src/internal/fdv2/proto.ts +++ b/packages/shared/common/src/internal/fdv2/proto.ts @@ -1,14 +1,16 @@ +export type EventType = 'server-intent' | 'put-object' | 'delete-object' | 'payload-transferred' | 'goodbye' | 'error'| 'heart-beat'; +export type IntentCode = 'xfer-full' | 'xfer-changes' | 'none'; +export type ObjectKind = 'flag' | 'segment'; + export interface Event { - event: string; - data: any; + event: EventType; + data: ServerIntentData | PutObject | DeleteObject | PayloadTransferred | GoodbyeObject | ErrorObject; } export interface ServerIntentData { payloads: PayloadIntent[]; } -export type IntentCode = 'xfer-full' | 'xfer-changes' | 'none'; - export interface PayloadIntent { id: string; target: number; @@ -17,19 +19,31 @@ export interface PayloadIntent { } export interface PutObject { - kind: string; + kind: ObjectKind; key: string; version: number; object: any; } export interface DeleteObject { - kind: string; + kind: ObjectKind; key: string; version: number; } +export interface GoodbyeObject { + reason: string; + silent: boolean; + catastrophe: boolean; +} + +export interface ErrorObject { + payload_id: string; + reason: string; +} + export interface PayloadTransferred { state: string; version: number; + id?: string; } diff --git a/packages/shared/sdk-server/src/LDClientImpl.ts b/packages/shared/sdk-server/src/LDClientImpl.ts index a4f324ac99..2c7bd61e57 100644 --- a/packages/shared/sdk-server/src/LDClientImpl.ts +++ b/packages/shared/sdk-server/src/LDClientImpl.ts @@ -43,6 +43,7 @@ import BigSegmentStoreStatusProvider from './BigSegmentStatusProviderImpl'; import { createPluginEnvironmentMetadata } from './createPluginEnvironmentMetadata'; import { createPayloadListener } from './data_sources/createPayloadListenerFDv2'; import { createStreamListeners } from './data_sources/createStreamListeners'; +import FileDataInitializerFDv2 from './data_sources/fileDataInitilizerFDv2'; import DataSourceUpdates from './data_sources/DataSourceUpdates'; import OneShotInitializerFDv2 from './data_sources/OneShotInitializerFDv2'; import PollingProcessor from './data_sources/PollingProcessor'; @@ -325,7 +326,7 @@ function constructFDv2( const initializers: subsystem.LDDataSourceFactory[] = []; // use one shot initializer for performance and cost if we can do a combination of polling and streaming - if (isStandardOptions(dataSystem.dataSource)) { + if (dataSystem.dataSource?.initializerOptions?.type === 'polling') { initializers.push( () => new OneShotInitializerFDv2( @@ -333,6 +334,14 @@ function constructFDv2( config.logger, ), ); + } else if (dataSystem.dataSource?.initializerOptions?.type === 'file') { + initializers.push( + () => + new FileDataInitializerFDv2( + config, + platform, + ), + ); } const synchronizers: subsystem.LDDataSourceFactory[] = []; diff --git a/packages/shared/sdk-server/src/api/options/LDDataSystemOptions.ts b/packages/shared/sdk-server/src/api/options/LDDataSystemOptions.ts index cdb80f8ea5..1cd026d2c3 100644 --- a/packages/shared/sdk-server/src/api/options/LDDataSystemOptions.ts +++ b/packages/shared/sdk-server/src/api/options/LDDataSystemOptions.ts @@ -75,6 +75,25 @@ export type DataSourceOptions = | StreamingDataSourceOptions | PollingDataSourceOptions; +/** + * Initializer option to read data from a file. + * + * NOTE: right now we only support data sources that are in FDv1 format. + */ +export interface FileDataInitializerOptions { + type: 'file'; + paths: Array; + yamlParser?: (data: string) => any; +}; + +/** + * Initializer option to initilize the SDK from doing a one time full payload transfer. + * This will be the default initializer used by the standard data source type. + */ +export interface PollingDataInitializerOptions { + type: 'polling'; +}; + /** * This standard data source is the recommended datasource for most customers. It will use * a combination of streaming and polling to initialize the SDK, provide real time updates, @@ -82,7 +101,7 @@ export type DataSourceOptions = */ export interface StandardDataSourceOptions { dataSourceOptionsType: 'standard'; - + initializerOptions?: FileDataInitializerOptions | PollingDataInitializerOptions; /** * Sets the initial reconnect delay for the streaming connection, in seconds. Default if omitted. * @@ -106,7 +125,7 @@ export interface StandardDataSourceOptions { */ export interface StreamingDataSourceOptions { dataSourceOptionsType: 'streamingOnly'; - + initializerOptions?: FileDataInitializerOptions | PollingDataInitializerOptions; /** * Sets the initial reconnect delay for the streaming connection, in seconds. Default if omitted. * @@ -124,7 +143,7 @@ export interface StreamingDataSourceOptions { */ export interface PollingDataSourceOptions { dataSourceOptionsType: 'pollingOnly'; - + initializerOptions?: FileDataInitializerOptions | PollingDataInitializerOptions; /** * The time between polling requests, in seconds. Default if omitted. */ diff --git a/packages/shared/sdk-server/src/data_sources/fileDataInitilizerFDv2.ts b/packages/shared/sdk-server/src/data_sources/fileDataInitilizerFDv2.ts new file mode 100644 index 0000000000..91b14a7054 --- /dev/null +++ b/packages/shared/sdk-server/src/data_sources/fileDataInitilizerFDv2.ts @@ -0,0 +1,162 @@ +import { + DataSourceErrorKind, + Filesystem, + internal, + LDLogger, + LDPollingError, + Platform, + subsystem as subsystemCommon, +} from '@launchdarkly/js-sdk-common'; + +import { Flag } from '../evaluation/data/Flag'; +import { Segment } from '../evaluation/data/Segment'; +import { processFlag, processSegment } from '../store/serialization'; +import Configuration from '../options/Configuration'; +import { FileDataInitializerOptions } from '../api'; +import FileLoader from './FileLoader'; + +/** + * @internal + */ +export default class FileDataInitializerFDv2 implements subsystemCommon.DataSource { + private _paths: Array; + private _logger: LDLogger | undefined; + private _filesystem: Filesystem; + private _yamlParser?: (data: string) => any; + private _fileLoader?: FileLoader; + + // TODO: do a options check here + constructor( + config: Configuration, + platform: Platform + ) { + const options = config.dataSystem?.dataSource?.initializerOptions as FileDataInitializerOptions + this._validateInputs(options, platform); + + this._paths = options.paths; + this._logger = config.logger; + this._filesystem = platform.fileSystem!; + this._yamlParser = options.yamlParser; + } + + private _validateInputs(options: FileDataInitializerOptions, platform: Platform) { + if (!options.paths || options.paths.length === 0) { + throw new Error('FileDataInitializerFDv2: paths are required'); + } + + if (!platform.fileSystem) { + throw new Error('FileDataInitializerFDv2: file system is required'); + } + } + + start( + dataCallback: (basis: boolean, data: any) => void, + statusCallback: (status: subsystemCommon.DataSourceState, err?: any) => void, + ) { + statusCallback(subsystemCommon.DataSourceState.Initializing); + const initMetadata = internal.initMetadataFromHeaders(undefined); + + const payloadProcessor = new internal.PayloadProcessor({ + flag: (flag: Flag) => { + processFlag(flag); + return flag; + }, + segment: (segment: Segment) => { + processSegment(segment); + return segment; + }, + }, + (errorKind: DataSourceErrorKind, message: string) => { + statusCallback( + subsystemCommon.DataSourceState.Interrupted, + new LDPollingError(errorKind, message), + ); + }, + this._logger, + ); + + this._fileLoader = new FileLoader( + this._filesystem, + this._paths, + false, // autoupdate is always false for initializer + (results: { path: string; data: string }[]) => { + // Whenever changes are detected we re-process all of the data. + // The FileLoader will have handled debouncing for us. + try { + const parsedData = this._processFileData(results); + + statusCallback(subsystemCommon.DataSourceState.Valid); + + payloadProcessor.addPayloadListener((payload) => { + dataCallback(payload.basis, { initMetadata, payload }); + }); + + payloadProcessor.processEvents(parsedData.events); + + statusCallback(subsystemCommon.DataSourceState.Closed); + } catch (err) { + this._logger?.error('File contained invalid data', err); + statusCallback( + subsystemCommon.DataSourceState.Closed, + new LDPollingError(DataSourceErrorKind.InvalidData, 'Malformed data in file response'), + ); + } + }, + ) + + this._fileLoader.loadAndWatch() + } + + _processFileData(results: { path: string; data: string }[]) { + const combined: any = results.reduce((acc, curr) => { + let parsed: any; + if (curr.path.endsWith('.yml') || curr.path.endsWith('.yaml')) { + if (this._yamlParser) { + parsed = this._yamlParser(curr.data); + } else { + throw new Error(`Attempted to parse yaml file (${curr.path}) without parser.`); + } + } else { + parsed = JSON.parse(curr.data); + } + return { + segments: { + ...acc.segments, + ...parsed.segments, + }, + flags: { + ...acc.flags, + ...parsed.flags, + } + } + }, { + segments: {}, + flags: {}, + }); + + const changeSetBuilder = new internal.FDv2ChangeSetBuilder() + changeSetBuilder.start('xfer-full') + + Object.keys(combined).forEach((kind : string) => { + Object.entries(combined[kind]).forEach(([k, v]) => { + changeSetBuilder.putObject({ + // strong assumption here that we only have segments and flags. + kind: kind === 'segments' ? 'segment' : 'flag', + key: k, + version: v.version || 1, + object: v + }) + }) + }); + + return { + events: changeSetBuilder.finish() + } + } + + stop() { + if (this._fileLoader) { + this._fileLoader.close(); + } + } +} diff --git a/packages/shared/sdk-server/src/options/Configuration.ts b/packages/shared/sdk-server/src/options/Configuration.ts index 5eb7bbc5a3..1a74f6e787 100644 --- a/packages/shared/sdk-server/src/options/Configuration.ts +++ b/packages/shared/sdk-server/src/options/Configuration.ts @@ -82,6 +82,9 @@ const DEFAULT_STREAM_RECONNECT_DELAY = 1; const defaultStandardDataSourceOptions: StandardDataSourceOptions = { dataSourceOptionsType: 'standard', + initializerOptions: { + type: 'polling', + }, streamInitialReconnectDelay: DEFAULT_STREAM_RECONNECT_DELAY, pollInterval: DEFAULT_POLL_INTERVAL, }; @@ -248,6 +251,12 @@ function validateDataSystemOptions(options: Options): { ), ]; } + // Preserve initializer options if it was provided, since it's not validated by validateTypesAndNames. + // Currently, setting this option is most commonly used as an override of default initializer options. + if (options.dataSource && 'initializerOptions' in options.dataSource) { + validatedDataSourceOptions.initializerOptions = options.dataSource.initializerOptions; + } + validatedOptions.dataSource = validatedDataSourceOptions; allErrors.push(...errors); } else { From 678e42feb4c5d36230f817d898de687d32ffba35 Mon Sep 17 00:00:00 2001 From: Steven Zhang Date: Tue, 18 Nov 2025 16:40:12 -0600 Subject: [PATCH 2/8] style: [sdk-1577] fixing lint issues --- .../src/internal/fdv2/FDv2ChangeSetBuilder.ts | 35 +++---- .../shared/common/src/internal/fdv2/index.ts | 2 +- .../src/internal/fdv2/payloadProcessor.ts | 3 +- .../shared/common/src/internal/fdv2/proto.ts | 17 +++- .../shared/sdk-server/src/LDClientImpl.ts | 10 +- .../src/api/options/LDDataSystemOptions.ts | 4 +- .../data_sources/fileDataInitilizerFDv2.ts | 95 ++++++++++--------- 7 files changed, 88 insertions(+), 78 deletions(-) diff --git a/packages/shared/common/src/internal/fdv2/FDv2ChangeSetBuilder.ts b/packages/shared/common/src/internal/fdv2/FDv2ChangeSetBuilder.ts index 5df324270f..f7b4a3ced2 100644 --- a/packages/shared/common/src/internal/fdv2/FDv2ChangeSetBuilder.ts +++ b/packages/shared/common/src/internal/fdv2/FDv2ChangeSetBuilder.ts @@ -1,4 +1,4 @@ -import { PutObject, DeleteObject, Event } from './proto'; +import { DeleteObject, Event, PutObject } from './proto'; // eventually this will be the same as the IntentCode type, but for now we'll use a simpler type type supportedIntentCodes = 'xfer-full'; @@ -13,15 +13,15 @@ type supportedIntentCodes = 'xfer-full'; * compatibility guarantees or semantic versioning. It is not suitable for production usage. */ export default class FDv2ChangeSetBuilder { - private intent?: supportedIntentCodes; - private events: Event[] = []; + private _intent?: supportedIntentCodes; + private _events: Event[] = []; /** * Begins a new change set with a given intent. */ start(intent: supportedIntentCodes): this { - this.intent = intent; - this.events = []; + this._intent = intent; + this._events = []; return this; } @@ -33,7 +33,7 @@ export default class FDv2ChangeSetBuilder { * resetting some values in the future. */ finish(): Array { - if (this.intent === undefined) { + if (this._intent === undefined) { throw new Error('changeset: cannot complete without a server-intent'); } @@ -45,16 +45,17 @@ export default class FDv2ChangeSetBuilder { { event: 'server-intent', data: { - payloads: [{ + payloads: [ + { id: 'dummy-id', target: 1, - intentCode: this.intent, + intentCode: this._intent!, reason: 'payload-missing', }, - ] - } + ], + }, }, - ...this.events, + ...this._events, { event: 'payload-transferred', data: { @@ -63,7 +64,7 @@ export default class FDv2ChangeSetBuilder { state: '', version: 1, id: 'dummy-id', - } + }, }, ]; @@ -74,23 +75,23 @@ export default class FDv2ChangeSetBuilder { * Adds a new object to the changeset. */ putObject(obj: PutObject): this { - this.events.push({ + this._events.push({ event: 'put-object', data: obj, }); - return this + return this; } /** * Adds a deletion to the changeset. */ deleteObject(obj: DeleteObject): this { - this.events.push({ + this._events.push({ event: 'delete-object', - data: obj + data: obj, }); - return this + return this; } } diff --git a/packages/shared/common/src/internal/fdv2/index.ts b/packages/shared/common/src/internal/fdv2/index.ts index c7e97bd6b2..113602ea5e 100644 --- a/packages/shared/common/src/internal/fdv2/index.ts +++ b/packages/shared/common/src/internal/fdv2/index.ts @@ -1,3 +1,4 @@ +import FDv2ChangeSetBuilder from './FDv2ChangeSetBuilder'; import { FDv2EventsCollection, Payload, @@ -6,7 +7,6 @@ import { Update, } from './payloadProcessor'; import { PayloadStreamReader } from './payloadStreamReader'; -import FDv2ChangeSetBuilder from './FDv2ChangeSetBuilder'; export { FDv2EventsCollection, diff --git a/packages/shared/common/src/internal/fdv2/payloadProcessor.ts b/packages/shared/common/src/internal/fdv2/payloadProcessor.ts index e8b654693e..4beee20bb6 100644 --- a/packages/shared/common/src/internal/fdv2/payloadProcessor.ts +++ b/packages/shared/common/src/internal/fdv2/payloadProcessor.ts @@ -224,7 +224,8 @@ export class PayloadProcessor { // server intent hasn't been received yet. !this._tempId || // selector can be an empty string if we are using a file data initilizer - (data.state === null || data.state === undefined) || + data.state === null || + data.state === undefined || !data.version ) { this._resetAll(); // a reset is best defensive action since payload transferred terminates a payload diff --git a/packages/shared/common/src/internal/fdv2/proto.ts b/packages/shared/common/src/internal/fdv2/proto.ts index 493c91f97b..3062c7fcdd 100644 --- a/packages/shared/common/src/internal/fdv2/proto.ts +++ b/packages/shared/common/src/internal/fdv2/proto.ts @@ -1,10 +1,23 @@ -export type EventType = 'server-intent' | 'put-object' | 'delete-object' | 'payload-transferred' | 'goodbye' | 'error'| 'heart-beat'; +export type EventType = + | 'server-intent' + | 'put-object' + | 'delete-object' + | 'payload-transferred' + | 'goodbye' + | 'error' + | 'heart-beat'; export type IntentCode = 'xfer-full' | 'xfer-changes' | 'none'; export type ObjectKind = 'flag' | 'segment'; export interface Event { event: EventType; - data: ServerIntentData | PutObject | DeleteObject | PayloadTransferred | GoodbyeObject | ErrorObject; + data: + | ServerIntentData + | PutObject + | DeleteObject + | PayloadTransferred + | GoodbyeObject + | ErrorObject; } export interface ServerIntentData { diff --git a/packages/shared/sdk-server/src/LDClientImpl.ts b/packages/shared/sdk-server/src/LDClientImpl.ts index 2c7bd61e57..966f712d39 100644 --- a/packages/shared/sdk-server/src/LDClientImpl.ts +++ b/packages/shared/sdk-server/src/LDClientImpl.ts @@ -43,8 +43,8 @@ import BigSegmentStoreStatusProvider from './BigSegmentStatusProviderImpl'; import { createPluginEnvironmentMetadata } from './createPluginEnvironmentMetadata'; import { createPayloadListener } from './data_sources/createPayloadListenerFDv2'; import { createStreamListeners } from './data_sources/createStreamListeners'; -import FileDataInitializerFDv2 from './data_sources/fileDataInitilizerFDv2'; import DataSourceUpdates from './data_sources/DataSourceUpdates'; +import FileDataInitializerFDv2 from './data_sources/fileDataInitilizerFDv2'; import OneShotInitializerFDv2 from './data_sources/OneShotInitializerFDv2'; import PollingProcessor from './data_sources/PollingProcessor'; import PollingProcessorFDv2 from './data_sources/PollingProcessorFDv2'; @@ -335,13 +335,7 @@ function constructFDv2( ), ); } else if (dataSystem.dataSource?.initializerOptions?.type === 'file') { - initializers.push( - () => - new FileDataInitializerFDv2( - config, - platform, - ), - ); + initializers.push(() => new FileDataInitializerFDv2(config, platform)); } const synchronizers: subsystem.LDDataSourceFactory[] = []; diff --git a/packages/shared/sdk-server/src/api/options/LDDataSystemOptions.ts b/packages/shared/sdk-server/src/api/options/LDDataSystemOptions.ts index 1cd026d2c3..81903c80da 100644 --- a/packages/shared/sdk-server/src/api/options/LDDataSystemOptions.ts +++ b/packages/shared/sdk-server/src/api/options/LDDataSystemOptions.ts @@ -84,7 +84,7 @@ export interface FileDataInitializerOptions { type: 'file'; paths: Array; yamlParser?: (data: string) => any; -}; +} /** * Initializer option to initilize the SDK from doing a one time full payload transfer. @@ -92,7 +92,7 @@ export interface FileDataInitializerOptions { */ export interface PollingDataInitializerOptions { type: 'polling'; -}; +} /** * This standard data source is the recommended datasource for most customers. It will use diff --git a/packages/shared/sdk-server/src/data_sources/fileDataInitilizerFDv2.ts b/packages/shared/sdk-server/src/data_sources/fileDataInitilizerFDv2.ts index 91b14a7054..1ad7e0bc7a 100644 --- a/packages/shared/sdk-server/src/data_sources/fileDataInitilizerFDv2.ts +++ b/packages/shared/sdk-server/src/data_sources/fileDataInitilizerFDv2.ts @@ -8,11 +8,11 @@ import { subsystem as subsystemCommon, } from '@launchdarkly/js-sdk-common'; +import { FileDataInitializerOptions } from '../api'; import { Flag } from '../evaluation/data/Flag'; import { Segment } from '../evaluation/data/Segment'; -import { processFlag, processSegment } from '../store/serialization'; import Configuration from '../options/Configuration'; -import { FileDataInitializerOptions } from '../api'; +import { processFlag, processSegment } from '../store/serialization'; import FileLoader from './FileLoader'; /** @@ -26,11 +26,8 @@ export default class FileDataInitializerFDv2 implements subsystemCommon.DataSour private _fileLoader?: FileLoader; // TODO: do a options check here - constructor( - config: Configuration, - platform: Platform - ) { - const options = config.dataSystem?.dataSource?.initializerOptions as FileDataInitializerOptions + constructor(config: Configuration, platform: Platform) { + const options = config.dataSystem?.dataSource?.initializerOptions as FileDataInitializerOptions; this._validateInputs(options, platform); this._paths = options.paths; @@ -56,7 +53,8 @@ export default class FileDataInitializerFDv2 implements subsystemCommon.DataSour statusCallback(subsystemCommon.DataSourceState.Initializing); const initMetadata = internal.initMetadataFromHeaders(undefined); - const payloadProcessor = new internal.PayloadProcessor({ + const payloadProcessor = new internal.PayloadProcessor( + { flag: (flag: Flag) => { processFlag(flag); return flag; @@ -102,56 +100,59 @@ export default class FileDataInitializerFDv2 implements subsystemCommon.DataSour ); } }, - ) + ); - this._fileLoader.loadAndWatch() + this._fileLoader.loadAndWatch(); } - _processFileData(results: { path: string; data: string }[]) { - const combined: any = results.reduce((acc, curr) => { - let parsed: any; - if (curr.path.endsWith('.yml') || curr.path.endsWith('.yaml')) { - if (this._yamlParser) { - parsed = this._yamlParser(curr.data); + private _processFileData(results: { path: string; data: string }[]) { + const combined: any = results.reduce( + (acc, curr) => { + let parsed: any; + if (curr.path.endsWith('.yml') || curr.path.endsWith('.yaml')) { + if (this._yamlParser) { + parsed = this._yamlParser(curr.data); + } else { + throw new Error(`Attempted to parse yaml file (${curr.path}) without parser.`); + } } else { - throw new Error(`Attempted to parse yaml file (${curr.path}) without parser.`); - } - } else { - parsed = JSON.parse(curr.data); - } - return { - segments: { - ...acc.segments, - ...parsed.segments, - }, - flags: { - ...acc.flags, - ...parsed.flags, + parsed = JSON.parse(curr.data); } - } - }, { - segments: {}, - flags: {}, - }); + return { + segments: { + ...acc.segments, + ...parsed.segments, + }, + flags: { + ...acc.flags, + ...parsed.flags, + }, + }; + }, + { + segments: {}, + flags: {}, + }, + ); - const changeSetBuilder = new internal.FDv2ChangeSetBuilder() - changeSetBuilder.start('xfer-full') + const changeSetBuilder = new internal.FDv2ChangeSetBuilder(); + changeSetBuilder.start('xfer-full'); - Object.keys(combined).forEach((kind : string) => { + Object.keys(combined).forEach((kind: string) => { Object.entries(combined[kind]).forEach(([k, v]) => { - changeSetBuilder.putObject({ - // strong assumption here that we only have segments and flags. - kind: kind === 'segments' ? 'segment' : 'flag', - key: k, - version: v.version || 1, - object: v - }) - }) + changeSetBuilder.putObject({ + // strong assumption here that we only have segments and flags. + kind: kind === 'segments' ? 'segment' : 'flag', + key: k, + version: v.version || 1, + object: v, + }); }); + }); return { - events: changeSetBuilder.finish() - } + events: changeSetBuilder.finish(), + }; } stop() { From 15d65d7df8e5ce6156b28b846d96b660527309ee Mon Sep 17 00:00:00 2001 From: Steven Zhang Date: Tue, 18 Nov 2025 17:04:32 -0600 Subject: [PATCH 3/8] test: [sdk-1577] add unit tests for file data initializer --- .../fdv2/FDv2ChangeSetBuilder.test.ts | 99 ++++ .../fileDataInitilizerFDv2.test.ts | 450 ++++++++++++++++++ .../data_sources/fileDataInitilizerFDv2.ts | 4 +- 3 files changed, 551 insertions(+), 2 deletions(-) create mode 100644 packages/shared/common/__tests__/internal/fdv2/FDv2ChangeSetBuilder.test.ts create mode 100644 packages/shared/sdk-server/__tests__/data_sources/fileDataInitilizerFDv2.test.ts diff --git a/packages/shared/common/__tests__/internal/fdv2/FDv2ChangeSetBuilder.test.ts b/packages/shared/common/__tests__/internal/fdv2/FDv2ChangeSetBuilder.test.ts new file mode 100644 index 0000000000..c28af646ce --- /dev/null +++ b/packages/shared/common/__tests__/internal/fdv2/FDv2ChangeSetBuilder.test.ts @@ -0,0 +1,99 @@ +import FDv2ChangeSetBuilder from '../../../src/internal/fdv2/FDv2ChangeSetBuilder'; +import { DeleteObject, Event, PutObject } from '../../../src/internal/fdv2/proto'; + +it('throws an error when finishing without starting', () => { + const builder = new FDv2ChangeSetBuilder(); + expect(() => builder.finish()).toThrow('changeset: cannot complete without a server-intent'); +}); + +it('starts a new changeset with the given intent', () => { + const builder = new FDv2ChangeSetBuilder(); + builder.start('xfer-full'); + const result = builder.finish(); + + expect(result).toBeDefined(); + expect(result.length).toBeGreaterThan(0); + expect(result[0].event).toBe('server-intent'); +}); + +it('resets events when starting a new changeset', () => { + const builder = new FDv2ChangeSetBuilder(); + builder.start('xfer-full'); + builder.putObject({ kind: 'flag', key: 'test-flag', version: 1, object: {} }); + builder.start('xfer-full'); + const result = builder.finish(); + + // Should only have server-intent and payload-transferred, no put-object events + const putObjectEvents = result.filter((e) => e.event === 'put-object'); + expect(putObjectEvents.length).toBe(0); +}); + +it('includes server-intent as the first event with correct structure', () => { + const builder = new FDv2ChangeSetBuilder(); + builder.start('xfer-full'); + const result = builder.finish(); + + const serverIntentEvent = result[0] as Event; + expect(serverIntentEvent.event).toBe('server-intent'); + expect(serverIntentEvent.data).toBeDefined(); + + const intentData = serverIntentEvent.data as any; + expect(intentData.payloads).toBeDefined(); + expect(intentData.payloads.length).toBe(1); + expect(intentData.payloads[0].intentCode).toBe('xfer-full'); + expect(intentData.payloads[0].id).toBe('dummy-id'); + expect(intentData.payloads[0].target).toBe(1); + expect(intentData.payloads[0].reason).toBe('payload-missing'); +}); + +it('includes payload-transferred as the last event with empty state', () => { + const builder = new FDv2ChangeSetBuilder(); + builder.start('xfer-full'); + const result = builder.finish(); + + const payloadTransferredEvent = result[result.length - 1] as Event; + expect(payloadTransferredEvent.event).toBe('payload-transferred'); + expect(payloadTransferredEvent.data).toBeDefined(); + + const transferredData = payloadTransferredEvent.data as any; + expect(transferredData.state).toBe(''); + expect(transferredData.version).toBe(1); + expect(transferredData.id).toBe('dummy-id'); +}); + +it('includes all put and delete events between server-intent and payload-transferred', () => { + const builder = new FDv2ChangeSetBuilder(); + const putObj1: PutObject = { + kind: 'flag', + key: 'flag-1', + version: 1, + object: { key: 'flag-1', on: true }, + }; + const deleteObj: DeleteObject = { + kind: 'segment', + key: 'segment-1', + version: 2, + }; + const putObj2: PutObject = { + kind: 'flag', + key: 'flag-2', + version: 3, + object: { key: 'flag-2', on: false }, + }; + + builder.start('xfer-full'); + builder.putObject(putObj1); + builder.deleteObject(deleteObj); + builder.putObject(putObj2); + const result = builder.finish(); + + expect(result.length).toBe(5); // server-intent + 3 events + payload-transferred + expect(result[0].event).toBe('server-intent'); + expect(result[1].event).toBe('put-object'); + expect((result[1].data as PutObject).key).toBe('flag-1'); + expect(result[2].event).toBe('delete-object'); + expect((result[2].data as DeleteObject).key).toBe('segment-1'); + expect(result[3].event).toBe('put-object'); + expect((result[3].data as PutObject).key).toBe('flag-2'); + expect(result[4].event).toBe('payload-transferred'); +}); diff --git a/packages/shared/sdk-server/__tests__/data_sources/fileDataInitilizerFDv2.test.ts b/packages/shared/sdk-server/__tests__/data_sources/fileDataInitilizerFDv2.test.ts new file mode 100644 index 0000000000..bfa0955941 --- /dev/null +++ b/packages/shared/sdk-server/__tests__/data_sources/fileDataInitilizerFDv2.test.ts @@ -0,0 +1,450 @@ +import { + DataSourceErrorKind, + Filesystem, + LDPollingError, + Platform, + subsystem, + WatchHandle, +} from '@launchdarkly/js-sdk-common'; + +import FileDataInitializerFDv2 from '../../src/data_sources/fileDataInitilizerFDv2'; +import Configuration from '../../src/options/Configuration'; +import { createBasicPlatform } from '../createBasicPlatform'; +import TestLogger, { LogLevel } from '../Logger'; + +class MockFilesystem implements Filesystem { + public fileData: Record< + string, + { + timestamp: number; + data: string; + } + > = {}; + + async getFileTimestamp(path: string): Promise { + return this.fileData[path]?.timestamp ?? 0; + } + + async readFile(path: string): Promise { + if (!this.fileData[path]) { + throw new Error('FILE NOT FOUND'); + } + return this.fileData[path]?.data; + } + + watch(_path: string, _callback: (eventType: string, filename: string) => void): WatchHandle { + return { + close: jest.fn(), + }; + } +} + +const flag1 = { + key: 'flag1', + on: true, + fallthrough: { variation: 0 }, + variations: ['value1'], + version: 1, +}; + +const segment1 = { + key: 'segment1', + include: ['user1'], + version: 1, +}; + +const flagOnlyJson = JSON.stringify({ + flags: { + flag1, + }, +}); + +const segmentOnlyJson = JSON.stringify({ + segments: { + segment1, + }, +}); + +const allPropertiesJson = JSON.stringify({ + flags: { + flag1, + }, + segments: { + segment1, + }, +}); + +describe('FileDataInitializerFDv2', () => { + let mockFilesystem: MockFilesystem; + let logger: TestLogger; + let platform: Platform; + let config: Configuration; + let mockDataCallback: jest.Mock; + let mockStatusCallback: jest.Mock; + + beforeEach(() => { + jest.useFakeTimers(); + mockFilesystem = new MockFilesystem(); + logger = new TestLogger(); + platform = { + ...createBasicPlatform(), + fileSystem: mockFilesystem as unknown as Filesystem, + }; + mockDataCallback = jest.fn(); + mockStatusCallback = jest.fn(); + }); + + afterEach(() => { + jest.resetAllMocks(); + jest.useRealTimers(); + }); + + it('throws error when paths are not provided', () => { + config = new Configuration({ + dataSystem: { + dataSource: { + dataSourceOptionsType: 'standard', + initializerOptions: { + type: 'file', + paths: [], + }, + }, + }, + logger, + }); + + expect(() => { + /* eslint-disable-next-line no-new */ + new FileDataInitializerFDv2(config, platform); + }).toThrow('FileDataInitializerFDv2: paths are required'); + }); + + it('throws error when file system is not available', () => { + const platformWithoutFileSystem = { + ...createBasicPlatform(), + fileSystem: undefined, + }; + + config = new Configuration({ + dataSystem: { + dataSource: { + dataSourceOptionsType: 'standard', + initializerOptions: { + type: 'file', + paths: ['test.json'], + }, + }, + }, + logger, + }); + + expect(() => { + /* eslint-disable-next-line no-new */ + new FileDataInitializerFDv2(config, platformWithoutFileSystem); + }).toThrow('FileDataInitializerFDv2: file system is required'); + }); + + it('loads and processes JSON file with flags and segments', async () => { + mockFilesystem.fileData['test.json'] = { timestamp: 0, data: allPropertiesJson }; + + config = new Configuration({ + dataSystem: { + dataSource: { + dataSourceOptionsType: 'standard', + initializerOptions: { + type: 'file', + paths: ['test.json'], + }, + }, + }, + logger, + }); + + const initializer = new FileDataInitializerFDv2(config, platform); + initializer.start(mockDataCallback, mockStatusCallback); + + await jest.runAllTimersAsync(); + + expect(mockStatusCallback).toHaveBeenCalledWith(subsystem.DataSourceState.Valid); + expect(mockDataCallback).toHaveBeenCalled(); + }); + + it('loads and processes multiple JSON files', async () => { + mockFilesystem.fileData['flags.json'] = { timestamp: 0, data: flagOnlyJson }; + mockFilesystem.fileData['segments.json'] = { timestamp: 0, data: segmentOnlyJson }; + + config = new Configuration({ + dataSystem: { + dataSource: { + dataSourceOptionsType: 'standard', + initializerOptions: { + type: 'file', + paths: ['flags.json', 'segments.json'], + }, + }, + }, + logger, + }); + + const initializer = new FileDataInitializerFDv2(config, platform); + initializer.start(mockDataCallback, mockStatusCallback); + + await jest.runAllTimersAsync(); + + expect(mockStatusCallback).toHaveBeenCalledWith(subsystem.DataSourceState.Valid); + expect(mockDataCallback).toHaveBeenCalled(); + }); + + it('loads and processes YAML file when parser is provided', async () => { + const yamlData = 'flags:\n flag1:\n key: flag1\n on: true\n version: 1'; + const mockYamlParser = jest.fn((data: string) => { + if (data === yamlData) { + return { + flags: { + flag1, + }, + }; + } + return {}; + }); + + mockFilesystem.fileData['test.yaml'] = { timestamp: 0, data: yamlData }; + + config = new Configuration({ + dataSystem: { + dataSource: { + dataSourceOptionsType: 'standard', + initializerOptions: { + type: 'file', + paths: ['test.yaml'], + yamlParser: mockYamlParser, + }, + }, + }, + logger, + }); + + const initializer = new FileDataInitializerFDv2(config, platform); + initializer.start(mockDataCallback, mockStatusCallback); + + await jest.runAllTimersAsync(); + + expect(mockYamlParser).toHaveBeenCalledWith(yamlData); + expect(mockStatusCallback).toHaveBeenCalledWith(subsystem.DataSourceState.Valid); + expect(mockDataCallback).toHaveBeenCalled(); + }); + + it('throws error when YAML file is provided without parser', async () => { + const yamlData = 'flags:\n flag1:\n key: flag1'; + mockFilesystem.fileData['test.yaml'] = { timestamp: 0, data: yamlData }; + + config = new Configuration({ + dataSystem: { + dataSource: { + dataSourceOptionsType: 'standard', + initializerOptions: { + type: 'file', + paths: ['test.yaml'], + }, + }, + }, + logger, + }); + + const initializer = new FileDataInitializerFDv2(config, platform); + initializer.start(mockDataCallback, mockStatusCallback); + + await jest.runAllTimersAsync(); + + expect(mockStatusCallback).toHaveBeenCalledWith( + subsystem.DataSourceState.Closed, + expect.any(LDPollingError), + ); + expect(logger.getCount(LogLevel.Error)).toBeGreaterThan(0); + }); + + it('handles invalid JSON gracefully', async () => { + mockFilesystem.fileData['test.json'] = { timestamp: 0, data: 'invalid json {{{{' }; + + config = new Configuration({ + dataSystem: { + dataSource: { + dataSourceOptionsType: 'standard', + initializerOptions: { + type: 'file', + paths: ['test.json'], + }, + }, + }, + logger, + }); + + const initializer = new FileDataInitializerFDv2(config, platform); + initializer.start(mockDataCallback, mockStatusCallback); + + await jest.runAllTimersAsync(); + + expect(mockStatusCallback).toHaveBeenCalledWith( + subsystem.DataSourceState.Closed, + expect.any(LDPollingError), + ); + const errorCall = mockStatusCallback.mock.calls.find( + (call) => call[0] === subsystem.DataSourceState.Closed && call[1] instanceof LDPollingError, + ); + expect(errorCall).toBeDefined(); + if (errorCall && errorCall[1] instanceof LDPollingError) { + expect(errorCall[1].kind).toBe(DataSourceErrorKind.InvalidData); + } + expect(logger.getCount(LogLevel.Error)).toBeGreaterThan(0); + }); + + it('combines data from multiple files correctly', async () => { + const flag2 = { + key: 'flag2', + on: false, + fallthrough: { variation: 0 }, + variations: ['value2'], + version: 2, + }; + const segment2 = { + key: 'segment2', + include: ['user2'], + version: 2, + }; + + const flagFile = JSON.stringify({ + flags: { + flag1, + flag2, + }, + }); + const segmentFile = JSON.stringify({ + segments: { + segment1, + segment2, + }, + }); + + mockFilesystem.fileData['flags.json'] = { timestamp: 0, data: flagFile }; + mockFilesystem.fileData['segments.json'] = { timestamp: 0, data: segmentFile }; + + config = new Configuration({ + dataSystem: { + dataSource: { + dataSourceOptionsType: 'standard', + initializerOptions: { + type: 'file', + paths: ['flags.json', 'segments.json'], + }, + }, + }, + logger, + }); + + const initializer = new FileDataInitializerFDv2(config, platform); + initializer.start(mockDataCallback, mockStatusCallback); + + await jest.runAllTimersAsync(); + + expect(mockStatusCallback).toHaveBeenCalledWith(subsystem.DataSourceState.Valid); + expect(mockDataCallback).toHaveBeenCalled(); + + // Verify the combined data structure + const dataCall = mockDataCallback.mock.calls[0]; + expect(dataCall[0]).toBe(true); // basis + expect(dataCall[1]).toHaveProperty('initMetadata'); + expect(dataCall[1]).toHaveProperty('payload'); + + const { payload } = dataCall[1]; + expect(payload).toHaveProperty('updates'); + expect(Array.isArray(payload.updates)).toBe(true); + + // Verify all flags are present + const flagUpdates = payload.updates.filter((update: any) => update.kind === 'flag'); + expect(flagUpdates.length).toBe(2); + + const flag1Update = flagUpdates.find((update: any) => update.key === 'flag1'); + expect(flag1Update).toBeDefined(); + expect(flag1Update.version).toBe(1); + expect(flag1Update.object).toEqual(flag1); + + const flag2Update = flagUpdates.find((update: any) => update.key === 'flag2'); + expect(flag2Update).toBeDefined(); + expect(flag2Update.version).toBe(2); + expect(flag2Update.object).toEqual(flag2); + + // Verify all segments are present + const segmentUpdates = payload.updates.filter((update: any) => update.kind === 'segment'); + expect(segmentUpdates.length).toBe(2); + + const segment1Update = segmentUpdates.find((update: any) => update.key === 'segment1'); + expect(segment1Update).toBeDefined(); + expect(segment1Update.version).toBe(1); + expect(segment1Update.object).toEqual(segment1); + + const segment2Update = segmentUpdates.find((update: any) => update.key === 'segment2'); + expect(segment2Update).toBeDefined(); + expect(segment2Update.version).toBe(2); + expect(segment2Update.object).toEqual(segment2); + }); + + it('overwrites data when the same key appears in multiple files', async () => { + // First file has flag1 with version 1 + const file1 = JSON.stringify({ + flags: { + flag1: { + ...flag1, + version: 1, + }, + }, + }); + + // Second file has flag1 with version 2 (should overwrite) + const file2 = JSON.stringify({ + flags: { + flag1: { + ...flag1, + version: 2, + on: false, // Different value to verify it's overwritten + }, + }, + }); + + mockFilesystem.fileData['file1.json'] = { timestamp: 0, data: file1 }; + mockFilesystem.fileData['file2.json'] = { timestamp: 0, data: file2 }; + + config = new Configuration({ + dataSystem: { + dataSource: { + dataSourceOptionsType: 'standard', + initializerOptions: { + type: 'file', + paths: ['file1.json', 'file2.json'], + }, + }, + }, + logger, + }); + + const initializer = new FileDataInitializerFDv2(config, platform); + initializer.start(mockDataCallback, mockStatusCallback); + + await jest.runAllTimersAsync(); + + expect(mockStatusCallback).toHaveBeenCalledWith(subsystem.DataSourceState.Valid); + expect(mockDataCallback).toHaveBeenCalled(); + + // Verify that flag1 from file2 (version 2, on: false) overwrote file1 + const dataCall = mockDataCallback.mock.calls[0]; + const { payload } = dataCall[1]; + const flagUpdates = payload.updates.filter((update: any) => update.kind === 'flag'); + + // Should only have one flag1 (not duplicated) + const flag1Updates = flagUpdates.filter((update: any) => update.key === 'flag1'); + expect(flag1Updates.length).toBe(1); + + const flag1Update = flag1Updates[0]; + expect(flag1Update.version).toBe(2); + expect(flag1Update.object.on).toBe(false); // Should be from file2 + }); +}); diff --git a/packages/shared/sdk-server/src/data_sources/fileDataInitilizerFDv2.ts b/packages/shared/sdk-server/src/data_sources/fileDataInitilizerFDv2.ts index 1ad7e0bc7a..c477eff4a3 100644 --- a/packages/shared/sdk-server/src/data_sources/fileDataInitilizerFDv2.ts +++ b/packages/shared/sdk-server/src/data_sources/fileDataInitilizerFDv2.ts @@ -121,11 +121,11 @@ export default class FileDataInitializerFDv2 implements subsystemCommon.DataSour return { segments: { ...acc.segments, - ...parsed.segments, + ...(parsed.segments ?? {}), }, flags: { ...acc.flags, - ...parsed.flags, + ...(parsed.flags ?? {}), }, }; }, From 10363a9cf9c567a0de15c8939d0154f06ccfe7b6 Mon Sep 17 00:00:00 2001 From: Steven Zhang Date: Wed, 19 Nov 2025 14:50:13 -0600 Subject: [PATCH 4/8] chore: addressing PR feedback --- .../src/data_sources/fileDataInitilizerFDv2.ts | 18 ++++++++++++------ .../sdk-server/src/options/Configuration.ts | 7 ++++++- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/packages/shared/sdk-server/src/data_sources/fileDataInitilizerFDv2.ts b/packages/shared/sdk-server/src/data_sources/fileDataInitilizerFDv2.ts index c477eff4a3..920440f77f 100644 --- a/packages/shared/sdk-server/src/data_sources/fileDataInitilizerFDv2.ts +++ b/packages/shared/sdk-server/src/data_sources/fileDataInitilizerFDv2.ts @@ -25,7 +25,6 @@ export default class FileDataInitializerFDv2 implements subsystemCommon.DataSour private _yamlParser?: (data: string) => any; private _fileLoader?: FileLoader; - // TODO: do a options check here constructor(config: Configuration, platform: Platform) { const options = config.dataSystem?.dataSource?.initializerOptions as FileDataInitializerOptions; this._validateInputs(options, platform); @@ -78,17 +77,15 @@ export default class FileDataInitializerFDv2 implements subsystemCommon.DataSour this._paths, false, // autoupdate is always false for initializer (results: { path: string; data: string }[]) => { - // Whenever changes are detected we re-process all of the data. - // The FileLoader will have handled debouncing for us. try { const parsedData = this._processFileData(results); - statusCallback(subsystemCommon.DataSourceState.Valid); - payloadProcessor.addPayloadListener((payload) => { dataCallback(payload.basis, { initMetadata, payload }); }); + statusCallback(subsystemCommon.DataSourceState.Valid); + payloadProcessor.processEvents(parsedData.events); statusCallback(subsystemCommon.DataSourceState.Closed); @@ -102,7 +99,16 @@ export default class FileDataInitializerFDv2 implements subsystemCommon.DataSour }, ); - this._fileLoader.loadAndWatch(); + this._fileLoader.loadAndWatch().catch((err) => { + this._logger?.error('Error loading files', err); + statusCallback( + subsystemCommon.DataSourceState.Closed, + new LDPollingError( + DataSourceErrorKind.NetworkError, + `Failed to load files: ${err instanceof Error ? err.message : String(err)}`, + ), + ); + }); } private _processFileData(results: { path: string; data: string }[]) { diff --git a/packages/shared/sdk-server/src/options/Configuration.ts b/packages/shared/sdk-server/src/options/Configuration.ts index 1a74f6e787..1fa4329893 100644 --- a/packages/shared/sdk-server/src/options/Configuration.ts +++ b/packages/shared/sdk-server/src/options/Configuration.ts @@ -253,7 +253,12 @@ function validateDataSystemOptions(options: Options): { } // Preserve initializer options if it was provided, since it's not validated by validateTypesAndNames. // Currently, setting this option is most commonly used as an override of default initializer options. - if (options.dataSource && 'initializerOptions' in options.dataSource) { + // Check that the value is not undefined to avoid overwriting defaults when explicitly set to undefined. + if ( + options.dataSource && + 'initializerOptions' in options.dataSource && + options.dataSource.initializerOptions !== undefined + ) { validatedDataSourceOptions.initializerOptions = options.dataSource.initializerOptions; } From b02abe90529c35c88bdbeca75b1461c175d09a62 Mon Sep 17 00:00:00 2001 From: Steven Zhang Date: Thu, 20 Nov 2025 14:25:40 -0600 Subject: [PATCH 5/8] feat: FDv1 payload adaptor this commit adds a fdv1 payload adaptor to process fdv1 data sets --- .../fdv2/FDv2ChangeSetBuilder.test.ts | 201 ++++++++++++++---- .../src/internal/fdv2/FDv1PayloadAdaptor.ts | 156 ++++++++++++++ .../src/internal/fdv2/FDv2ChangeSetBuilder.ts | 97 --------- .../shared/common/src/internal/fdv2/index.ts | 4 +- .../fileDataInitilizerFDv2.test.ts | 58 +++-- .../shared/sdk-server/src/LDClientImpl.ts | 12 +- .../src/api/options/LDDataSystemOptions.ts | 59 ++++- .../src/data_sources/PollingProcessorFDv2.ts | 58 +---- .../data_sources/fileDataInitilizerFDv2.ts | 30 +-- .../sdk-server/src/options/Configuration.ts | 4 +- 10 files changed, 431 insertions(+), 248 deletions(-) create mode 100644 packages/shared/common/src/internal/fdv2/FDv1PayloadAdaptor.ts delete mode 100644 packages/shared/common/src/internal/fdv2/FDv2ChangeSetBuilder.ts diff --git a/packages/shared/common/__tests__/internal/fdv2/FDv2ChangeSetBuilder.test.ts b/packages/shared/common/__tests__/internal/fdv2/FDv2ChangeSetBuilder.test.ts index c28af646ce..f4ef82d1d9 100644 --- a/packages/shared/common/__tests__/internal/fdv2/FDv2ChangeSetBuilder.test.ts +++ b/packages/shared/common/__tests__/internal/fdv2/FDv2ChangeSetBuilder.test.ts @@ -1,39 +1,58 @@ -import FDv2ChangeSetBuilder from '../../../src/internal/fdv2/FDv2ChangeSetBuilder'; +import FDv1PayloadAdaptor from '../../../src/internal/fdv2/FDv1PayloadAdaptor'; +import { PayloadProcessor } from '../../../src/internal/fdv2/payloadProcessor'; import { DeleteObject, Event, PutObject } from '../../../src/internal/fdv2/proto'; -it('throws an error when finishing without starting', () => { - const builder = new FDv2ChangeSetBuilder(); - expect(() => builder.finish()).toThrow('changeset: cannot complete without a server-intent'); +// Mock PayloadProcessor that captures events +class MockPayloadProcessor extends PayloadProcessor { + public processedEvents: Event[] = []; + + constructor() { + super({}, undefined, undefined); + } + + processEvents(events: Event[]) { + this.processedEvents = [...this.processedEvents, ...events]; + // Don't call super.processEvents to avoid side effects in tests + } +} + +it('throws an error when using unsupported intent', () => { + const processor = new MockPayloadProcessor(); + const adaptor = new FDv1PayloadAdaptor(processor); + // @ts-ignore - testing invalid intent + expect(() => adaptor.start('invalid-intent')).toThrow('intent: only xfer-full is supported'); }); it('starts a new changeset with the given intent', () => { - const builder = new FDv2ChangeSetBuilder(); - builder.start('xfer-full'); - const result = builder.finish(); + const processor = new MockPayloadProcessor(); + const adaptor = new FDv1PayloadAdaptor(processor); + adaptor.start('xfer-full'); + adaptor.finish(); - expect(result).toBeDefined(); - expect(result.length).toBeGreaterThan(0); - expect(result[0].event).toBe('server-intent'); + expect(processor.processedEvents.length).toBeGreaterThan(0); + expect(processor.processedEvents[0].event).toBe('server-intent'); }); it('resets events when starting a new changeset', () => { - const builder = new FDv2ChangeSetBuilder(); - builder.start('xfer-full'); - builder.putObject({ kind: 'flag', key: 'test-flag', version: 1, object: {} }); - builder.start('xfer-full'); - const result = builder.finish(); + const processor = new MockPayloadProcessor(); + const adaptor = new FDv1PayloadAdaptor(processor); + adaptor.start('xfer-full'); + adaptor.putObject({ kind: 'flag', key: 'test-flag', version: 1, object: {} }); + adaptor.start('xfer-full'); + adaptor.finish(); // Should only have server-intent and payload-transferred, no put-object events - const putObjectEvents = result.filter((e) => e.event === 'put-object'); + const putObjectEvents = processor.processedEvents.filter((e) => e.event === 'put-object'); expect(putObjectEvents.length).toBe(0); }); it('includes server-intent as the first event with correct structure', () => { - const builder = new FDv2ChangeSetBuilder(); - builder.start('xfer-full'); - const result = builder.finish(); + const processor = new MockPayloadProcessor(); + const adaptor = new FDv1PayloadAdaptor(processor); + adaptor.start('xfer-full'); + adaptor.finish(); - const serverIntentEvent = result[0] as Event; + const serverIntentEvent = processor.processedEvents[0] as Event; expect(serverIntentEvent.event).toBe('server-intent'); expect(serverIntentEvent.data).toBeDefined(); @@ -41,28 +60,30 @@ it('includes server-intent as the first event with correct structure', () => { expect(intentData.payloads).toBeDefined(); expect(intentData.payloads.length).toBe(1); expect(intentData.payloads[0].intentCode).toBe('xfer-full'); - expect(intentData.payloads[0].id).toBe('dummy-id'); + expect(intentData.payloads[0].id).toBe('FDv1Fallback'); expect(intentData.payloads[0].target).toBe(1); expect(intentData.payloads[0].reason).toBe('payload-missing'); }); it('includes payload-transferred as the last event with empty state', () => { - const builder = new FDv2ChangeSetBuilder(); - builder.start('xfer-full'); - const result = builder.finish(); + const processor = new MockPayloadProcessor(); + const adaptor = new FDv1PayloadAdaptor(processor); + adaptor.start('xfer-full'); + adaptor.finish(); - const payloadTransferredEvent = result[result.length - 1] as Event; + const payloadTransferredEvent = processor.processedEvents[processor.processedEvents.length - 1] as Event; expect(payloadTransferredEvent.event).toBe('payload-transferred'); expect(payloadTransferredEvent.data).toBeDefined(); const transferredData = payloadTransferredEvent.data as any; expect(transferredData.state).toBe(''); expect(transferredData.version).toBe(1); - expect(transferredData.id).toBe('dummy-id'); + expect(transferredData.id).toBe('FDv1Fallback'); }); it('includes all put and delete events between server-intent and payload-transferred', () => { - const builder = new FDv2ChangeSetBuilder(); + const processor = new MockPayloadProcessor(); + const adaptor = new FDv1PayloadAdaptor(processor); const putObj1: PutObject = { kind: 'flag', key: 'flag-1', @@ -81,19 +102,115 @@ it('includes all put and delete events between server-intent and payload-transfe object: { key: 'flag-2', on: false }, }; - builder.start('xfer-full'); - builder.putObject(putObj1); - builder.deleteObject(deleteObj); - builder.putObject(putObj2); - const result = builder.finish(); - - expect(result.length).toBe(5); // server-intent + 3 events + payload-transferred - expect(result[0].event).toBe('server-intent'); - expect(result[1].event).toBe('put-object'); - expect((result[1].data as PutObject).key).toBe('flag-1'); - expect(result[2].event).toBe('delete-object'); - expect((result[2].data as DeleteObject).key).toBe('segment-1'); - expect(result[3].event).toBe('put-object'); - expect((result[3].data as PutObject).key).toBe('flag-2'); - expect(result[4].event).toBe('payload-transferred'); + adaptor.start('xfer-full'); + adaptor.putObject(putObj1); + adaptor.deleteObject(deleteObj); + adaptor.putObject(putObj2); + adaptor.finish(); + + expect(processor.processedEvents.length).toBe(5); // server-intent + 3 events + payload-transferred + expect(processor.processedEvents[0].event).toBe('server-intent'); + expect(processor.processedEvents[1].event).toBe('put-object'); + expect((processor.processedEvents[1].data as PutObject).key).toBe('flag-1'); + expect(processor.processedEvents[2].event).toBe('delete-object'); + expect((processor.processedEvents[2].data as DeleteObject).key).toBe('segment-1'); + expect(processor.processedEvents[3].event).toBe('put-object'); + expect((processor.processedEvents[3].data as PutObject).key).toBe('flag-2'); + expect(processor.processedEvents[4].event).toBe('payload-transferred'); +}); + +it('clears events after finish is called', () => { + const processor = new MockPayloadProcessor(); + const adaptor = new FDv1PayloadAdaptor(processor); + adaptor.start('xfer-full'); + adaptor.putObject({ kind: 'flag', key: 'test-flag', version: 1, object: {} }); + adaptor.finish(); + + const firstFinishEventCount = processor.processedEvents.length; + expect(firstFinishEventCount).toBe(3); // server-intent + put-object + payload-transferred + + // Start a new changeset + adaptor.start('xfer-full'); + adaptor.finish(); + + // Should have processed 2 more events (server-intent + payload-transferred) + // but the adaptor's internal events should be cleared + expect(processor.processedEvents.length).toBe(firstFinishEventCount + 2); +}); + +it('pushFdv1Payload adds put-object events for flags and segments', () => { + const processor = new MockPayloadProcessor(); + const adaptor = new FDv1PayloadAdaptor(processor); + const fdv1Payload = { + flags: { + 'flag-1': { key: 'flag-1', version: 1, on: true }, + 'flag-2': { key: 'flag-2', version: 2, on: false }, + }, + segments: { + 'segment-1': { key: 'segment-1', version: 1 }, + }, + }; + + adaptor.start('xfer-full'); + adaptor.pushFdv1Payload(fdv1Payload); + adaptor.finish(); + + const putObjectEvents = processor.processedEvents.filter((e) => e.event === 'put-object'); + expect(putObjectEvents.length).toBe(3); + + const flag1Event = putObjectEvents.find((e) => (e.data as PutObject).key === 'flag-1'); + expect(flag1Event).toBeDefined(); + expect((flag1Event!.data as PutObject).kind).toBe('flag'); + expect((flag1Event!.data as PutObject).version).toBe(1); + + const flag2Event = putObjectEvents.find((e) => (e.data as PutObject).key === 'flag-2'); + expect(flag2Event).toBeDefined(); + expect((flag2Event!.data as PutObject).kind).toBe('flag'); + expect((flag2Event!.data as PutObject).version).toBe(2); + + const segment1Event = putObjectEvents.find((e) => (e.data as PutObject).key === 'segment-1'); + expect(segment1Event).toBeDefined(); + expect((segment1Event!.data as PutObject).kind).toBe('segment'); + expect((segment1Event!.data as PutObject).version).toBe(1); +}); + +it('pushFdv1Payload handles empty or missing flags and segments', () => { + const processor = new MockPayloadProcessor(); + const adaptor = new FDv1PayloadAdaptor(processor); + + adaptor.start('xfer-full'); + adaptor.pushFdv1Payload({ flags: {}, segments: {} }); + adaptor.finish(); + + const putObjectEvents = processor.processedEvents.filter((e) => e.event === 'put-object'); + expect(putObjectEvents.length).toBe(0); + + // Test with missing properties + const processor2 = new MockPayloadProcessor(); + const adaptor2 = new FDv1PayloadAdaptor(processor2); + adaptor2.start('xfer-full'); + adaptor2.pushFdv1Payload({} as any); + adaptor2.finish(); + + const putObjectEvents2 = processor2.processedEvents.filter((e) => e.event === 'put-object'); + expect(putObjectEvents2.length).toBe(0); +}); + +it('pushFdv1Payload uses default version of 1 when version is missing', () => { + const processor = new MockPayloadProcessor(); + const adaptor = new FDv1PayloadAdaptor(processor); + const fdv1Payload = { + flags: { + 'flag-1': { key: 'flag-1', on: true }, // no version + }, + segments: {}, + }; + + adaptor.start('xfer-full'); + adaptor.pushFdv1Payload(fdv1Payload); + adaptor.finish(); + + const putObjectEvents = processor.processedEvents.filter((e) => e.event === 'put-object'); + expect(putObjectEvents.length).toBe(1); + expect((putObjectEvents[0].data as PutObject).version).toBe(1); }); diff --git a/packages/shared/common/src/internal/fdv2/FDv1PayloadAdaptor.ts b/packages/shared/common/src/internal/fdv2/FDv1PayloadAdaptor.ts new file mode 100644 index 0000000000..2a9eb2cd0e --- /dev/null +++ b/packages/shared/common/src/internal/fdv2/FDv1PayloadAdaptor.ts @@ -0,0 +1,156 @@ +import { PayloadProcessor } from './payloadProcessor'; +import { DeleteObject, Event, PutObject, EventType } from './proto'; + +// eventually this will be the same as the IntentCode type, but for now we'll use a simpler type +type supportedIntentCodes = 'xfer-full'; + +interface fdv1Payload { + flags: { [name: string]: any }; + segments: { [name: string]: any }; +} + +const PAYLOAD_ID = 'FDv1Fallback'; + +/** + * FDv1PayloadAdaptor is a helper for constructing a change set for FDv2. + * The main use case for this adaptor is to help construct a change set from + * a FDv1 payload. + * + * @experimental + * This type is not stable, and not subject to any backwards + * compatibility guarantees or semantic versioning. It is not suitable for production usage. + */ +export default class FDv1PayloadAdaptor { + private _events: Event[] = []; + private _processor: PayloadProcessor; + private _selector: string = ''; + private _intent: supportedIntentCodes = 'xfer-full'; + + constructor(processor: PayloadProcessor) { + this._processor = processor + } + + /** + * Begins a new change set with a given intent. + */ + start(intent: supportedIntentCodes): this { + if (intent !== 'xfer-full') { + throw new Error('intent: only xfer-full is supported'); + } + + this._events = [] + this._intent = intent; + + return this; + } + + /** + * Customizes the selector to use for the change set. + * + * NOTE: you probably only need this method for a synchronizer + * fallback scenario. + * + * @param selector - the selector to use for the change set + * @returns {this} - the adaptor instance + */ + useSelector(selector: string): this { + this._selector = selector; + return this; + } + + /** + * Returns the completed changeset. + * NOTE: currently, this adaptor is not designed to continuously build changesets, rather + * it is designed to construct a single changeset at a time. We can easily expand this by + * resetting some values in the future. + */ + finish(): this { + // NOTE: currently the only use case for this adaptor is to + // construct a change set for a file data intializer which only supports + // FDv1 format. As such, we need to use dummy values to satisfy the FDv2 + // protocol. + const serverIntentEvent: Event = { + event: 'server-intent', + data: { + payloads: [{ + id: PAYLOAD_ID, + target: 1, + intentCode: this._intent, + reason: 'payload-missing' + }], + }, + }; + + const finishEvent: Event = { + event: 'payload-transferred', + data: { + // IMPORTANT: the selector MUST be empty or "live" data synchronizers + // will not work as it would try to resume from a bogus state. + state: this._selector, + version: 1, + id: PAYLOAD_ID, + }, + }; + + this._processor.processEvents([ + serverIntentEvent, + ...this._events, + finishEvent, + ]); + this._events = [] + + return this; + } + + /** + * + * @param data - FDv1 payload from a fdv1 poll + */ + pushFdv1Payload(data: fdv1Payload): this { + Object.entries(data?.flags || []).forEach(([key, flag]) => { + this.putObject({ + // strong assumption here that we only have segments and flags. + kind: 'flag', + key: key, + version: flag.version || 1, + object: flag, + }); + }); + + Object.entries(data?.segments || []).forEach(([key, segment]) => { + this.putObject({ + // strong assumption here that we only have segments and flags. + kind: 'segment', + key: key, + version: segment.version || 1, + object: segment, + }); + }); + + return this + } + + /** + * Adds a new object to the changeset. + */ + putObject(obj: PutObject): this { + this._events.push({ + event: 'put-object', + data: obj, + }); + + return this; + } + + /** + * Adds a deletion to the changeset. + */ + deleteObject(obj: DeleteObject): this { + this._events.push({ + event: 'delete-object', + data: obj, + }); + + return this; + } +} diff --git a/packages/shared/common/src/internal/fdv2/FDv2ChangeSetBuilder.ts b/packages/shared/common/src/internal/fdv2/FDv2ChangeSetBuilder.ts deleted file mode 100644 index f7b4a3ced2..0000000000 --- a/packages/shared/common/src/internal/fdv2/FDv2ChangeSetBuilder.ts +++ /dev/null @@ -1,97 +0,0 @@ -import { DeleteObject, Event, PutObject } from './proto'; - -// eventually this will be the same as the IntentCode type, but for now we'll use a simpler type -type supportedIntentCodes = 'xfer-full'; - -/** - * FDv2ChangeSetBuilder is a helper for constructing a change set for FDv2. - * The main use case for this builder is to help construct a change set from - * a FDv1 payload. - * - * @experimental - * This type is not stable, and not subject to any backwards - * compatibility guarantees or semantic versioning. It is not suitable for production usage. - */ -export default class FDv2ChangeSetBuilder { - private _intent?: supportedIntentCodes; - private _events: Event[] = []; - - /** - * Begins a new change set with a given intent. - */ - start(intent: supportedIntentCodes): this { - this._intent = intent; - this._events = []; - - return this; - } - - /** - * Returns the completed changeset. - * NOTE: currently, this builder is not designed to continuously build changesets, rather - * it is designed to construct a single changeset at a time. We can easily expand this by - * resetting some values in the future. - */ - finish(): Array { - if (this._intent === undefined) { - throw new Error('changeset: cannot complete without a server-intent'); - } - - // NOTE: currently the only use case for this builder is to - // construct a change set for a file data intializer which only supports - // FDv1 format. As such, we need to use dummy values to satisfy the FDv2 - // protocol. - const events: Array = [ - { - event: 'server-intent', - data: { - payloads: [ - { - id: 'dummy-id', - target: 1, - intentCode: this._intent!, - reason: 'payload-missing', - }, - ], - }, - }, - ...this._events, - { - event: 'payload-transferred', - data: { - // IMPORTANT: the selector MUST be empty or "live" data synchronizers - // will not work as it would try to resume from a bogus state. - state: '', - version: 1, - id: 'dummy-id', - }, - }, - ]; - - return events; - } - - /** - * Adds a new object to the changeset. - */ - putObject(obj: PutObject): this { - this._events.push({ - event: 'put-object', - data: obj, - }); - - return this; - } - - /** - * Adds a deletion to the changeset. - */ - deleteObject(obj: DeleteObject): this { - this._events.push({ - event: 'delete-object', - data: obj, - }); - - return this; - } -} diff --git a/packages/shared/common/src/internal/fdv2/index.ts b/packages/shared/common/src/internal/fdv2/index.ts index 113602ea5e..be62fbcd2f 100644 --- a/packages/shared/common/src/internal/fdv2/index.ts +++ b/packages/shared/common/src/internal/fdv2/index.ts @@ -1,4 +1,4 @@ -import FDv2ChangeSetBuilder from './FDv2ChangeSetBuilder'; +import FDv1PayloadAdaptor from './FDv1PayloadAdaptor'; import { FDv2EventsCollection, Payload, @@ -15,5 +15,5 @@ export { PayloadProcessor, PayloadStreamReader, Update, - FDv2ChangeSetBuilder, + FDv1PayloadAdaptor, }; diff --git a/packages/shared/sdk-server/__tests__/data_sources/fileDataInitilizerFDv2.test.ts b/packages/shared/sdk-server/__tests__/data_sources/fileDataInitilizerFDv2.test.ts index bfa0955941..36cfe7af62 100644 --- a/packages/shared/sdk-server/__tests__/data_sources/fileDataInitilizerFDv2.test.ts +++ b/packages/shared/sdk-server/__tests__/data_sources/fileDataInitilizerFDv2.test.ts @@ -105,8 +105,10 @@ describe('FileDataInitializerFDv2', () => { dataSource: { dataSourceOptionsType: 'standard', initializerOptions: { - type: 'file', - paths: [], + file: { + enabled: true, + paths: [], + }, }, }, }, @@ -130,8 +132,10 @@ describe('FileDataInitializerFDv2', () => { dataSource: { dataSourceOptionsType: 'standard', initializerOptions: { - type: 'file', - paths: ['test.json'], + file: { + enabled: true, + paths: ['test.json'], + }, }, }, }, @@ -152,8 +156,10 @@ describe('FileDataInitializerFDv2', () => { dataSource: { dataSourceOptionsType: 'standard', initializerOptions: { - type: 'file', - paths: ['test.json'], + file: { + enabled: true, + paths: ['test.json'], + }, }, }, }, @@ -178,8 +184,10 @@ describe('FileDataInitializerFDv2', () => { dataSource: { dataSourceOptionsType: 'standard', initializerOptions: { - type: 'file', - paths: ['flags.json', 'segments.json'], + file: { + enabled: true, + paths: ['flags.json', 'segments.json'], + }, }, }, }, @@ -215,9 +223,11 @@ describe('FileDataInitializerFDv2', () => { dataSource: { dataSourceOptionsType: 'standard', initializerOptions: { - type: 'file', - paths: ['test.yaml'], - yamlParser: mockYamlParser, + file: { + enabled: true, + paths: ['test.yaml'], + yamlParser: mockYamlParser, + }, }, }, }, @@ -243,8 +253,10 @@ describe('FileDataInitializerFDv2', () => { dataSource: { dataSourceOptionsType: 'standard', initializerOptions: { - type: 'file', - paths: ['test.yaml'], + file: { + enabled: true, + paths: ['test.yaml'], + } }, }, }, @@ -271,8 +283,10 @@ describe('FileDataInitializerFDv2', () => { dataSource: { dataSourceOptionsType: 'standard', initializerOptions: { - type: 'file', - paths: ['test.json'], + file: { + enabled: true, + paths: ['test.json'], + }, }, }, }, @@ -333,8 +347,10 @@ describe('FileDataInitializerFDv2', () => { dataSource: { dataSourceOptionsType: 'standard', initializerOptions: { - type: 'file', - paths: ['flags.json', 'segments.json'], + file: { + enabled: true, + paths: ['flags.json', 'segments.json'], + } }, }, }, @@ -351,7 +367,7 @@ describe('FileDataInitializerFDv2', () => { // Verify the combined data structure const dataCall = mockDataCallback.mock.calls[0]; - expect(dataCall[0]).toBe(true); // basis + expect(dataCall[0]).toBe(false); expect(dataCall[1]).toHaveProperty('initMetadata'); expect(dataCall[1]).toHaveProperty('payload'); @@ -418,8 +434,10 @@ describe('FileDataInitializerFDv2', () => { dataSource: { dataSourceOptionsType: 'standard', initializerOptions: { - type: 'file', - paths: ['file1.json', 'file2.json'], + file: { + enabled: true, + paths: ['file1.json', 'file2.json'], + }, }, }, }, diff --git a/packages/shared/sdk-server/src/LDClientImpl.ts b/packages/shared/sdk-server/src/LDClientImpl.ts index 966f712d39..8e0046f837 100644 --- a/packages/shared/sdk-server/src/LDClientImpl.ts +++ b/packages/shared/sdk-server/src/LDClientImpl.ts @@ -324,9 +324,10 @@ function constructFDv2( if (!(config.offline || config.dataSystem!.useLdd)) { // make the FDv2 composite datasource with initializers/synchronizers const initializers: subsystem.LDDataSourceFactory[] = []; + const initializerOptions = dataSystem.dataSource?.initializerOptions ?? {}; - // use one shot initializer for performance and cost if we can do a combination of polling and streaming - if (dataSystem.dataSource?.initializerOptions?.type === 'polling') { + if (initializerOptions.polling?.enabled) { + console.log('adding one shot initializer'); initializers.push( () => new OneShotInitializerFDv2( @@ -334,7 +335,12 @@ function constructFDv2( config.logger, ), ); - } else if (dataSystem.dataSource?.initializerOptions?.type === 'file') { + } + + // If a file intializer is configured, then we will add it as a fallback to the + // polling initializer. + if (initializerOptions.file?.enabled) { + console.log('adding file data initializer'); initializers.push(() => new FileDataInitializerFDv2(config, platform)); } diff --git a/packages/shared/sdk-server/src/api/options/LDDataSystemOptions.ts b/packages/shared/sdk-server/src/api/options/LDDataSystemOptions.ts index 81903c80da..13d0409976 100644 --- a/packages/shared/sdk-server/src/api/options/LDDataSystemOptions.ts +++ b/packages/shared/sdk-server/src/api/options/LDDataSystemOptions.ts @@ -75,14 +75,29 @@ export type DataSourceOptions = | StreamingDataSourceOptions | PollingDataSourceOptions; + +interface initializerOptionsBase { + /** + * whether the initializer is enabled. This value defaults to false. + * + * @default false + */ + enabled: boolean; +} + /** * Initializer option to read data from a file. * * NOTE: right now we only support data sources that are in FDv1 format. */ -export interface FileDataInitializerOptions { - type: 'file'; +export interface FileDataInitializerOptions extends initializerOptionsBase { + /** + * The paths to the files to read data from. + */ paths: Array; + /** + * A function to parse the data from the file. + */ yamlParser?: (data: string) => any; } @@ -90,8 +105,35 @@ export interface FileDataInitializerOptions { * Initializer option to initilize the SDK from doing a one time full payload transfer. * This will be the default initializer used by the standard data source type. */ -export interface PollingDataInitializerOptions { - type: 'polling'; +export interface PollingDataInitializerOptions extends initializerOptionsBase { + /** + * whether the initializer is enabled. This value defaults to true. + * + * @default true + */ + enabled: boolean; +} + +export interface InitializerOptions { + file?: FileDataInitializerOptions; + polling?: PollingDataInitializerOptions; +} + +interface dataSourceOptionsBase { + /** + * This is the preconfigured data source type that the SDK will use. + * + * standard: a combination of streaming and polling + * streamingOnly: only streaming + * pollingOnly: only polling + * + * @default 'standard' + */ + dataSourceOptionsType: 'standard' | 'streamingOnly' | 'pollingOnly'; + /** + * Initializer options for the data source. + */ + initializerOptions?: InitializerOptions; } /** @@ -99,9 +141,8 @@ export interface PollingDataInitializerOptions { * a combination of streaming and polling to initialize the SDK, provide real time updates, * and can switch between streaming and polling automatically to provide redundancy. */ -export interface StandardDataSourceOptions { +export interface StandardDataSourceOptions extends dataSourceOptionsBase { dataSourceOptionsType: 'standard'; - initializerOptions?: FileDataInitializerOptions | PollingDataInitializerOptions; /** * Sets the initial reconnect delay for the streaming connection, in seconds. Default if omitted. * @@ -123,9 +164,8 @@ export interface StandardDataSourceOptions { * This data source will make best effort to maintain a streaming connection to LaunchDarkly services * to provide real time data updates. */ -export interface StreamingDataSourceOptions { +export interface StreamingDataSourceOptions extends dataSourceOptionsBase { dataSourceOptionsType: 'streamingOnly'; - initializerOptions?: FileDataInitializerOptions | PollingDataInitializerOptions; /** * Sets the initial reconnect delay for the streaming connection, in seconds. Default if omitted. * @@ -141,9 +181,8 @@ export interface StreamingDataSourceOptions { /** * This data source will periodically make a request to LaunchDarkly services to retrieve updated data. */ -export interface PollingDataSourceOptions { +export interface PollingDataSourceOptions extends dataSourceOptionsBase { dataSourceOptionsType: 'pollingOnly'; - initializerOptions?: FileDataInitializerOptions | PollingDataInitializerOptions; /** * The time between polling requests, in seconds. Default if omitted. */ diff --git a/packages/shared/sdk-server/src/data_sources/PollingProcessorFDv2.ts b/packages/shared/sdk-server/src/data_sources/PollingProcessorFDv2.ts index 713326ad5d..1528e04dfa 100644 --- a/packages/shared/sdk-server/src/data_sources/PollingProcessorFDv2.ts +++ b/packages/shared/sdk-server/src/data_sources/PollingProcessorFDv2.ts @@ -34,58 +34,12 @@ function processFDv1FlagsAndSegments( payloadProcessor: internal.PayloadProcessor, data: FlagsAndSegments, ) { - payloadProcessor.processEvents([ - { - event: `server-intent`, - data: { - payloads: [ - { - id: `FDv1Fallback`, - target: 1, - intentCode: `xfer-full`, - }, - ], - }, - }, - ]); - - Object.entries(data?.flags || []).forEach(([key, flag]) => { - payloadProcessor.processEvents([ - { - event: `put-object`, - data: { - kind: 'flag', - key, - version: flag.version, - object: flag, - }, - }, - ]); - }); - - Object.entries(data?.segments || []).forEach(([key, segment]) => { - payloadProcessor.processEvents([ - { - event: `put-object`, - data: { - kind: 'segment', - key, - version: segment.version, - object: segment, - }, - }, - ]); - }); - - payloadProcessor.processEvents([ - { - event: `payload-transferred`, - data: { - state: `FDv1Fallback`, - version: 1, - }, - }, - ]); + const adaptor = new internal.FDv1PayloadAdaptor(payloadProcessor); + + adaptor.start('xfer-full') + .useSelector('FDv1Fallback') + .pushFdv1Payload(data) + .finish(); } /** diff --git a/packages/shared/sdk-server/src/data_sources/fileDataInitilizerFDv2.ts b/packages/shared/sdk-server/src/data_sources/fileDataInitilizerFDv2.ts index 920440f77f..811d15b485 100644 --- a/packages/shared/sdk-server/src/data_sources/fileDataInitilizerFDv2.ts +++ b/packages/shared/sdk-server/src/data_sources/fileDataInitilizerFDv2.ts @@ -26,7 +26,7 @@ export default class FileDataInitializerFDv2 implements subsystemCommon.DataSour private _fileLoader?: FileLoader; constructor(config: Configuration, platform: Platform) { - const options = config.dataSystem?.dataSource?.initializerOptions as FileDataInitializerOptions; + const options = config.dataSystem?.dataSource?.initializerOptions?.file as FileDataInitializerOptions; this._validateInputs(options, platform); this._paths = options.paths; @@ -72,6 +72,8 @@ export default class FileDataInitializerFDv2 implements subsystemCommon.DataSour this._logger, ); + const adaptor = new internal.FDv1PayloadAdaptor(payloadProcessor); + this._fileLoader = new FileLoader( this._filesystem, this._paths, @@ -81,12 +83,15 @@ export default class FileDataInitializerFDv2 implements subsystemCommon.DataSour const parsedData = this._processFileData(results); payloadProcessor.addPayloadListener((payload) => { - dataCallback(payload.basis, { initMetadata, payload }); + // NOTE: file data initializer will never have a valid basis, so we always pass false + dataCallback(false, { initMetadata, payload }); }); statusCallback(subsystemCommon.DataSourceState.Valid); - payloadProcessor.processEvents(parsedData.events); + adaptor.start('xfer-full') + .pushFdv1Payload(parsedData) + .finish(); statusCallback(subsystemCommon.DataSourceState.Closed); } catch (err) { @@ -141,24 +146,7 @@ export default class FileDataInitializerFDv2 implements subsystemCommon.DataSour }, ); - const changeSetBuilder = new internal.FDv2ChangeSetBuilder(); - changeSetBuilder.start('xfer-full'); - - Object.keys(combined).forEach((kind: string) => { - Object.entries(combined[kind]).forEach(([k, v]) => { - changeSetBuilder.putObject({ - // strong assumption here that we only have segments and flags. - kind: kind === 'segments' ? 'segment' : 'flag', - key: k, - version: v.version || 1, - object: v, - }); - }); - }); - - return { - events: changeSetBuilder.finish(), - }; + return combined; } stop() { diff --git a/packages/shared/sdk-server/src/options/Configuration.ts b/packages/shared/sdk-server/src/options/Configuration.ts index 1fa4329893..d464977681 100644 --- a/packages/shared/sdk-server/src/options/Configuration.ts +++ b/packages/shared/sdk-server/src/options/Configuration.ts @@ -83,7 +83,9 @@ const DEFAULT_STREAM_RECONNECT_DELAY = 1; const defaultStandardDataSourceOptions: StandardDataSourceOptions = { dataSourceOptionsType: 'standard', initializerOptions: { - type: 'polling', + polling: { + enabled: true, + } }, streamInitialReconnectDelay: DEFAULT_STREAM_RECONNECT_DELAY, pollInterval: DEFAULT_POLL_INTERVAL, From b1e01236361692f720488531bb9b6f7b51222f8d Mon Sep 17 00:00:00 2001 From: Steven Zhang Date: Thu, 20 Nov 2025 15:19:27 -0600 Subject: [PATCH 6/8] chore: revert datasource config and add custom datasource option --- .../fileDataInitilizerFDv2.test.ts | 185 +++++------------- .../shared/sdk-server/src/LDClientImpl.ts | 13 +- .../src/api/options/LDDataSystemOptions.ts | 131 ++++++------- .../src/data_sources/PollingProcessorFDv2.ts | 5 +- .../data_sources/fileDataInitilizerFDv2.ts | 14 +- .../sdk-server/src/options/Configuration.ts | 16 -- 6 files changed, 114 insertions(+), 250 deletions(-) diff --git a/packages/shared/sdk-server/__tests__/data_sources/fileDataInitilizerFDv2.test.ts b/packages/shared/sdk-server/__tests__/data_sources/fileDataInitilizerFDv2.test.ts index 36cfe7af62..8215121867 100644 --- a/packages/shared/sdk-server/__tests__/data_sources/fileDataInitilizerFDv2.test.ts +++ b/packages/shared/sdk-server/__tests__/data_sources/fileDataInitilizerFDv2.test.ts @@ -7,8 +7,8 @@ import { WatchHandle, } from '@launchdarkly/js-sdk-common'; +import { FileSystemDataSourceConfiguration } from '../../src/api'; import FileDataInitializerFDv2 from '../../src/data_sources/fileDataInitilizerFDv2'; -import Configuration from '../../src/options/Configuration'; import { createBasicPlatform } from '../createBasicPlatform'; import TestLogger, { LogLevel } from '../Logger'; @@ -78,7 +78,6 @@ describe('FileDataInitializerFDv2', () => { let mockFilesystem: MockFilesystem; let logger: TestLogger; let platform: Platform; - let config: Configuration; let mockDataCallback: jest.Mock; let mockStatusCallback: jest.Mock; @@ -100,24 +99,14 @@ describe('FileDataInitializerFDv2', () => { }); it('throws error when paths are not provided', () => { - config = new Configuration({ - dataSystem: { - dataSource: { - dataSourceOptionsType: 'standard', - initializerOptions: { - file: { - enabled: true, - paths: [], - }, - }, - }, - }, - logger, - }); + const options: FileSystemDataSourceConfiguration = { + type: 'file', + paths: [], + }; expect(() => { /* eslint-disable-next-line no-new */ - new FileDataInitializerFDv2(config, platform); + new FileDataInitializerFDv2(options, platform, logger); }).toThrow('FileDataInitializerFDv2: paths are required'); }); @@ -127,46 +116,26 @@ describe('FileDataInitializerFDv2', () => { fileSystem: undefined, }; - config = new Configuration({ - dataSystem: { - dataSource: { - dataSourceOptionsType: 'standard', - initializerOptions: { - file: { - enabled: true, - paths: ['test.json'], - }, - }, - }, - }, - logger, - }); + const options: FileSystemDataSourceConfiguration = { + type: 'file', + paths: ['test.json'], + }; expect(() => { /* eslint-disable-next-line no-new */ - new FileDataInitializerFDv2(config, platformWithoutFileSystem); + new FileDataInitializerFDv2(options, platformWithoutFileSystem, logger); }).toThrow('FileDataInitializerFDv2: file system is required'); }); it('loads and processes JSON file with flags and segments', async () => { mockFilesystem.fileData['test.json'] = { timestamp: 0, data: allPropertiesJson }; - config = new Configuration({ - dataSystem: { - dataSource: { - dataSourceOptionsType: 'standard', - initializerOptions: { - file: { - enabled: true, - paths: ['test.json'], - }, - }, - }, - }, - logger, - }); + const options: FileSystemDataSourceConfiguration = { + type: 'file', + paths: ['test.json'], + }; - const initializer = new FileDataInitializerFDv2(config, platform); + const initializer = new FileDataInitializerFDv2(options, platform, logger); initializer.start(mockDataCallback, mockStatusCallback); await jest.runAllTimersAsync(); @@ -179,22 +148,12 @@ describe('FileDataInitializerFDv2', () => { mockFilesystem.fileData['flags.json'] = { timestamp: 0, data: flagOnlyJson }; mockFilesystem.fileData['segments.json'] = { timestamp: 0, data: segmentOnlyJson }; - config = new Configuration({ - dataSystem: { - dataSource: { - dataSourceOptionsType: 'standard', - initializerOptions: { - file: { - enabled: true, - paths: ['flags.json', 'segments.json'], - }, - }, - }, - }, - logger, - }); + const options: FileSystemDataSourceConfiguration = { + type: 'file', + paths: ['flags.json', 'segments.json'], + }; - const initializer = new FileDataInitializerFDv2(config, platform); + const initializer = new FileDataInitializerFDv2(options, platform, logger); initializer.start(mockDataCallback, mockStatusCallback); await jest.runAllTimersAsync(); @@ -218,23 +177,13 @@ describe('FileDataInitializerFDv2', () => { mockFilesystem.fileData['test.yaml'] = { timestamp: 0, data: yamlData }; - config = new Configuration({ - dataSystem: { - dataSource: { - dataSourceOptionsType: 'standard', - initializerOptions: { - file: { - enabled: true, - paths: ['test.yaml'], - yamlParser: mockYamlParser, - }, - }, - }, - }, - logger, - }); + const options: FileSystemDataSourceConfiguration = { + type: 'file', + paths: ['test.yaml'], + yamlParser: mockYamlParser, + }; - const initializer = new FileDataInitializerFDv2(config, platform); + const initializer = new FileDataInitializerFDv2(options, platform, logger); initializer.start(mockDataCallback, mockStatusCallback); await jest.runAllTimersAsync(); @@ -248,22 +197,12 @@ describe('FileDataInitializerFDv2', () => { const yamlData = 'flags:\n flag1:\n key: flag1'; mockFilesystem.fileData['test.yaml'] = { timestamp: 0, data: yamlData }; - config = new Configuration({ - dataSystem: { - dataSource: { - dataSourceOptionsType: 'standard', - initializerOptions: { - file: { - enabled: true, - paths: ['test.yaml'], - } - }, - }, - }, - logger, - }); + const options: FileSystemDataSourceConfiguration = { + type: 'file', + paths: ['test.yaml'], + }; - const initializer = new FileDataInitializerFDv2(config, platform); + const initializer = new FileDataInitializerFDv2(options, platform, logger); initializer.start(mockDataCallback, mockStatusCallback); await jest.runAllTimersAsync(); @@ -278,22 +217,12 @@ describe('FileDataInitializerFDv2', () => { it('handles invalid JSON gracefully', async () => { mockFilesystem.fileData['test.json'] = { timestamp: 0, data: 'invalid json {{{{' }; - config = new Configuration({ - dataSystem: { - dataSource: { - dataSourceOptionsType: 'standard', - initializerOptions: { - file: { - enabled: true, - paths: ['test.json'], - }, - }, - }, - }, - logger, - }); + const options: FileSystemDataSourceConfiguration = { + type: 'file', + paths: ['test.json'], + }; - const initializer = new FileDataInitializerFDv2(config, platform); + const initializer = new FileDataInitializerFDv2(options, platform, logger); initializer.start(mockDataCallback, mockStatusCallback); await jest.runAllTimersAsync(); @@ -342,22 +271,12 @@ describe('FileDataInitializerFDv2', () => { mockFilesystem.fileData['flags.json'] = { timestamp: 0, data: flagFile }; mockFilesystem.fileData['segments.json'] = { timestamp: 0, data: segmentFile }; - config = new Configuration({ - dataSystem: { - dataSource: { - dataSourceOptionsType: 'standard', - initializerOptions: { - file: { - enabled: true, - paths: ['flags.json', 'segments.json'], - } - }, - }, - }, - logger, - }); + const options: FileSystemDataSourceConfiguration = { + type: 'file', + paths: ['flags.json', 'segments.json'], + }; - const initializer = new FileDataInitializerFDv2(config, platform); + const initializer = new FileDataInitializerFDv2(options, platform, logger); initializer.start(mockDataCallback, mockStatusCallback); await jest.runAllTimersAsync(); @@ -429,22 +348,12 @@ describe('FileDataInitializerFDv2', () => { mockFilesystem.fileData['file1.json'] = { timestamp: 0, data: file1 }; mockFilesystem.fileData['file2.json'] = { timestamp: 0, data: file2 }; - config = new Configuration({ - dataSystem: { - dataSource: { - dataSourceOptionsType: 'standard', - initializerOptions: { - file: { - enabled: true, - paths: ['file1.json', 'file2.json'], - }, - }, - }, - }, - logger, - }); + const options: FileSystemDataSourceConfiguration = { + type: 'file', + paths: ['file1.json', 'file2.json'], + }; - const initializer = new FileDataInitializerFDv2(config, platform); + const initializer = new FileDataInitializerFDv2(options, platform, logger); initializer.start(mockDataCallback, mockStatusCallback); await jest.runAllTimersAsync(); diff --git a/packages/shared/sdk-server/src/LDClientImpl.ts b/packages/shared/sdk-server/src/LDClientImpl.ts index 8e0046f837..a4f324ac99 100644 --- a/packages/shared/sdk-server/src/LDClientImpl.ts +++ b/packages/shared/sdk-server/src/LDClientImpl.ts @@ -44,7 +44,6 @@ import { createPluginEnvironmentMetadata } from './createPluginEnvironmentMetada import { createPayloadListener } from './data_sources/createPayloadListenerFDv2'; import { createStreamListeners } from './data_sources/createStreamListeners'; import DataSourceUpdates from './data_sources/DataSourceUpdates'; -import FileDataInitializerFDv2 from './data_sources/fileDataInitilizerFDv2'; import OneShotInitializerFDv2 from './data_sources/OneShotInitializerFDv2'; import PollingProcessor from './data_sources/PollingProcessor'; import PollingProcessorFDv2 from './data_sources/PollingProcessorFDv2'; @@ -324,10 +323,9 @@ function constructFDv2( if (!(config.offline || config.dataSystem!.useLdd)) { // make the FDv2 composite datasource with initializers/synchronizers const initializers: subsystem.LDDataSourceFactory[] = []; - const initializerOptions = dataSystem.dataSource?.initializerOptions ?? {}; - if (initializerOptions.polling?.enabled) { - console.log('adding one shot initializer'); + // use one shot initializer for performance and cost if we can do a combination of polling and streaming + if (isStandardOptions(dataSystem.dataSource)) { initializers.push( () => new OneShotInitializerFDv2( @@ -337,13 +335,6 @@ function constructFDv2( ); } - // If a file intializer is configured, then we will add it as a fallback to the - // polling initializer. - if (initializerOptions.file?.enabled) { - console.log('adding file data initializer'); - initializers.push(() => new FileDataInitializerFDv2(config, platform)); - } - const synchronizers: subsystem.LDDataSourceFactory[] = []; // if streaming is configured, add streaming synchronizer if (isStandardOptions(dataSystem.dataSource) || isStreamingOnlyOptions(dataSystem.dataSource)) { diff --git a/packages/shared/sdk-server/src/api/options/LDDataSystemOptions.ts b/packages/shared/sdk-server/src/api/options/LDDataSystemOptions.ts index 13d0409976..a3b22894ea 100644 --- a/packages/shared/sdk-server/src/api/options/LDDataSystemOptions.ts +++ b/packages/shared/sdk-server/src/api/options/LDDataSystemOptions.ts @@ -36,7 +36,7 @@ export interface LDDataSystemOptions { /** * Configuration options for the Data Source that the SDK uses to get flags and other * data from the LaunchDarkly servers. Choose one of {@link StandardDataSourceOptions}, - * {@link StreamingDataSourceOptions}, or {@link PollingDataSourceOptions}; setting the + * {@link StreamingDataSourceOptions}, {@link PollingDataSourceOptions}, or {@link CustomDataSourceOptions}; setting the * type and the optional fields you want to customize. * * If not specified, this defaults to using the {@link StandardDataSourceOptions} which @@ -75,22 +75,13 @@ export type DataSourceOptions = | StreamingDataSourceOptions | PollingDataSourceOptions; +export type DataSourceConfiguration = + | FileSystemDataSourceConfiguration + | StreamingDataSourceConfiguration + | PollingDataSourceConfiguration; -interface initializerOptionsBase { - /** - * whether the initializer is enabled. This value defaults to false. - * - * @default false - */ - enabled: boolean; -} - -/** - * Initializer option to read data from a file. - * - * NOTE: right now we only support data sources that are in FDv1 format. - */ -export interface FileDataInitializerOptions extends initializerOptionsBase { +export interface FileSystemDataSourceConfiguration { + type: 'file'; /** * The paths to the files to read data from. */ @@ -101,92 +92,88 @@ export interface FileDataInitializerOptions extends initializerOptionsBase { yamlParser?: (data: string) => any; } -/** - * Initializer option to initilize the SDK from doing a one time full payload transfer. - * This will be the default initializer used by the standard data source type. - */ -export interface PollingDataInitializerOptions extends initializerOptionsBase { - /** - * whether the initializer is enabled. This value defaults to true. - * - * @default true - */ - enabled: boolean; -} +export interface StreamingDataSourceConfiguration { + type: 'streaming'; -export interface InitializerOptions { - file?: FileDataInitializerOptions; - polling?: PollingDataInitializerOptions; -} - -interface dataSourceOptionsBase { - /** - * This is the preconfigured data source type that the SDK will use. - * - * standard: a combination of streaming and polling - * streamingOnly: only streaming - * pollingOnly: only polling - * - * @default 'standard' - */ - dataSourceOptionsType: 'standard' | 'streamingOnly' | 'pollingOnly'; - /** - * Initializer options for the data source. - */ - initializerOptions?: InitializerOptions; -} - -/** - * This standard data source is the recommended datasource for most customers. It will use - * a combination of streaming and polling to initialize the SDK, provide real time updates, - * and can switch between streaming and polling automatically to provide redundancy. - */ -export interface StandardDataSourceOptions extends dataSourceOptionsBase { - dataSourceOptionsType: 'standard'; /** * Sets the initial reconnect delay for the streaming connection, in seconds. Default if omitted. * * The streaming service uses a backoff algorithm (with jitter) every time the connection needs * to be reestablished. The delay for the first reconnection will start near this value, and then - * increase exponentially for any subsequent connection failures. + * increase exponentially up to a maximum for any subsequent connection failures. * * The default value is 1. */ streamInitialReconnectDelay?: number; +} +export interface PollingDataSourceConfiguration { + type: 'polling'; /** * The time between polling requests, in seconds. Default if omitted. */ pollInterval?: number; } +/** + * This standard data source is the recommended datasource for most customers. It will use + * a combination of streaming and polling to initialize the SDK, provide real time updates, + * and can switch between streaming and polling automatically to provide redundancy. + */ +export interface StandardDataSourceOptions + extends Omit, + Omit { + dataSourceOptionsType: 'standard'; +} + /** * This data source will make best effort to maintain a streaming connection to LaunchDarkly services * to provide real time data updates. */ -export interface StreamingDataSourceOptions extends dataSourceOptionsBase { +export interface StreamingDataSourceOptions extends Omit { dataSourceOptionsType: 'streamingOnly'; - /** - * Sets the initial reconnect delay for the streaming connection, in seconds. Default if omitted. - * - * The streaming service uses a backoff algorithm (with jitter) every time the connection needs - * to be reestablished. The delay for the first reconnection will start near this value, and then - * increase exponentially up to a maximum for any subsequent connection failures. - * - * The default value is 1. - */ - streamInitialReconnectDelay?: number; } /** * This data source will periodically make a request to LaunchDarkly services to retrieve updated data. */ -export interface PollingDataSourceOptions extends dataSourceOptionsBase { +export interface PollingDataSourceOptions extends Omit { dataSourceOptionsType: 'pollingOnly'; +} + +/** + * Initializer configuration options + */ +export type InitializerDataSource = + | FileSystemDataSourceConfiguration + | PollingDataSourceConfiguration; + +/** + * Synchronizer configuration options + */ +export type SynchronizerDataSource = + | PollingDataSourceConfiguration + | StreamingDataSourceConfiguration; + +/** + * This data source will allow developers to define their own composite data source + */ +export interface CustomDataSourceOptions { + dataSourceOptionsType: 'custom'; + /** - * The time between polling requests, in seconds. Default if omitted. + * Ordered list of {@link InitializerDataSource} that will run in order. The first + * initializer that successfully returns a valid payload will transition the sdk + * out of intialization stage into the synchronization stage. */ - pollInterval?: number; + initializers: Array; + + /** + * Order list of {@link SynchronizerDataSource} in priority order. Datasources will + * failover to the next datasource in this array until there are no datasources left + * to run. + */ + synchronizers: Array; } export function isStandardOptions(u: any): u is StandardDataSourceOptions { diff --git a/packages/shared/sdk-server/src/data_sources/PollingProcessorFDv2.ts b/packages/shared/sdk-server/src/data_sources/PollingProcessorFDv2.ts index 1528e04dfa..9ae4802825 100644 --- a/packages/shared/sdk-server/src/data_sources/PollingProcessorFDv2.ts +++ b/packages/shared/sdk-server/src/data_sources/PollingProcessorFDv2.ts @@ -36,10 +36,7 @@ function processFDv1FlagsAndSegments( ) { const adaptor = new internal.FDv1PayloadAdaptor(payloadProcessor); - adaptor.start('xfer-full') - .useSelector('FDv1Fallback') - .pushFdv1Payload(data) - .finish(); + adaptor.start('xfer-full').useSelector('FDv1Fallback').pushFdv1Payload(data).finish(); } /** diff --git a/packages/shared/sdk-server/src/data_sources/fileDataInitilizerFDv2.ts b/packages/shared/sdk-server/src/data_sources/fileDataInitilizerFDv2.ts index 811d15b485..d837a10818 100644 --- a/packages/shared/sdk-server/src/data_sources/fileDataInitilizerFDv2.ts +++ b/packages/shared/sdk-server/src/data_sources/fileDataInitilizerFDv2.ts @@ -8,10 +8,9 @@ import { subsystem as subsystemCommon, } from '@launchdarkly/js-sdk-common'; -import { FileDataInitializerOptions } from '../api'; +import { FileSystemDataSourceConfiguration } from '../api'; import { Flag } from '../evaluation/data/Flag'; import { Segment } from '../evaluation/data/Segment'; -import Configuration from '../options/Configuration'; import { processFlag, processSegment } from '../store/serialization'; import FileLoader from './FileLoader'; @@ -25,17 +24,16 @@ export default class FileDataInitializerFDv2 implements subsystemCommon.DataSour private _yamlParser?: (data: string) => any; private _fileLoader?: FileLoader; - constructor(config: Configuration, platform: Platform) { - const options = config.dataSystem?.dataSource?.initializerOptions?.file as FileDataInitializerOptions; + constructor(options: FileSystemDataSourceConfiguration, platform: Platform, logger: LDLogger) { this._validateInputs(options, platform); this._paths = options.paths; - this._logger = config.logger; + this._logger = logger; this._filesystem = platform.fileSystem!; this._yamlParser = options.yamlParser; } - private _validateInputs(options: FileDataInitializerOptions, platform: Platform) { + private _validateInputs(options: FileSystemDataSourceConfiguration, platform: Platform) { if (!options.paths || options.paths.length === 0) { throw new Error('FileDataInitializerFDv2: paths are required'); } @@ -89,9 +87,7 @@ export default class FileDataInitializerFDv2 implements subsystemCommon.DataSour statusCallback(subsystemCommon.DataSourceState.Valid); - adaptor.start('xfer-full') - .pushFdv1Payload(parsedData) - .finish(); + adaptor.start('xfer-full').pushFdv1Payload(parsedData).finish(); statusCallback(subsystemCommon.DataSourceState.Closed); } catch (err) { diff --git a/packages/shared/sdk-server/src/options/Configuration.ts b/packages/shared/sdk-server/src/options/Configuration.ts index d464977681..5eb7bbc5a3 100644 --- a/packages/shared/sdk-server/src/options/Configuration.ts +++ b/packages/shared/sdk-server/src/options/Configuration.ts @@ -82,11 +82,6 @@ const DEFAULT_STREAM_RECONNECT_DELAY = 1; const defaultStandardDataSourceOptions: StandardDataSourceOptions = { dataSourceOptionsType: 'standard', - initializerOptions: { - polling: { - enabled: true, - } - }, streamInitialReconnectDelay: DEFAULT_STREAM_RECONNECT_DELAY, pollInterval: DEFAULT_POLL_INTERVAL, }; @@ -253,17 +248,6 @@ function validateDataSystemOptions(options: Options): { ), ]; } - // Preserve initializer options if it was provided, since it's not validated by validateTypesAndNames. - // Currently, setting this option is most commonly used as an override of default initializer options. - // Check that the value is not undefined to avoid overwriting defaults when explicitly set to undefined. - if ( - options.dataSource && - 'initializerOptions' in options.dataSource && - options.dataSource.initializerOptions !== undefined - ) { - validatedDataSourceOptions.initializerOptions = options.dataSource.initializerOptions; - } - validatedOptions.dataSource = validatedDataSourceOptions; allErrors.push(...errors); } else { From 718b11af0baddf39bf933d4679afa9b4b748622a Mon Sep 17 00:00:00 2001 From: Steven Zhang Date: Thu, 20 Nov 2025 16:50:33 -0600 Subject: [PATCH 7/8] feat: implement the custom data source option this enables users to specify file data source initializer --- ...der.test.ts => FDv1PayloadAdaptor.test.ts} | 6 +- .../DataSystem/CompositeDataSource.test.ts | 158 +++++++++++++++++ .../src/internal/fdv2/FDv1PayloadAdaptor.ts | 56 +++--- .../shared/sdk-server/src/LDClientImpl.ts | 165 +++++++++++++----- .../src/api/options/LDDataSystemOptions.ts | 7 +- .../data_sources/fileDataInitilizerFDv2.ts | 2 +- .../sdk-server/src/options/Configuration.ts | 6 +- 7 files changed, 321 insertions(+), 79 deletions(-) rename packages/shared/common/__tests__/internal/fdv2/{FDv2ChangeSetBuilder.test.ts => FDv1PayloadAdaptor.test.ts} (98%) diff --git a/packages/shared/common/__tests__/internal/fdv2/FDv2ChangeSetBuilder.test.ts b/packages/shared/common/__tests__/internal/fdv2/FDv1PayloadAdaptor.test.ts similarity index 98% rename from packages/shared/common/__tests__/internal/fdv2/FDv2ChangeSetBuilder.test.ts rename to packages/shared/common/__tests__/internal/fdv2/FDv1PayloadAdaptor.test.ts index f4ef82d1d9..8b6bca71b4 100644 --- a/packages/shared/common/__tests__/internal/fdv2/FDv2ChangeSetBuilder.test.ts +++ b/packages/shared/common/__tests__/internal/fdv2/FDv1PayloadAdaptor.test.ts @@ -10,7 +10,7 @@ class MockPayloadProcessor extends PayloadProcessor { super({}, undefined, undefined); } - processEvents(events: Event[]) { + override processEvents(events: Event[]) { this.processedEvents = [...this.processedEvents, ...events]; // Don't call super.processEvents to avoid side effects in tests } @@ -71,7 +71,9 @@ it('includes payload-transferred as the last event with empty state', () => { adaptor.start('xfer-full'); adaptor.finish(); - const payloadTransferredEvent = processor.processedEvents[processor.processedEvents.length - 1] as Event; + const payloadTransferredEvent = processor.processedEvents[ + processor.processedEvents.length - 1 + ] as Event; expect(payloadTransferredEvent.event).toBe('payload-transferred'); expect(payloadTransferredEvent.data).toBeDefined(); diff --git a/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts b/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts index 87d2c77105..79cb4e6ee7 100644 --- a/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts +++ b/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts @@ -1004,3 +1004,161 @@ it('consumes cancellation tokens correctly', async () => { // eslint-disable-next-line no-underscore-dangle expect(underTest._cancelTokens.length).toEqual(0); }); + +it('handles multiple initializers with fallback when first initializer fails and second succeeds', async () => { + const mockInitializer1Error = { + name: 'Error', + message: 'First initializer failed', + }; + const mockInitializer1: DataSource = { + start: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: any) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, + ) => { + _statusCallback(DataSourceState.Initializing); + _statusCallback(DataSourceState.Closed, mockInitializer1Error); + }, + ), + stop: jest.fn(), + }; + + const mockInitializer2Data = { key: 'init2' }; + const mockInitializer2: DataSource = { + start: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: any) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, + ) => { + _statusCallback(DataSourceState.Initializing); + _statusCallback(DataSourceState.Valid); + _dataCallback(true, mockInitializer2Data); + _statusCallback(DataSourceState.Closed); + }, + ), + stop: jest.fn(), + }; + + const mockSynchronizer1Data = { key: 'sync1' }; + const mockSynchronizer1 = { + start: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: any) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, + ) => { + _statusCallback(DataSourceState.Initializing); + _statusCallback(DataSourceState.Valid); + _dataCallback(false, mockSynchronizer1Data); + }, + ), + stop: jest.fn(), + }; + + const underTest = new CompositeDataSource( + [makeDataSourceFactory(mockInitializer1), makeDataSourceFactory(mockInitializer2)], + [makeDataSourceFactory(mockSynchronizer1)], + [], + undefined, + makeTestTransitionConditions(), + makeZeroBackoff(), + ); + + let dataCallback; + const statusCallback = jest.fn(); + await new Promise((resolve) => { + dataCallback = jest.fn((_: boolean, data: any) => { + if (data === mockSynchronizer1Data) { + resolve(); + } + }); + + underTest.start(dataCallback, statusCallback); + }); + + expect(mockInitializer1.start).toHaveBeenCalledTimes(1); + expect(mockInitializer2.start).toHaveBeenCalledTimes(1); + expect(mockSynchronizer1.start).toHaveBeenCalledTimes(1); + expect(statusCallback).toHaveBeenCalledTimes(5); + expect(statusCallback).toHaveBeenNthCalledWith(1, DataSourceState.Initializing, undefined); + expect(statusCallback).toHaveBeenNthCalledWith( + 2, + DataSourceState.Interrupted, + mockInitializer1Error, + ); + expect(statusCallback).toHaveBeenNthCalledWith(3, DataSourceState.Valid, undefined); + expect(statusCallback).toHaveBeenNthCalledWith(4, DataSourceState.Interrupted, undefined); + expect(statusCallback).toHaveBeenNthCalledWith(5, DataSourceState.Valid, undefined); +}); + +it('does not run second initializer when first initializer succeeds', async () => { + const mockInitializer1Data = { key: 'init1' }; + const mockInitializer1: DataSource = { + start: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: any) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, + ) => { + _statusCallback(DataSourceState.Initializing); + _statusCallback(DataSourceState.Valid); + _dataCallback(true, mockInitializer1Data); + _statusCallback(DataSourceState.Closed); + }, + ), + stop: jest.fn(), + }; + + const mockInitializer2: DataSource = { + start: jest.fn(), + stop: jest.fn(), + }; + + const mockSynchronizer1Data = { key: 'sync1' }; + const mockSynchronizer1 = { + start: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: any) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, + ) => { + _statusCallback(DataSourceState.Initializing); + _statusCallback(DataSourceState.Valid); + _dataCallback(false, mockSynchronizer1Data); + }, + ), + stop: jest.fn(), + }; + + const underTest = new CompositeDataSource( + [makeDataSourceFactory(mockInitializer1), makeDataSourceFactory(mockInitializer2)], + [makeDataSourceFactory(mockSynchronizer1)], + [], + undefined, + makeTestTransitionConditions(), + makeZeroBackoff(), + ); + + let dataCallback; + const statusCallback = jest.fn(); + await new Promise((resolve) => { + dataCallback = jest.fn((_: boolean, data: any) => { + if (data === mockSynchronizer1Data) { + resolve(); + } + }); + + underTest.start(dataCallback, statusCallback); + }); + + expect(mockInitializer1.start).toHaveBeenCalledTimes(1); + expect(mockInitializer2.start).toHaveBeenCalledTimes(0); + expect(mockSynchronizer1.start).toHaveBeenCalledTimes(1); +}); diff --git a/packages/shared/common/src/internal/fdv2/FDv1PayloadAdaptor.ts b/packages/shared/common/src/internal/fdv2/FDv1PayloadAdaptor.ts index 2a9eb2cd0e..5dd27f8b7e 100644 --- a/packages/shared/common/src/internal/fdv2/FDv1PayloadAdaptor.ts +++ b/packages/shared/common/src/internal/fdv2/FDv1PayloadAdaptor.ts @@ -1,5 +1,5 @@ import { PayloadProcessor } from './payloadProcessor'; -import { DeleteObject, Event, PutObject, EventType } from './proto'; +import { DeleteObject, Event, PutObject } from './proto'; // eventually this will be the same as the IntentCode type, but for now we'll use a simpler type type supportedIntentCodes = 'xfer-full'; @@ -27,7 +27,7 @@ export default class FDv1PayloadAdaptor { private _intent: supportedIntentCodes = 'xfer-full'; constructor(processor: PayloadProcessor) { - this._processor = processor + this._processor = processor; } /** @@ -38,7 +38,7 @@ export default class FDv1PayloadAdaptor { throw new Error('intent: only xfer-full is supported'); } - this._events = [] + this._events = []; this._intent = intent; return this; @@ -72,12 +72,14 @@ export default class FDv1PayloadAdaptor { const serverIntentEvent: Event = { event: 'server-intent', data: { - payloads: [{ - id: PAYLOAD_ID, - target: 1, - intentCode: this._intent, - reason: 'payload-missing' - }], + payloads: [ + { + id: PAYLOAD_ID, + target: 1, + intentCode: this._intent, + reason: 'payload-missing', + }, + ], }, }; @@ -92,42 +94,38 @@ export default class FDv1PayloadAdaptor { }, }; - this._processor.processEvents([ - serverIntentEvent, - ...this._events, - finishEvent, - ]); - this._events = [] + this._processor.processEvents([serverIntentEvent, ...this._events, finishEvent]); + this._events = []; return this; } /** - * + * * @param data - FDv1 payload from a fdv1 poll */ pushFdv1Payload(data: fdv1Payload): this { Object.entries(data?.flags || []).forEach(([key, flag]) => { this.putObject({ - // strong assumption here that we only have segments and flags. - kind: 'flag', - key: key, - version: flag.version || 1, - object: flag, - }); + // strong assumption here that we only have segments and flags. + kind: 'flag', + key, + version: flag.version || 1, + object: flag, + }); }); Object.entries(data?.segments || []).forEach(([key, segment]) => { this.putObject({ - // strong assumption here that we only have segments and flags. - kind: 'segment', - key: key, - version: segment.version || 1, - object: segment, - }); + // strong assumption here that we only have segments and flags. + kind: 'segment', + key, + version: segment.version || 1, + object: segment, + }); }); - return this + return this; } /** diff --git a/packages/shared/sdk-server/src/LDClientImpl.ts b/packages/shared/sdk-server/src/LDClientImpl.ts index a4f324ac99..9a3a55098d 100644 --- a/packages/shared/sdk-server/src/LDClientImpl.ts +++ b/packages/shared/sdk-server/src/LDClientImpl.ts @@ -34,6 +34,7 @@ import { Hook } from './api/integrations/Hook'; import { BigSegmentStoreMembership } from './api/interfaces'; import { LDWaitForInitializationOptions } from './api/LDWaitForInitializationOptions'; import { + isCustomOptions, isPollingOnlyOptions, isStandardOptions, isStreamingOnlyOptions, @@ -44,6 +45,7 @@ import { createPluginEnvironmentMetadata } from './createPluginEnvironmentMetada import { createPayloadListener } from './data_sources/createPayloadListenerFDv2'; import { createStreamListeners } from './data_sources/createStreamListeners'; import DataSourceUpdates from './data_sources/DataSourceUpdates'; +import FileDataInitializerFDv2 from './data_sources/fileDataInitilizerFDv2'; import OneShotInitializerFDv2 from './data_sources/OneShotInitializerFDv2'; import PollingProcessor from './data_sources/PollingProcessor'; import PollingProcessorFDv2 from './data_sources/PollingProcessorFDv2'; @@ -65,7 +67,10 @@ import FlagsStateBuilder from './FlagsStateBuilder'; import HookRunner from './hooks/HookRunner'; import MigrationOpEventToInputEvent from './MigrationOpEventConversion'; import MigrationOpTracker from './MigrationOpTracker'; -import Configuration, { DEFAULT_POLL_INTERVAL } from './options/Configuration'; +import Configuration, { + DEFAULT_POLL_INTERVAL, + DEFAULT_STREAM_RECONNECT_DELAY, +} from './options/Configuration'; import { ServerInternalOptions } from './options/ServerInternalOptions'; import VersionedDataKinds from './store/VersionedDataKinds'; @@ -323,60 +328,130 @@ function constructFDv2( if (!(config.offline || config.dataSystem!.useLdd)) { // make the FDv2 composite datasource with initializers/synchronizers const initializers: subsystem.LDDataSourceFactory[] = []; + const synchronizers: subsystem.LDDataSourceFactory[] = []; + const fdv1FallbackSynchronizers: subsystem.LDDataSourceFactory[] = []; - // use one shot initializer for performance and cost if we can do a combination of polling and streaming - if (isStandardOptions(dataSystem.dataSource)) { - initializers.push( - () => - new OneShotInitializerFDv2( - new Requestor(config, platform.requests, baseHeaders, '/sdk/poll', config.logger), - config.logger, - ), - ); - } + if (isCustomOptions(dataSystem.dataSource)) { + const { initializers: initializerConfigs = [], synchronizers: synchronizerConfigs = [] } = + dataSystem.dataSource; - const synchronizers: subsystem.LDDataSourceFactory[] = []; - // if streaming is configured, add streaming synchronizer - if (isStandardOptions(dataSystem.dataSource) || isStreamingOnlyOptions(dataSystem.dataSource)) { - const reconnectDelay = dataSystem.dataSource.streamInitialReconnectDelay; - synchronizers.push( - () => - new StreamingProcessorFDv2( - clientContext, - '/sdk/stream', - [], - baseHeaders, - diagnosticsManager, - reconnectDelay, - ), - ); - } + initializerConfigs.forEach((initializerConfig) => { + switch (initializerConfig.type) { + case 'file': { + initializers.push( + () => new FileDataInitializerFDv2(initializerConfig, platform, config.logger), + ); + break; + } + case 'polling': { + const { pollInterval = DEFAULT_POLL_INTERVAL } = initializerConfig; + initializers.push( + () => + new PollingProcessorFDv2( + new Requestor(config, platform.requests, baseHeaders, '/sdk/poll', config.logger), + pollInterval, + config.logger, + ), + ); + break; + } + default: { + throw new Error('Unsupported initializer type'); + } + } + }); + synchronizerConfigs.forEach((synchronizerConfig) => { + switch (synchronizerConfig.type) { + case 'streaming': { + const { streamInitialReconnectDelay = DEFAULT_STREAM_RECONNECT_DELAY } = + synchronizerConfig; + synchronizers.push( + () => + new StreamingProcessorFDv2( + clientContext, + '/sdk/stream', + [], + baseHeaders, + diagnosticsManager, + streamInitialReconnectDelay, + ), + ); + break; + } + case 'polling': { + const { pollInterval = DEFAULT_POLL_INTERVAL } = synchronizerConfig; + synchronizers.push( + () => + new PollingProcessorFDv2( + new Requestor(config, platform.requests, baseHeaders, '/sdk/poll', config.logger), + pollInterval, + config.logger, + ), + ); + break; + } + default: { + throw new Error('Unsupported synchronizer type'); + } + } + }); + } else { + // use one shot initializer for performance and cost if we can do a combination of polling and streaming + if (isStandardOptions(dataSystem.dataSource)) { + initializers.push( + () => + new OneShotInitializerFDv2( + new Requestor(config, platform.requests, baseHeaders, '/sdk/poll', config.logger), + config.logger, + ), + ); + } - let pollingInterval = DEFAULT_POLL_INTERVAL; - // if polling is configured, add polling synchronizer - if (isStandardOptions(dataSystem.dataSource) || isPollingOnlyOptions(dataSystem.dataSource)) { - pollingInterval = dataSystem.dataSource.pollInterval ?? DEFAULT_POLL_INTERVAL; - synchronizers.push( + // if streaming is configured, add streaming synchronizer + if ( + isStandardOptions(dataSystem.dataSource) || + isStreamingOnlyOptions(dataSystem.dataSource) + ) { + const reconnectDelay = dataSystem.dataSource.streamInitialReconnectDelay; + synchronizers.push( + () => + new StreamingProcessorFDv2( + clientContext, + '/sdk/stream', + [], + baseHeaders, + diagnosticsManager, + reconnectDelay, + ), + ); + } + + let pollingInterval = DEFAULT_POLL_INTERVAL; + // if polling is configured, add polling synchronizer + if (isStandardOptions(dataSystem.dataSource) || isPollingOnlyOptions(dataSystem.dataSource)) { + pollingInterval = dataSystem.dataSource.pollInterval ?? DEFAULT_POLL_INTERVAL; + synchronizers.push( + () => + new PollingProcessorFDv2( + new Requestor(config, platform.requests, baseHeaders, '/sdk/poll', logger), + pollingInterval, + logger, + ), + ); + } + + // This is short term handling and will be removed once FDv2 adoption is sufficient. + fdv1FallbackSynchronizers.push( () => new PollingProcessorFDv2( - new Requestor(config, platform.requests, baseHeaders, '/sdk/poll', logger), + new Requestor(config, platform.requests, baseHeaders, '/sdk/latest-all', config.logger), pollingInterval, - logger, + config.logger, + true, ), ); } - // This is short term handling and will be removed once FDv2 adoption is sufficient. - const fdv1FallbackSynchronizers = [ - () => - new PollingProcessorFDv2( - new Requestor(config, platform.requests, baseHeaders, '/sdk/latest-all', config.logger), - pollingInterval, - config.logger, - true, - ), - ]; - dataSource = new CompositeDataSource( initializers, synchronizers, diff --git a/packages/shared/sdk-server/src/api/options/LDDataSystemOptions.ts b/packages/shared/sdk-server/src/api/options/LDDataSystemOptions.ts index a3b22894ea..34bc001454 100644 --- a/packages/shared/sdk-server/src/api/options/LDDataSystemOptions.ts +++ b/packages/shared/sdk-server/src/api/options/LDDataSystemOptions.ts @@ -73,7 +73,8 @@ export interface LDDataSystemOptions { export type DataSourceOptions = | StandardDataSourceOptions | StreamingDataSourceOptions - | PollingDataSourceOptions; + | PollingDataSourceOptions + | CustomDataSourceOptions; export type DataSourceConfiguration = | FileSystemDataSourceConfiguration @@ -187,3 +188,7 @@ export function isStreamingOnlyOptions(u: any): u is StreamingDataSourceOptions export function isPollingOnlyOptions(u: any): u is PollingDataSourceOptions { return u.dataSourceOptionsType === 'pollingOnly'; } + +export function isCustomOptions(u: any): u is CustomDataSourceOptions { + return u.dataSourceOptionsType === 'custom'; +} diff --git a/packages/shared/sdk-server/src/data_sources/fileDataInitilizerFDv2.ts b/packages/shared/sdk-server/src/data_sources/fileDataInitilizerFDv2.ts index d837a10818..22dc19edfd 100644 --- a/packages/shared/sdk-server/src/data_sources/fileDataInitilizerFDv2.ts +++ b/packages/shared/sdk-server/src/data_sources/fileDataInitilizerFDv2.ts @@ -24,7 +24,7 @@ export default class FileDataInitializerFDv2 implements subsystemCommon.DataSour private _yamlParser?: (data: string) => any; private _fileLoader?: FileLoader; - constructor(options: FileSystemDataSourceConfiguration, platform: Platform, logger: LDLogger) { + constructor(options: FileSystemDataSourceConfiguration, platform: Platform, logger?: LDLogger) { this._validateInputs(options, platform); this._paths = options.paths; diff --git a/packages/shared/sdk-server/src/options/Configuration.ts b/packages/shared/sdk-server/src/options/Configuration.ts index 5eb7bbc5a3..aaec4dd3f0 100644 --- a/packages/shared/sdk-server/src/options/Configuration.ts +++ b/packages/shared/sdk-server/src/options/Configuration.ts @@ -16,6 +16,7 @@ import { LDBigSegmentsOptions, LDOptions, LDProxyOptions, LDTLSOptions } from '. import { Hook } from '../api/integrations'; import { DataSourceOptions, + isCustomOptions, isPollingOnlyOptions, isStandardOptions, isStreamingOnlyOptions, @@ -78,7 +79,7 @@ const validations: Record = { }; export const DEFAULT_POLL_INTERVAL = 30; -const DEFAULT_STREAM_RECONNECT_DELAY = 1; +export const DEFAULT_STREAM_RECONNECT_DELAY = 1; const defaultStandardDataSourceOptions: StandardDataSourceOptions = { dataSourceOptionsType: 'standard', @@ -237,6 +238,9 @@ function validateDataSystemOptions(options: Options): { options.dataSource, defaultPollingDataSourceOptions, )); + } else if (isCustomOptions(options.dataSource)) { + validatedDataSourceOptions = options.dataSource; + errors = []; } else { // provided datasource options don't fit any expected form, drop them and use defaults validatedDataSourceOptions = defaultStandardDataSourceOptions; From c42e4f4c3ba2c2d89e1f92ebab01cde976d00d8f Mon Sep 17 00:00:00 2001 From: Steven Zhang Date: Fri, 21 Nov 2025 10:05:02 -0600 Subject: [PATCH 8/8] chore: changed payload adaptor implementation this commit will change the payload adaptor from a class to function so it could be minified better --- .../internal/fdv2/FDv1PayloadAdaptor.test.ts | 152 +------------ .../src/internal/fdv2/FDv1PayloadAdaptor.ts | 205 +++++++----------- .../shared/common/src/internal/fdv2/index.ts | 2 +- .../shared/sdk-server/src/LDClientImpl.ts | 4 +- .../src/data_sources/PollingProcessorFDv2.ts | 4 +- .../data_sources/fileDataInitilizerFDv2.ts | 4 +- 6 files changed, 93 insertions(+), 278 deletions(-) diff --git a/packages/shared/common/__tests__/internal/fdv2/FDv1PayloadAdaptor.test.ts b/packages/shared/common/__tests__/internal/fdv2/FDv1PayloadAdaptor.test.ts index 8b6bca71b4..478106d7ec 100644 --- a/packages/shared/common/__tests__/internal/fdv2/FDv1PayloadAdaptor.test.ts +++ b/packages/shared/common/__tests__/internal/fdv2/FDv1PayloadAdaptor.test.ts @@ -1,6 +1,6 @@ -import FDv1PayloadAdaptor from '../../../src/internal/fdv2/FDv1PayloadAdaptor'; +import { fdv1PayloadAdaptor as FDv1PayloadAdaptor } from '../../../src/internal/fdv2/FDv1PayloadAdaptor'; import { PayloadProcessor } from '../../../src/internal/fdv2/payloadProcessor'; -import { DeleteObject, Event, PutObject } from '../../../src/internal/fdv2/proto'; +import { Event, PutObject } from '../../../src/internal/fdv2/proto'; // Mock PayloadProcessor that captures events class MockPayloadProcessor extends PayloadProcessor { @@ -16,41 +16,10 @@ class MockPayloadProcessor extends PayloadProcessor { } } -it('throws an error when using unsupported intent', () => { +it('includes server-intent as the first event and payload-transferred as the last eventwith correct structure', () => { const processor = new MockPayloadProcessor(); - const adaptor = new FDv1PayloadAdaptor(processor); - // @ts-ignore - testing invalid intent - expect(() => adaptor.start('invalid-intent')).toThrow('intent: only xfer-full is supported'); -}); - -it('starts a new changeset with the given intent', () => { - const processor = new MockPayloadProcessor(); - const adaptor = new FDv1PayloadAdaptor(processor); - adaptor.start('xfer-full'); - adaptor.finish(); - - expect(processor.processedEvents.length).toBeGreaterThan(0); - expect(processor.processedEvents[0].event).toBe('server-intent'); -}); - -it('resets events when starting a new changeset', () => { - const processor = new MockPayloadProcessor(); - const adaptor = new FDv1PayloadAdaptor(processor); - adaptor.start('xfer-full'); - adaptor.putObject({ kind: 'flag', key: 'test-flag', version: 1, object: {} }); - adaptor.start('xfer-full'); - adaptor.finish(); - - // Should only have server-intent and payload-transferred, no put-object events - const putObjectEvents = processor.processedEvents.filter((e) => e.event === 'put-object'); - expect(putObjectEvents.length).toBe(0); -}); - -it('includes server-intent as the first event with correct structure', () => { - const processor = new MockPayloadProcessor(); - const adaptor = new FDv1PayloadAdaptor(processor); - adaptor.start('xfer-full'); - adaptor.finish(); + const adaptor = FDv1PayloadAdaptor(processor); + adaptor.processFullTransfer({ flags: {}, segments: {} }); const serverIntentEvent = processor.processedEvents[0] as Event; expect(serverIntentEvent.event).toBe('server-intent'); @@ -63,13 +32,6 @@ it('includes server-intent as the first event with correct structure', () => { expect(intentData.payloads[0].id).toBe('FDv1Fallback'); expect(intentData.payloads[0].target).toBe(1); expect(intentData.payloads[0].reason).toBe('payload-missing'); -}); - -it('includes payload-transferred as the last event with empty state', () => { - const processor = new MockPayloadProcessor(); - const adaptor = new FDv1PayloadAdaptor(processor); - adaptor.start('xfer-full'); - adaptor.finish(); const payloadTransferredEvent = processor.processedEvents[ processor.processedEvents.length - 1 @@ -83,66 +45,9 @@ it('includes payload-transferred as the last event with empty state', () => { expect(transferredData.id).toBe('FDv1Fallback'); }); -it('includes all put and delete events between server-intent and payload-transferred', () => { - const processor = new MockPayloadProcessor(); - const adaptor = new FDv1PayloadAdaptor(processor); - const putObj1: PutObject = { - kind: 'flag', - key: 'flag-1', - version: 1, - object: { key: 'flag-1', on: true }, - }; - const deleteObj: DeleteObject = { - kind: 'segment', - key: 'segment-1', - version: 2, - }; - const putObj2: PutObject = { - kind: 'flag', - key: 'flag-2', - version: 3, - object: { key: 'flag-2', on: false }, - }; - - adaptor.start('xfer-full'); - adaptor.putObject(putObj1); - adaptor.deleteObject(deleteObj); - adaptor.putObject(putObj2); - adaptor.finish(); - - expect(processor.processedEvents.length).toBe(5); // server-intent + 3 events + payload-transferred - expect(processor.processedEvents[0].event).toBe('server-intent'); - expect(processor.processedEvents[1].event).toBe('put-object'); - expect((processor.processedEvents[1].data as PutObject).key).toBe('flag-1'); - expect(processor.processedEvents[2].event).toBe('delete-object'); - expect((processor.processedEvents[2].data as DeleteObject).key).toBe('segment-1'); - expect(processor.processedEvents[3].event).toBe('put-object'); - expect((processor.processedEvents[3].data as PutObject).key).toBe('flag-2'); - expect(processor.processedEvents[4].event).toBe('payload-transferred'); -}); - -it('clears events after finish is called', () => { - const processor = new MockPayloadProcessor(); - const adaptor = new FDv1PayloadAdaptor(processor); - adaptor.start('xfer-full'); - adaptor.putObject({ kind: 'flag', key: 'test-flag', version: 1, object: {} }); - adaptor.finish(); - - const firstFinishEventCount = processor.processedEvents.length; - expect(firstFinishEventCount).toBe(3); // server-intent + put-object + payload-transferred - - // Start a new changeset - adaptor.start('xfer-full'); - adaptor.finish(); - - // Should have processed 2 more events (server-intent + payload-transferred) - // but the adaptor's internal events should be cleared - expect(processor.processedEvents.length).toBe(firstFinishEventCount + 2); -}); - it('pushFdv1Payload adds put-object events for flags and segments', () => { const processor = new MockPayloadProcessor(); - const adaptor = new FDv1PayloadAdaptor(processor); + const adaptor = FDv1PayloadAdaptor(processor); const fdv1Payload = { flags: { 'flag-1': { key: 'flag-1', version: 1, on: true }, @@ -153,9 +58,7 @@ it('pushFdv1Payload adds put-object events for flags and segments', () => { }, }; - adaptor.start('xfer-full'); - adaptor.pushFdv1Payload(fdv1Payload); - adaptor.finish(); + adaptor.processFullTransfer(fdv1Payload); const putObjectEvents = processor.processedEvents.filter((e) => e.event === 'put-object'); expect(putObjectEvents.length).toBe(3); @@ -175,44 +78,3 @@ it('pushFdv1Payload adds put-object events for flags and segments', () => { expect((segment1Event!.data as PutObject).kind).toBe('segment'); expect((segment1Event!.data as PutObject).version).toBe(1); }); - -it('pushFdv1Payload handles empty or missing flags and segments', () => { - const processor = new MockPayloadProcessor(); - const adaptor = new FDv1PayloadAdaptor(processor); - - adaptor.start('xfer-full'); - adaptor.pushFdv1Payload({ flags: {}, segments: {} }); - adaptor.finish(); - - const putObjectEvents = processor.processedEvents.filter((e) => e.event === 'put-object'); - expect(putObjectEvents.length).toBe(0); - - // Test with missing properties - const processor2 = new MockPayloadProcessor(); - const adaptor2 = new FDv1PayloadAdaptor(processor2); - adaptor2.start('xfer-full'); - adaptor2.pushFdv1Payload({} as any); - adaptor2.finish(); - - const putObjectEvents2 = processor2.processedEvents.filter((e) => e.event === 'put-object'); - expect(putObjectEvents2.length).toBe(0); -}); - -it('pushFdv1Payload uses default version of 1 when version is missing', () => { - const processor = new MockPayloadProcessor(); - const adaptor = new FDv1PayloadAdaptor(processor); - const fdv1Payload = { - flags: { - 'flag-1': { key: 'flag-1', on: true }, // no version - }, - segments: {}, - }; - - adaptor.start('xfer-full'); - adaptor.pushFdv1Payload(fdv1Payload); - adaptor.finish(); - - const putObjectEvents = processor.processedEvents.filter((e) => e.event === 'put-object'); - expect(putObjectEvents.length).toBe(1); - expect((putObjectEvents[0].data as PutObject).version).toBe(1); -}); diff --git a/packages/shared/common/src/internal/fdv2/FDv1PayloadAdaptor.ts b/packages/shared/common/src/internal/fdv2/FDv1PayloadAdaptor.ts index 5dd27f8b7e..f797bfad5d 100644 --- a/packages/shared/common/src/internal/fdv2/FDv1PayloadAdaptor.ts +++ b/packages/shared/common/src/internal/fdv2/FDv1PayloadAdaptor.ts @@ -1,8 +1,5 @@ import { PayloadProcessor } from './payloadProcessor'; -import { DeleteObject, Event, PutObject } from './proto'; - -// eventually this will be the same as the IntentCode type, but for now we'll use a simpler type -type supportedIntentCodes = 'xfer-full'; +import { Event } from './proto'; interface fdv1Payload { flags: { [name: string]: any }; @@ -12,143 +9,101 @@ interface fdv1Payload { const PAYLOAD_ID = 'FDv1Fallback'; /** - * FDv1PayloadAdaptor is a helper for constructing a change set for FDv2. - * The main use case for this adaptor is to help construct a change set from - * a FDv1 payload. - * - * @experimental - * This type is not stable, and not subject to any backwards - * compatibility guarantees or semantic versioning. It is not suitable for production usage. + * The FDv1PayloadAdaptor is a helper class that converts FDv1 payloads into events that the PayloadProcessor can understand. */ -export default class FDv1PayloadAdaptor { - private _events: Event[] = []; - private _processor: PayloadProcessor; - private _selector: string = ''; - private _intent: supportedIntentCodes = 'xfer-full'; - - constructor(processor: PayloadProcessor) { - this._processor = processor; - } - +export interface FDv1PayloadAdaptor { /** - * Begins a new change set with a given intent. + * The PayloadProcessor that will be used to process the events. */ - start(intent: supportedIntentCodes): this { - if (intent !== 'xfer-full') { - throw new Error('intent: only xfer-full is supported'); - } - - this._events = []; - this._intent = intent; - - return this; - } + readonly _processor: PayloadProcessor; /** - * Customizes the selector to use for the change set. - * - * NOTE: you probably only need this method for a synchronizer - * fallback scenario. - * - * @param selector - the selector to use for the change set - * @returns {this} - the adaptor instance + * The selector that will be used to identify the payload. */ - useSelector(selector: string): this { - this._selector = selector; - return this; - } + _selector: string; /** - * Returns the completed changeset. - * NOTE: currently, this adaptor is not designed to continuously build changesets, rather - * it is designed to construct a single changeset at a time. We can easily expand this by - * resetting some values in the future. + * The method that will be used to set a selector for the payload that is + * being processed. + * + * @remarks + * This method probably shouldn't be used in most instances as FDv1 payloads + * do not have the concept of a selector. + * + * @param selector - The selector to set for the payload. + * @returns this FDv1PayloadAdaptor instance */ - finish(): this { - // NOTE: currently the only use case for this adaptor is to - // construct a change set for a file data intializer which only supports - // FDv1 format. As such, we need to use dummy values to satisfy the FDv2 - // protocol. - const serverIntentEvent: Event = { - event: 'server-intent', - data: { - payloads: [ - { - id: PAYLOAD_ID, - target: 1, - intentCode: this._intent, - reason: 'payload-missing', - }, - ], - }, - }; - - const finishEvent: Event = { - event: 'payload-transferred', - data: { - // IMPORTANT: the selector MUST be empty or "live" data synchronizers - // will not work as it would try to resume from a bogus state. - state: this._selector, - version: 1, - id: PAYLOAD_ID, - }, - }; - - this._processor.processEvents([serverIntentEvent, ...this._events, finishEvent]); - this._events = []; - - return this; - } + useSelector: (selector: string) => FDv1PayloadAdaptor; /** + * The method that will be used to process a full transfer changeset. * - * @param data - FDv1 payload from a fdv1 poll + * @param data - The data to process. */ - pushFdv1Payload(data: fdv1Payload): this { - Object.entries(data?.flags || []).forEach(([key, flag]) => { - this.putObject({ - // strong assumption here that we only have segments and flags. - kind: 'flag', - key, - version: flag.version || 1, - object: flag, - }); - }); + processFullTransfer: (data: fdv1Payload) => void; +} - Object.entries(data?.segments || []).forEach(([key, segment]) => { - this.putObject({ - // strong assumption here that we only have segments and flags. - kind: 'segment', - key, - version: segment.version || 1, - object: segment, +export function fdv1PayloadAdaptor(processor: PayloadProcessor): FDv1PayloadAdaptor { + return { + _processor: processor, + _selector: '', + useSelector(selector: string): FDv1PayloadAdaptor { + this._selector = selector; + return this; + }, + processFullTransfer(data) { + const events: Array = [ + { + event: 'server-intent', + data: { + payloads: [ + { + id: PAYLOAD_ID, + target: 1, + intentCode: 'xfer-full', + reason: 'payload-missing', + }, + ], + }, + }, + ]; + + Object.entries(data?.flags || []).forEach(([key, flag]) => { + events.push({ + event: 'put-object', + data: { + kind: 'flag', + key, + version: flag.version || 1, + object: flag, + }, + }); }); - }); - return this; - } - - /** - * Adds a new object to the changeset. - */ - putObject(obj: PutObject): this { - this._events.push({ - event: 'put-object', - data: obj, - }); - - return this; - } + Object.entries(data?.segments || []).forEach(([key, segment]) => { + events.push({ + event: 'put-object', + data: { + kind: 'segment', + key, + version: segment.version || 1, + object: segment, + }, + }); + }); - /** - * Adds a deletion to the changeset. - */ - deleteObject(obj: DeleteObject): this { - this._events.push({ - event: 'delete-object', - data: obj, - }); + events.push({ + event: 'payload-transferred', + data: { + // IMPORTANT: the selector MUST be empty or "live" data synchronizers + // will not work as it would try to resume from a bogus state. + state: this._selector, + version: 1, + id: PAYLOAD_ID, + }, + }); - return this; - } + this._processor.processEvents(events); + }, + }; } diff --git a/packages/shared/common/src/internal/fdv2/index.ts b/packages/shared/common/src/internal/fdv2/index.ts index be62fbcd2f..d0178d6a47 100644 --- a/packages/shared/common/src/internal/fdv2/index.ts +++ b/packages/shared/common/src/internal/fdv2/index.ts @@ -1,4 +1,4 @@ -import FDv1PayloadAdaptor from './FDv1PayloadAdaptor'; +import { fdv1PayloadAdaptor as FDv1PayloadAdaptor } from './FDv1PayloadAdaptor'; import { FDv2EventsCollection, Payload, diff --git a/packages/shared/sdk-server/src/LDClientImpl.ts b/packages/shared/sdk-server/src/LDClientImpl.ts index 9a3a55098d..97205d287b 100644 --- a/packages/shared/sdk-server/src/LDClientImpl.ts +++ b/packages/shared/sdk-server/src/LDClientImpl.ts @@ -344,12 +344,10 @@ function constructFDv2( break; } case 'polling': { - const { pollInterval = DEFAULT_POLL_INTERVAL } = initializerConfig; initializers.push( () => - new PollingProcessorFDv2( + new OneShotInitializerFDv2( new Requestor(config, platform.requests, baseHeaders, '/sdk/poll', config.logger), - pollInterval, config.logger, ), ); diff --git a/packages/shared/sdk-server/src/data_sources/PollingProcessorFDv2.ts b/packages/shared/sdk-server/src/data_sources/PollingProcessorFDv2.ts index 9ae4802825..3aac11c15d 100644 --- a/packages/shared/sdk-server/src/data_sources/PollingProcessorFDv2.ts +++ b/packages/shared/sdk-server/src/data_sources/PollingProcessorFDv2.ts @@ -34,9 +34,9 @@ function processFDv1FlagsAndSegments( payloadProcessor: internal.PayloadProcessor, data: FlagsAndSegments, ) { - const adaptor = new internal.FDv1PayloadAdaptor(payloadProcessor); + const adaptor = internal.FDv1PayloadAdaptor(payloadProcessor); - adaptor.start('xfer-full').useSelector('FDv1Fallback').pushFdv1Payload(data).finish(); + adaptor.useSelector('FDv1Fallback').processFullTransfer(data); } /** diff --git a/packages/shared/sdk-server/src/data_sources/fileDataInitilizerFDv2.ts b/packages/shared/sdk-server/src/data_sources/fileDataInitilizerFDv2.ts index 22dc19edfd..4e40cac96e 100644 --- a/packages/shared/sdk-server/src/data_sources/fileDataInitilizerFDv2.ts +++ b/packages/shared/sdk-server/src/data_sources/fileDataInitilizerFDv2.ts @@ -70,7 +70,7 @@ export default class FileDataInitializerFDv2 implements subsystemCommon.DataSour this._logger, ); - const adaptor = new internal.FDv1PayloadAdaptor(payloadProcessor); + const adaptor = internal.FDv1PayloadAdaptor(payloadProcessor); this._fileLoader = new FileLoader( this._filesystem, @@ -87,7 +87,7 @@ export default class FileDataInitializerFDv2 implements subsystemCommon.DataSour statusCallback(subsystemCommon.DataSourceState.Valid); - adaptor.start('xfer-full').pushFdv1Payload(parsedData).finish(); + adaptor.processFullTransfer(parsedData); statusCallback(subsystemCommon.DataSourceState.Closed); } catch (err) {