Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/tricky-phones-tell.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/service-core': patch
---

Log stats on sync lock when reaching concurrency limit
24 changes: 22 additions & 2 deletions packages/service-core/src/sync/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ async function* streamResponseInner(
raw_data,
binary_data,
signal,
tracker
tracker,
user_id: syncParams.user_id
});

await new Promise((resolve) => setTimeout(resolve, 10));
Expand All @@ -213,6 +214,7 @@ interface BucketDataRequest {
binary_data: boolean | undefined;
tracker: RequestTracker;
signal: AbortSignal;
user_id?: string;
}

async function* bucketDataInBatches(request: BucketDataRequest) {
Expand Down Expand Up @@ -261,8 +263,19 @@ async function* bucketDataBatch(request: BucketDataRequest): AsyncGenerator<Buck
const checkpointOp = BigInt(checkpoint);
let checkpointInvalidated = false;

const [_, release] = await syncSemaphore.acquire();
if (syncSemaphore.isLocked()) {
logger.info('Sync concurrency limit reached, waiting for lock', { user_id: request.user_id });
}
const [value, release] = await syncSemaphore.acquire();
try {
if (value <= 3) {
// This can be noisy, so we only log when we get close to the
// concurrency limit.
logger.info(`Got sync lock. Slots available: ${value - 1}`, {
user_id: request.user_id,
sync_data_slots: value - 1
});
}
// Optimization: Only fetch buckets for which the checksums have changed since the last checkpoint
// For the first batch, this will be all buckets.
const filteredBuckets = new Map(bucketsToFetch.map((bucket) => [bucket, dataBuckets.get(bucket)!]));
Expand Down Expand Up @@ -330,6 +343,13 @@ async function* bucketDataBatch(request: BucketDataRequest): AsyncGenerator<Buck
}
}
} finally {
if (value <= 3) {
// This can be noisy, so we only log when we get close to the
// concurrency limit.
logger.info(`Releasing sync lock`, {
user_id: request.user_id
});
}
release();
}
}
Expand Down
Loading