diff --git a/server.ts b/server.ts index bb6a892e..88bf5669 100644 --- a/server.ts +++ b/server.ts @@ -1,19 +1,13 @@ import './src/utils/instrumentation'; import {fileURLToPath} from 'url'; +import {initializeApp} from './src/initializeApp'; const isMainModule = process.argv[1] === fileURLToPath(import.meta.url); const port: number = parseInt(process.env.PORT || '3000', 10); const isWorkerMode = process.env.WORKER_MODE === 'true'; -// Load the appropriate app once -let app; -if (isWorkerMode) { - const workerModule = await import('./src/worker-app'); - app = workerModule.default; -} else { - const appModule = await import('./src/app'); - app = appModule.default; -} +// Create an instance of the appropriate app +const app = await initializeApp({isWorkerMode}); // Start the server if this file is run directly if (isMainModule) { @@ -35,5 +29,4 @@ if (isMainModule) { start(); } -// Export the app -export default app; +export default app; \ No newline at end of file diff --git a/src/app.ts b/src/app.ts deleted file mode 100644 index f1fd1fa2..00000000 --- a/src/app.ts +++ /dev/null @@ -1,42 +0,0 @@ -// Main module file -import fastify from 'fastify'; -import {TypeBoxTypeProvider} from '@fastify/type-provider-typebox'; -import loggingPlugin from './plugins/logging'; -import corsPlugin from './plugins/cors'; -import proxyPlugin from './plugins/proxy'; -import {getLoggerConfig} from './utils/logger'; -import {errorHandler} from './utils/error-handler'; -import v1Routes from './routes/v1'; -import replyFrom from '@fastify/reply-from'; - -const app = fastify({ - logger: getLoggerConfig(), - disableRequestLogging: true, - trustProxy: process.env.TRUST_PROXY !== 'false' -}).withTypeProvider(); - -// Register global validation error handler -app.setErrorHandler(errorHandler()); - -// Register reply-from plugin -app.register(replyFrom); - -// Register CORS plugin -app.register(corsPlugin); - -// Register logging plugin -app.register(loggingPlugin); - -// Register proxy plugin -app.register(proxyPlugin); - -// Register v1 routes -app.register(v1Routes, {prefix: '/api/v1'}); - -// Routes -app.get('/', async () => { - return 'Hello World - Github Actions Deployment Test'; -}); - -export default app; - diff --git a/src/initializeApp.ts b/src/initializeApp.ts new file mode 100644 index 00000000..b974861f --- /dev/null +++ b/src/initializeApp.ts @@ -0,0 +1,5 @@ +export async function initializeApp({isWorkerMode}: {isWorkerMode: boolean}) { + const appModulePath = isWorkerMode ? './src/worker-app' : './src/service-app'; + const appModule = await import(appModulePath); + return appModule.default(); +} diff --git a/src/service-app.ts b/src/service-app.ts new file mode 100644 index 00000000..c9966408 --- /dev/null +++ b/src/service-app.ts @@ -0,0 +1,46 @@ +// Main module file +import fastify from 'fastify'; +import {TypeBoxTypeProvider} from '@fastify/type-provider-typebox'; +import loggingPlugin from './plugins/logging'; +import corsPlugin from './plugins/cors'; +import proxyPlugin from './plugins/proxy'; +import {getLoggerConfig} from './utils/logger'; +import {errorHandler} from './utils/error-handler'; +import v1Routes from './routes/v1'; +import replyFrom from '@fastify/reply-from'; + +function createApp() { + const app = fastify({ + logger: getLoggerConfig(), + disableRequestLogging: true, + trustProxy: process.env.TRUST_PROXY !== 'false' + }).withTypeProvider(); + + // Register global validation error handler + app.setErrorHandler(errorHandler()); + + // Register reply-from plugin + app.register(replyFrom); + + // Register CORS plugin + app.register(corsPlugin); + + // Register logging plugin + app.register(loggingPlugin); + + // Register proxy plugin + app.register(proxyPlugin); + + // Register v1 routes + app.register(v1Routes, {prefix: '/api/v1'}); + + // Routes + app.get('/', async () => { + return 'Hello World - Github Actions Deployment Test'; + }); + + return app; +} + +export default createApp; + diff --git a/src/services/events/publisher.ts b/src/services/events/publisher.ts index 0266f7f7..580402e0 100644 --- a/src/services/events/publisher.ts +++ b/src/services/events/publisher.ts @@ -11,8 +11,8 @@ class EventPublisher { private static instance: EventPublisher; private pubsub: PubSub; - private constructor() { - this.pubsub = new PubSub({ + private constructor(pubsub?: PubSub) { + this.pubsub = pubsub || new PubSub({ projectId: process.env.GOOGLE_CLOUD_PROJECT }); } @@ -24,6 +24,10 @@ class EventPublisher { return EventPublisher.instance; } + static resetInstance(pubsub?: PubSub): void { + EventPublisher.instance = new EventPublisher(pubsub); + } + async publishEvent({topic, payload, logger}: PublishEventOptions): Promise { try { const message = { @@ -32,7 +36,7 @@ class EventPublisher { }; const messageId = await this.pubsub.topic(topic).publishMessage(message); - + logger.info({ messageId, topic, @@ -55,4 +59,7 @@ class EventPublisher { export const publishEvent = async ({topic, payload, logger}: PublishEventOptions): Promise => { const publisher = EventPublisher.getInstance(); return publisher.publishEvent({topic, payload, logger}); -}; \ No newline at end of file +}; + +// Export for testing purposes +export {EventPublisher}; \ No newline at end of file diff --git a/src/worker-app.ts b/src/worker-app.ts index b0803ec7..cb559a14 100644 --- a/src/worker-app.ts +++ b/src/worker-app.ts @@ -4,25 +4,29 @@ import loggingPlugin from './plugins/logging'; import workerPlugin from './plugins/worker-plugin'; import {getLoggerConfig} from './utils/logger'; -const app = fastify({ - logger: getLoggerConfig(), - disableRequestLogging: true, - trustProxy: process.env.TRUST_PROXY !== 'false' -}); +function createApp() { + const app = fastify({ + logger: getLoggerConfig(), + disableRequestLogging: true, + trustProxy: process.env.TRUST_PROXY !== 'false' + }); + + // Register logging plugin for consistent log formatting + app.register(loggingPlugin); + + // Register worker plugin for heartbeat logging + app.register(workerPlugin); + + // Health endpoints for Cloud Run deployment + app.get('/', async () => { + return {status: 'worker-healthy'}; + }); + + app.get('/health', async () => { + return {status: 'worker-healthy'}; + }); -// Register logging plugin for consistent log formatting -app.register(loggingPlugin); + return app; +} -// Register worker plugin for heartbeat logging -app.register(workerPlugin); - -// Health endpoints for Cloud Run deployment -app.get('/', async () => { - return {status: 'worker-healthy'}; -}); - -app.get('/health', async () => { - return {status: 'worker-healthy'}; -}); - -export default app; \ No newline at end of file +export default createApp; \ No newline at end of file diff --git a/test/integration/routes/v1/page_hit.test.ts b/test/integration/api/v1/page_hit.test.ts similarity index 65% rename from test/integration/routes/v1/page_hit.test.ts rename to test/integration/api/v1/page_hit.test.ts index 5bc85823..918566f6 100644 --- a/test/integration/routes/v1/page_hit.test.ts +++ b/test/integration/api/v1/page_hit.test.ts @@ -1,17 +1,16 @@ import {describe, it, expect, beforeAll} from 'vitest'; import {FastifyInstance} from 'fastify'; +import {initializeApp} from '../../../../src/initializeApp'; describe('/api/v1/page_hit', () => { - let fastify: FastifyInstance; + let app: FastifyInstance; beforeAll(async function () { - const appModule = await import('../../../../src/app'); - fastify = appModule.default; - await fastify.ready(); + app = await initializeApp({isWorkerMode: false}); }); it('should return 200', async function () { - const response = await fastify.inject({ + const response = await app.inject({ method: 'GET', url: '/api/v1/page_hit' }); diff --git a/test/integration/api/web_analytics/batch-mode.test.ts b/test/integration/api/web_analytics/batch-mode.test.ts new file mode 100644 index 00000000..96aa04ec --- /dev/null +++ b/test/integration/api/web_analytics/batch-mode.test.ts @@ -0,0 +1,88 @@ +import {describe, expect, it, beforeEach, beforeAll, vi} from 'vitest'; +import {FastifyInstance, FastifyRequest, FastifyReply} from 'fastify'; +import {expectResponse} from '../../../utils/assertions'; +import {createPubSubSpy} from '../../../utils/pubsub-spy'; +import defaultValidRequestQuery from '../../../utils/fixtures/defaultValidRequestQuery.json'; +import defaultValidRequestHeaders from '../../../utils/fixtures/defaultValidRequestHeaders.json'; +import defaultValidRequestBody from '../../../utils/fixtures/defaultValidRequestBody.json'; +import headersWithoutSiteUuid from '../../../utils/fixtures/headersWithoutSiteUuid.json'; +import {initializeApp} from '../../../../src/initializeApp'; +import {EventPublisher} from '../../../../src/services/events/publisher'; + +const preHandlerStub = async (_request: FastifyRequest, reply: FastifyReply) => { + reply.code(202); +}; + +describe('POST /tb/web_analytics', () => { + let app: FastifyInstance; + + describe('Batch Mode - Publishing to pub/sub', function () { + let pubSubSpy: ReturnType; + const pageHitsRawTopic: string = 'page-hits-raw'; + + beforeAll(async function () { + vi.stubEnv('PUBSUB_TOPIC_PAGE_HITS_RAW', pageHitsRawTopic); + }); + beforeEach(async function () { + app = await initializeApp({isWorkerMode: false}); + app.addHook('preHandler', preHandlerStub); + pubSubSpy = createPubSubSpy(); + EventPublisher.resetInstance(pubSubSpy.mockPubSub as any); + }); + + it('should transform the request body and publish to pub/sub', async function () { + await app.inject({ + method: 'POST', + url: '/tb/web_analytics', + query: defaultValidRequestQuery, + headers: defaultValidRequestHeaders, + body: defaultValidRequestBody + }); + + pubSubSpy.expectPublishedMessageToTopic(pageHitsRawTopic).withMessageData({ + timestamp: expect.any(String), + action: 'page_hit', + version: '1', + site_uuid: defaultValidRequestHeaders['x-site-uuid'], + payload: { + event_id: defaultValidRequestBody.payload.event_id, + href: 'https://www.example.com/', + pathname: '/', + member_uuid: 'undefined', + member_status: 'undefined', + post_uuid: 'undefined', + post_type: 'null', + parsedReferrer: { + medium: '', + source: '', + url: '' + }, + locale: 'en-US', + location: 'US', + referrer: null + }, + meta: { + ip: expect.any(String), + 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36' + } + }); + }); + + it('should not publish a message if the request fails validation', async function () { + const response = await app.inject({ + method: 'POST', + url: '/tb/web_analytics', + query: defaultValidRequestQuery, + headers: headersWithoutSiteUuid, + body: defaultValidRequestBody + }); + expectResponse({ + response, + statusCode: 400, + errorType: 'Bad Request', + message: 'headers must have required property \'x-site-uuid\'' + }); + pubSubSpy.expectNoMessagesPublished(); + }); + }); +}); \ No newline at end of file diff --git a/test/integration/api/web_analytics/request-validation.test.ts b/test/integration/api/web_analytics/request-validation.test.ts new file mode 100644 index 00000000..69509880 --- /dev/null +++ b/test/integration/api/web_analytics/request-validation.test.ts @@ -0,0 +1,91 @@ +import {describe, it, beforeEach} from 'vitest'; +import {FastifyInstance, FastifyRequest, FastifyReply} from 'fastify'; +import {expectResponse} from '../../../utils/assertions'; +import defaultValidRequestQuery from '../../../utils/fixtures/defaultValidRequestQuery.json'; +import defaultValidRequestHeaders from '../../../utils/fixtures/defaultValidRequestHeaders.json'; +import defaultValidRequestBody from '../../../utils/fixtures/defaultValidRequestBody.json'; +import headersWithInvalidContentType from '../../../utils/fixtures/headersWithInvalidContentType.json'; +import headersWithoutUserAgent from '../../../utils/fixtures/headersWithoutUserAgent.json'; +import headersWithoutSiteUuid from '../../../utils/fixtures/headersWithoutSiteUuid.json'; +import {initializeApp} from '../../../../src/initializeApp'; + +const preHandlerStub = async (_request: FastifyRequest, reply: FastifyReply) => { + reply.code(202); +}; + +describe('POST /tb/web_analytics', () => { + let app: FastifyInstance; + + describe('Request validation', function () { + beforeEach(async function () { + app = await initializeApp({isWorkerMode: false}); + app.addHook('preHandler', preHandlerStub); + }); + + it('should accept a default valid request from the tracking script', async function () { + const response = await app.inject({ + method: 'POST', + url: '/tb/web_analytics', + query: defaultValidRequestQuery, + headers: defaultValidRequestHeaders, + body: defaultValidRequestBody + }); + expectResponse({response, statusCode: 202}); + }); + + describe('requests with missing or invalid required headers', function () { + it('should reject a request with a missing site uuid header', async function () { + const response = await app.inject({ + method: 'POST', + url: '/tb/web_analytics', + query: defaultValidRequestQuery, + headers: headersWithoutSiteUuid, + body: defaultValidRequestBody + }); + expectResponse({ + response, + statusCode: 400, + errorType: 'Bad Request', + message: 'headers must have required property \'x-site-uuid\'' + }); + }); + + it('should reject a request with a missing user agent header', async function () { + // fastify.inject() adds a default user-agent header, so we need to remove it before validation + app.addHook('onRequest', async (request) => { + delete request.headers['user-agent']; + }); + + const response = await app.inject({ + method: 'POST', + url: '/tb/web_analytics', + query: defaultValidRequestQuery, + headers: headersWithoutUserAgent, + body: defaultValidRequestBody + }); + expectResponse({ + response, + statusCode: 400, + errorType: 'Bad Request', + message: 'headers must have required property \'user-agent\'' + }); + }); + + it('should reject a request with a content type other than application/json', async function () { + const response = await app.inject({ + method: 'POST', + url: '/tb/web_analytics', + query: defaultValidRequestQuery, + headers: headersWithInvalidContentType, + body: defaultValidRequestBody + }); + expectResponse({ + response, + statusCode: 415, + errorType: 'Unsupported Media Type', + message: 'Unsupported Media Type: application/xml' + }); + }); + }); + }); +}); \ No newline at end of file diff --git a/test/integration/app.test.ts b/test/integration/app.test.ts index 15e799f8..8ea51e22 100644 --- a/test/integration/app.test.ts +++ b/test/integration/app.test.ts @@ -14,6 +14,7 @@ vi.mock('../../src/services/user-signature', () => ({ // Import the mocked service import {userSignatureService} from '../../src/services/user-signature'; +import {initializeApp} from '../../src/initializeApp'; const eventPayload = { timestamp: '2025-04-14T22:16:06.095Z', @@ -77,9 +78,8 @@ describe('Fastify App', () => { // Set the PROXY_TARGET environment variable before requiring the app process.env.PROXY_TARGET = targetUrl; - // Import directly from the source - const appModule = await import('../../src/app'); - app = appModule.default; + // Create an instance of the app + app = await initializeApp({isWorkerMode: false}); await app.ready(); proxyServer = app.server; }); diff --git a/test/integration/validation-error-logging.test.ts b/test/integration/validation-error-logging.test.ts index 60f59c91..8c899525 100644 --- a/test/integration/validation-error-logging.test.ts +++ b/test/integration/validation-error-logging.test.ts @@ -3,6 +3,7 @@ import request from 'supertest'; import createMockUpstream from '../utils/mock-upstream'; import {FastifyInstance} from 'fastify'; import {Server} from 'http'; +import {initializeApp} from '../../src/initializeApp'; // Mock the user signature service before importing the app vi.mock('../../src/services/user-signature', () => ({ @@ -47,8 +48,7 @@ describe('Validation Error Logging', () => { process.env.PROXY_TARGET = targetUrl; - const appModule = await import('../../src/app'); - app = appModule.default; + app = await initializeApp({isWorkerMode: false}); await app.ready(); proxyServer = app.server; }); diff --git a/test/integration/worker-app.test.ts b/test/integration/worker-app.test.ts index 7652d89d..5fb7251c 100644 --- a/test/integration/worker-app.test.ts +++ b/test/integration/worker-app.test.ts @@ -1,21 +1,13 @@ import {describe, it, expect, beforeEach, afterEach, vi} from 'vitest'; import request from 'supertest'; import {FastifyInstance} from 'fastify'; +import {initializeApp} from '../../src/initializeApp'; describe('Worker App', () => { let app: FastifyInstance; beforeEach(async () => { - // Clear environment variables to ensure clean state - delete process.env.WORKER_MODE; - - // Clear module cache to ensure fresh import - vi.resetModules(); - - // Import worker app fresh - const workerModule = await import('../../src/worker-app'); - app = workerModule.default; - - // Wait for app to be ready + // Create an instance of the worker app + app = await initializeApp({isWorkerMode: true}); await app.ready(); }); @@ -23,7 +15,6 @@ describe('Worker App', () => { if (app) { await app.close(); } - // Note: Global setup handles resource cleanup }); describe('Health Endpoints', () => { diff --git a/test/utils/assertions.ts b/test/utils/assertions.ts new file mode 100644 index 00000000..047c62aa --- /dev/null +++ b/test/utils/assertions.ts @@ -0,0 +1,13 @@ +import {expect} from 'vitest'; +import type {Response} from 'light-my-request'; + +export function expectResponse({response, statusCode, errorType, message}: {response: Response, statusCode: number, errorType?: string, message?: string}) { + const bodyJson = JSON.parse(response.body); + expect(response.statusCode).toBe(statusCode); + if (errorType) { + expect(bodyJson.error).toBe(errorType); + } + if (message) { + expect(bodyJson.message).toBe(message); + } +} \ No newline at end of file diff --git a/test/utils/fixtures/defaultValidRequestBody.json b/test/utils/fixtures/defaultValidRequestBody.json new file mode 100644 index 00000000..25500308 --- /dev/null +++ b/test/utils/fixtures/defaultValidRequestBody.json @@ -0,0 +1,23 @@ +{ + "timestamp": "2025-07-02T20:09:42.591Z", + "action": "page_hit", + "version": "1", + "payload": { + "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36", + "locale": "en-US", + "location": "US", + "parsedReferrer": { + "source": null, + "medium": null, + "url": null + }, + "pathname": "/", + "href": "https://www.example.com/", + "event_id": "70ce307c-0b09-4bb5-b7c7-e41cf1085720", + "site_uuid": "940b73e9-4952-4752-b23d-9486f999c47e", + "post_uuid": "undefined", + "post_type": "null", + "member_uuid": "undefined", + "member_status": "undefined" + } +} \ No newline at end of file diff --git a/test/utils/fixtures/defaultValidRequestHeaders.json b/test/utils/fixtures/defaultValidRequestHeaders.json new file mode 100644 index 00000000..4d47a981 --- /dev/null +++ b/test/utils/fixtures/defaultValidRequestHeaders.json @@ -0,0 +1,5 @@ +{ + "Content-Type": "application/json", + "x-site-uuid": "3bfa03e9-b0e7-40dc-a9de-f4b8640b88cc", + "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36" +} \ No newline at end of file diff --git a/test/utils/fixtures/defaultValidRequestQuery.json b/test/utils/fixtures/defaultValidRequestQuery.json new file mode 100644 index 00000000..361356fd --- /dev/null +++ b/test/utils/fixtures/defaultValidRequestQuery.json @@ -0,0 +1,3 @@ +{ + "name": "analytics_events" +} \ No newline at end of file diff --git a/test/utils/fixtures/headersWithInvalidContentType.json b/test/utils/fixtures/headersWithInvalidContentType.json new file mode 100644 index 00000000..8df94c5c --- /dev/null +++ b/test/utils/fixtures/headersWithInvalidContentType.json @@ -0,0 +1,5 @@ +{ + "Content-Type": "application/xml", + "x-site-uuid": "3bfa03e9-b0e7-40dc-a9de-f4b8640b88cc", + "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36" +} \ No newline at end of file diff --git a/test/utils/fixtures/headersWithoutSiteUuid.json b/test/utils/fixtures/headersWithoutSiteUuid.json new file mode 100644 index 00000000..32e5a99c --- /dev/null +++ b/test/utils/fixtures/headersWithoutSiteUuid.json @@ -0,0 +1,4 @@ +{ + "Content-Type": "application/json", + "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36" +} \ No newline at end of file diff --git a/test/utils/fixtures/headersWithoutUserAgent.json b/test/utils/fixtures/headersWithoutUserAgent.json new file mode 100644 index 00000000..551ec3c2 --- /dev/null +++ b/test/utils/fixtures/headersWithoutUserAgent.json @@ -0,0 +1,4 @@ +{ + "Content-Type": "application/json", + "x-site-uuid": "3bfa03e9-b0e7-40dc-a9de-f4b8640b88cc" +} \ No newline at end of file diff --git a/test/utils/fixtures/requestBodyWithInvalidTimestamp.json b/test/utils/fixtures/requestBodyWithInvalidTimestamp.json new file mode 100644 index 00000000..508f01e7 --- /dev/null +++ b/test/utils/fixtures/requestBodyWithInvalidTimestamp.json @@ -0,0 +1,23 @@ +{ + "timestamp": "2025-07-02T20:09:42.591Z", + "action": "page_hit", + "version": "1", + "payload": { + "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36", + "locale": "en-US", + "location": "US", + "parsedReferrer": { + "source": null, + "medium": null, + "url": null + }, + "pathname": "/", + "href": "https://www.chrisraible.com/", + "event_id": "70ce307c-0b09-4bb5-b7c7-e41cf1085720", + "site_uuid": "940b73e9-4952-4752-b23d-9486f999c47e", + "post_uuid": "undefined", + "post_type": "null", + "member_uuid": "undefined", + "member_status": "undefined" + } +} \ No newline at end of file diff --git a/test/utils/pubsub-spy.ts b/test/utils/pubsub-spy.ts new file mode 100644 index 00000000..069c0c44 --- /dev/null +++ b/test/utils/pubsub-spy.ts @@ -0,0 +1,100 @@ +import {vi, expect} from 'vitest'; + +/** + * Creates a PubSub spy that can be used to spy on the PubSub client. + * + * Usage: + * ```ts + * const pubSubSpy = createPubSubSpy(); + * EventPublisher.resetInstance(pubSubSpy.mockPubSub as any); + * + * const response = await app.inject({ + * method: 'POST', + * url: '/tb/web_analytics', + * query: fixtures.queryParams.defaultValidRequestQuery, + * headers: fixtures.headers.defaultValidRequestHeaders, + * body: fixtures.pageHits.defaultValidRequestBody + * }); + * + * // You can use withMessage for low-level assertions: + * pubSubSpy.expectPublishedMessageToTopic(pubsubTopic).withMessage({ + * data: expect.any(Buffer), + * timestamp: expect.any(String) + * }); + * + * // Or use withMessageData to assert on the parsed data: + * pubSubSpy.expectPublishedMessageToTopic(pubsubTopic).withMessageData({ + * site_uuid: 'test-site-uuid', + * page_hits: expect.any(Array) + * }); + * ``` + * @returns + */ +export const createPubSubSpy = () => { + // Realistic mock message ID (15-digit number as string) + const DEFAULT_MESSAGE_ID = '384950293840593'; + + const publishMessageSpy = vi.fn().mockResolvedValue(DEFAULT_MESSAGE_ID); + const topicSpy = vi.fn(); + + const mockPubSub = { + topic: topicSpy.mockImplementation(() => ({ + publishMessage: publishMessageSpy + })) + }; + + const expectPublishedMessageToTopic = (expectedTopic: string) => { + return { + withMessageData: (expectedData: any) => { + expect(topicSpy).toHaveBeenCalledWith(expectedTopic); + + // Get the actual call arguments + const calls = publishMessageSpy.mock.calls; + expect(calls.length).toBeGreaterThan(0); + + const actualMessage = calls[calls.length - 1][0]; + + // Parse the buffer data if it exists + if (actualMessage.data && Buffer.isBuffer(actualMessage.data)) { + const parsedData = JSON.parse(actualMessage.data.toString()); + expect(parsedData).toEqual(expectedData); + } else { + // Fallback to direct comparison if not a buffer + expect(actualMessage).toEqual(expectedData); + } + + // Also check timestamp exists + expect(actualMessage).toHaveProperty('timestamp'); + expect(typeof actualMessage.timestamp).toBe('string'); + }, + withMessage: (messageMatcher: any) => { + expect(topicSpy).toHaveBeenCalledWith(expectedTopic); + expect(publishMessageSpy).toHaveBeenCalledWith(messageMatcher); + } + }; + }; + + const expectNoMessagesPublished = () => { + expect(publishMessageSpy).not.toHaveBeenCalled(); + expect(topicSpy).not.toHaveBeenCalled(); + }; + + const clearSpies = () => { + publishMessageSpy.mockClear(); + topicSpy.mockClear(); + }; + + const setMockMessageId = (messageId: string) => { + publishMessageSpy.mockResolvedValue(messageId); + }; + + return { + mockPubSub, + publishMessageSpy, + topicSpy, + expectPublishedMessageToTopic, + expectNoMessagesPublished, + clearSpies, + setMockMessageId + }; +}; \ No newline at end of file