Skip to content

Commit 256ee4e

Browse files
committed
Add test client.
1 parent a308e45 commit 256ee4e

File tree

10 files changed

+292
-0
lines changed

10 files changed

+292
-0
lines changed

pnpm-lock.yaml

Lines changed: 16 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pnpm-workspace.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,6 @@ packages:
22
- 'packages/*'
33
- 'libs/*'
44
- 'service'
5+
- 'test-client'
56
# exclude packages that are inside test directories
67
- '!**/test/**'

test-client/README.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# Test Client
2+
3+
This is a minimal client demonstrating direct usage of the sync api.
4+
5+
For a full implementation, see our client SDKs.
6+
7+
## Usage
8+
9+
```sh
10+
# In project root
11+
pnpm install
12+
pnpm build:packages
13+
# Here
14+
pnpm build
15+
node dist/bin.js fetch-operations --token <token> --endpoint http://localhost:8080
16+
```
17+
18+
The script will normalize the data in each bucket to a single CLEAR operation, followed by the latest PUT operation for each row.

test-client/package.json

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
{
2+
"name": "test-client",
3+
"repository": "https://github.com/powersync-ja/powersync-service",
4+
"private": true,
5+
"version": "0.1.0",
6+
"main": "dist/index.js",
7+
"bin": "dist/bin.js",
8+
"license": "Apache-2.0",
9+
"type": "module",
10+
"scripts": {
11+
"fetch-operations": "tsc -b && node dist/bin.js fetch-operations",
12+
"build": "tsc -b",
13+
"clean": "rm -rf ./dist && tsc -b --clean"
14+
},
15+
"dependencies": {
16+
"@powersync/service-core": "workspace:*",
17+
"commander": "^12.0.0"
18+
},
19+
"devDependencies": {
20+
"typescript": "^5.2.2",
21+
"@types/node": "18.11.11"
22+
}
23+
}

test-client/src/bin.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
import { program } from 'commander';
2+
import { getCheckpointData } from './client.js';
3+
4+
program
5+
.command('fetch-operations')
6+
.option('-t, --token [token]')
7+
.option('-e, --endpoint [endpoint]')
8+
.option('--raw')
9+
.action(async (options) => {
10+
const data = await getCheckpointData({ endpoint: options.endpoint, token: options.token, raw: options.raw });
11+
console.log(JSON.stringify(data, null, 2));
12+
});
13+
14+
await program.parseAsync();

test-client/src/client.ts

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
import { ndjsonStream } from './ndjson.js';
2+
import type * as types from '@powersync/service-core';
3+
import { isCheckpoint, isCheckpointComplete, isStreamingSyncData, normalizeData } from './util.js';
4+
5+
export interface GetCheckpointOptions {
6+
endpoint: string;
7+
token: string;
8+
raw?: boolean;
9+
}
10+
11+
export async function getCheckpointData(options: GetCheckpointOptions) {
12+
const response = await fetch(`${options.endpoint}/sync/stream`, {
13+
method: 'POST',
14+
headers: {
15+
'Content-Type': 'application/json',
16+
Authorization: `Token ${options.token}`
17+
},
18+
body: JSON.stringify({
19+
raw_data: true,
20+
include_checksum: true,
21+
parameters: {}
22+
} satisfies types.StreamingSyncRequest)
23+
});
24+
if (!response.ok) {
25+
throw new Error(response.statusText + '\n' + (await response.text()));
26+
}
27+
28+
let data: types.StreamingSyncData[] = [];
29+
let checkpoint: types.StreamingSyncCheckpoint;
30+
31+
for await (let chunk of ndjsonStream<types.StreamingSyncLine>(response.body!)) {
32+
if (isStreamingSyncData(chunk)) {
33+
data.push(chunk);
34+
} else if (isCheckpoint(chunk)) {
35+
checkpoint = chunk;
36+
} else if (isCheckpointComplete(chunk)) {
37+
break;
38+
}
39+
}
40+
41+
return normalizeData(checkpoint!, data, { raw: options.raw ?? false });
42+
}

test-client/src/index.ts

Whitespace-only changes.

test-client/src/ndjson.ts

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// import { ReadableStream } from 'node:stream/web';
2+
3+
export function ndjsonStream<T>(response: ReadableStream<Uint8Array>): ReadableStream<T> & AsyncIterable<T> {
4+
// For cancellation
5+
var is_reader: any,
6+
cancellationRequest = false;
7+
return new ReadableStream<T>({
8+
start: function (controller) {
9+
var reader = response.getReader();
10+
is_reader = reader;
11+
var decoder = new TextDecoder();
12+
var data_buf = '';
13+
14+
reader.read().then(function processResult(result): void | Promise<any> {
15+
if (result.done) {
16+
if (cancellationRequest) {
17+
// Immediately exit
18+
return;
19+
}
20+
21+
data_buf = data_buf.trim();
22+
if (data_buf.length !== 0) {
23+
try {
24+
var data_l = JSON.parse(data_buf);
25+
controller.enqueue(data_l);
26+
} catch (e) {
27+
controller.error(e);
28+
return;
29+
}
30+
}
31+
controller.close();
32+
return;
33+
}
34+
35+
var data = decoder.decode(result.value, { stream: true });
36+
data_buf += data;
37+
var lines = data_buf.split('\n');
38+
for (var i = 0; i < lines.length - 1; ++i) {
39+
var l = lines[i].trim();
40+
if (l.length > 0) {
41+
try {
42+
var data_line = JSON.parse(l);
43+
controller.enqueue(data_line);
44+
} catch (e) {
45+
controller.error(e);
46+
cancellationRequest = true;
47+
reader.cancel();
48+
return;
49+
}
50+
}
51+
}
52+
data_buf = lines[lines.length - 1];
53+
54+
return reader.read().then(processResult);
55+
});
56+
},
57+
cancel: function (reason) {
58+
cancellationRequest = true;
59+
is_reader.cancel();
60+
}
61+
}) as any;
62+
}

