Skip to content

Commit c910c66

Browse files
authored
Add getCrudTransactions() returning an async generator (#693)
1 parent 6a75f08 commit c910c66

File tree

6 files changed

+134
-28
lines changed

6 files changed

+134
-28
lines changed

.changeset/nice-dragons-smile.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
---
2+
'@powersync/common': minor
3+
'@powersync/node': minor
4+
'@powersync/react-native': minor
5+
'@powersync/web': minor
6+
---
7+
8+
Add `getCrudTransactions()`, returning an async iterator of transactions. This can be used to batch transactions when uploading CRUD data.

.github/workflows/test-simulators.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,11 @@ jobs:
138138
with:
139139
persist-credentials: false
140140

141+
- name: Set up XCode
142+
uses: maxim-lobanov/setup-xcode@v1
143+
with:
144+
xcode-version: latest-stable
145+
141146
- name: CocoaPods Cache
142147
uses: actions/cache@v3
143148
id: cocoapods-cache

packages/common/src/client/AbstractPowerSyncDatabase.ts

Lines changed: 72 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import { UploadQueueStats } from '../db/crud/UploadQueueStatus.js';
1515
import { Schema } from '../db/schema/Schema.js';
1616
import { BaseObserver } from '../utils/BaseObserver.js';
1717
import { ControlledExecutor } from '../utils/ControlledExecutor.js';
18-
import { throttleTrailing } from '../utils/async.js';
18+
import { symbolAsyncIterator, throttleTrailing } from '../utils/async.js';
1919
import { ConnectionManager } from './ConnectionManager.js';
2020
import { CustomQuery } from './CustomQuery.js';
2121
import { ArrayQueryDefinition, Query } from './Query.js';
@@ -632,35 +632,80 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
632632
* @returns A transaction of CRUD operations to upload, or null if there are none
633633
*/
634634
async getNextCrudTransaction(): Promise<CrudTransaction | null> {
635-
return await this.readTransaction(async (tx) => {
636-
const first = await tx.getOptional<CrudEntryJSON>(
637-
`SELECT id, tx_id, data FROM ${PSInternalTable.CRUD} ORDER BY id ASC LIMIT 1`
638-
);
635+
const iterator = this.getCrudTransactions()[symbolAsyncIterator]();
636+
return (await iterator.next()).value;
637+
}
639638

640-
if (!first) {
641-
return null;
642-
}
643-
const txId = first.tx_id;
639+
/**
640+
* Returns an async iterator of completed transactions with local writes against the database.
641+
*
642+
* This is typically used from the {@link PowerSyncBackendConnector.uploadData} callback. Each entry emitted by the
643+
* returned iterator is a full transaction containing all local writes made while that transaction was active.
644+
*
645+
* Unlike {@link getNextCrudTransaction}, which always returns the oldest transaction that hasn't been
646+
* {@link CrudTransaction.complete}d yet, this iterator can be used to receive multiple transactions. Calling
647+
* {@link CrudTransaction.complete} will mark that and all prior transactions emitted by the iterator as completed.
648+
*
649+
* This can be used to upload multiple transactions in a single batch, e.g with:
650+
*
651+
* ```JavaScript
652+
* let lastTransaction = null;
653+
* let batch = [];
654+
*
655+
* for await (const transaction of database.getCrudTransactions()) {
656+
* batch.push(...transaction.crud);
657+
* lastTransaction = transaction;
658+
*
659+
* if (batch.length > 10) {
660+
* break;
661+
* }
662+
* }
663+
* ```
664+
*
665+
* If there is no local data to upload, the async iterator complete without emitting any items.
666+
*
667+
* Note that iterating over async iterables requires a [polyfill](https://github.com/powersync-ja/powersync-js/tree/main/packages/react-native#babel-plugins-watched-queries)
668+
* for React Native.
669+
*/
670+
getCrudTransactions(): AsyncIterable<CrudTransaction, null> {
671+
return {
672+
[symbolAsyncIterator]: () => {
673+
let lastCrudItemId = -1;
674+
const sql = `
675+
WITH RECURSIVE crud_entries AS (
676+
SELECT id, tx_id, data FROM ps_crud WHERE id = (SELECT min(id) FROM ps_crud WHERE id > ?)
677+
UNION ALL
678+
SELECT ps_crud.id, ps_crud.tx_id, ps_crud.data FROM ps_crud
679+
INNER JOIN crud_entries ON crud_entries.id + 1 = rowid
680+
WHERE crud_entries.tx_id = ps_crud.tx_id
681+
)
682+
SELECT * FROM crud_entries;
683+
`;
684+
685+
return {
686+
next: async () => {
687+
const nextTransaction = await this.database.getAll<CrudEntryJSON>(sql, [lastCrudItemId]);
688+
if (nextTransaction.length == 0) {
689+
return { done: true, value: null };
690+
}
644691

645-
let all: CrudEntry[];
646-
if (!txId) {
647-
all = [CrudEntry.fromRow(first)];
648-
} else {
649-
const result = await tx.getAll<CrudEntryJSON>(
650-
`SELECT id, tx_id, data FROM ${PSInternalTable.CRUD} WHERE tx_id = ? ORDER BY id ASC`,
651-
[txId]
652-
);
653-
all = result.map((row) => CrudEntry.fromRow(row));
692+
const items = nextTransaction.map((row) => CrudEntry.fromRow(row));
693+
const last = items[items.length - 1];
694+
const txId = last.transactionId;
695+
lastCrudItemId = last.clientId;
696+
697+
return {
698+
done: false,
699+
value: new CrudTransaction(
700+
items,
701+
async (writeCheckpoint?: string) => this.handleCrudCheckpoint(last.clientId, writeCheckpoint),
702+
txId
703+
)
704+
};
705+
}
706+
};
654707
}
655-
656-
const last = all[all.length - 1];
657-
658-
return new CrudTransaction(
659-
all,
660-
async (writeCheckpoint?: string) => this.handleCrudCheckpoint(last.clientId, writeCheckpoint),
661-
txId
662-
);
663-
});
708+
};
664709
}
665710

666711
/**

packages/common/src/utils/async.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,14 @@
1+
/**
2+
* A ponyfill for `Symbol.asyncIterator` that is compatible with the
3+
* [recommended polyfill](https://github.com/Azure/azure-sdk-for-js/blob/%40azure/core-asynciterator-polyfill_1.0.2/sdk/core/core-asynciterator-polyfill/src/index.ts#L4-L6)
4+
* we recommend for React Native.
5+
*
6+
* As long as we use this symbol (instead of `for await` and `async *`) in this package, we can be compatible with async
7+
* iterators without requiring them.
8+
*/
9+
export const symbolAsyncIterator: typeof Symbol.asyncIterator =
10+
Symbol.asyncIterator ?? Symbol.for('Symbol.asyncIterator');
11+
112
/**
213
* Throttle a function to be called at most once every "wait" milliseconds,
314
* on the trailing edge.

packages/node/tests/PowerSyncDatabase.test.ts

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { Worker } from 'node:worker_threads';
33

44
import { vi, expect, test } from 'vitest';
55
import { AppSchema, databaseTest, tempDirectoryTest } from './utils';
6-
import { PowerSyncDatabase } from '../lib';
6+
import { CrudEntry, CrudTransaction, PowerSyncDatabase } from '../lib';
77
import { WorkerOpener } from '../lib/db/options';
88

99
test('validates options', async () => {
@@ -131,3 +131,39 @@ databaseTest.skip('can watch queries', async ({ database }) => {
131131
await database.execute('INSERT INTO todos (id, content) VALUES (uuid(), ?)', ['fourth']);
132132
expect((await query.next()).value.rows).toHaveLength(4);
133133
});
134+
135+
databaseTest('getCrudTransactions', async ({ database }) => {
136+
async function createTransaction(amount: number) {
137+
await database.writeTransaction(async (tx) => {
138+
for (let i = 0; i < amount; i++) {
139+
await tx.execute('insert into todos (id) values (uuid())');
140+
}
141+
});
142+
}
143+
144+
let iterator = database.getCrudTransactions()[Symbol.asyncIterator]();
145+
expect(await iterator.next()).toMatchObject({ done: true });
146+
147+
await createTransaction(5);
148+
await createTransaction(10);
149+
await createTransaction(15);
150+
151+
let lastTransaction: CrudTransaction | null = null;
152+
let batch: CrudEntry[] = [];
153+
154+
// Take the first two transactions via the async generator.
155+
for await (const transaction of database.getCrudTransactions()) {
156+
batch.push(...transaction.crud);
157+
lastTransaction = transaction;
158+
159+
if (batch.length > 10) {
160+
break;
161+
}
162+
}
163+
164+
expect(batch).toHaveLength(15);
165+
await lastTransaction!.complete();
166+
167+
const remainingTransaction = await database.getNextCrudTransaction();
168+
expect(remainingTransaction?.crud).toHaveLength(15);
169+
});

packages/react-native/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ npx expo install @journeyapps/react-native-quick-sqlite
3737
Watched queries can be used with either a callback response or Async Iterator response.
3838

3939
Watched queries using the Async Iterator response format require support for Async Iterators.
40+
`PowerSyncDatabase.getCrudTransactions()` also returns an Async Iterator and requires this workaround.
4041

4142
Expo apps currently require polyfill and Babel plugins in order to use this functionality.
4243

0 commit comments

Comments
 (0)