Skip to content

Commit 2bae929

Browse files
fix: Cursor set race condition
When calling cursors.set, before the ::$cursors channel is attached, the set calls fail silently and positions are not emitted. This PR fixes that. Note: - After many hours of painfully trying to get the CLI tests to pass, I discovered the issue was not with the tests or the CLI, but the Spaces SDK! - It's a little disappointing that setting cursor positions is both a) not using async/await, b) has no means to fail given it just sticks the position onto a queue. Whilst I appreciate cursor positions are indeed lower QoS, obvious failures should still propagate back to the caller. - Seeing that none of the tests are e2e was a bit of a surprise! I suspect stuff like this would have been more obvious with e2e tests. Some high level e2e tests should be added at some point.
1 parent 23f85ac commit 2bae929

File tree

4 files changed

+204
-10
lines changed

4 files changed

+204
-10
lines changed

src/CursorBatching.ts

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ export type OutgoingBuffer = { cursor: Pick<CursorUpdate, 'position' | 'data'>;
88

99
export default class CursorBatching {
1010
outgoingBuffer: OutgoingBuffer[] = [];
11+
pendingBuffer: OutgoingBuffer[] = [];
1112

1213
batchTime: number;
1314

@@ -28,28 +29,39 @@ export default class CursorBatching {
2829
}
2930

3031
pushCursorPosition(channel: RealtimeChannel, cursor: Pick<CursorUpdate, 'position' | 'data'>) {
31-
// Ignore the cursor update if there is no one listening
32-
if (!this.shouldSend) return;
33-
3432
const timestamp = new Date().getTime();
3533

3634
let offset: number;
3735
// First update in the buffer is always 0
38-
if (this.outgoingBuffer.length === 0) {
36+
if (this.outgoingBuffer.length === 0 && this.pendingBuffer.length === 0) {
3937
offset = 0;
4038
this.bufferStartTimestamp = timestamp;
4139
} else {
4240
// Add the offset compared to the first update in the buffer
4341
offset = timestamp - this.bufferStartTimestamp;
4442
}
4543

44+
const bufferItem = { cursor, offset };
45+
46+
if (!this.shouldSend) {
47+
// Queue cursor positions when channel is not ready (no one listening yet)
48+
this.pushToPendingBuffer(bufferItem);
49+
return;
50+
}
51+
4652
this.hasMovement = true;
47-
this.pushToBuffer({ cursor, offset });
53+
this.pushToBuffer(bufferItem);
4854
this.publishFromBuffer(channel, CURSOR_UPDATE);
4955
}
5056

5157
setShouldSend(shouldSend: boolean) {
58+
const wasSending = this.shouldSend;
5259
this.shouldSend = shouldSend;
60+
61+
// If we just became ready to send and have pending cursor positions, process them
62+
if (!wasSending && this.shouldSend && this.pendingBuffer.length > 0) {
63+
this.processPendingBuffer();
64+
}
5365
}
5466

5567
setBatchTime(batchTime: number) {
@@ -60,6 +72,35 @@ export default class CursorBatching {
6072
this.outgoingBuffer.push(value);
6173
}
6274

75+
private pushToPendingBuffer(value: OutgoingBuffer) {
76+
this.pendingBuffer.push(value);
77+
}
78+
79+
private processPendingBuffer() {
80+
// Move all pending cursor positions to outgoing buffer
81+
for (const item of this.pendingBuffer) {
82+
this.pushToBuffer(item);
83+
}
84+
85+
// Clear pending buffer
86+
this.pendingBuffer = [];
87+
88+
// Start publishing if we have cursor movements
89+
if (this.outgoingBuffer.length > 0) {
90+
this.hasMovement = true;
91+
// Note: We need the channel to publish, but since setShouldSend doesn't have it,
92+
// we'll need to trigger this from the caller that has access to the channel
93+
}
94+
}
95+
96+
// Method to manually trigger publishing when pending items are processed
97+
triggerPublishFromPending(channel: RealtimeChannel) {
98+
if (this.outgoingBuffer.length > 0) {
99+
this.hasMovement = true;
100+
this.publishFromBuffer(channel, CURSOR_UPDATE);
101+
}
102+
}
103+
63104
private async publishFromBuffer(channel: RealtimeChannel, eventName: string) {
64105
if (!this.isRunning) {
65106
this.isRunning = true;

src/CursorQueueing.test.ts

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
import { it, describe, expect, vi, beforeEach } from 'vitest';
2+
import { Realtime, RealtimeClient, RealtimeChannel } from 'ably';
3+
import Space from './Space.js';
4+
import CursorBatching from './CursorBatching.js';
5+
import { CURSOR_UPDATE } from './CursorConstants.js';
6+
7+
interface CursorQueueingTestContext {
8+
client: RealtimeClient;
9+
space: Space;
10+
channel: RealtimeChannel;
11+
batching: CursorBatching;
12+
}
13+
14+
vi.mock('ably');
15+
16+
describe('Cursor Queuing Bug Fix', () => {
17+
beforeEach<CursorQueueingTestContext>((context) => {
18+
const client = new Realtime({});
19+
// Mock the connection object that Space expects
20+
(client as any).connection = { id: 'test-connection-id' };
21+
22+
context.client = client;
23+
context.space = new Space('test', client);
24+
25+
// Set up cursor channel by subscribing
26+
context.space.cursors.subscribe('update', () => {});
27+
context.channel = context.space.cursors.channel!;
28+
context.batching = context.space.cursors['cursorBatching'];
29+
30+
// Mock channel methods
31+
vi.spyOn(context.channel, 'publish');
32+
});
33+
34+
it<CursorQueueingTestContext>('BUG FIX: cursor positions set before channel ready should be queued and sent when ready', async ({
35+
space,
36+
batching,
37+
channel,
38+
}) => {
39+
// Mock the self member (required for cursor.set())
40+
vi.spyOn(space.members, 'getSelf').mockResolvedValue({
41+
connectionId: 'test-connection',
42+
clientId: 'test-client',
43+
isConnected: true,
44+
profileData: {},
45+
location: null,
46+
lastEvent: { name: 'enter', timestamp: 1 },
47+
});
48+
49+
// Get the publish spy
50+
const publishSpy = vi.spyOn(channel, 'publish');
51+
52+
// Start with shouldSend false (channel not ready)
53+
batching.setShouldSend(false);
54+
55+
// Client sets cursor position before channel is ready
56+
await space.cursors.set({ position: { x: 100, y: 200 }, data: { color: 'blue' } });
57+
58+
// Position should NOT be published immediately
59+
expect(publishSpy).not.toHaveBeenCalled();
60+
61+
// Verify position is in pending buffer
62+
expect(batching.pendingBuffer.length).toBe(1);
63+
expect(batching.pendingBuffer[0].cursor.position).toEqual({ x: 100, y: 200 });
64+
65+
// Simulate channel becoming ready
66+
batching.setShouldSend(true);
67+
68+
// Trigger publish of pending items
69+
batching.triggerPublishFromPending(channel);
70+
71+
// The queued cursor position should now be published
72+
expect(publishSpy).toHaveBeenCalledWith(CURSOR_UPDATE, [
73+
expect.objectContaining({
74+
cursor: { position: { x: 100, y: 200 }, data: { color: 'blue' } },
75+
}),
76+
]);
77+
78+
// Pending buffer should be cleared
79+
expect(batching.pendingBuffer.length).toBe(0);
80+
});
81+
82+
it<CursorQueueingTestContext>('multiple pending cursor positions are preserved and sent in order', async ({
83+
batching,
84+
channel,
85+
}) => {
86+
const publishSpy = vi.spyOn(channel, 'publish');
87+
88+
// Start with shouldSend false
89+
batching.setShouldSend(false);
90+
91+
// Add multiple cursor positions to pending buffer
92+
batching.pushCursorPosition(channel, { position: { x: 10, y: 20 }, data: { color: 'red' } });
93+
batching.pushCursorPosition(channel, { position: { x: 30, y: 40 }, data: { color: 'green' } });
94+
batching.pushCursorPosition(channel, { position: { x: 50, y: 60 }, data: { color: 'blue' } });
95+
96+
// Verify all positions are queued
97+
expect(batching.pendingBuffer.length).toBe(3);
98+
expect(publishSpy).not.toHaveBeenCalled();
99+
100+
// Set shouldSend to true (this should process pending items)
101+
batching.setShouldSend(true);
102+
103+
// Trigger publish of pending items
104+
batching.triggerPublishFromPending(channel);
105+
106+
// All pending items should be moved to outgoing buffer and published
107+
expect(batching.pendingBuffer.length).toBe(0);
108+
expect(publishSpy).toHaveBeenCalled();
109+
});
110+
111+
it<CursorQueueingTestContext>('cursor positions set after shouldSend is true are published immediately', async ({
112+
batching,
113+
channel,
114+
}) => {
115+
const publishSpy = vi.spyOn(channel, 'publish');
116+
117+
// Start with shouldSend true
118+
batching.setShouldSend(true);
119+
120+
// Add cursor position (should be published immediately)
121+
batching.pushCursorPosition(channel, { position: { x: 100, y: 200 }, data: { color: 'yellow' } });
122+
123+
// Should be published immediately, not queued
124+
expect(batching.pendingBuffer.length).toBe(0);
125+
expect(publishSpy).toHaveBeenCalled();
126+
});
127+
128+
it<CursorQueueingTestContext>('setShouldSend(true) processes existing pending items', ({ batching, channel }) => {
129+
// Add items to pending buffer while shouldSend is false
130+
batching.setShouldSend(false);
131+
batching.pushCursorPosition(channel, { position: { x: 1, y: 2 }, data: {} });
132+
batching.pushCursorPosition(channel, { position: { x: 3, y: 4 }, data: {} });
133+
134+
expect(batching.pendingBuffer.length).toBe(2);
135+
136+
// Setting shouldSend to true should process pending items
137+
batching.setShouldSend(true);
138+
139+
expect(batching.pendingBuffer.length).toBe(0);
140+
});
141+
});

src/Cursors.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,9 @@ export default class Cursors extends EventEmitter<CursorsEventMap> {
110110
* E.g. multiply the configured outboundBatchInterval by groups of 100 members instead of the total number of members.
111111
*/
112112
this.cursorBatching.setBatchTime(Math.ceil(cursorsMembers.length / 100) * this.options.outboundBatchInterval);
113+
114+
// Trigger publishing of any pending cursor positions now that channel is ready
115+
this.cursorBatching.triggerPublishFromPending(channel);
113116
}
114117

115118
private isUnsubscribed() {

test/integration/integration.test.ts

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -178,9 +178,8 @@ describe(
178178
// 2. one of its `get*()` methods is called
179179
// 3. its `subscribe` or `unsubscribe` method is called
180180
//
181-
// This seems to mean that a client that sends cursor updates but does not listen for them will drop the first update passed to `cursors.set()`.
182-
//
183-
// So, to work around this, here I perform a "sacrificial" call to `performerSpace.cursors.set()`, the idea of which is to put `performerSpace.cursors.set()` into a state in which it will not drop the updates passed in subsequent calls.
181+
// UPDATE: This race condition bug has been fixed. Early cursor positions are now queued and sent when the channel becomes ready.
182+
// However, we'll keep this "sacrificial" call to ensure the test works correctly with the fix.
184183
await performerSpace.cursors.set({ position: { x: 0, y: 0 } });
185184
});
186185

@@ -203,7 +202,8 @@ describe(
203202
const observedCursorEventsData: CursorsEventMap['update'][] = [];
204203
const cursorsListener = (data: CursorsEventMap['update']) => {
205204
observedCursorEventsData.push(data);
206-
if (observedCursorEventsData.length === 4) {
205+
// Now expecting 5 updates: 1 sacrificial + 4 intended (since the race condition bug is fixed)
206+
if (observedCursorEventsData.length === 5) {
207207
observerSpace.cursors.unsubscribe(cursorsListener);
208208
resolve(observedCursorEventsData);
209209
}
@@ -232,8 +232,17 @@ describe(
232232

233233
// Note that we check that the order in which we recieve the cursor updates matches that in which they were passed to `set()`
234234
const observedCursorEventsData = await cursorUpdatesPromise;
235+
236+
// First cursor should be the sacrificial one from scenario 2.1 (now preserved due to bug fix)
237+
expect(observedCursorEventsData[0]).toMatchObject({
238+
clientId: performerClientId,
239+
position: { x: 0, y: 0 },
240+
// Note: no data field expected for the sacrificial cursor
241+
});
242+
243+
// Remaining 4 cursors should match the intended cursors
235244
for (const [index, setCursor] of cursorsToSet.entries()) {
236-
expect(observedCursorEventsData[index]).toMatchObject({ clientId: performerClientId, ...setCursor });
245+
expect(observedCursorEventsData[index + 1]).toMatchObject({ clientId: performerClientId, ...setCursor });
237246
}
238247
});
239248
});

0 commit comments

Comments
 (0)