Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import { fdv1PayloadAdaptor as FDv1PayloadAdaptor } from '../../../src/internal/fdv2/FDv1PayloadAdaptor';
import { PayloadProcessor } from '../../../src/internal/fdv2/payloadProcessor';
import { Event, PutObject } from '../../../src/internal/fdv2/proto';

// Mock PayloadProcessor that captures events
class MockPayloadProcessor extends PayloadProcessor {
public processedEvents: Event[] = [];

constructor() {
super({}, undefined, undefined);
}

override processEvents(events: Event[]) {
this.processedEvents = [...this.processedEvents, ...events];
// Don't call super.processEvents to avoid side effects in tests
}
}

it('includes server-intent as the first event and payload-transferred as the last eventwith correct structure', () => {
const processor = new MockPayloadProcessor();
const adaptor = FDv1PayloadAdaptor(processor);
adaptor.processFullTransfer({ flags: {}, segments: {} });

const serverIntentEvent = processor.processedEvents[0] as Event;
expect(serverIntentEvent.event).toBe('server-intent');
expect(serverIntentEvent.data).toBeDefined();

const intentData = serverIntentEvent.data as any;
expect(intentData.payloads).toBeDefined();
expect(intentData.payloads.length).toBe(1);
expect(intentData.payloads[0].intentCode).toBe('xfer-full');
expect(intentData.payloads[0].id).toBe('FDv1Fallback');
expect(intentData.payloads[0].target).toBe(1);
expect(intentData.payloads[0].reason).toBe('payload-missing');

const payloadTransferredEvent = processor.processedEvents[
processor.processedEvents.length - 1
] as Event;
expect(payloadTransferredEvent.event).toBe('payload-transferred');
expect(payloadTransferredEvent.data).toBeDefined();

const transferredData = payloadTransferredEvent.data as any;
expect(transferredData.state).toBe('');
expect(transferredData.version).toBe(1);
expect(transferredData.id).toBe('FDv1Fallback');
});

it('pushFdv1Payload adds put-object events for flags and segments', () => {
const processor = new MockPayloadProcessor();
const adaptor = FDv1PayloadAdaptor(processor);
const fdv1Payload = {
flags: {
'flag-1': { key: 'flag-1', version: 1, on: true },
'flag-2': { key: 'flag-2', version: 2, on: false },
},
segments: {
'segment-1': { key: 'segment-1', version: 1 },
},
};

adaptor.processFullTransfer(fdv1Payload);

const putObjectEvents = processor.processedEvents.filter((e) => e.event === 'put-object');
expect(putObjectEvents.length).toBe(3);

const flag1Event = putObjectEvents.find((e) => (e.data as PutObject).key === 'flag-1');
expect(flag1Event).toBeDefined();
expect((flag1Event!.data as PutObject).kind).toBe('flag');
expect((flag1Event!.data as PutObject).version).toBe(1);

const flag2Event = putObjectEvents.find((e) => (e.data as PutObject).key === 'flag-2');
expect(flag2Event).toBeDefined();
expect((flag2Event!.data as PutObject).kind).toBe('flag');
expect((flag2Event!.data as PutObject).version).toBe(2);

const segment1Event = putObjectEvents.find((e) => (e.data as PutObject).key === 'segment-1');
expect(segment1Event).toBeDefined();
expect((segment1Event!.data as PutObject).kind).toBe('segment');
expect((segment1Event!.data as PutObject).version).toBe(1);
});
Original file line number Diff line number Diff line change
Expand Up @@ -1004,3 +1004,161 @@ it('consumes cancellation tokens correctly', async () => {
// eslint-disable-next-line no-underscore-dangle
expect(underTest._cancelTokens.length).toEqual(0);
});

