-
Notifications
You must be signed in to change notification settings - Fork 2.9k
fix: job queue error when running tasks in parallel on sqlite #13452
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
2 tasks
📦 esbuild Bundle Analysis for payloadThis analysis was generated by esbuild-bundle-analyzer. 🤖
Largest pathsThese visualization shows top 20 largest paths in the bundle.Meta file: packages/next/meta_index.json, Out file: esbuild/index.js
Meta file: packages/payload/meta_index.json, Out file: esbuild/index.js
Meta file: packages/payload/meta_shared.json, Out file: esbuild/exports/shared.js
Meta file: packages/richtext-lexical/meta_client.json, Out file: esbuild/exports/client_optimized/index.js
Meta file: packages/ui/meta_client.json, Out file: esbuild/exports/client_optimized/index.js
Meta file: packages/ui/meta_shared.json, Out file: esbuild/exports/shared_optimized/index.js
DetailsNext to the size is how much the size has increased or decreased compared with the base branch of this PR.
|
AlessioGr
added a commit
that referenced
this pull request
Aug 27, 2025
This PR adds **atomic** `$push` **support for array fields**. It makes it possible to safely append new items to arrays, which is especially useful when running tasks in parallel (like job queues) where multiple processes might update the same record at the same time. By handling pushes atomically, we avoid race conditions and keep data consistent - especially on postgres, where the current implementation would nuke the entire array table before re-inserting every single array item. The feature works for both localized and unlocalized arrays, and supports pushing either single or multiple items at once. This PR is a requirement for reliably running parallel tasks in the job queue - see #13452. Alongside documenting `$push`, this PR also adds documentation for `$inc`. ## Changes to updatedAt behavior #13335 allows us to override the updatedAt property instead of the db always setting it to the current date. However, we are not able to skip updating the updatedAt property completely. This means, usage of $push results in 2 postgres db calls: 1. set updatedAt in main row 2. append array row in arrays table This PR changes the behavior to only automatically set updatedAt if it's undefined. If you explicitly set it to `null`, this now allows you to skip the db adapter automatically setting updatedAt. => This allows us to use $push in just one single db call ## Usage Examples ### Pushing a single item to an array ```ts const post = (await payload.db.updateOne({ data: { array: { $push: { text: 'some text 2', id: new mongoose.Types.ObjectId().toHexString(), }, }, }, collection: 'posts', id: post.id, })) ``` ### Pushing a single item to a localized array ```ts const post = (await payload.db.updateOne({ data: { arrayLocalized: { $push: { en: { text: 'some text 2', id: new mongoose.Types.ObjectId().toHexString(), }, es: { text: 'some text 2 es', id: new mongoose.Types.ObjectId().toHexString(), }, }, }, }, collection: 'posts', id: post.id, })) ``` ### Pushing multiple items to an array ```ts const post = (await payload.db.updateOne({ data: { array: { $push: [ { text: 'some text 2', id: new mongoose.Types.ObjectId().toHexString(), }, { text: 'some text 3', id: new mongoose.Types.ObjectId().toHexString(), }, ], }, }, collection: 'posts', id: post.id, })) ``` ### Pushing multiple items to a localized array ```ts const post = (await payload.db.updateOne({ data: { arrayLocalized: { $push: { en: { text: 'some text 2', id: new mongoose.Types.ObjectId().toHexString(), }, es: [ { text: 'some text 2 es', id: new mongoose.Types.ObjectId().toHexString(), }, { text: 'some text 3 es', id: new mongoose.Types.ObjectId().toHexString(), }, ], }, }, }, collection: 'posts', id: post.id, })) ``` --- - To see the specific tasks where the Asana app for GitHub is being used, see below: - https://app.asana.com/0/0/1211110462564647
Closing in favor of #13614 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Only happens on sqlite. Postgres works fine.
What happens on postgres (2 parallel tasks)
What happens on sqlite (2 parallel tasks)
TODO
Possible Solutions (notes):
Solution 1: Optimize db.updateJobs to atomically update the jobs log table (it's its own table, each log = 1 row which makes it easy) without nuking the entire table
Problem: won't be fool-proof for mongodb. What if the 2nd task finished before the 1st one? In that case, the final job log (same document in mongodb) will just contain 1 single item, as the following will have happened:
The chance of this happening is not very likely, as I believe the second job log addition will mutate the job log object of the first task, so it could squeeze itself through. But in theory this race condition can be a problem.
Possible solution: add support for atomic array updates across all db adapter (
data: { array: { $push { ... } } }
)Solution 2: Disallow the use of Promise.all. Pass concurrency utility that manages this in a safe way to task and workflow handler args
Users will have to use this utility in any Promise.all where there may be a task execution nested anywhere within the code executed within
When using this concurrency utility, we can skip the job log update for each task, and do it one single time once all tasks have been completed.
=> this will actually be a performance boost. {amount of parallel tasks} db updates => 1 db update
Keep in mind: we need to explicitly handle scenarios where some tasks may fail. => stop promises execution if one single task fails. What do we do with remaining unfinished (but possibly started) tasks?
Fail them? Ignore them and just not add them to job log?
Probably fail them, so the user has the chance to revert possible db mutations done in these tasks.