Skip to content

Conversation

AlessioGr
Copy link
Member

@AlessioGr AlessioGr commented Aug 13, 2025

This PR adds atomic $push support for array fields. It makes it possible to safely append new items to arrays, which is especially useful when running tasks in parallel (like job queues) where multiple processes might update the same record at the same time. By handling pushes atomically, we avoid race conditions and keep data consistent - especially on postgres, where the current implementation would nuke the entire array table before re-inserting every single array item.

The feature works for both localized and unlocalized arrays, and supports pushing either single or multiple items at once.

This PR is a requirement for reliably running parallel tasks in the job queue - see #13452.

Alongside documenting $push, this PR also adds documentation for $inc.

Changes to updatedAt behavior

#13335 allows us to override the updatedAt property instead of the db always setting it to the current date.

However, we are not able to skip updating the updatedAt property completely. This means, usage of $push results in 2 postgres db calls:

  1. set updatedAt in main row
  2. append array row in arrays table

This PR changes the behavior to only automatically set updatedAt if it's undefined. If you explicitly set it to null, this now allows you to skip the db adapter automatically setting updatedAt.

Note

I would like to note that this does differ from the behavior of other fields being set to null.

For other fields, setting them to null will cause the database to think we want to set it to null. In this PR, we created an exception for the updatedAt field, where setting it to null will simply skip it from the db update, neither setting it to the current date (this would be the behavior if it was undefined) nor setting it to null in the db.

The alternative to this would be adding a new property like disableTimestampUpdates, but that would be more code to maintain and less simple.

=> This allows us to use $push in just one single db call

Usage Examples

Pushing a single item to an array

const post = (await payload.db.updateOne({
  data: {
    array: {
      $push: {
        text: 'some text 2',
        id: new mongoose.Types.ObjectId().toHexString(),
      },
    },
  },
  collection: 'posts',
  id: post.id,
}))

Pushing a single item to a localized array

const post = (await payload.db.updateOne({
  data: {
    arrayLocalized: {
      $push: {
        en: {
          text: 'some text 2',
          id: new mongoose.Types.ObjectId().toHexString(),
        },
        es: {
          text: 'some text 2 es',
          id: new mongoose.Types.ObjectId().toHexString(),
        },
      },
    },
  },
  collection: 'posts',
  id: post.id,
}))

Pushing multiple items to an array

const post = (await payload.db.updateOne({
  data: {
    array: {
      $push: [
        {
          text: 'some text 2',
          id: new mongoose.Types.ObjectId().toHexString(),
        },
        {
          text: 'some text 3',
          id: new mongoose.Types.ObjectId().toHexString(),
        },
      ],
    },
  },
  collection: 'posts',
  id: post.id,
}))

Pushing multiple items to a localized array

const post = (await payload.db.updateOne({
  data: {
    arrayLocalized: {
      $push: {
        en: {
          text: 'some text 2',
          id: new mongoose.Types.ObjectId().toHexString(),
        },
        es: [
          {
            text: 'some text 2 es',
            id: new mongoose.Types.ObjectId().toHexString(),
          },
          {
            text: 'some text 3 es',
            id: new mongoose.Types.ObjectId().toHexString(),
          },
        ],
      },
    },
  },
  collection: 'posts',
  id: post.id,
}))

Copy link
Contributor

github-actions bot commented Aug 13, 2025

📦 esbuild Bundle Analysis for payload

This analysis was generated by esbuild-bundle-analyzer. 🤖
This PR introduced no changes to the esbuild bundle! 🙌

@AlessioGr AlessioGr marked this pull request as ready for review August 21, 2025 20:38
@AlessioGr AlessioGr requested a review from DanRibbens August 21, 2025 20:38
@AlessioGr AlessioGr enabled auto-merge (squash) August 21, 2025 22:33
@AlessioGr AlessioGr changed the title feat(db-*): support atomic array push db updates feat(db-*): support atomic array $push db updates Aug 22, 2025
@AlessioGr AlessioGr disabled auto-merge August 22, 2025 19:52
@AlessioGr AlessioGr enabled auto-merge (squash) August 22, 2025 19:52
@jmikrut
Copy link
Member

jmikrut commented Aug 22, 2025

we should chat about this one with @DanRibbens

