Skip to content

Commit 479da75

Browse files
authored
fix: prevent synced invoices from getting updated by outdated webhooks (#68)
This PR introduces a general safeguard for upserting - the `upsertManyWithTimestampProtection` function. This function uses a SQL statement with a `WHERE` clause that only updates records when the incoming `last_synced_at` timestamp is newer than the existing one. This prevents older webhook events from overwriting newer data, ensuring data consistency. This pattern can easily be applied to other entities by using the same function in their respective sync functions. It also: - Adds a new migration to add the last_synced_at column to all entities: `invoices`, `customers`, `subscriptions`, `credit_notes`, `plans`, and `billable_metrics` - Adds the `syncTimestamp` parameter to the `syncInvoices` method allows webhook handlers to pass the webhook's `created_at` timestamp, which is then used as the `last_synced_at` value for timestamp-based protection - Adds assertions and tests to cover this functionality, including a new test case where we check that an invoice is not updated with an out-of-date webhook.
1 parent 6542b79 commit 479da75

File tree

6 files changed

+236
-14
lines changed

6 files changed

+236
-14
lines changed

apps/node-fastify/src/test/test-utils.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ export async function fetchInvoicesFromDatabase(postgresClient: PostgresClient,
1111

1212
const placeholders = invoiceIds.map((_, index) => `$${index + 1}`).join(',');
1313
const result = await postgresClient.query(
14-
`SELECT id, invoice_number, customer_id, total, currency, status, updated_at FROM orb.invoices WHERE id IN (${placeholders})`,
14+
`SELECT id, invoice_number, customer_id, total, currency, status, updated_at, last_synced_at FROM orb.invoices WHERE id IN (${placeholders})`,
1515
invoiceIds
1616
);
1717
return result.rows;

apps/node-fastify/src/test/webhooks.test.ts

Lines changed: 148 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,10 @@ describe('POST /webhooks', () => {
182182

183183
const webhookData = JSON.parse(payload);
184184
webhookData.type = webhookType;
185+
186+
const webhookTimestamp = new Date('2025-01-15T10:30:00.000Z').toISOString();
187+
webhookData.created_at = webhookTimestamp;
188+
185189
payload = JSON.stringify(webhookData);
186190

187191
const invoiceId = webhookData.invoice.id;
@@ -200,6 +204,10 @@ describe('POST /webhooks', () => {
200204
expect(invoice.total).toBe(webhookData.invoice.amount_due);
201205
expect(invoice.currency).toBe(webhookData.invoice.currency);
202206
expect(invoice.status).toBe(webhookData.invoice.status);
207+
208+
// Verify that last_synced_at gets set to the webhook timestamp for new invoices
209+
expect(invoice.last_synced_at).toBeDefined();
210+
expect(new Date(invoice.last_synced_at).toISOString()).toBe(webhookTimestamp);
203211
});
204212

205213
it('should update an existing invoice when webhook arrives', async () => {
@@ -229,8 +237,14 @@ describe('POST /webhooks', () => {
229237
status: initialStatus,
230238
};
231239

232-
// Store the initial invoice in the database
233-
await syncInvoices(postgresClient, [initialInvoiceData]);
240+
// Store the initial invoice in the database with an old timestamp
241+
const oldTimestamp = new Date('2025-01-10T10:00:00.000Z').toISOString();
242+
await syncInvoices(postgresClient, [initialInvoiceData], oldTimestamp);
243+
244+
// Verify the initial invoice was created with the old timestamp
245+
const [initialInvoice] = await fetchInvoicesFromDatabase(orbSync.postgresClient, [invoiceId]);
246+
expect(initialInvoice).toBeDefined();
247+
expect(new Date(initialInvoice.last_synced_at).toISOString()).toBe(oldTimestamp);
234248

235249
// Now update the webhook data with new values
236250
const updatedAmount = 1500;
@@ -240,6 +254,10 @@ describe('POST /webhooks', () => {
240254
webhookData.invoice.status = updatedStatus;
241255
webhookData.invoice.paid_at = new Date().toISOString();
242256

257+
// Set a newer webhook timestamp that should trigger an update
258+
const newWebhookTimestamp = new Date('2025-01-15T10:30:00.000Z').toISOString();
259+
webhookData.created_at = newWebhookTimestamp;
260+
243261
payload = JSON.stringify(webhookData);
244262

245263
// Send the webhook with updated data
@@ -252,13 +270,135 @@ describe('POST /webhooks', () => {
252270
});
253271

254272
// Verify that the invoice was updated in the database
255-
const [invoice] = await fetchInvoicesFromDatabase(orbSync.postgresClient, [invoiceId]);
256-
expect(invoice).toBeDefined();
257-
expect(Number(invoice.total)).toBe(updatedAmount);
258-
expect(invoice.status).toBe(updatedStatus);
273+
const [updatedInvoice] = await fetchInvoicesFromDatabase(orbSync.postgresClient, [invoiceId]);
274+
expect(updatedInvoice).toBeDefined();
275+
expect(Number(updatedInvoice.total)).toBe(updatedAmount);
276+
expect(updatedInvoice.status).toBe(updatedStatus);
259277

260278
// Verify that the updated_at timestamp was changed
261-
expect(invoice.updated_at).toBeDefined();
262-
expect(new Date(invoice.updated_at).getTime()).toBeGreaterThan(new Date(webhookData.invoice.created_at).getTime());
279+
expect(updatedInvoice.updated_at).toBeDefined();
280+
expect(new Date(updatedInvoice.updated_at).getTime()).toBeGreaterThan(new Date(webhookData.invoice.created_at).getTime());
281+
282+
// Verify that last_synced_at was updated to the new webhook timestamp
283+
expect(updatedInvoice.last_synced_at).toBeDefined();
284+
expect(new Date(updatedInvoice.last_synced_at).toISOString()).toBe(newWebhookTimestamp);
285+
286+
// Verify that the new timestamp is newer than the old timestamp
287+
expect(new Date(updatedInvoice.last_synced_at).getTime()).toBeGreaterThan(new Date(oldTimestamp).getTime());
288+
});
289+
290+
it('should NOT update invoice when webhook timestamp is older than last_synced_at', async () => {
291+
let payload = loadWebhookPayload('invoice');
292+
const postgresClient = orbSync.postgresClient;
293+
294+
const webhookData = JSON.parse(payload);
295+
const invoiceId = webhookData.invoice.id;
296+
await deleteTestData(orbSync.postgresClient, 'invoices', [invoiceId]);
297+
298+
webhookData.type = 'invoice.payment_succeeded';
299+
300+
// Insert an invoice with a "new" timestamp and known values
301+
const originalAmount = 2000;
302+
const originalStatus = 'paid';
303+
webhookData.invoice.amount_due = originalAmount.toString();
304+
webhookData.invoice.total = originalAmount.toString();
305+
webhookData.invoice.status = originalStatus;
306+
307+
const newTimestamp = new Date('2025-01-15T10:30:00.000Z').toISOString();
308+
const initialInvoiceData = {
309+
...webhookData.invoice,
310+
amount_due: originalAmount.toString(),
311+
total: originalAmount.toString(),
312+
status: originalStatus,
313+
};
314+
await syncInvoices(postgresClient, [initialInvoiceData], newTimestamp);
315+
316+
// Verify the invoice was created with the new timestamp
317+
const [initialInvoice] = await fetchInvoicesFromDatabase(orbSync.postgresClient, [invoiceId]);
318+
expect(initialInvoice).toBeDefined();
319+
expect(Number(initialInvoice.total)).toBe(originalAmount);
320+
expect(initialInvoice.status).toBe(originalStatus);
321+
expect(new Date(initialInvoice.last_synced_at).toISOString()).toBe(newTimestamp);
322+
323+
// Now attempt to update with an older webhook timestamp and different values
324+
const outdatedAmount = 1000;
325+
const outdatedStatus = 'pending';
326+
webhookData.invoice.amount_due = outdatedAmount.toString();
327+
webhookData.invoice.total = outdatedAmount.toString();
328+
webhookData.invoice.status = outdatedStatus;
329+
webhookData.invoice.paid_at = undefined;
330+
const oldWebhookTimestamp = new Date('2025-01-10T10:00:00.000Z').toISOString();
331+
webhookData.created_at = oldWebhookTimestamp;
332+
payload = JSON.stringify(webhookData);
333+
334+
// Send the webhook with the outdated data
335+
const response = await sendWebhookRequest(payload);
336+
expect(response.statusCode).toBe(200);
337+
const data = response.json();
338+
expect(data).toMatchObject({ received: true });
339+
340+
// Fetch the invoice again and verify it was NOT updated
341+
const [afterWebhookInvoice] = await fetchInvoicesFromDatabase(orbSync.postgresClient, [invoiceId]);
342+
expect(afterWebhookInvoice).toBeDefined();
343+
// Data should remain unchanged
344+
expect(Number(afterWebhookInvoice.total)).toBe(originalAmount);
345+
expect(afterWebhookInvoice.status).toBe(originalStatus);
346+
// last_synced_at should remain the new timestamp
347+
expect(new Date(afterWebhookInvoice.last_synced_at).toISOString()).toBe(newTimestamp);
348+
});
349+
350+
it('should ignore outdated webhook for invoice when last_synced_at is newer, even with multiple invoices', async () => {
351+
const postgresClient = orbSync.postgresClient;
352+
let payload = loadWebhookPayload('invoice');
353+
const webhookData = JSON.parse(payload);
354+
355+
webhookData.type = 'invoice.payment_succeeded';
356+
357+
// Prepare two invoices with different ids
358+
const invoiceId1 = webhookData.invoice.id;
359+
const invoiceId2 = invoiceId1 + '_other';
360+
361+
// Insert both invoices with different last_synced_at values
362+
const webhookInvoice1Timestamp = new Date('2025-01-10T09:30:00.000Z').toISOString(); // This is older than invoice1Timestamp, so it should not update invoice1
363+
const invoice1Timestamp = new Date('2025-01-10T10:00:00.000Z').toISOString();
364+
const invoice2Timestamp = new Date('2025-01-10T10:30:00.000Z').toISOString();
365+
366+
// Invoice 1: will be updated
367+
const invoiceData1 = {
368+
...webhookData.invoice,
369+
id: invoiceId1,
370+
total: '1500',
371+
status: 'paid',
372+
};
373+
374+
// Invoice 2 data is irrelevant for this test
375+
const invoiceData2 = {
376+
...webhookData.invoice,
377+
id: invoiceId2
378+
};
379+
380+
await deleteTestData(postgresClient, 'invoices', [invoiceId1, invoiceId2]);
381+
await syncInvoices(postgresClient, [invoiceData1], invoice1Timestamp);
382+
await syncInvoices(postgresClient, [invoiceData2], invoice2Timestamp);
383+
384+
// Now, send a webhook for invoiceId1 with an older timestamp and outdated values
385+
webhookData.invoice.id = invoiceId1;
386+
webhookData.invoice.total = '1000';
387+
webhookData.invoice.status = 'pending';
388+
webhookData.created_at = webhookInvoice1Timestamp;
389+
payload = JSON.stringify(webhookData);
390+
391+
const response = await sendWebhookRequest(payload);
392+
expect(response.statusCode).toBe(200);
393+
394+
// Fetch both invoices
395+
const [updatedInvoice1] = await fetchInvoicesFromDatabase(postgresClient, [invoiceId1]);
396+
397+
// Invoice 1 should NOT be updated because the webhook timestamp is older than the invoice's last_synced_at
398+
expect(updatedInvoice1).toBeDefined();
399+
expect(updatedInvoice1.total).toBe('1500');
400+
expect(updatedInvoice1.status).toBe('paid');
401+
expect(new Date(updatedInvoice1.last_synced_at).toISOString()).toBe(invoice1Timestamp);
402+
263403
});
264404
});

packages/orb-sync-lib/src/database/postgres.ts

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,51 @@ export class PostgresClient {
4747
return results.flatMap((it) => it.rows);
4848
}
4949

50+
async upsertManyWithTimestampProtection<
51+
T extends {
52+
[Key: string]: any; // eslint-disable-line @typescript-eslint/no-explicit-any
53+
},
54+
>(
55+
entries: T[],
56+
table: string,
57+
tableSchema: JsonSchema,
58+
syncTimestamp: string
59+
): Promise<T[]> {
60+
if (!entries.length) return [];
61+
62+
// Max 5 in parallel to avoid exhausting connection pool
63+
const chunkSize = 5;
64+
const results: pg.QueryResult<T>[] = [];
65+
66+
for (let i = 0; i < entries.length; i += chunkSize) {
67+
const chunk = entries.slice(i, i + chunkSize);
68+
69+
const queries: Promise<pg.QueryResult<T>>[] = [];
70+
chunk.forEach((entry) => {
71+
// Inject the values
72+
const cleansed = this.cleanseArrayField(entry, tableSchema);
73+
// Add last_synced_at to the cleansed data for SQL parameter binding
74+
cleansed.last_synced_at = syncTimestamp;
75+
76+
const upsertSql = this.constructUpsertWithTimestampProtectionSql(
77+
this.config.schema,
78+
table,
79+
tableSchema
80+
);
81+
82+
const prepared = sql(upsertSql, {
83+
useNullForMissing: true,
84+
})(cleansed);
85+
86+
queries.push(this.pool.query(prepared.text, prepared.values));
87+
});
88+
89+
results.push(...(await Promise.all(queries)));
90+
}
91+
92+
return results.flatMap((it) => it.rows);
93+
}
94+
5095
private constructUpsertSql = (schema: string, table: string, tableSchema: JsonSchema): string => {
5196
const conflict = 'id';
5297
const properties = tableSchema.properties;
@@ -72,6 +117,38 @@ export class PostgresClient {
72117
;`;
73118
};
74119

120+
private constructUpsertWithTimestampProtectionSql = (
121+
schema: string,
122+
table: string,
123+
tableSchema: JsonSchema
124+
): string => {
125+
const conflict = 'id';
126+
const properties = tableSchema.properties;
127+
128+
// The WHERE clause in ON CONFLICT DO UPDATE only applies to the conflicting row
129+
// (the row being updated), not to all rows in the table. PostgreSQL ensures that
130+
// the condition is evaluated only for the specific row that conflicts with the INSERT.
131+
return `
132+
INSERT INTO "${schema}"."${table}" (
133+
${Object.keys(properties)
134+
.map((x) => `"${x}"`)
135+
.join(',')}
136+
)
137+
VALUES (
138+
${Object.keys(properties)
139+
.map((x) => `:${x}`)
140+
.join(',')}
141+
)
142+
ON CONFLICT (${conflict}) DO UPDATE SET
143+
${Object.keys(properties)
144+
.filter((x) => x !== 'last_synced_at')
145+
.map((x) => `"${x}" = EXCLUDED."${x}"`)
146+
.join(',')},
147+
last_synced_at = :last_synced_at
148+
WHERE "${table}"."last_synced_at" IS NULL
149+
OR "${table}"."last_synced_at" < :last_synced_at;`;
150+
};
151+
75152
/**
76153
* Updates a subscription's billing cycle dates, provided that the current end date is in the past (i.e. the subscription
77154
* data in the database being stale).

packages/orb-sync-lib/src/orb-sync.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ export class OrbSync {
173173
const invoice = webhook.invoice;
174174
this.config.logger?.info(`Received webhook ${webhook.id}: ${parsedData.type} for invoice ${invoice.id}`);
175175

176-
await syncInvoices(this.postgresClient, [invoice]);
176+
await syncInvoices(this.postgresClient, [invoice], webhook.created_at);
177177

178178
const billingCycle = getBillingCycleFromInvoice(invoice);
179179
if (billingCycle && invoice.subscription) {
@@ -201,7 +201,7 @@ export class OrbSync {
201201

202202
this.config.logger?.info(`Received webhook ${webhook.id}: ${webhook.type} for invoice ${webhook.invoice.id}`);
203203

204-
await syncInvoices(this.postgresClient, [webhook.invoice]);
204+
await syncInvoices(this.postgresClient, [webhook.invoice], webhook.created_at);
205205
break;
206206
}
207207

packages/orb-sync-lib/src/schemas/invoice.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ export const invoiceSchema: JsonSchema = {
4343
sync_failed_at: { type: 'string' },
4444
voided_at: { type: 'string' },
4545
will_auto_issue: { type: 'boolean' },
46+
last_synced_at: { type: 'string' },
4647
},
4748
required: ['id'],
4849
} as const;

packages/orb-sync-lib/src/sync/invoices.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,19 @@ import { Invoice } from 'orb-billing/resources';
66

77
const TABLE = 'invoices';
88

9-
export async function syncInvoices(postgresClient: PostgresClient, invoices: Invoice[]) {
10-
return postgresClient.upsertMany(
9+
export async function syncInvoices(postgresClient: PostgresClient, invoices: Invoice[], syncTimestamp?: string) {
10+
const timestamp = syncTimestamp || new Date().toISOString();
11+
12+
return postgresClient.upsertManyWithTimestampProtection(
1113
invoices.map((invoice) => ({
1214
...invoice,
1315
customer_id: invoice.customer.id,
1416
subscription_id: invoice.subscription?.id,
17+
last_synced_at: timestamp,
1518
})),
1619
TABLE,
17-
invoiceSchema
20+
invoiceSchema,
21+
timestamp
1822
);
1923
}
2024

0 commit comments

Comments
 (0)