Skip to content

Commit 7ef13d3

Browse files
authored
fix SqlPersistedQueue batch size (#5813)
1 parent 678cb91 commit 7ef13d3

File tree

3 files changed

+57
-25
lines changed

3 files changed

+57
-25
lines changed

.changeset/poor-clouds-stand.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"effect": patch
3+
"@effect/sql": patch
4+
---
5+
6+
fix SqlPersistedQueue batch size

packages/effect/src/internal/mailbox.ts

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import * as Effectable from "../Effectable.js"
88
import type { Exit } from "../Exit.js"
99
import { dual } from "../Function.js"
1010
import * as Inspectable from "../Inspectable.js"
11-
import * as Iterable from "../Iterable.js"
1211
import type * as Api from "../Mailbox.js"
1312
import * as Option from "../Option.js"
1413
import { pipeArguments } from "../Pipeable.js"
@@ -421,9 +420,13 @@ class MailboxImpl<A, E> extends Effectable.Class<readonly [messages: Chunk.Chunk
421420
} else if (this.state.takers.size === 0) {
422421
return
423422
}
424-
const taker = Iterable.unsafeHead(this.state.takers)
425-
this.state.takers.delete(taker)
426-
taker(core.exitVoid)
423+
for (const taker of this.state.takers) {
424+
this.state.takers.delete(taker)
425+
taker(core.exitVoid)
426+
if (this.messages.length + this.messagesChunk.length === 0) {
427+
break
428+
}
429+
}
427430
}
428431

429432
private unsafeTakeAll() {

packages/sql/src/SqlPersistedQueue.ts

Lines changed: 44 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import * as Effect from "effect/Effect"
99
import * as Exit from "effect/Exit"
1010
import * as Layer from "effect/Layer"
1111
import * as Mailbox from "effect/Mailbox"
12+
import * as Option from "effect/Option"
1213
import * as RcMap from "effect/RcMap"
1314
import * as Schedule from "effect/Schedule"
1415
import type * as Scope from "effect/Scope"
@@ -238,10 +239,18 @@ export const make: (
238239
readonly queue_name: string
239240
element: string
240241
readonly attempts: number
242+
updated_at?: Date
241243
}
242244
const mailboxes = yield* RcMap.make({
243245
lookup: Effect.fnUntraced(function*({ maxAttempts, name }: QueueKey) {
244-
const mailbox = yield* Mailbox.make<Element>({ capacity: 0 })
246+
const mailbox = yield* Mailbox.make<Element>()
247+
248+
yield* Effect.addFinalizer(() =>
249+
Effect.flatMap(mailbox.clear, (elements) => {
250+
if (elements.length === 0) return Effect.void
251+
return interrupt(Array.from(elements, (e) => e.id))
252+
})
253+
)
245254

246255
const poll = sql.onDialectOrElse({
247256
pg: () =>
@@ -258,11 +267,15 @@ export const make: (
258267
FOR UPDATE SKIP LOCKED
259268
LIMIT ${pollBatchSizeSql}
260269
)
261-
RETURNING id, queue_name, element, attempts
262-
`,
270+
RETURNING id, queue_name, element, attempts, updated_at
271+
`.pipe(
272+
Effect.map((rows) =>
273+
(rows as Array<Element>).sort((a, b) => a.updated_at!.getTime() - b.updated_at!.getTime())
274+
)
275+
),
263276
mysql: () =>
264277
sql<Element>`
265-
SELECT id, queue_name, element, attempts FROM ${tableNameSql}
278+
SELECT id, queue_name, element, attempts, updated_at FROM ${tableNameSql}
266279
WHERE queue_name = ${name}
267280
AND completed = FALSE
268281
AND attempts < ${maxAttempts}
@@ -273,6 +286,7 @@ export const make: (
273286
`.pipe(
274287
Effect.tap((rows) => {
275288
if (rows.length === 0) return Effect.void
289+
;(rows as Array<Element>).sort((a, b) => a.updated_at!.getTime() - b.updated_at!.getTime())
276290
return sql`
277291
UPDATE ${tableNameSql}
278292
SET acquired_at = ${sqlNow}, acquired_by = ${workerIdSql}
@@ -293,7 +307,7 @@ export const make: (
293307
)
294308
UPDATE q
295309
SET acquired_at = ${sqlNow}, acquired_by = ${workerIdSql}
296-
OUTPUT inserted.id, inserted.queue_name, inserted.element, inserted.attempts
310+
OUTPUT inserted.id, inserted.queue_name, inserted.element, inserted.attempts, inserted.updated_at
297311
FROM ${tableNameSql} AS q
298312
INNER JOIN cte ON q.id = cte.id
299313
`,
@@ -311,26 +325,30 @@ export const make: (
311325
ORDER BY updated_at ASC
312326
LIMIT ${pollBatchSizeSql}
313327
)
314-
RETURNING id, queue_name, element, attempts
315-
`
328+
RETURNING id, queue_name, element, attempts, updated_at
329+
`.pipe(
330+
Effect.map((rows) => {
331+
if (rows.length === 0) return rows
332+
else if (typeof rows[0].updated_at === "string") {
333+
;(rows as Array<any>).sort((a, b) => a.updated_at.localeCompare(b.updated_at))
334+
} else {
335+
;(rows as Array<Element>).sort((a, b) => a.updated_at!.getTime() - b.updated_at!.getTime())
336+
}
337+
return rows
338+
})
339+
)
316340
})
317341

318342
yield* Effect.gen(function*() {
319343
while (true) {
320-
const results = yield* poll
344+
const size = Option.getOrElse(yield* mailbox.size, () => 0)
345+
const results = size === 0 ? yield* poll : []
321346
if (results.length > 0) {
322-
const toOffer = new Set(results)
323-
yield* Effect.forEach(toOffer, (element) => {
347+
for (let i = 0; i < results.length; i++) {
348+
const element = results[i]
324349
element.element = JSON.parse(element.element)
325-
return mailbox.offer(element).pipe(
326-
Effect.tap(() => {
327-
toOffer.delete(element)
328-
})
329-
)
330-
}).pipe(
331-
Effect.onInterrupt(() => interrupt(Array.from(toOffer, (e) => e.id)))
332-
)
333-
yield* Effect.yieldNow()
350+
}
351+
yield* mailbox.offerAll(results)
334352
} else {
335353
// TODO: use listen/notify or equivalent to avoid polling
336354
yield* Effect.sleep(pollInterval)
@@ -368,7 +386,6 @@ export const make: (
368386
Effect.uninterruptibleMask((restore) =>
369387
RcMap.get(mailboxes, new QueueKey({ name, maxAttempts })).pipe(
370388
Effect.flatMap((m) => Effect.orDie(m.take)),
371-
Effect.zipLeft(Effect.yieldNow()),
372389
Effect.scoped,
373390
restore,
374391
Effect.tap((element) =>
@@ -394,7 +411,13 @@ class QueueKey extends Data.Class<{
394411
* @since 1.0.0
395412
* @category layers
396413
*/
397-
export const layerStore = (options?: {}): Layer.Layer<
414+
export const layerStore = (options?: {
415+
readonly tableName?: string | undefined
416+
readonly pollInterval?: Duration.DurationInput | undefined
417+
readonly pollBatchSize?: number | undefined
418+
readonly lockRefreshInterval?: Duration.DurationInput | undefined
419+
readonly lockExpiration?: Duration.DurationInput | undefined
420+
}): Layer.Layer<
398421
PersistedQueue.PersistedQueueStore,
399422
SqlError,
400423
SqlClient.SqlClient

0 commit comments

Comments
 (0)