it('handles multiple initializers with fallback when first initializer fails and second succeeds', async () => {
const mockInitializer1Error = {
name: 'Error',
message: 'First initializer failed',
};
const mockInitializer1: DataSource = {
start: jest
.fn()
.mockImplementation(
(
_dataCallback: (basis: boolean, data: any) => void,
_statusCallback: (status: DataSourceState, err?: any) => void,
) => {
_statusCallback(DataSourceState.Initializing);
_statusCallback(DataSourceState.Closed, mockInitializer1Error);
},
),
stop: jest.fn(),
};

const mockInitializer2Data = { key: 'init2' };
const mockInitializer2: DataSource = {
start: jest
.fn()
.mockImplementation(
(
_dataCallback: (basis: boolean, data: any) => void,
_statusCallback: (status: DataSourceState, err?: any) => void,
) => {
_statusCallback(DataSourceState.Initializing);
_statusCallback(DataSourceState.Valid);
_dataCallback(true, mockInitializer2Data);
_statusCallback(DataSourceState.Closed);
},
),
stop: jest.fn(),
};

const mockSynchronizer1Data = { key: 'sync1' };
const mockSynchronizer1 = {
start: jest
.fn()
.mockImplementation(
(
_dataCallback: (basis: boolean, data: any) => void,
_statusCallback: (status: DataSourceState, err?: any) => void,
) => {
_statusCallback(DataSourceState.Initializing);
_statusCallback(DataSourceState.Valid);
_dataCallback(false, mockSynchronizer1Data);
},
),
stop: jest.fn(),
};

const underTest = new CompositeDataSource(
[makeDataSourceFactory(mockInitializer1), makeDataSourceFactory(mockInitializer2)],
[makeDataSourceFactory(mockSynchronizer1)],
[],
undefined,
makeTestTransitionConditions(),
makeZeroBackoff(),
);

let dataCallback;
const statusCallback = jest.fn();
await new Promise<void>((resolve) => {
dataCallback = jest.fn((_: boolean, data: any) => {
if (data === mockSynchronizer1Data) {
resolve();
}
});

underTest.start(dataCallback, statusCallback);
});

expect(mockInitializer1.start).toHaveBeenCalledTimes(1);
expect(mockInitializer2.start).toHaveBeenCalledTimes(1);
expect(mockSynchronizer1.start).toHaveBeenCalledTimes(1);
expect(statusCallback).toHaveBeenCalledTimes(5);
expect(statusCallback).toHaveBeenNthCalledWith(1, DataSourceState.Initializing, undefined);
expect(statusCallback).toHaveBeenNthCalledWith(
2,
DataSourceState.Interrupted,
mockInitializer1Error,
);
expect(statusCallback).toHaveBeenNthCalledWith(3, DataSourceState.Valid, undefined);
expect(statusCallback).toHaveBeenNthCalledWith(4, DataSourceState.Interrupted, undefined);
expect(statusCallback).toHaveBeenNthCalledWith(5, DataSourceState.Valid, undefined);
});

it('does not run second initializer when first initializer succeeds', async () => {
const mockInitializer1Data = { key: 'init1' };
const mockInitializer1: DataSource = {
start: jest
.fn()
.mockImplementation(
(
_dataCallback: (basis: boolean, data: any) => void,
_statusCallback: (status: DataSourceState, err?: any) => void,
) => {
_statusCallback(DataSourceState.Initializing);
_statusCallback(DataSourceState.Valid);
_dataCallback(true, mockInitializer1Data);
_statusCallback(DataSourceState.Closed);
},
),
stop: jest.fn(),
};

const mockInitializer2: DataSource = {
start: jest.fn(),
stop: jest.fn(),
};

const mockSynchronizer1Data = { key: 'sync1' };
const mockSynchronizer1 = {
start: jest
.fn()
.mockImplementation(
(
_dataCallback: (basis: boolean, data: any) => void,
_statusCallback: (status: DataSourceState, err?: any) => void,
) => {
_statusCallback(DataSourceState.Initializing);
_statusCallback(DataSourceState.Valid);
_dataCallback(false, mockSynchronizer1Data);
},
),
stop: jest.fn(),
};

const underTest = new CompositeDataSource(
[makeDataSourceFactory(mockInitializer1), makeDataSourceFactory(mockInitializer2)],
[makeDataSourceFactory(mockSynchronizer1)],
[],
undefined,
makeTestTransitionConditions(),
makeZeroBackoff(),
);

let dataCallback;
const statusCallback = jest.fn();
await new Promise<void>((resolve) => {
dataCallback = jest.fn((_: boolean, data: any) => {
if (data === mockSynchronizer1Data) {
resolve();
}
});

underTest.start(dataCallback, statusCallback);
});

expect(mockInitializer1.start).toHaveBeenCalledTimes(1);
expect(mockInitializer2.start).toHaveBeenCalledTimes(0);
expect(mockSynchronizer1.start).toHaveBeenCalledTimes(1);
});
109 changes: 109 additions & 0 deletions packages/shared/common/src/internal/fdv2/FDv1PayloadAdaptor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import { PayloadProcessor } from './payloadProcessor';
import { Event } from './proto';