test-client/src/util.ts

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
import type * as types from '@powersync/service-core';
2+
3+
export type BucketData = Record<string, types.OplogEntry[]>;
4+
5+
export function normalizeData(
6+
checkpoint: types.StreamingSyncCheckpoint,
7+
chunks: types.StreamingSyncData[],
8+
options: { raw: boolean }
9+
) {
10+
const lastOpId = BigInt(checkpoint.checkpoint.last_op_id);
11+
let buckets: BucketData = {};
12+
for (let {
13+
data: { bucket, data }
14+
} of chunks) {
15+
buckets[bucket] ??= [];
16+
for (let entry of data) {
17+
if (BigInt(entry.op_id) > lastOpId) {
18+
continue;
19+
}
20+
buckets[bucket].push(entry);
21+
}
22+
}
23+
24+
if (options.raw) {
25+
return buckets;
26+
}
27+
28+
return Object.fromEntries(
29+
Object.entries(buckets).map(([bucket, entries]) => {
30+
return [bucket, reduceBucket(entries)];
31+
})
32+
);
33+
}
34+
35+
export function isStreamingSyncData(line: types.StreamingSyncLine): line is types.StreamingSyncData {
36+
return (line as types.StreamingSyncData).data != null;
37+
}
38+
39+
export function isCheckpointComplete(line: types.StreamingSyncLine): line is types.StreamingSyncCheckpointComplete {
40+
return (line as types.StreamingSyncCheckpointComplete).checkpoint_complete != null;
41+
}
42+
export function isCheckpoint(line: types.StreamingSyncLine): line is types.StreamingSyncCheckpoint {
43+
return (line as types.StreamingSyncCheckpoint).checkpoint != null;
44+
}
45+
46+
/**
47+
* Reduce a bucket to the final state as stored on the client.
48+
*
49+
* This keeps the final state for each row as a PUT operation.
50+
*
51+
* All other operations are replaced with a single CLEAR operation,
52+
* summing their checksums, and using a 0 as an op_id.
53+
*
54+
* This is the function $r(B)$, as described in /docs/bucket-properties.md.
55+
*/
56+
export function reduceBucket(operations: types.OplogEntry[]) {
57+
let rowState = new Map<string, types.OplogEntry>();
58+
let otherChecksum = 0;
59+
60+
for (let op of operations) {
61+
const key = rowKey(op);
62+
if (op.op == 'PUT') {
63+
const existing = rowState.get(key);
64+
if (existing) {
65+
otherChecksum = addChecksums(otherChecksum, existing.checksum as number);
66+
}
67+
rowState.set(key, op);
68+
} else if (op.op == 'REMOVE') {
69+
const existing = rowState.get(key);
70+
if (existing) {
71+
otherChecksum = addChecksums(otherChecksum, existing.checksum as number);
72+
}
73+
rowState.delete(key);
74+
otherChecksum = addChecksums(otherChecksum, op.checksum as number);
75+
} else if (op.op == 'CLEAR') {
76+
rowState.clear();
77+
otherChecksum = op.checksum as number;
78+
} else if (op.op == 'MOVE') {
79+
otherChecksum = addChecksums(otherChecksum, op.checksum as number);
80+
} else {
81+
throw new Error(`Unknown operation ${op.op}`);
82+
}
83+
}
84+
85+
const puts = [...rowState.values()].sort((a, b) => {
86+
return Number(BigInt(a.op_id) - BigInt(b.op_id));
87+
});
88+
89+
let finalState: types.OplogEntry[] = [
90+
// Special operation to indiciate the checksum remainder
91+
{ op_id: '0', op: 'CLEAR', checksum: otherChecksum },
92+
...puts
93+
];
94+
95+
return finalState;
96+
}
97+
98+
function rowKey(entry: types.OplogEntry) {
99+
return `${entry.object_type}/${entry.object_id}/${entry.subkey}`;
100+
}
101+
102+
export function addChecksums(a: number, b: number) {
103+
return (a + b) & 0xffffffff;
104+
}

test-client/tsconfig.json

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
{
2+
"extends": "../tsconfig.base.json",
3+
"compilerOptions": {
4+
"lib": ["ES2023", "DOM"],
5+
"rootDir": "src",
6+
"outDir": "dist",
7+
"esModuleInterop": true,
8+
"sourceMap": true
9+
},
10+
"include": ["src"],
11+
"references": []
12+
}

0 commit comments

Comments
 (0)