Skip to content

Commit fce4168

Browse files
authored
Improve usage and testing of delayed operations. (#499)
Core changes: * Moves ExponentialBackoff to the AsyncQueue (matches iOS / Android). * Adds a TimerId enum for identifying delayed operations on the queue and uses it to identify our existing backoff and idle timers. * Added AsyncQueue.hasDelayedOperation(id) and .runDelayedOperationsEarly(id) which can be used from tests to check for the presence of an operation and to schedule them to run early. * Idle tests now use these mechanisms. * Spec tests now use this rather than setting initalBackoffDelay to 1ms. * Reworked mechanism by which DelayedOperation objects get removed from AsyncQueue's delayedOperations list to make sure it happens synchronously. Cleanup: * Renamed schedule() to enqueue() and scheduleWithDelay() to enqueueAfterDelay(). * Reorders AsyncQueue.enqueueAfterDelay() arguments to put operation last.
1 parent 7a611b6 commit fce4168

File tree

10 files changed

+310
-130
lines changed

10 files changed

+310
-130
lines changed

packages/firestore/src/core/firestore_client.ts

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -143,14 +143,14 @@ export class FirestoreClient {
143143
.then(() => this.initializeRest(user))
144144
.then(initializationDone.resolve, initializationDone.reject);
145145
} else {
146-
this.asyncQueue.schedule(() => {
146+
this.asyncQueue.enqueue(() => {
147147
return this.handleUserChange(user);
148148
});
149149
}
150150
});
151151

152152
// Block the async queue until initialization is done
153-
this.asyncQueue.schedule(() => {
153+
this.asyncQueue.enqueue(() => {
154154
return initializationDone.promise;
155155
});
156156

@@ -162,7 +162,7 @@ export class FirestoreClient {
162162

163163
/** Enables the network connection and requeues all pending operations. */
164164
enableNetwork(): Promise<void> {
165-
return this.asyncQueue.schedule(() => {
165+
return this.asyncQueue.enqueue(() => {
166166
return this.remoteStore.enableNetwork();
167167
});
168168
}
@@ -321,14 +321,14 @@ export class FirestoreClient {
321321

322322
/** Disables the network connection. Pending operations will not complete. */
323323
disableNetwork(): Promise<void> {
324-
return this.asyncQueue.schedule(() => {
324+
return this.asyncQueue.enqueue(() => {
325325
return this.remoteStore.disableNetwork();
326326
});
327327
}
328328

329329
shutdown(): Promise<void> {
330330
return this.asyncQueue
331-
.schedule(() => {
331+
.enqueue(() => {
332332
this.credentials.removeUserChangeListener();
333333
return this.remoteStore.shutdown();
334334
})
@@ -344,21 +344,21 @@ export class FirestoreClient {
344344
options: ListenOptions
345345
): QueryListener {
346346
const listener = new QueryListener(query, observer, options);
347-
this.asyncQueue.schedule(() => {
347+
this.asyncQueue.enqueue(() => {
348348
return this.eventMgr.listen(listener);
349349
});
350350
return listener;
351351
}
352352

353353
unlisten(listener: QueryListener): void {
354-
this.asyncQueue.schedule(() => {
354+
this.asyncQueue.enqueue(() => {
355355
return this.eventMgr.unlisten(listener);
356356
});
357357
}
358358

359359
write(mutations: Mutation[]): Promise<void> {
360360
const deferred = new Deferred<void>();
361-
this.asyncQueue.schedule(() => this.syncEngine.write(mutations, deferred));
361+
this.asyncQueue.enqueue(() => this.syncEngine.write(mutations, deferred));
362362
return deferred.promise;
363363
}
364364

@@ -371,7 +371,7 @@ export class FirestoreClient {
371371
): Promise<T> {
372372
// We have to wait for the async queue to be sure syncEngine is initialized.
373373
return this.asyncQueue
374-
.schedule(() => {
374+
.enqueue(() => {
375375
return Promise.resolve();
376376
})
377377
.then(() => {

packages/firestore/src/remote/backoff.ts

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@
1515
*/
1616

1717
import * as log from '../util/log';
18-
import { Deferred } from '../util/promise';
19-
18+
import { CancelablePromise } from '../util/promise';
19+
import { AsyncQueue, TimerId } from '../util/async_queue';
2020
const LOG_TAG = 'ExponentialBackoff';
2121

2222
/**
@@ -30,8 +30,17 @@ const LOG_TAG = 'ExponentialBackoff';
3030
*/
3131
export class ExponentialBackoff {
3232
private currentBaseMs: number;
33+
private timerPromise: CancelablePromise<void> | null = null;
3334

3435
constructor(
36+
/**
37+
* The AsyncQueue to run backoff operations on.
38+
*/
39+
private readonly queue: AsyncQueue,
40+
/**
41+
* The ID to use when scheduling backoff operations on the AsyncQueue.
42+
*/
43+
private readonly timerId: TimerId,
3544
/**
3645
* The initial delay (used as the base delay on the first retry attempt).
3746
* Note that jitter will still be applied, so the actual delay could be as
@@ -74,10 +83,13 @@ export class ExponentialBackoff {
7483

7584
/**
7685
* Returns a promise that resolves after currentDelayMs, and increases the
77-
* delay for any subsequent attempts.
86+
* delay for any subsequent attempts. If there was a pending backoff operation
87+
* already, it will be canceled.
7888
*/
79-
backoffAndWait(): Promise<void> {
80-
const def = new Deferred<void>();
89+
backoffAndRun(op: () => Promise<void>): void {
90+
if (this.timerPromise !== null) {
91+
this.timerPromise.cancel();
92+
}
8193

8294
// First schedule using the current base (which may be 0 and should be
8395
// honored as such).
@@ -89,9 +101,11 @@ export class ExponentialBackoff {
89101
`(base delay: ${this.currentBaseMs} ms)`
90102
);
91103
}
92-
setTimeout(() => {
93-
def.resolve();
94-
}, delayWithJitterMs);
104+
this.timerPromise = this.queue.enqueueAfterDelay(
105+
this.timerId,
106+
delayWithJitterMs,
107+
op
108+
);
95109

96110
// Apply backoff factor to determine next delay and ensure it is within
97111
// bounds.
@@ -102,8 +116,6 @@ export class ExponentialBackoff {
102116
if (this.currentBaseMs > this.maxDelayMs) {
103117
this.currentBaseMs = this.maxDelayMs;
104118
}
105-
106-
return def.promise;
107119
}
108120

109121
/** Returns a random value in the range [-currentBaseMs/2, currentBaseMs/2] */

packages/firestore/src/remote/datastore.ts

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,17 +50,15 @@ export class Datastore {
5050
private queue: AsyncQueue,
5151
private connection: Connection,
5252
private credentials: CredentialsProvider,
53-
private serializer: JsonProtoSerializer,
54-
private initialBackoffDelay?: number
53+
private serializer: JsonProtoSerializer
5554
) {}
5655

5756
newPersistentWriteStream(): PersistentWriteStream {
5857
return new PersistentWriteStream(
5958
this.queue,
6059
this.connection,
6160
this.credentials,
62-
this.serializer,
63-
this.initialBackoffDelay
61+
this.serializer
6462
);
6563
}
6664

@@ -69,8 +67,7 @@ export class Datastore {
6967
this.queue,
7068
this.connection,
7169
this.credentials,
72-
this.serializer,
73-
this.initialBackoffDelay
70+
this.serializer
7471
);
7572
}
7673

packages/firestore/src/remote/persistent_stream.ts

Lines changed: 38 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import { ProtoByteString, TargetId } from '../core/types';
2121
import { QueryData } from '../local/query_data';
2222
import { Mutation, MutationResult } from '../model/mutation';
2323
import { assert } from '../util/assert';
24-
import { AsyncQueue } from '../util/async_queue';
24+
import { AsyncQueue, TimerId } from '../util/async_queue';
2525
import { Code, FirestoreError } from '../util/error';
2626
import * as log from '../util/log';
2727

@@ -163,13 +163,15 @@ export abstract class PersistentStream<
163163

164164
constructor(
165165
private queue: AsyncQueue,
166+
connectionTimerId: TimerId,
167+
private idleTimerId: TimerId,
166168
protected connection: Connection,
167-
private credentialsProvider: CredentialsProvider,
168-
// Used for faster retries in testing
169-
initialBackoffDelay?: number
169+
private credentialsProvider: CredentialsProvider
170170
) {
171171
this.backoff = new ExponentialBackoff(
172-
initialBackoffDelay ? initialBackoffDelay : BACKOFF_INITIAL_DELAY_MS,
172+
queue,
173+
connectionTimerId,
174+
BACKOFF_INITIAL_DELAY_MS,
173175
BACKOFF_FACTOR,
174176
BACKOFF_MAX_DELAY_MS
175177
);
@@ -258,9 +260,10 @@ export abstract class PersistentStream<
258260
// Starts the idle time if we are in state 'Open' and are not yet already
259261
// running a timer (in which case the previous idle timeout still applies).
260262
if (this.isOpen() && this.inactivityTimerPromise === null) {
261-
this.inactivityTimerPromise = this.queue.scheduleWithDelay(
262-
() => this.handleIdleCloseTimer(),
263-
IDLE_TIMEOUT_MS
263+
this.inactivityTimerPromise = this.queue.enqueueAfterDelay(
264+
this.idleTimerId,
265+
IDLE_TIMEOUT_MS,
266+
() => this.handleIdleCloseTimer()
264267
);
265268

266269
this.inactivityTimerPromise.catch((err: FirestoreError) => {
@@ -400,7 +403,7 @@ export abstract class PersistentStream<
400403
this.startStream(token);
401404
},
402405
(error: Error) => {
403-
this.queue.schedule(() => {
406+
this.queue.enqueue(() => {
404407
if (this.state !== PersistentStreamState.Stopped) {
405408
// Stream can be stopped while waiting for authorization.
406409
const rpcError = new FirestoreError(
@@ -433,7 +436,7 @@ export abstract class PersistentStream<
433436
stream: Stream<SendType, ReceiveType>,
434437
fn: () => Promise<void>
435438
) => {
436-
this.queue.schedule(() => {
439+
this.queue.enqueue(() => {
437440
// Only raise events if the stream instance has not changed
438441
if (this.stream === stream) {
439442
return fn();
@@ -477,20 +480,16 @@ export abstract class PersistentStream<
477480
);
478481
this.state = PersistentStreamState.Backoff;
479482

480-
this.backoff.backoffAndWait().then(() => {
481-
// Backoff does not run on the AsyncQueue, so we need to reschedule to
482-
// make sure the queue blocks
483-
this.queue.schedule(() => {
484-
if (this.state === PersistentStreamState.Stopped) {
485-
// Stream can be stopped while waiting for backoff to complete.
486-
return Promise.resolve();
487-
}
488-
489-
this.state = PersistentStreamState.Initial;
490-
this.start(listener);
491-
assert(this.isStarted(), 'PersistentStream should have started');
483+
this.backoff.backoffAndRun(() => {
484+
if (this.state === PersistentStreamState.Stopped) {
485+
// Stream can be stopped while waiting for backoff to complete.
492486
return Promise.resolve();
493-
});
487+
}
488+
489+
this.state = PersistentStreamState.Initial;
490+
this.start(listener);
491+
assert(this.isStarted(), 'PersistentStream should have started');
492+
return Promise.resolve();
494493
});
495494
}
496495

@@ -536,10 +535,15 @@ export class PersistentListenStream extends PersistentStream<
536535
queue: AsyncQueue,
537536
connection: Connection,
538537
credentials: CredentialsProvider,
539-
private serializer: JsonProtoSerializer,
540-
initialBackoffDelay?: number
538+
private serializer: JsonProtoSerializer
541539
) {
542-
super(queue, connection, credentials, initialBackoffDelay);
540+
super(
541+
queue,
542+
TimerId.ListenStreamConnection,
543+
TimerId.ListenStreamIdle,
544+
connection,
545+
credentials
546+
);
543547
}
544548

545549
protected startRpc(
@@ -639,10 +643,15 @@ export class PersistentWriteStream extends PersistentStream<
639643
queue: AsyncQueue,
640644
connection: Connection,
641645
credentials: CredentialsProvider,
642-
private serializer: JsonProtoSerializer,
643-
initialBackoffDelay?: number
646+
private serializer: JsonProtoSerializer
644647
) {
645-
super(queue, connection, credentials, initialBackoffDelay);
648+
super(
649+
queue,
650+
TimerId.WriteStreamConnection,
651+
TimerId.WriteStreamIdle,
652+
connection,
653+
credentials
654+
);
646655
}
647656

648657
/**

0 commit comments

Comments
 (0)