Skip to content

Commit 718b11a

Browse files
committed
feat: implement the custom data source option
this enables users to specify file data source initializer
1 parent b1e0123 commit 718b11a

File tree

7 files changed

+321
-79
lines changed

7 files changed

+321
-79
lines changed

packages/shared/common/__tests__/internal/fdv2/FDv2ChangeSetBuilder.test.ts renamed to packages/shared/common/__tests__/internal/fdv2/FDv1PayloadAdaptor.test.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ class MockPayloadProcessor extends PayloadProcessor {
1010
super({}, undefined, undefined);
1111
}
1212

13-
processEvents(events: Event[]) {
13+
override processEvents(events: Event[]) {
1414
this.processedEvents = [...this.processedEvents, ...events];
1515
// Don't call super.processEvents to avoid side effects in tests
1616
}
@@ -71,7 +71,9 @@ it('includes payload-transferred as the last event with empty state', () => {
7171
adaptor.start('xfer-full');
7272
adaptor.finish();
7373

74-
const payloadTransferredEvent = processor.processedEvents[processor.processedEvents.length - 1] as Event;
74+
const payloadTransferredEvent = processor.processedEvents[
75+
processor.processedEvents.length - 1
76+
] as Event;
7577
expect(payloadTransferredEvent.event).toBe('payload-transferred');
7678
expect(payloadTransferredEvent.data).toBeDefined();
7779

packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1004,3 +1004,161 @@ it('consumes cancellation tokens correctly', async () => {
10041004
// eslint-disable-next-line no-underscore-dangle
10051005
expect(underTest._cancelTokens.length).toEqual(0);
10061006
});
1007+
1008+
it('handles multiple initializers with fallback when first initializer fails and second succeeds', async () => {
1009+
const mockInitializer1Error = {
1010+
name: 'Error',
1011+
message: 'First initializer failed',
1012+
};
1013+
const mockInitializer1: DataSource = {
1014+
start: jest
1015+
.fn()
1016+
.mockImplementation(
1017+
(
1018+
_dataCallback: (basis: boolean, data: any) => void,
1019+
_statusCallback: (status: DataSourceState, err?: any) => void,
1020+
) => {
1021+
_statusCallback(DataSourceState.Initializing);
1022+
_statusCallback(DataSourceState.Closed, mockInitializer1Error);
1023+
},
1024+
),
1025+
stop: jest.fn(),
1026+
};
1027+
1028+
const mockInitializer2Data = { key: 'init2' };
1029+
const mockInitializer2: DataSource = {
1030+
start: jest
1031+
.fn()
1032+
.mockImplementation(
1033+
(
1034+
_dataCallback: (basis: boolean, data: any) => void,
1035+
_statusCallback: (status: DataSourceState, err?: any) => void,
1036+
) => {
1037+
_statusCallback(DataSourceState.Initializing);
1038+
_statusCallback(DataSourceState.Valid);
1039+
_dataCallback(true, mockInitializer2Data);
1040+
_statusCallback(DataSourceState.Closed);
1041+
},
1042+
),
1043+
stop: jest.fn(),
1044+
};
1045+
1046+
const mockSynchronizer1Data = { key: 'sync1' };
1047+
const mockSynchronizer1 = {
1048+
start: jest
1049+
.fn()
1050+
.mockImplementation(
1051+
(
1052+
_dataCallback: (basis: boolean, data: any) => void,
1053+
_statusCallback: (status: DataSourceState, err?: any) => void,
1054+
) => {
1055+
_statusCallback(DataSourceState.Initializing);
1056+
_statusCallback(DataSourceState.Valid);
1057+
_dataCallback(false, mockSynchronizer1Data);
1058+
},
1059+
),
1060+
stop: jest.fn(),
1061+
};
1062+
1063+
const underTest = new CompositeDataSource(
1064+
[makeDataSourceFactory(mockInitializer1), makeDataSourceFactory(mockInitializer2)],
1065+
[makeDataSourceFactory(mockSynchronizer1)],
1066+
[],
1067+
undefined,
1068+
makeTestTransitionConditions(),
1069+
makeZeroBackoff(),
1070+
);
1071+
1072+
let dataCallback;
1073+
const statusCallback = jest.fn();
1074+
await new Promise<void>((resolve) => {
1075+
dataCallback = jest.fn((_: boolean, data: any) => {
1076+
if (data === mockSynchronizer1Data) {
1077+
resolve();
1078+
}
1079+
});
1080+
1081+
underTest.start(dataCallback, statusCallback);
1082+
});
1083+
1084+
expect(mockInitializer1.start).toHaveBeenCalledTimes(1);
1085+
expect(mockInitializer2.start).toHaveBeenCalledTimes(1);
1086+
expect(mockSynchronizer1.start).toHaveBeenCalledTimes(1);
1087+
expect(statusCallback).toHaveBeenCalledTimes(5);
1088+
expect(statusCallback).toHaveBeenNthCalledWith(1, DataSourceState.Initializing, undefined);
1089+
expect(statusCallback).toHaveBeenNthCalledWith(
1090+
2,
1091+
DataSourceState.Interrupted,
1092+
mockInitializer1Error,
1093+
);
1094+
expect(statusCallback).toHaveBeenNthCalledWith(3, DataSourceState.Valid, undefined);
1095+
expect(statusCallback).toHaveBeenNthCalledWith(4, DataSourceState.Interrupted, undefined);
1096+
expect(statusCallback).toHaveBeenNthCalledWith(5, DataSourceState.Valid, undefined);
1097+
});
1098+
1099+
it('does not run second initializer when first initializer succeeds', async () => {
1100+
const mockInitializer1Data = { key: 'init1' };
1101+
const mockInitializer1: DataSource = {
1102+
start: jest
1103+
.fn()
1104+
.mockImplementation(
1105+
(
1106+
_dataCallback: (basis: boolean, data: any) => void,
1107+
_statusCallback: (status: DataSourceState, err?: any) => void,
1108+
) => {
1109+
_statusCallback(DataSourceState.Initializing);
1110+
_statusCallback(DataSourceState.Valid);
1111+
_dataCallback(true, mockInitializer1Data);
1112+
_statusCallback(DataSourceState.Closed);
1113+
},
1114+
),
1115+
stop: jest.fn(),
1116+
};
1117+
1118+
const mockInitializer2: DataSource = {
1119+
start: jest.fn(),
1120+
stop: jest.fn(),
1121+
};
1122+
1123+
const mockSynchronizer1Data = { key: 'sync1' };
1124+
const mockSynchronizer1 = {
1125+
start: jest
1126+
.fn()
1127+
.mockImplementation(
1128+
(
1129+
_dataCallback: (basis: boolean, data: any) => void,
1130+
_statusCallback: (status: DataSourceState, err?: any) => void,
1131+
) => {
1132+
_statusCallback(DataSourceState.Initializing);
1133+
_statusCallback(DataSourceState.Valid);
1134+
_dataCallback(false, mockSynchronizer1Data);
1135+
},
1136+
),
1137+
stop: jest.fn(),
1138+
};
1139+
1140+
const underTest = new CompositeDataSource(
1141+
[makeDataSourceFactory(mockInitializer1), makeDataSourceFactory(mockInitializer2)],
1142+
[makeDataSourceFactory(mockSynchronizer1)],
1143+
[],
1144+
undefined,
1145+
makeTestTransitionConditions(),
1146+
makeZeroBackoff(),
1147+
);
1148+
1149+
let dataCallback;
1150+
const statusCallback = jest.fn();
1151+
await new Promise<void>((resolve) => {
1152+
dataCallback = jest.fn((_: boolean, data: any) => {
1153+
if (data === mockSynchronizer1Data) {
1154+
resolve();
1155+
}
1156+
});
1157+
1158+
underTest.start(dataCallback, statusCallback);
1159+
});
1160+
1161+
expect(mockInitializer1.start).toHaveBeenCalledTimes(1);
1162+
expect(mockInitializer2.start).toHaveBeenCalledTimes(0);
1163+
expect(mockSynchronizer1.start).toHaveBeenCalledTimes(1);
1164+
});

packages/shared/common/src/internal/fdv2/FDv1PayloadAdaptor.ts

Lines changed: 27 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { PayloadProcessor } from './payloadProcessor';
2-
import { DeleteObject, Event, PutObject, EventType } from './proto';
2+
import { DeleteObject, Event, PutObject } from './proto';
33

44
// eventually this will be the same as the IntentCode type, but for now we'll use a simpler type
55
type supportedIntentCodes = 'xfer-full';
@@ -27,7 +27,7 @@ export default class FDv1PayloadAdaptor {
2727
private _intent: supportedIntentCodes = 'xfer-full';
2828

2929
constructor(processor: PayloadProcessor) {
30-
this._processor = processor
30+
this._processor = processor;
3131
}
3232

3333
/**
@@ -38,7 +38,7 @@ export default class FDv1PayloadAdaptor {
3838
throw new Error('intent: only xfer-full is supported');
3939
}
4040

41-
this._events = []
41+
this._events = [];
4242
this._intent = intent;
4343

4444
return this;
@@ -72,12 +72,14 @@ export default class FDv1PayloadAdaptor {
7272
const serverIntentEvent: Event = {
7373
event: 'server-intent',
7474
data: {
75-
payloads: [{
76-
id: PAYLOAD_ID,
77-
target: 1,
78-
intentCode: this._intent,
79-
reason: 'payload-missing'
80-
}],
75+
payloads: [
76+
{
77+
id: PAYLOAD_ID,
78+
target: 1,
79+
intentCode: this._intent,
80+
reason: 'payload-missing',
81+
},
82+
],
8183
},
8284
};
8385

@@ -92,42 +94,38 @@ export default class FDv1PayloadAdaptor {
9294
},
9395
};
9496

95-
this._processor.processEvents([
96-
serverIntentEvent,
97-
...this._events,
98-
finishEvent,
99-
]);
100-
this._events = []
97+
this._processor.processEvents([serverIntentEvent, ...this._events, finishEvent]);
98+
this._events = [];
10199

102100
return this;
103101
}
104102

105103
/**
106-
*
104+
*
107105
* @param data - FDv1 payload from a fdv1 poll
108106
*/
109107
pushFdv1Payload(data: fdv1Payload): this {
110108
Object.entries(data?.flags || []).forEach(([key, flag]) => {
111109
this.putObject({
112-
// strong assumption here that we only have segments and flags.
113-
kind: 'flag',
114-
key: key,
115-
version: flag.version || 1,
116-
object: flag,
117-
});
110+
// strong assumption here that we only have segments and flags.
111+
kind: 'flag',
112+
key,
113+
version: flag.version || 1,
114+
object: flag,
115+
});
118116
});
119117

120118
Object.entries(data?.segments || []).forEach(([key, segment]) => {
121119
this.putObject({
122-
// strong assumption here that we only have segments and flags.
123-
kind: 'segment',
124-
key: key,
125-
version: segment.version || 1,
126-
object: segment,
127-
});
120+
// strong assumption here that we only have segments and flags.
121+
kind: 'segment',
122+
key,
123+
version: segment.version || 1,
124+
object: segment,
125+
});
128126
});
129127

130-
return this
128+
return this;
131129
}
132130

133131
/**

0 commit comments

Comments
 (0)