interface fdv1Payload {
flags: { [name: string]: any };
segments: { [name: string]: any };
}

const PAYLOAD_ID = 'FDv1Fallback';

/**
* The FDv1PayloadAdaptor is a helper class that converts FDv1 payloads into events that the PayloadProcessor can understand.
*/
export interface FDv1PayloadAdaptor {
/**
* The PayloadProcessor that will be used to process the events.
*/
readonly _processor: PayloadProcessor;

/**
* The selector that will be used to identify the payload.
*/
_selector: string;

/**
* The method that will be used to set a selector for the payload that is
* being processed.
*
* @remarks
* This method probably shouldn't be used in most instances as FDv1 payloads
* do not have the concept of a selector.
*
* @param selector - The selector to set for the payload.
* @returns this FDv1PayloadAdaptor instance
*/
useSelector: (selector: string) => FDv1PayloadAdaptor;

/**
* The method that will be used to process a full transfer changeset.
*
* @param data - The data to process.
*/
processFullTransfer: (data: fdv1Payload) => void;
}

export function fdv1PayloadAdaptor(processor: PayloadProcessor): FDv1PayloadAdaptor {
return {
_processor: processor,
_selector: '',
useSelector(selector: string): FDv1PayloadAdaptor {
this._selector = selector;
return this;
},
processFullTransfer(data) {
const events: Array<Event> = [
{
event: 'server-intent',
data: {
payloads: [
{
id: PAYLOAD_ID,
target: 1,
intentCode: 'xfer-full',
reason: 'payload-missing',
},
],
},
},
];

Object.entries(data?.flags || []).forEach(([key, flag]) => {
events.push({
event: 'put-object',
data: {
kind: 'flag',
key,
version: flag.version || 1,
object: flag,
},
});
});

Object.entries(data?.segments || []).forEach(([key, segment]) => {
events.push({
event: 'put-object',
data: {
kind: 'segment',
key,
version: segment.version || 1,
object: segment,
},
});
});

events.push({
event: 'payload-transferred',
data: {
// IMPORTANT: the selector MUST be empty or "live" data synchronizers
// will not work as it would try to resume from a bogus state.
state: this._selector,
version: 1,
id: PAYLOAD_ID,
},
});

this._processor.processEvents(events);
},
};
}
2 changes: 2 additions & 0 deletions packages/shared/common/src/internal/fdv2/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { fdv1PayloadAdaptor as FDv1PayloadAdaptor } from './FDv1PayloadAdaptor';
import {
FDv2EventsCollection,
Payload,
Expand All @@ -14,4 +15,5 @@ export {
PayloadProcessor,
PayloadStreamReader,
Update,
FDv1PayloadAdaptor,
};
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,11 @@ export class PayloadProcessor {
private _processPayloadTransferred = (data: PayloadTransferred) => {
// if the following properties haven't been provided by now, we should reset
if (
!this._tempId || // server intent hasn't been received yet.
!data.state ||
// server intent hasn't been received yet.
!this._tempId ||
// selector can be an empty string if we are using a file data initilizer
data.state === null ||
data.state === undefined ||
!data.version
) {
this._resetAll(); // a reset is best defensive action since payload transferred terminates a payload
Expand Down
Loading