Skip to content

Commit 1a1a026

Browse files
ignatzKyleAMathews
andauthored
Some minor drive-by fixes and cleanups for the electric collection. (#205)
Co-authored-by: Kyle Mathews <[email protected]>
1 parent 5d0737f commit 1a1a026

File tree

2 files changed

+26
-41
lines changed

2 files changed

+26
-41
lines changed

packages/db-collections/src/electric.ts

Lines changed: 23 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -115,12 +115,8 @@ function isUpToDateMessage<T extends Row<unknown>>(
115115
// Check if a message contains txids in its headers
116116
function hasTxids<T extends Row<unknown>>(
117117
message: Message<T>
118-
): message is Message<T> & { headers: { txids?: Array<number> } } {
119-
return (
120-
`headers` in message &&
121-
`txids` in message.headers &&
122-
Array.isArray(message.headers.txids)
123-
)
118+
): message is Message<T> & { headers: { txids?: Array<string> } } {
119+
return `txids` in message.headers && Array.isArray(message.headers.txids)
124120
}
125121

126122
/**
@@ -149,7 +145,7 @@ export function electricCollectionOptions<
149145
TSchema extends StandardSchemaV1 = never,
150146
TFallback extends Row<unknown> = Row<unknown>,
151147
>(config: ElectricCollectionConfig<TExplicit, TSchema, TFallback>) {
152-
const seenTxids = new Store<Set<string>>(new Set([`${Math.random()}`]))
148+
const seenTxids = new Store<Set<string>>(new Set([]))
153149
const sync = createElectricSync<ResolveType<TExplicit, TSchema, TFallback>>(
154150
config.shapeOptions,
155151
{
@@ -165,7 +161,7 @@ export function electricCollectionOptions<
165161
*/
166162
const awaitTxId: AwaitTxIdFn = async (
167163
txId: string,
168-
timeout = 30000
164+
timeout: number = 30000
169165
): Promise<boolean> => {
170166
if (typeof txId !== `string`) {
171167
throw new TypeError(
@@ -246,18 +242,14 @@ export function electricCollectionOptions<
246242
ResolveType<TExplicit, TSchema, TFallback>
247243
>
248244
) => {
249-
// Runtime check (that doesn't follow type)
250-
// eslint-disable-next-line
251-
const handlerResult = (await config.onDelete!(params)) ?? {}
252-
const txid = (handlerResult as { txid?: string }).txid
253-
254-
if (!txid) {
245+
const handlerResult = await config.onDelete!(params)
246+
if (!handlerResult.txid) {
255247
throw new Error(
256248
`Electric collection onDelete handler must return a txid`
257249
)
258250
}
259251

260-
await awaitTxId(txid)
252+
await awaitTxId(handlerResult.txid)
261253
return handlerResult
262254
}
263255
: undefined
@@ -333,43 +325,37 @@ function createElectricSync<T extends Row<unknown>>(
333325
signal: abortController.signal,
334326
})
335327
let transactionStarted = false
336-
let newTxids = new Set<string>()
328+
const newTxids = new Set<string>()
337329

338330
unsubscribeStream = stream.subscribe((messages: Array<Message<T>>) => {
339331
let hasUpToDate = false
340332

341333
for (const message of messages) {
342334
// Check for txids in the message and add them to our store
343-
if (hasTxids(message) && message.headers.txids) {
344-
message.headers.txids.forEach((txid) => newTxids.add(String(txid)))
335+
if (hasTxids(message)) {
336+
message.headers.txids?.forEach((txid) => newTxids.add(txid))
345337
}
346338

347-
// Check if the message contains schema information
348-
if (isChangeMessage(message) && message.headers.schema) {
349-
// Store the schema for future use if it's a valid string
350-
if (typeof message.headers.schema === `string`) {
351-
const schema: string = message.headers.schema
339+
if (isChangeMessage(message)) {
340+
// Check if the message contains schema information
341+
const schema = message.headers.schema
342+
if (schema && typeof schema === `string`) {
343+
// Store the schema for future use if it's a valid string
352344
relationSchema.setState(() => schema)
353345
}
354-
}
355346

356-
if (isChangeMessage(message)) {
357347
if (!transactionStarted) {
358348
begin()
359349
transactionStarted = true
360350
}
361351

362-
const value = message.value as unknown as T
363-
364-
// Include the primary key and relation info in the metadata
365-
const enhancedMetadata = {
366-
...message.headers,
367-
}
368-
369352
write({
370353
type: message.headers.operation,
371-
value,
372-
metadata: enhancedMetadata,
354+
value: message.value,
355+
// Include the primary key and relation info in the metadata
356+
metadata: {
357+
...message.headers,
358+
},
373359
})
374360
} else if (isUpToDateMessage(message)) {
375361
hasUpToDate = true
@@ -390,10 +376,9 @@ function createElectricSync<T extends Row<unknown>>(
390376

391377
// Always commit txids when we receive up-to-date, regardless of transaction state
392378
seenTxids.setState((currentTxids) => {
393-
const clonedSeen = new Set(currentTxids)
394-
newTxids.forEach((txid) => clonedSeen.add(String(txid)))
395-
396-
newTxids = new Set()
379+
const clonedSeen = new Set<string>(currentTxids)
380+
newTxids.forEach((txid) => clonedSeen.add(txid))
381+
newTxids.clear()
397382
return clonedSeen
398383
})
399384
}

packages/db-collections/tests/electric.test.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -510,7 +510,7 @@ describe(`Electric Integration`, () => {
510510
value: value.value as Row,
511511
headers: {
512512
operation: `insert`,
513-
txids: [Number(txid)], // Convert to number as the API expects numbers but our code converts to strings
513+
txids: [txid],
514514
},
515515
})
516516
}
@@ -662,7 +662,7 @@ describe(`Electric Integration`, () => {
662662
value: { id: 1, name: `Test` },
663663
headers: {
664664
operation: `insert`,
665-
txids: [100, 200],
665+
txids: [`100`, `200`],
666666
},
667667
},
668668
{
@@ -959,7 +959,7 @@ describe(`Electric Integration`, () => {
959959
{
960960
headers: {
961961
control: `up-to-date`,
962-
txids: [300, 400],
962+
txids: [`300`, `400`],
963963
},
964964
},
965965
])

0 commit comments

Comments
 (0)