Skip to content

Commit ea4cb3c

Browse files
committed
Add unit tests for the sync protocol
1 parent 801a8b3 commit ea4cb3c

File tree

3 files changed

+369
-22
lines changed

3 files changed

+369
-22
lines changed

package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
"ajv": "8.12.0",
7373
"async-lock": "1.4.1",
7474
"classnames": "^2.3.2",
75+
"comlink": "^4.4.2",
7576
"crc-32": "1.2.2",
7677
"diff3": "0.0.4",
7778
"express": "4.21.2",
@@ -90,6 +91,7 @@
9091
"yargs": "17.7.2"
9192
},
9293
"devDependencies": {
94+
"@biomejs/biome": "2.0.6",
9395
"@docusaurus/core": "3.7.0",
9496
"@docusaurus/plugin-client-redirects": "3.7.0",
9597
"@docusaurus/plugin-ideal-image": "^3.7.0",
Lines changed: 342 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,342 @@
1+
import { describe, it, expect } from 'vitest';
2+
import { MessageChannel, Worker as NodeWorker } from 'worker_threads';
3+
import { NodeSABSyncReceiveMessageTransport } from './comlink-sync';
4+
5+
describe('Comlink Sync Communication Tests', () => {
6+
describe('Basic Infrastructure', () => {
7+
it('should create transport successfully', async () => {
8+
const transport = await NodeSABSyncReceiveMessageTransport.create();
9+
expect(transport).toBeDefined();
10+
expect(typeof transport.send).toBe('function');
11+
expect(typeof transport.afterResponseSent).toBe('function');
12+
});
13+
});
14+
15+
describe('Real Multi-Worker Synchronous Communication', () => {
16+
async function createInlineWorker(workerCode: string): Promise<Worker> {
17+
// Create a worker from inline code to avoid import path issues
18+
const worker = new NodeWorker(workerCode, {
19+
eval: true,
20+
stdout: true,
21+
stderr: true,
22+
});
23+
worker.stdout.on('data', (data) => {
24+
console.log('worker stdout', new TextDecoder().decode(data));
25+
});
26+
worker.stderr.on('data', (data) => {
27+
console.log('worker stderr', new TextDecoder().decode(data));
28+
});
29+
30+
// Wait for worker to load
31+
await new Promise<void>((resolve, reject) => {
32+
const timeout = setTimeout(
33+
() => reject(new Error('Worker load timeout')),
34+
5000
35+
);
36+
worker.once('message', (data) => {
37+
if (data.type === 'loaded') {
38+
clearTimeout(timeout);
39+
resolve();
40+
}
41+
});
42+
// Reject if worker exits prematurely or errors
43+
worker.on('error', (error) => {
44+
clearTimeout(timeout);
45+
reject(new Error(`Worker error: ${error.message}`));
46+
});
47+
48+
worker.on('exit', (code) => {
49+
if (code !== 0) {
50+
clearTimeout(timeout);
51+
reject(new Error(`Worker exited with code ${code}`));
52+
}
53+
});
54+
});
55+
56+
return worker;
57+
}
58+
59+
async function setupWorkerWithPort(
60+
worker: Worker,
61+
port: MessageChannel['port1']
62+
): Promise<void> {
63+
worker.postMessage({ type: 'setup', port }, [port]);
64+
65+
// Wait for worker to be ready
66+
await new Promise<void>((resolve, reject) => {
67+
const timeout = setTimeout(
68+
() => reject(new Error('Worker setup timeout')),
69+
5000
70+
);
71+
worker.once('message', (data) => {
72+
if (data.type === 'ready') {
73+
clearTimeout(timeout);
74+
resolve();
75+
} else if (data.type === 'error') {
76+
clearTimeout(timeout);
77+
reject(new Error(data.message));
78+
}
79+
});
80+
});
81+
}
82+
83+
const comlinkSyncPath = import.meta.dirname + '/comlink-sync.ts';
84+
// Simple inline worker code that doesn't depend on complex imports
85+
const serverWorkerCode = `
86+
import { parentPort } from 'worker_threads';
87+
import { exposeSync, NodeSABSyncReceiveMessageTransport } from "${comlinkSyncPath}";
88+
export {};
89+
90+
const testAPI = {
91+
ping: () => 'pong',
92+
add: (a, b) => a + b,
93+
multiply: (a, b) => a * b,
94+
getCurrentTime: () => Date.now(),
95+
processArray: (numbers) => numbers.reduce((sum, num) => sum + num, 0),
96+
throwError: () => { throw new Error('Test error from sync API server'); }
97+
};
98+
99+
parentPort?.on('message', async (data) => {
100+
if (data.type === 'setup' && data.port) {
101+
const transport = await NodeSABSyncReceiveMessageTransport.create();
102+
await exposeSync(testAPI, data.port, transport);
103+
parentPort?.postMessage({ type: 'ready' });
104+
}
105+
});
106+
parentPort?.postMessage({ type: 'loaded' });
107+
`;
108+
109+
const clientWorkerCode = `
110+
import { parentPort } from 'worker_threads';
111+
import { wrapSync, NodeSABSyncReceiveMessageTransport } from "${comlinkSyncPath}";
112+
113+
let syncAPI = null;
114+
parentPort?.on('message', async (data) => {
115+
if (data.type === 'setup' && data.port) {
116+
const transport = await NodeSABSyncReceiveMessageTransport.create();
117+
syncAPI = await wrapSync(data.port, transport);
118+
parentPort?.postMessage({ type: 'ready' });
119+
} else if (data.type === 'test' && syncAPI) {
120+
try {
121+
const testName = data.testName;
122+
let result;
123+
124+
switch (testName) {
125+
case 'ping':
126+
result = await syncAPI.ping();
127+
break;
128+
case 'add':
129+
result = await syncAPI.add(5, 10);
130+
break;
131+
case 'multiply':
132+
result = await syncAPI.multiply(3, 7);
133+
break;
134+
case 'getCurrentTime':
135+
result = await syncAPI.getCurrentTime();
136+
break;
137+
case 'processArray':
138+
result = await syncAPI.processArray([1, 2, 3, 4, 5]);
139+
break;
140+
case 'throwError':
141+
try {
142+
await syncAPI.throwError();
143+
result = 'ERROR: Should have thrown';
144+
} catch (error) {
145+
result = 'Caught error: ' + error.message;
146+
}
147+
break;
148+
default:
149+
result = 'Unknown test: ' + testName;
150+
}
151+
152+
parentPort?.postMessage({
153+
type: 'test-result',
154+
testName,
155+
result,
156+
success: true
157+
});
158+
} catch (error) {
159+
parentPort?.postMessage({
160+
type: 'test-result',
161+
testName: data.testName,
162+
result: error.message,
163+
success: false
164+
});
165+
}
166+
}
167+
});
168+
169+
parentPort?.postMessage({ type: 'loaded' });
170+
`;
171+
172+
it('should enable synchronous method calls between workers', async () => {
173+
// Create message channel for worker communication
174+
const { port1, port2 } = new MessageChannel();
175+
176+
// Create server worker (exposes sync API)
177+
const serverWorker = await createInlineWorker(serverWorkerCode);
178+
179+
// Create client worker (consumes sync API)
180+
const clientWorker = await createInlineWorker(clientWorkerCode);
181+
182+
try {
183+
// Set up workers with their respective ports
184+
await Promise.all([
185+
setupWorkerWithPort(serverWorker, port1),
186+
setupWorkerWithPort(clientWorker, port2),
187+
]);
188+
189+
// Test synchronous method calls
190+
const testCases = [
191+
{ testName: 'ping', expected: 'pong' },
192+
{ testName: 'add', expected: 15 },
193+
{ testName: 'multiply', expected: 21 },
194+
{ testName: 'processArray', expected: 15 },
195+
];
196+
197+
for (const testCase of testCases) {
198+
// Send test request to client worker
199+
clientWorker.postMessage({
200+
type: 'test',
201+
testName: testCase.testName,
202+
});
203+
204+
// Wait for result
205+
const result = await new Promise<any>((resolve, reject) => {
206+
const timeout = setTimeout(
207+
() =>
208+
reject(
209+
new Error(
210+
`Test ${testCase.testName} timeout`
211+
)
212+
),
213+
10000
214+
);
215+
clientWorker.once('message', (data: any) => {
216+
if (
217+
data.type === 'test-result' &&
218+
data.testName === testCase.testName
219+
) {
220+
clearTimeout(timeout);
221+
resolve(data);
222+
}
223+
});
224+
});
225+
226+
expect(result.success).toBe(true);
227+
expect(result.result).toBe(testCase.expected);
228+
}
229+
} finally {
230+
await serverWorker.terminate();
231+
await clientWorker.terminate();
232+
}
233+
});
234+
235+
it('should handle errors in synchronous calls', async () => {
236+
// Create message channel for worker communication
237+
const { port1, port2 } = new MessageChannel();
238+
239+
// Create server worker (exposes sync API)
240+
const serverWorker = await createInlineWorker(serverWorkerCode);
241+
242+
// Create client worker (consumes sync API)
243+
const clientWorker = await createInlineWorker(clientWorkerCode);
244+
245+
try {
246+
// Set up workers with their respective ports
247+
await Promise.all([
248+
setupWorkerWithPort(serverWorker, port1),
249+
setupWorkerWithPort(clientWorker, port2),
250+
]);
251+
252+
// Test error handling
253+
clientWorker.postMessage({
254+
type: 'test',
255+
testName: 'throwError',
256+
});
257+
258+
// Wait for result
259+
const result = await new Promise<any>((resolve, reject) => {
260+
const timeout = setTimeout(
261+
() => reject(new Error('Error test timeout')),
262+
10000
263+
);
264+
clientWorker.once('message', (data) => {
265+
if (
266+
data.type === 'test-result' &&
267+
data.testName === 'throwError'
268+
) {
269+
clearTimeout(timeout);
270+
resolve(data);
271+
}
272+
});
273+
});
274+
275+
expect(result.success).toBe(true);
276+
expect(result.result).toContain(
277+
'Test error from sync API server'
278+
);
279+
} finally {
280+
await serverWorker.terminate();
281+
await clientWorker.terminate();
282+
}
283+
});
284+
285+
it('should demonstrate time-based synchronous calls', async () => {
286+
// Create message channel for worker communication
287+
const { port1, port2 } = new MessageChannel();
288+
289+
// Create server worker (exposes sync API)
290+
const serverWorker = await createInlineWorker(serverWorkerCode);
291+
292+
// Create client worker (consumes sync API)
293+
const clientWorker = await createInlineWorker(clientWorkerCode);
294+
295+
try {
296+
// Set up workers with their respective ports
297+
await Promise.all([
298+
setupWorkerWithPort(serverWorker, port1),
299+
setupWorkerWithPort(clientWorker, port2),
300+
]);
301+
302+
// Record start time
303+
const startTime = Date.now();
304+
305+
// Test getCurrentTime call
306+
clientWorker.postMessage({
307+
type: 'test',
308+
testName: 'getCurrentTime',
309+
});
310+
311+
// Wait for result
312+
const result = await new Promise<any>((resolve, reject) => {
313+
const timeout = setTimeout(
314+
() => reject(new Error('Time test timeout')),
315+
10000
316+
);
317+
clientWorker.once('message', (data: any) => {
318+
if (
319+
data.type === 'test-result' &&
320+
data.testName === 'getCurrentTime'
321+
) {
322+
clearTimeout(timeout);
323+
resolve(data);
324+
}
325+
});
326+
});
327+
328+
const endTime = Date.now();
329+
330+
expect(result.success).toBe(true);
331+
expect(typeof result.result).toBe('number');
332+
333+
// Verify the time returned is reasonable (between start and end)
334+
expect(result.result).toBeGreaterThanOrEqual(startTime);
335+
expect(result.result).toBeLessThanOrEqual(endTime);
336+
} finally {
337+
await serverWorker.terminate();
338+
await clientWorker.terminate();
339+
}
340+
});
341+
});
342+
});

0 commit comments

Comments
 (0)