diff --git a/packages/db-mongodb/src/updateOne.ts b/packages/db-mongodb/src/updateOne.ts index 20816512ade..86d8fc8f02d 100644 --- a/packages/db-mongodb/src/updateOne.ts +++ b/packages/db-mongodb/src/updateOne.ts @@ -50,12 +50,18 @@ export const updateOne: UpdateOne = async function updateOne( let result - const $inc: Record = {} let updateData: UpdateQuery = data - transform({ $inc, adapter: this, data, fields, operation: 'write' }) + + const $inc: Record = {} + const $push: Record = {} + + transform({ $inc, $push, adapter: this, data, fields, operation: 'write' }) if (Object.keys($inc).length) { updateData = { $inc, $set: updateData } } + if (Object.keys($push).length) { + updateData = { $push, $set: updateData } + } try { if (returning === false) { diff --git a/packages/db-mongodb/src/utilities/transform.ts b/packages/db-mongodb/src/utilities/transform.ts index 35a271877c7..49d2f4c4bf6 100644 --- a/packages/db-mongodb/src/utilities/transform.ts +++ b/packages/db-mongodb/src/utilities/transform.ts @@ -209,6 +209,7 @@ const sanitizeDate = ({ type Args = { $inc?: Record + $push?: Record /** instance of the adapter */ adapter: MongooseAdapter /** data to transform, can be an array of documents or a single document */ @@ -398,6 +399,7 @@ const stripFields = ({ export const transform = ({ $inc, + $push, adapter, data, fields, @@ -412,7 +414,16 @@ export const transform = ({ if (Array.isArray(data)) { for (const item of data) { - transform({ $inc, adapter, data: item, fields, globalSlug, operation, validateRelationships }) + transform({ + $inc, + $push, + adapter, + data: item, + fields, + globalSlug, + operation, + validateRelationships, + }) } return } @@ -470,6 +481,26 @@ export const transform = ({ } } + if ( + $push && + field.type === 'array' && + operation === 'write' && + field.name in ref && + ref[field.name] + ) { + const value = ref[field.name] + if (value && typeof value === 'object' && '$push' in value) { + const push = value.$push + + if (Array.isArray(push)) { + $push[`${parentPath}${field.name}`] = { $each: push } + } else if (typeof push === 'object') { + $push[`${parentPath}${field.name}`] = push + } + delete ref[field.name] + } + } + if (field.type === 'date' && operation === 'read' && field.name in ref && ref[field.name]) { if (config.localization && fieldShouldBeLocalized({ field, parentIsLocalized })) { const fieldRef = ref[field.name] as Record diff --git a/packages/drizzle/src/transform/write/array.ts b/packages/drizzle/src/transform/write/array.ts index 522400cc7cc..9572592e543 100644 --- a/packages/drizzle/src/transform/write/array.ts +++ b/packages/drizzle/src/transform/write/array.ts @@ -71,6 +71,7 @@ export const transformArray = ({ data.forEach((arrayRow, i) => { const newRow: ArrayRowToInsert = { arrays: {}, + arraysToPush: {}, locales: {}, row: { _order: i + 1, @@ -104,6 +105,7 @@ export const transformArray = ({ traverseFields({ adapter, arrays: newRow.arrays, + arraysToPush: newRow.arraysToPush, baseTableName, blocks, blocksToDelete, diff --git a/packages/drizzle/src/transform/write/blocks.ts b/packages/drizzle/src/transform/write/blocks.ts index dd786433815..1eeebe0a0f4 100644 --- a/packages/drizzle/src/transform/write/blocks.ts +++ b/packages/drizzle/src/transform/write/blocks.ts @@ -78,6 +78,7 @@ export const transformBlocks = ({ const newRow: BlockRowToInsert = { arrays: {}, + arraysToPush: {}, locales: {}, row: { _order: i + 1, @@ -116,6 +117,7 @@ export const transformBlocks = ({ traverseFields({ adapter, arrays: newRow.arrays, + arraysToPush: newRow.arraysToPush, baseTableName, blocks, blocksToDelete, diff --git a/packages/drizzle/src/transform/write/index.ts b/packages/drizzle/src/transform/write/index.ts index 5d875162dae..e0d67793cef 100644 --- a/packages/drizzle/src/transform/write/index.ts +++ b/packages/drizzle/src/transform/write/index.ts @@ -27,6 +27,7 @@ export const transformForWrite = ({ // Split out the incoming data into rows to insert / delete const rowToInsert: RowToInsert = { arrays: {}, + arraysToPush: {}, blocks: {}, blocksToDelete: new Set(), locales: {}, @@ -45,6 +46,7 @@ export const transformForWrite = ({ traverseFields({ adapter, arrays: rowToInsert.arrays, + arraysToPush: rowToInsert.arraysToPush, baseTableName: tableName, blocks: rowToInsert.blocks, blocksToDelete: rowToInsert.blocksToDelete, diff --git a/packages/drizzle/src/transform/write/traverseFields.ts b/packages/drizzle/src/transform/write/traverseFields.ts index feb3b176629..0c853a4c448 100644 --- a/packages/drizzle/src/transform/write/traverseFields.ts +++ b/packages/drizzle/src/transform/write/traverseFields.ts @@ -4,13 +4,7 @@ import { fieldIsVirtual, fieldShouldBeLocalized } from 'payload/shared' import toSnakeCase from 'to-snake-case' import type { DrizzleAdapter } from '../../types.js' -import type { - ArrayRowToInsert, - BlockRowToInsert, - NumberToDelete, - RelationshipToDelete, - TextToDelete, -} from './types.js' +import type { NumberToDelete, RelationshipToDelete, RowToInsert, TextToDelete } from './types.js' import { isArrayOfRows } from '../../utilities/isArrayOfRows.js' import { resolveBlockTableName } from '../../utilities/validateExistingBlockIsIdentical.js' @@ -23,16 +17,20 @@ import { transformTexts } from './texts.js' type Args = { adapter: DrizzleAdapter - arrays: { - [tableName: string]: ArrayRowToInsert[] - } + /** + * This will delete the array table and then re-insert all the new array rows. + */ + arrays: RowToInsert['arrays'] + /** + * Array rows to push to the existing array. This will simply create + * a new row in the array table. + */ + arraysToPush: RowToInsert['arraysToPush'] /** * This is the name of the base table */ baseTableName: string - blocks: { - [blockType: string]: BlockRowToInsert[] - } + blocks: RowToInsert['blocks'] blocksToDelete: Set /** * A snake-case field prefix, representing prior fields @@ -82,6 +80,7 @@ type Args = { export const traverseFields = ({ adapter, arrays, + arraysToPush, baseTableName, blocks, blocksToDelete, @@ -129,10 +128,6 @@ export const traverseFields = ({ if (field.type === 'array') { const arrayTableName = adapter.tableNameMap.get(`${parentTableName}_${columnName}`) - if (!arrays[arrayTableName]) { - arrays[arrayTableName] = [] - } - if (isLocalized) { if (typeof data[field.name] === 'object' && data[field.name] !== null) { Object.entries(data[field.name]).forEach(([localeKey, localeData]) => { @@ -157,19 +152,33 @@ export const traverseFields = ({ textsToDelete, withinArrayOrBlockLocale: localeKey, }) - + if (!arrays[arrayTableName]) { + arrays[arrayTableName] = [] + } arrays[arrayTableName] = arrays[arrayTableName].concat(newRows) } }) } } else { + let value = data[field.name] + let push = false + if ( + // TODO do this for localized as well in DRY way + + typeof value === 'object' && + '$push' in value + ) { + value = Array.isArray(value.$push) ? value.$push : [value.$push] + push = true + } + const newRows = transformArray({ adapter, arrayTableName, baseTableName, blocks, blocksToDelete, - data: data[field.name], + data: value, field, numbers, numbersToDelete, @@ -183,7 +192,17 @@ export const traverseFields = ({ withinArrayOrBlockLocale, }) - arrays[arrayTableName] = arrays[arrayTableName].concat(newRows) + if (push) { + if (!arraysToPush[arrayTableName]) { + arraysToPush[arrayTableName] = [] + } + arraysToPush[arrayTableName] = arraysToPush[arrayTableName].concat(newRows) + } else { + if (!arrays[arrayTableName]) { + arrays[arrayTableName] = [] + } + arrays[arrayTableName] = arrays[arrayTableName].concat(newRows) + } } return @@ -264,6 +283,7 @@ export const traverseFields = ({ traverseFields({ adapter, arrays, + arraysToPush, baseTableName, blocks, blocksToDelete, @@ -298,6 +318,7 @@ export const traverseFields = ({ traverseFields({ adapter, arrays, + arraysToPush, baseTableName, blocks, blocksToDelete, diff --git a/packages/drizzle/src/transform/write/types.ts b/packages/drizzle/src/transform/write/types.ts index 27281a695f2..b78052c3791 100644 --- a/packages/drizzle/src/transform/write/types.ts +++ b/packages/drizzle/src/transform/write/types.ts @@ -2,6 +2,9 @@ export type ArrayRowToInsert = { arrays: { [tableName: string]: ArrayRowToInsert[] } + arraysToPush: { + [tableName: string]: ArrayRowToInsert[] + } locales: { [locale: string]: Record } @@ -12,6 +15,9 @@ export type BlockRowToInsert = { arrays: { [tableName: string]: ArrayRowToInsert[] } + arraysToPush: { + [tableName: string]: ArrayRowToInsert[] + } locales: { [locale: string]: Record } @@ -37,6 +43,9 @@ export type RowToInsert = { arrays: { [tableName: string]: ArrayRowToInsert[] } + arraysToPush: { + [tableName: string]: ArrayRowToInsert[] + } blocks: { [tableName: string]: BlockRowToInsert[] } diff --git a/packages/drizzle/src/updateJobs.ts b/packages/drizzle/src/updateJobs.ts index 6bfec911a21..7463dab2480 100644 --- a/packages/drizzle/src/updateJobs.ts +++ b/packages/drizzle/src/updateJobs.ts @@ -13,9 +13,13 @@ export const updateJobs: UpdateJobs = async function updateMany( this: DrizzleAdapter, { id, data, limit: limitArg, req, returning, sort: sortArg, where: whereArg }, ) { - if (!(data?.log as object[])?.length) { + if ( + !(data?.log as object[])?.length && + !(data.log && typeof data.log === 'object' && '$push' in data.log) + ) { delete data.log } + const whereToUse: Where = id ? { id: { equals: id } } : whereArg const limit = id ? 1 : limitArg diff --git a/packages/drizzle/src/upsertRow/index.ts b/packages/drizzle/src/upsertRow/index.ts index 52d686a55e0..756d136719e 100644 --- a/packages/drizzle/src/upsertRow/index.ts +++ b/packages/drizzle/src/upsertRow/index.ts @@ -44,7 +44,7 @@ export const upsertRow = async | TypeWithID>( }: Args): Promise => { let insertedRow: Record = { id } if (id && shouldUseOptimizedUpsertRow({ data, fields })) { - const { row } = transformForWrite({ + const { arraysToPush, row } = transformForWrite({ adapter, data, enableAtomicWrites: true, @@ -54,11 +54,27 @@ export const upsertRow = async | TypeWithID>( const drizzle = db as LibSQLDatabase + // First, handle $push arrays + + if (arraysToPush && Object.keys(arraysToPush)?.length) { + await insertArrays({ + adapter, + arrays: [arraysToPush], + db, + parentRows: [insertedRow], + uuidMap: {}, + }) + } + + // Then, handle regular row update + if (ignoreResult) { - await drizzle - .update(adapter.tables[tableName]) - .set(row) - .where(eq(adapter.tables[tableName].id, id)) + if (row && Object.keys(row).length) { + await drizzle + .update(adapter.tables[tableName]) + .set(row) + .where(eq(adapter.tables[tableName].id, id)) + } return ignoreResult === 'idOnly' ? ({ id } as T) : null } @@ -74,6 +90,22 @@ export const upsertRow = async | TypeWithID>( const findManyKeysLength = Object.keys(findManyArgs).length const hasOnlyColumns = Object.keys(findManyArgs.columns || {}).length > 0 + if (!row || !Object.keys(row).length) { + // Nothing to update => just fetch current row and return + findManyArgs.where = eq(adapter.tables[tableName].id, insertedRow.id) + + const doc = await db.query[tableName].findFirst(findManyArgs) + + return transform({ + adapter, + config: adapter.payload.config, + data: doc, + fields, + joinQuery: false, + tableName, + }) + } + if (findManyKeysLength === 0 || hasOnlyColumns) { // Optimization - No need for joins => can simply use returning(). This is optimal for very simple collections // without complex fields that live in separate tables like blocks, arrays, relationships, etc. @@ -429,9 +461,9 @@ export const upsertRow = async | TypeWithID>( await insertArrays({ adapter, - arrays: [rowToInsert.arrays], + arrays: [rowToInsert.arrays, rowToInsert.arraysToPush], db, - parentRows: [insertedRow], + parentRows: [insertedRow, insertedRow], uuidMap: arraysBlocksUUIDMap, }) diff --git a/packages/drizzle/src/upsertRow/insertArrays.ts b/packages/drizzle/src/upsertRow/insertArrays.ts index c88d115e820..19d1998a5e3 100644 --- a/packages/drizzle/src/upsertRow/insertArrays.ts +++ b/packages/drizzle/src/upsertRow/insertArrays.ts @@ -32,6 +32,9 @@ export const insertArrays = async ({ const rowsByTable: RowsByTable = {} arrays.forEach((arraysByTable, parentRowIndex) => { + if (!arraysByTable || Object.keys(arraysByTable).length === 0) { + return + } Object.entries(arraysByTable).forEach(([tableName, arrayRows]) => { // If the table doesn't exist in map, initialize it if (!rowsByTable[tableName]) { diff --git a/packages/drizzle/src/upsertRow/shouldUseOptimizedUpsertRow.ts b/packages/drizzle/src/upsertRow/shouldUseOptimizedUpsertRow.ts index 096d22a5cf1..42761e8f99a 100644 --- a/packages/drizzle/src/upsertRow/shouldUseOptimizedUpsertRow.ts +++ b/packages/drizzle/src/upsertRow/shouldUseOptimizedUpsertRow.ts @@ -20,7 +20,6 @@ export const shouldUseOptimizedUpsertRow = ({ } if ( - field.type === 'array' || field.type === 'blocks' || ((field.type === 'text' || field.type === 'relationship' || @@ -35,6 +34,17 @@ export const shouldUseOptimizedUpsertRow = ({ return false } + if (field.type === 'array') { + if (typeof value === 'object' && '$push' in value && value.$push) { + return shouldUseOptimizedUpsertRow({ + // Only check first row - this function cares about field definitions. Each array row will have the same field definitions. + data: Array.isArray(value.$push) ? value.$push?.[0] : value.$push, + fields: field.flattenedFields, + }) + } + return false + } + if ( (field.type === 'group' || field.type === 'tab') && value && diff --git a/packages/payload/src/queues/errors/handleTaskError.ts b/packages/payload/src/queues/errors/handleTaskError.ts index 3366b0a15b5..8b0c2e19f9f 100644 --- a/packages/payload/src/queues/errors/handleTaskError.ts +++ b/packages/payload/src/queues/errors/handleTaskError.ts @@ -1,6 +1,6 @@ import ObjectIdImport from 'bson-objectid' -import type { PayloadRequest } from '../../index.js' +import type { JobLog, PayloadRequest } from '../../index.js' import type { RunJobsSilent } from '../localAPI.js' import type { UpdateJobFunction } from '../operations/runJobs/runJob/getUpdateJobFunction.js' import type { TaskError } from './index.js' @@ -60,19 +60,6 @@ export async function handleTaskError({ const currentDate = getCurrentDate() - ;(job.log ??= []).push({ - id: new ObjectId().toHexString(), - completedAt: currentDate.toISOString(), - error: errorJSON, - executedAt: executedAt.toISOString(), - input, - output: output ?? {}, - parent: req.payload.config.jobs.addParentToTaskLog ? parent : undefined, - state: 'failed', - taskID, - taskSlug, - }) - if (job.waitUntil) { // Check if waitUntil is in the past const waitUntil = new Date(job.waitUntil) @@ -100,6 +87,19 @@ export async function handleTaskError({ maxRetries = retriesConfig.attempts } + const taskLogToPush: JobLog = { + id: new ObjectId().toHexString(), + completedAt: currentDate.toISOString(), + error: errorJSON, + executedAt: executedAt.toISOString(), + input, + output: output ?? {}, + parent: req.payload.config.jobs.addParentToTaskLog ? parent : undefined, + state: 'failed', + taskID, + taskSlug, + } + if (!taskStatus?.complete && (taskStatus?.totalTried ?? 0) >= maxRetries) { /** * Task reached max retries => workflow will not retry @@ -108,7 +108,9 @@ export async function handleTaskError({ await updateJob({ error: errorJSON, hasError: true, - log: job.log, + log: { + $push: taskLogToPush, + } as any, processing: false, totalTried: (job.totalTried ?? 0) + 1, waitUntil: job.waitUntil, @@ -168,7 +170,9 @@ export async function handleTaskError({ await updateJob({ error: hasFinalError ? errorJSON : undefined, hasError: hasFinalError, // If reached max retries => final error. If hasError is true this job will not be retried - log: job.log, + log: { + $push: taskLogToPush, + } as any, processing: false, totalTried: (job.totalTried ?? 0) + 1, waitUntil: job.waitUntil, diff --git a/packages/payload/src/queues/errors/handleWorkflowError.ts b/packages/payload/src/queues/errors/handleWorkflowError.ts index 2716aebdec8..aa3672e26f4 100644 --- a/packages/payload/src/queues/errors/handleWorkflowError.ts +++ b/packages/payload/src/queues/errors/handleWorkflowError.ts @@ -79,7 +79,6 @@ export async function handleWorkflowError({ await updateJob({ error: errorJSON, hasError: hasFinalError, // If reached max retries => final error. If hasError is true this job will not be retried - log: job.log, processing: false, totalTried: (job.totalTried ?? 0) + 1, waitUntil: job.waitUntil, diff --git a/packages/payload/src/queues/operations/runJobs/runJob/getRunTaskFunction.ts b/packages/payload/src/queues/operations/runJobs/runJob/getRunTaskFunction.ts index aa9f1715675..55ccf3746db 100644 --- a/packages/payload/src/queues/operations/runJobs/runJob/getRunTaskFunction.ts +++ b/packages/payload/src/queues/operations/runJobs/runJob/getRunTaskFunction.ts @@ -13,6 +13,7 @@ import type { TaskType, } from '../../../config/types/taskTypes.js' import type { + JobLog, SingleTaskStatus, WorkflowConfig, WorkflowTypes, @@ -185,7 +186,7 @@ export const getRunTaskFunction = ( await taskConfig.onSuccess() } - ;(job.log ??= []).push({ + const newLogItem: JobLog = { id: new ObjectId().toHexString(), completedAt: getCurrentDate().toISOString(), executedAt: executedAt.toISOString(), @@ -195,10 +196,12 @@ export const getRunTaskFunction = ( state: 'succeeded', taskID, taskSlug, - }) + } await updateJob({ - log: job.log, + log: { + $push: newLogItem, + } as any, }) return output diff --git a/test/database/int.spec.ts b/test/database/int.spec.ts index c4cbff43937..349d900a279 100644 --- a/test/database/int.spec.ts +++ b/test/database/int.spec.ts @@ -29,7 +29,7 @@ import { import { assert } from 'ts-essentials' import { fileURLToPath } from 'url' -import type { Global2 } from './payload-types.js' +import type { Global2, Post } from './payload-types.js' import { devUser } from '../credentials.js' import { initPayloadInt } from '../helpers/initPayloadInt.js' @@ -3019,7 +3019,7 @@ describe('database', () => { it('should allow incremental number update', async () => { const post = await payload.create({ collection: 'posts', data: { number: 1, title: 'post' } }) - const res = await payload.db.updateOne({ + const res = (await payload.db.updateOne({ data: { number: { $inc: 10, @@ -3027,11 +3027,11 @@ describe('database', () => { }, collection: 'posts', where: { id: { equals: post.id } }, - }) + })) as unknown as Post expect(res.number).toBe(11) - const res2 = await payload.db.updateOne({ + const res2 = (await payload.db.updateOne({ data: { number: { $inc: -3, @@ -3039,11 +3039,80 @@ describe('database', () => { }, collection: 'posts', where: { id: { equals: post.id } }, - }) + })) as unknown as Post expect(res2.number).toBe(8) }) + it('should allow atomic array updates using $push with single value', async () => { + const post = await payload.create({ + collection: 'posts', + data: { + arrayWithIDs: [ + { + text: 'some text', + }, + ], + title: 'post', + }, + }) + + const res = (await payload.db.updateOne({ + data: { + arrayWithIDs: { + $push: { + text: 'some text 2', + id: '689c2f4d970fc3809aecbc71', + }, + }, + }, + collection: 'posts', + id: post.id, + })) as unknown as Post + + expect(res.arrayWithIDs).toHaveLength(2) + expect(res.arrayWithIDs?.[0]?.text).toBe('some text') + expect(res.arrayWithIDs?.[1]?.text).toBe('some text 2') + }) + + it('should allow atomic array updates using $push with multiple values', async () => { + const post = await payload.create({ + collection: 'posts', + data: { + arrayWithIDs: [ + { + text: 'some text', + }, + ], + title: 'post', + }, + }) + + const res = (await payload.db.updateOne({ + data: { + arrayWithIDs: { + $push: [ + { + id: '689c2f4d970fc3809aecbc71', + text: 'some text 2', + }, + { + id: '689c2f4d970fc3809aecbc72', + text: 'some text 3', + }, + ], + }, + }, + collection: 'posts', + id: post.id, + })) as unknown as Post + + expect(res.arrayWithIDs).toHaveLength(3) + expect(res.arrayWithIDs?.[0]?.text).toBe('some text') + expect(res.arrayWithIDs?.[1]?.text).toBe('some text 2') + expect(res.arrayWithIDs?.[2]?.text).toBe('some text 3') + }) + it('should support x3 nesting blocks', async () => { const res = await payload.create({ collection: 'posts', diff --git a/test/database/postgres-logs.int.spec.ts b/test/database/postgres-logs.int.spec.ts index ba76de845f2..40aa2b00c84 100644 --- a/test/database/postgres-logs.int.spec.ts +++ b/test/database/postgres-logs.int.spec.ts @@ -5,6 +5,8 @@ import assert from 'assert' import path from 'path' import { fileURLToPath } from 'url' +import type { Post } from './payload-types.js' + import { initPayloadInt } from '../helpers/initPayloadInt.js' const filename = fileURLToPath(import.meta.url) @@ -171,4 +173,49 @@ describePostgres('database - postgres logs', () => { expect(allPosts.docs).toHaveLength(1) expect(allPosts.docs[0].id).toEqual(doc1.id) }) + + it('ensure array update using $push is done in single db call', async () => { + const post = await payload.create({ + collection: 'posts', + data: { + arrayWithIDs: [ + { + text: 'some text', + }, + ], + title: 'post', + }, + }) + const consoleCount = jest.spyOn(console, 'log').mockImplementation(() => {}) + + await payload.db.updateOne({ + data: { + arrayWithIDs: { + $push: { + text: 'some text 2', + id: '689c2f4d970fc3809aecbc71', + }, + }, + }, + collection: 'posts', + id: post.id, + returning: false, + }) + + // 2 Updates: + // 1. updatedAt for posts row. //TODO: Skip this once updatedAt PR is merged + // 2. arrayWithIDs.$push for posts row + expect(consoleCount).toHaveBeenCalledTimes(2) + consoleCount.mockRestore() + + const updatedPost = (await payload.db.findOne({ + collection: 'posts', + where: { id: { equals: post.id } }, + })) as unknown as Post + + expect(updatedPost.title).toBe('post') + expect(updatedPost.arrayWithIDs).toHaveLength(2) + expect(updatedPost.arrayWithIDs?.[0]?.text).toBe('some text') + expect(updatedPost.arrayWithIDs?.[1]?.text).toBe('some text 2') + }) }) diff --git a/test/queues/getConfig.ts b/test/queues/getConfig.ts index a5edc59120d..02e0bc1a8a5 100644 --- a/test/queues/getConfig.ts +++ b/test/queues/getConfig.ts @@ -19,6 +19,7 @@ import { UpdatePostStep2Task } from './tasks/UpdatePostStep2Task.js' import { UpdatePostTask } from './tasks/UpdatePostTask.js' import { externalWorkflow } from './workflows/externalWorkflow.js' import { failsImmediatelyWorkflow } from './workflows/failsImmediately.js' +import { fastParallelTaskWorkflow } from './workflows/fastParallelTaskWorkflow.js' import { inlineTaskTestWorkflow } from './workflows/inlineTaskTest.js' import { inlineTaskTestDelayedWorkflow } from './workflows/inlineTaskTestDelayed.js' import { longRunningWorkflow } from './workflows/longRunning.js' @@ -36,7 +37,6 @@ import { updatePostJSONWorkflow } from './workflows/updatePostJSON.js' import { workflowAndTasksRetriesUndefinedWorkflow } from './workflows/workflowAndTasksRetriesUndefined.js' import { workflowRetries2TasksRetries0Workflow } from './workflows/workflowRetries2TasksRetries0.js' import { workflowRetries2TasksRetriesUndefinedWorkflow } from './workflows/workflowRetries2TasksRetriesUndefined.js' - const dirname = path.dirname(fileURLToPath(import.meta.url)) // Needs to be a function to prevent object reference issues due to duplicative configs @@ -163,6 +163,7 @@ export const getConfig: () => Partial = () => ({ subTaskWorkflow, subTaskFailsWorkflow, longRunningWorkflow, + fastParallelTaskWorkflow, parallelTaskWorkflow, ], }, diff --git a/test/queues/int.spec.ts b/test/queues/int.spec.ts index e5b55963b3b..5ec0b34790d 100644 --- a/test/queues/int.spec.ts +++ b/test/queues/int.spec.ts @@ -1424,6 +1424,8 @@ describe('Queues', () => { id: job.id, }) + // error can be defined while hasError is true, as hasError: true is only set if the job cannot retry anymore. + expect(jobAfterRun.error).toBeNull() expect(jobAfterRun.hasError).toBe(false) expect(jobAfterRun.log?.length).toBe(amount) @@ -1444,6 +1446,30 @@ describe('Queues', () => { } }) + it('can reliably run workflows with parallel tasks that complete immediately', async () => { + const amount = 2 + payload.config.jobs.deleteJobOnComplete = false + + const job = await payload.jobs.queue({ + workflow: 'fastParallelTask', + input: { + amount, + }, + }) + + await payload.jobs.run({ silent: false }) + + const jobAfterRun = await payload.findByID({ + collection: 'payload-jobs', + id: job.id, + }) + + // error can be defined while hasError is true, as hasError: true is only set if the job cannot retry anymore. + expect(jobAfterRun.error).toBeNull() + expect(jobAfterRun.hasError).toBe(false) + expect(jobAfterRun.log?.length).toBe(amount) + }) + it('can create and autorun jobs', async () => { await payload.jobs.queue({ workflow: 'inlineTaskTest', diff --git a/test/queues/payload-types.ts b/test/queues/payload-types.ts index 580bf0b649b..a949afdee00 100644 --- a/test/queues/payload-types.ts +++ b/test/queues/payload-types.ts @@ -131,6 +131,7 @@ export interface Config { subTask: WorkflowSubTask; subTaskFails: WorkflowSubTaskFails; longRunning: WorkflowLongRunning; + fastParallelTask: WorkflowFastParallelTask; parallelTask: WorkflowParallelTask; }; }; @@ -331,6 +332,7 @@ export interface PayloadJob { | 'subTask' | 'subTaskFails' | 'longRunning' + | 'fastParallelTask' | 'parallelTask' ) | null; @@ -812,6 +814,15 @@ export interface WorkflowSubTaskFails { export interface WorkflowLongRunning { input?: unknown; } +/** + * This interface was referenced by `Config`'s JSON-Schema + * via the `definition` "WorkflowFastParallelTask". + */ +export interface WorkflowFastParallelTask { + input: { + amount: number; + }; +} /** * This interface was referenced by `Config`'s JSON-Schema * via the `definition` "WorkflowParallelTask". diff --git a/test/queues/postgres-logs.int.spec.ts b/test/queues/postgres-logs.int.spec.ts index 7a10cb84aa3..2a588f770a0 100644 --- a/test/queues/postgres-logs.int.spec.ts +++ b/test/queues/postgres-logs.int.spec.ts @@ -52,7 +52,7 @@ describePostgres('queues - postgres logs', () => { jobStatus: { '1': { status: 'success' } }, remainingJobsFromQueried: 0, }) - expect(consoleCount).toHaveBeenCalledTimes(17) // Should be 17 sql calls if the optimizations are used. If not, this would be 22 calls + expect(consoleCount).toHaveBeenCalledTimes(15) // Should be 15 sql calls if the optimizations are used. If not, this would be 22 calls consoleCount.mockRestore() }) }) diff --git a/test/queues/workflows/fastParallelTaskWorkflow.ts b/test/queues/workflows/fastParallelTaskWorkflow.ts new file mode 100644 index 00000000000..c7388c2ced6 --- /dev/null +++ b/test/queues/workflows/fastParallelTaskWorkflow.ts @@ -0,0 +1,34 @@ +import type { WorkflowConfig } from 'payload' + +export const fastParallelTaskWorkflow: WorkflowConfig<'fastParallelTask'> = { + slug: 'fastParallelTask', + inputSchema: [ + { + name: 'amount', + type: 'number', + required: true, + }, + ], + handler: async ({ job, inlineTask }) => { + const taskFunctions = [] + for (let i = 0; i < job.input.amount; i++) { + const idx = i + 1 + taskFunctions.push(async () => { + return await inlineTask(`fast parallel task ${idx}`, { + input: { + test: idx, + }, + task: () => { + return { + output: { + taskID: idx.toString(), + }, + } + }, + }) + }) + } + + await Promise.all(taskFunctions.map((f) => f())) + }, +}