@AlessioGr AlessioGr disabled auto-merge August 22, 2025 20:05
@AlessioGr AlessioGr enabled auto-merge (squash) August 22, 2025 20:06
@AlessioGr AlessioGr requested a review from DanRibbens August 26, 2025 18:28
@AlessioGr AlessioGr merged commit 5ded64e into main Aug 27, 2025
83 checks passed
@AlessioGr AlessioGr deleted the feat/$push branch August 27, 2025 18:11
AlessioGr added a commit that referenced this pull request Aug 27, 2025
Currently, attempting to run tasks in parallel will result in DB errors.

## Solution

The problem was caused due to inefficient db update calls. After each
task completes, we need to update the log array in the payload-jobs
collection. On postgres, that's a different table.

Currently, the update works the following way:
1. Nuke the table
2. Re-insert every single row, including the new one

This will throw db errors if multiple processes start doing that.
Additionally, due to conflicts, new log rows may be lost.

This PR makes use of the the [new db $push operation
](#13453) we recently added to
atomically push a new log row to the database in a single round-trip.
This not only reduces the amount of db round trips (=> faster job queue
system) but allows multiple tasks to perform this db operation in
parallel, without conflicts.

## Problem

**Example:**

```ts
export const fastParallelTaskWorkflow: WorkflowConfig<'fastParallelTask'> = {
  slug: 'fastParallelTask',
  handler: async ({nlineTask }) => {
    const taskFunctions = []
    for (let i = 0; i < 20; i++) {
      const idx = i + 1
      taskFunctions.push(async () => {
        return await inlineTask(`parallel task ${idx}`, {
          input: {
            test: idx,
          },
          task: () => {
            return {
              output: {
                taskID: idx.toString(),
              },
            }
          },
        })
      })
    }

    await Promise.all(taskFunctions.map((f) => f()))
  },
}
```

On SQLite, this would throw the following error:

```bash
Caught error Error: UNIQUE constraint failed: payload_jobs_log.id
    at Object.next (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/[email protected]/node_modules/libsql/index.js:335:20)
    at Statement.all (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/[email protected]/node_modules/libsql/index.js:360:16)
    at executeStmt (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/@libsql[email protected][email protected][email protected]/node_modules/@libsql/client/lib-cjs/sqlite3.js:285:34)
    at Sqlite3Client.execute (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/@libsql[email protected][email protected][email protected]/node_modules/@libsql/client/lib-cjs/sqlite3.js:101:16)
    at /Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/[email protected]_@[email protected][email protected][email protected]__@opentelemetr_asjmtflojkxlnxrshoh4fj5f6u/node_modules/src/libsql/session.ts:288:58
    at LibSQLPreparedQuery.queryWithCache (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/[email protected]_@[email protected][email protected][email protected]__@opentelemetr_asjmtflojkxlnxrshoh4fj5f6u/node_modules/src/sqlite-core/session.ts:79:18)
    at LibSQLPreparedQuery.values (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/[email protected]_@[email protected][email protected][email protected]__@opentelemetr_asjmtflojkxlnxrshoh4fj5f6u/node_modules/src/libsql/session.ts:286:21)
    at LibSQLPreparedQuery.all (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/[email protected]_@[email protected][email protected][email protected]__@opentelemetr_asjmtflojkxlnxrshoh4fj5f6u/node_modules/src/libsql/session.ts:214:27)
    at QueryPromise.all (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/[email protected]_@[email protected][email protected][email protected]__@opentelemetr_asjmtflojkxlnxrshoh4fj5f6u/node_modules/src/sqlite-core/query-builders/insert.ts:402:26)
    at QueryPromise.execute (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/[email protected]_@[email protected][email protected][email protected]__@opentelemetr_asjmtflojkxlnxrshoh4fj5f6u/node_modules/src/sqlite-core/query-builders/insert.ts:414:40)
    at QueryPromise.then (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/[email protected]_@[email protected][email protected][email protected]__@opentelemetr_asjmtflojkxlnxrshoh4fj5f6u/node_modules/src/query-promise.ts:31:15) {
  rawCode: 1555,
  code: 'SQLITE_CONSTRAINT_PRIMARYKEY',
  libsqlError: true
}
```

---
- To see the specific tasks where the Asana app for GitHub is being
used, see below:
  - https://app.asana.com/0/0/1211001438499053
Copy link
Contributor

🚀 This is included in version v3.54.0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants