From 3eff5e893696192a4a2103af31fe440e97e5197c Mon Sep 17 00:00:00 2001 From: Michael Lehenbauer Date: Wed, 7 Feb 2018 08:49:47 -0800 Subject: [PATCH 1/3] Improve usage and testing of delayed operations. Core changes: * Moves ExponentialBackoff to the AsyncQueue (matches iOS / Android). * Adds a TimerId enum for identifying delayed operations on the queue and uses it to identify our existing backoff and idle timers. * Added AsyncQueue.hasDelayedOperation(id) and .runDelayedOperationsEarly(id) which can be used from tests to check for the presence of an operation and to schedule them to run early. * Idle tests now use these mechanisms. * Spec tests now use this rather than setting initalBackoffDelay to 1ms. * Reworked mechanism by which DelayedOperation objects get removed from AsyncQueue's delayedOperations list to make sure it happens synchronously. Cleanup: * Renamed schedule() to enqueue() and scheduleWithDelay() to enqueueAfterDelay(). * Reorders AsyncQueue.enqueueAfterDelay() arguments to put operation last. --- .../firestore/src/core/firestore_client.ts | 18 +-- packages/firestore/src/remote/backoff.ts | 32 ++-- packages/firestore/src/remote/datastore.ts | 9 +- .../firestore/src/remote/persistent_stream.ts | 67 ++++---- packages/firestore/src/util/async_queue.ts | 152 +++++++++++++----- .../api_internal/idle_timeout.test.ts | 11 +- .../test/integration/remote/stream.test.ts | 14 +- .../test/integration/util/internal_helpers.ts | 10 +- .../test/unit/specs/spec_test_runner.ts | 26 ++- .../test/unit/util/async_queue.test.ts | 96 ++++++++++- 10 files changed, 305 insertions(+), 130 deletions(-) diff --git a/packages/firestore/src/core/firestore_client.ts b/packages/firestore/src/core/firestore_client.ts index 32d0cf2ac34..a344a7546ed 100644 --- a/packages/firestore/src/core/firestore_client.ts +++ b/packages/firestore/src/core/firestore_client.ts @@ -143,14 +143,14 @@ export class FirestoreClient { .then(() => this.initializeRest(user)) .then(initializationDone.resolve, initializationDone.reject); } else { - this.asyncQueue.schedule(() => { + this.asyncQueue.enqueue(() => { return this.handleUserChange(user); }); } }); // Block the async queue until initialization is done - this.asyncQueue.schedule(() => { + this.asyncQueue.enqueue(() => { return initializationDone.promise; }); @@ -162,7 +162,7 @@ export class FirestoreClient { /** Enables the network connection and requeues all pending operations. */ enableNetwork(): Promise { - return this.asyncQueue.schedule(() => { + return this.asyncQueue.enqueue(() => { return this.remoteStore.enableNetwork(); }); } @@ -321,14 +321,14 @@ export class FirestoreClient { /** Disables the network connection. Pending operations will not complete. */ disableNetwork(): Promise { - return this.asyncQueue.schedule(() => { + return this.asyncQueue.enqueue(() => { return this.remoteStore.disableNetwork(); }); } shutdown(): Promise { return this.asyncQueue - .schedule(() => { + .enqueue(() => { this.credentials.removeUserChangeListener(); return this.remoteStore.shutdown(); }) @@ -344,21 +344,21 @@ export class FirestoreClient { options: ListenOptions ): QueryListener { const listener = new QueryListener(query, observer, options); - this.asyncQueue.schedule(() => { + this.asyncQueue.enqueue(() => { return this.eventMgr.listen(listener); }); return listener; } unlisten(listener: QueryListener): void { - this.asyncQueue.schedule(() => { + this.asyncQueue.enqueue(() => { return this.eventMgr.unlisten(listener); }); } write(mutations: Mutation[]): Promise { const deferred = new Deferred(); - this.asyncQueue.schedule(() => this.syncEngine.write(mutations, deferred)); + this.asyncQueue.enqueue(() => this.syncEngine.write(mutations, deferred)); return deferred.promise; } @@ -371,7 +371,7 @@ export class FirestoreClient { ): Promise { // We have to wait for the async queue to be sure syncEngine is initialized. return this.asyncQueue - .schedule(() => { + .enqueue(() => { return Promise.resolve(); }) .then(() => { diff --git a/packages/firestore/src/remote/backoff.ts b/packages/firestore/src/remote/backoff.ts index 0bc991b5982..38262358289 100644 --- a/packages/firestore/src/remote/backoff.ts +++ b/packages/firestore/src/remote/backoff.ts @@ -15,8 +15,8 @@ */ import * as log from '../util/log'; -import { Deferred } from '../util/promise'; - +import { CancelablePromise } from '../util/promise'; +import { AsyncQueue, TimerId } from '../util/async_queue'; const LOG_TAG = 'ExponentialBackoff'; /** @@ -30,8 +30,17 @@ const LOG_TAG = 'ExponentialBackoff'; */ export class ExponentialBackoff { private currentBaseMs: number; + private timerPromise: CancelablePromise | null = null; constructor( + /** + * The AsyncQueue to run backoff operations on. + */ + private readonly queue: AsyncQueue, + /** + * The ID to use when scheduling backoff operations on the AsyncQueue. + */ + private readonly timerId: TimerId, /** * The initial delay (used as the base delay on the first retry attempt). * Note that jitter will still be applied, so the actual delay could be as @@ -74,10 +83,13 @@ export class ExponentialBackoff { /** * Returns a promise that resolves after currentDelayMs, and increases the - * delay for any subsequent attempts. + * delay for any subsequent attempts. If there was a pending backoff operation + * already, it will be canceled. */ - backoffAndWait(): Promise { - const def = new Deferred(); + backoffAndRun(op: () => Promise): void { + if (this.timerPromise !== null) { + this.timerPromise.cancel(); + } // First schedule using the current base (which may be 0 and should be // honored as such). @@ -89,9 +101,11 @@ export class ExponentialBackoff { `(base delay: ${this.currentBaseMs} ms)` ); } - setTimeout(() => { - def.resolve(); - }, delayWithJitterMs); + this.timerPromise = this.queue.enqueueAfterDelay( + this.timerId, + delayWithJitterMs, + op + ); // Apply backoff factor to determine next delay and ensure it is within // bounds. @@ -102,8 +116,6 @@ export class ExponentialBackoff { if (this.currentBaseMs > this.maxDelayMs) { this.currentBaseMs = this.maxDelayMs; } - - return def.promise; } /** Returns a random value in the range [-currentBaseMs/2, currentBaseMs/2] */ diff --git a/packages/firestore/src/remote/datastore.ts b/packages/firestore/src/remote/datastore.ts index 12a559c9437..8e5c2b5ad80 100644 --- a/packages/firestore/src/remote/datastore.ts +++ b/packages/firestore/src/remote/datastore.ts @@ -50,8 +50,7 @@ export class Datastore { private queue: AsyncQueue, private connection: Connection, private credentials: CredentialsProvider, - private serializer: JsonProtoSerializer, - private initialBackoffDelay?: number + private serializer: JsonProtoSerializer ) {} newPersistentWriteStream(): PersistentWriteStream { @@ -59,8 +58,7 @@ export class Datastore { this.queue, this.connection, this.credentials, - this.serializer, - this.initialBackoffDelay + this.serializer ); } @@ -69,8 +67,7 @@ export class Datastore { this.queue, this.connection, this.credentials, - this.serializer, - this.initialBackoffDelay + this.serializer ); } diff --git a/packages/firestore/src/remote/persistent_stream.ts b/packages/firestore/src/remote/persistent_stream.ts index 421647dd08a..72a4ddc5821 100644 --- a/packages/firestore/src/remote/persistent_stream.ts +++ b/packages/firestore/src/remote/persistent_stream.ts @@ -21,7 +21,7 @@ import { ProtoByteString, TargetId } from '../core/types'; import { QueryData } from '../local/query_data'; import { Mutation, MutationResult } from '../model/mutation'; import { assert } from '../util/assert'; -import { AsyncQueue } from '../util/async_queue'; +import { AsyncQueue, TimerId } from '../util/async_queue'; import { Code, FirestoreError } from '../util/error'; import * as log from '../util/log'; @@ -163,13 +163,15 @@ export abstract class PersistentStream< constructor( private queue: AsyncQueue, + backoffTimerId: TimerId, + private idleTimerId: TimerId, protected connection: Connection, - private credentialsProvider: CredentialsProvider, - // Used for faster retries in testing - initialBackoffDelay?: number + private credentialsProvider: CredentialsProvider ) { this.backoff = new ExponentialBackoff( - initialBackoffDelay ? initialBackoffDelay : BACKOFF_INITIAL_DELAY_MS, + queue, + backoffTimerId, + BACKOFF_INITIAL_DELAY_MS, BACKOFF_FACTOR, BACKOFF_MAX_DELAY_MS ); @@ -258,9 +260,10 @@ export abstract class PersistentStream< // Starts the idle time if we are in state 'Open' and are not yet already // running a timer (in which case the previous idle timeout still applies). if (this.isOpen() && this.inactivityTimerPromise === null) { - this.inactivityTimerPromise = this.queue.scheduleWithDelay( - () => this.handleIdleCloseTimer(), - IDLE_TIMEOUT_MS + this.inactivityTimerPromise = this.queue.enqueueAfterDelay( + this.idleTimerId, + IDLE_TIMEOUT_MS, + () => this.handleIdleCloseTimer() ); this.inactivityTimerPromise.catch((err: FirestoreError) => { @@ -400,7 +403,7 @@ export abstract class PersistentStream< this.startStream(token); }, (error: Error) => { - this.queue.schedule(() => { + this.queue.enqueue(() => { if (this.state !== PersistentStreamState.Stopped) { // Stream can be stopped while waiting for authorization. const rpcError = new FirestoreError( @@ -433,7 +436,7 @@ export abstract class PersistentStream< stream: Stream, fn: () => Promise ) => { - this.queue.schedule(() => { + this.queue.enqueue(() => { // Only raise events if the stream instance has not changed if (this.stream === stream) { return fn(); @@ -477,20 +480,16 @@ export abstract class PersistentStream< ); this.state = PersistentStreamState.Backoff; - this.backoff.backoffAndWait().then(() => { - // Backoff does not run on the AsyncQueue, so we need to reschedule to - // make sure the queue blocks - this.queue.schedule(() => { - if (this.state === PersistentStreamState.Stopped) { - // Stream can be stopped while waiting for backoff to complete. - return Promise.resolve(); - } - - this.state = PersistentStreamState.Initial; - this.start(listener); - assert(this.isStarted(), 'PersistentStream should have started'); + this.backoff.backoffAndRun(() => { + if (this.state === PersistentStreamState.Stopped) { + // Stream can be stopped while waiting for backoff to complete. return Promise.resolve(); - }); + } + + this.state = PersistentStreamState.Initial; + this.start(listener); + assert(this.isStarted(), 'PersistentStream should have started'); + return Promise.resolve(); }); } @@ -536,10 +535,15 @@ export class PersistentListenStream extends PersistentStream< queue: AsyncQueue, connection: Connection, credentials: CredentialsProvider, - private serializer: JsonProtoSerializer, - initialBackoffDelay?: number + private serializer: JsonProtoSerializer ) { - super(queue, connection, credentials, initialBackoffDelay); + super( + queue, + TimerId.ListenStreamBackoff, + TimerId.ListenStreamIdle, + connection, + credentials + ); } protected startRpc( @@ -639,10 +643,15 @@ export class PersistentWriteStream extends PersistentStream< queue: AsyncQueue, connection: Connection, credentials: CredentialsProvider, - private serializer: JsonProtoSerializer, - initialBackoffDelay?: number + private serializer: JsonProtoSerializer ) { - super(queue, connection, credentials, initialBackoffDelay); + super( + queue, + TimerId.WriteStreamBackoff, + TimerId.WriteStreamIdle, + connection, + credentials + ); } /** diff --git a/packages/firestore/src/util/async_queue.ts b/packages/firestore/src/util/async_queue.ts index 43c6d60660b..7c3b3c3c4f4 100644 --- a/packages/firestore/src/util/async_queue.ts +++ b/packages/firestore/src/util/async_queue.ts @@ -23,6 +23,18 @@ import { Code, FirestoreError } from './error'; // tslint:disable-next-line:no-any Accept any return type from setTimeout(). type TimerHandle = any; +/** + * Wellknown "timer" IDs used when scheduling delayed operations on the + * AsyncQueue. These IDs can then be used from tests to check for the presence + * of operations or to run them early. + */ +export enum TimerId { + ListenStreamIdle, + ListenStreamBackoff, + WriteStreamIdle, + WriteStreamBackoff +} + /** * Represents an operation scheduled to be run in the future on an AsyncQueue. * @@ -38,20 +50,42 @@ class DelayedOperation implements CancelablePromise { private readonly deferred = new Deferred(); private constructor( - private asyncQueue: AsyncQueue, - private op: () => Promise + private readonly asyncQueue: AsyncQueue, + readonly timerId: TimerId, + readonly targetTime: number, + private readonly op: () => Promise, + private readonly removalCallback: (op: DelayedOperation) => void ) {} /** * Creates and returns a DelayedOperation that has been scheduled to be * executed on the provided asyncQueue after the provided delayMs. + * + * @param asyncQueue The queue to schedule the operation on. + * @param id A Timer ID identifying the type of operation this is. + * @param delayMs The delay (ms) before the operation should be scheduled. + * @param op The operation to run. + * @param removalCallback A callback to be called synchronously once the + * operation is executed or canceled, notifying the AsyncQueue to remove it + * from its delayedOperations list. + * PORTING NOTE: This exists to prevent making removeDelayedOperation() and + * the DelayedOperation class public. */ static createAndSchedule( asyncQueue: AsyncQueue, + timerId: TimerId, + delayMs: number, op: () => Promise, - delayMs: number + removalCallback: (op: DelayedOperation) => void ): DelayedOperation { - const delayedOp = new DelayedOperation(asyncQueue, op); + const targetTime = Date.now() + delayMs; + const delayedOp = new DelayedOperation( + asyncQueue, + timerId, + targetTime, + op, + removalCallback + ); delayedOp.start(delayMs); return delayedOp; } @@ -97,7 +131,7 @@ class DelayedOperation implements CancelablePromise { catch = this.deferred.promise.catch.bind(this.deferred.promise); private handleDelayElapsed(): void { - this.asyncQueue.schedule(() => { + this.asyncQueue.enqueue(() => { if (this.timerHandle !== null) { this.clearTimeout(); return this.op().then(result => { @@ -110,7 +144,8 @@ class DelayedOperation implements CancelablePromise { } private clearTimeout() { - if (this.timerHandle) { + if (this.timerHandle !== null) { + this.removalCallback(this); clearTimeout(this.timerHandle); this.timerHandle = null; } @@ -121,17 +156,10 @@ export class AsyncQueue { // The last promise in the queue. private tail: Promise = Promise.resolve(); - // A list with timeout handles and their respective deferred promises. - // Contains an entry for each operation that is queued to run in the future - // (i.e. it has a delay that has not yet elapsed). + // Operations scheduled to be queued in the future. Operations are + // automatically removed after they are run or canceled. private delayedOperations: Array> = []; - // The number of operations that are queued to be run in the future (i.e. they - // have a delay that has not yet elapsed). Used for testing. - get delayedOperationsCount() { - return this.delayedOperations.length; - } - // visible for testing failure: Error; @@ -143,7 +171,7 @@ export class AsyncQueue { * Adds a new operation to the queue. Returns a promise that will be resolved * when the promise returned by the new operation is (with its value). */ - schedule(op: () => Promise): Promise { + enqueue(op: () => Promise): Promise { this.verifyNotFailed(); const newTail = this.tail.then(() => { this.operationInProgress = true; @@ -178,25 +206,35 @@ export class AsyncQueue { } /** - * Schedules an operation to be run on the AsyncQueue once the specified - * `delayMs` has elapsed. The returned DelayedOperationResult can be - * used to cancel the operation prior to its running. + * Schedules an operation to be queued on the AsyncQueue once the specified + * `delayMs` has elapsed. The returned CancelablePromise can be used to cancel + * the operation prior to its running. */ - scheduleWithDelay( - op: () => Promise, - delayMs: number + enqueueAfterDelay( + timerId: TimerId, + delayMs: number, + op: () => Promise ): CancelablePromise { this.verifyNotFailed(); - const delayedOp = DelayedOperation.createAndSchedule(this, op, delayMs); + // While not necessarily harmful, we currently don't expect to have multiple + // ops with the same timer id in the queue, so defensively reject them. + assert( + !this.hasDelayedOperation(timerId), + `Attempted to schedule multiple operations with timer id ${ + TimerId[timerId] + }.` + ); + + const delayedOp = DelayedOperation.createAndSchedule( + this, + timerId, + delayMs, + op, + op => this.removeDelayedOperation(op) + ); this.delayedOperations.push(delayedOp); - delayedOp.catch(err => {}).then(() => { - // NOTE: indexOf / slice are O(n), but delayedOperations is expected to be small. - const index = this.delayedOperations.indexOf(delayedOp); - assert(index >= 0, 'Delayed operation not found.'); - this.delayedOperations.slice(index, 1); - }); return delayedOp; } @@ -223,17 +261,55 @@ export class AsyncQueue { } /** - * Waits until all currently scheduled tasks are finished executing. Tasks - * scheduled with a delay can be rejected or queued for immediate execution. + * Waits until all currently queued tasks are finished executing. Delayed + * operations are not run. */ - drain(executeDelayedTasks: boolean): Promise { - this.delayedOperations.forEach(delayedOp => { - if (executeDelayedTasks) { - delayedOp.skipDelay(); - } else { - delayedOp.cancel('shutdown'); + drain(): Promise { + return this.enqueue(() => Promise.resolve()); + } + + /** + * For Tests: Determine if a particular delayed operation exists. + */ + hasDelayedOperation(timerId: TimerId): boolean { + return this.delayedOperations.findIndex(op => op.timerId === timerId) >= 0; + } + + /** + * For Tests: Runs some or all delayed operations early. + * + * @param lastTimerId If specified, only delayed operations up to and + * including this TimerId will be drained. Throws if no such operation + * exists. + * @returns a Promise that resolves once all operations have been run. + */ + runDelayedOperationsEarly(lastTimerId?: TimerId): Promise { + // Note that draining may generate more delayed ops, so we do that first. + return this.drain().then(() => { + assert( + lastTimerId === undefined || this.hasDelayedOperation(lastTimerId), + `Attempted to drain to missing operation ${lastTimerId}` + ); + + // Run ops in the same order they'd run if they ran naturally. + this.delayedOperations.sort((a, b) => a.targetTime - b.targetTime); + + for (const op of this.delayedOperations) { + op.skipDelay(); + if (lastTimerId !== undefined && op.timerId === lastTimerId) { + break; + } } + + return this.drain(); }); - return this.schedule(() => Promise.resolve()); + } + + /** Called once a DelayedOperation is run or canceled. */ + private removeDelayedOperation(op: DelayedOperation) { + // NOTE: indexOf / slice are O(n), but delayedOperations is expected to be small. + const index = this.delayedOperations.indexOf(op); + assert(index >= 0, 'Delayed operation not found.'); + this.delayedOperations.splice(index, 1); } } diff --git a/packages/firestore/test/integration/api_internal/idle_timeout.test.ts b/packages/firestore/test/integration/api_internal/idle_timeout.test.ts index 06267f2e0e1..892dee79a99 100644 --- a/packages/firestore/test/integration/api_internal/idle_timeout.test.ts +++ b/packages/firestore/test/integration/api_internal/idle_timeout.test.ts @@ -15,8 +15,9 @@ */ import { apiDescribe, withTestDb } from '../util/helpers'; -import { drainAsyncQueue } from '../util/internal_helpers'; +import { asyncQueue } from '../util/internal_helpers'; import { Deferred } from '../../util/promise'; +import { TimerId } from '../../../src/util/async_queue'; apiDescribe('Idle Timeout', persistence => { it('can write document after idle timeout', () => { @@ -25,7 +26,9 @@ apiDescribe('Idle Timeout', persistence => { return docRef .set({ foo: 'bar' }) .then(() => { - return drainAsyncQueue(db); + return asyncQueue(db).runDelayedOperationsEarly( + TimerId.WriteStreamIdle + ); }) .then(() => docRef.set({ foo: 'bar' })); }); @@ -49,7 +52,9 @@ apiDescribe('Idle Timeout', persistence => { return awaitOnlineSnapshot() .then(() => { - return drainAsyncQueue(db); + return asyncQueue(db).runDelayedOperationsEarly( + TimerId.ListenStreamIdle + ); }) .then(() => awaitOnlineSnapshot()); }); diff --git a/packages/firestore/test/integration/remote/stream.test.ts b/packages/firestore/test/integration/remote/stream.test.ts index 6e70086ef2e..2b71e5ea32c 100644 --- a/packages/firestore/test/integration/remote/stream.test.ts +++ b/packages/firestore/test/integration/remote/stream.test.ts @@ -28,11 +28,11 @@ import { ExistenceFilterChange, WatchTargetChange } from '../../../src/remote/watch_change'; -import { AsyncQueue } from '../../../src/util/async_queue'; +import { AsyncQueue, TimerId } from '../../../src/util/async_queue'; import { Deferred } from '../../../src/util/promise'; import { Datastore } from '../../../src/remote/datastore'; import { setMutation } from '../../util/helpers'; -import { drainAsyncQueue, withTestDatastore } from '../util/internal_helpers'; +import { withTestDatastore } from '../util/internal_helpers'; import { FirestoreError } from '@firebase/firestore-types'; /** @@ -227,10 +227,10 @@ describe('Write Stream', () => { }) .then(() => { writeStream.markIdle(); - expect(queue.delayedOperationsCount).to.be.equal(1); + expect(queue.hasDelayedOperation(TimerId.WriteStreamIdle)).to.be.true; return Promise.all([ - streamListener.awaitCallback('close'), - queue.drain(/*executeDelayedTasks=*/ true) + queue.runDelayedOperationsEarly(TimerId.WriteStreamIdle), + streamListener.awaitCallback('close') ]); }) .then(() => { @@ -254,11 +254,11 @@ describe('Write Stream', () => { .then(() => { // Mark the stream idle, but immediately cancel the idle timer by issuing another write. writeStream.markIdle(); - expect(queue.delayedOperationsCount).to.be.equal(1); + expect(queue.hasDelayedOperation(TimerId.WriteStreamIdle)).to.be.true; writeStream.writeMutations(SINGLE_MUTATION); return streamListener.awaitCallback('mutationResult'); }) - .then(() => queue.drain(/*executeDelayedTasks=*/ true)) + .then(() => queue.runDelayedOperationsEarly()) .then(() => { expect(writeStream.isOpen()).to.be.true; }); diff --git a/packages/firestore/test/integration/util/internal_helpers.ts b/packages/firestore/test/integration/util/internal_helpers.ts index 3d2aac9b2c0..65bc75cc15c 100644 --- a/packages/firestore/test/integration/util/internal_helpers.ts +++ b/packages/firestore/test/integration/util/internal_helpers.ts @@ -21,15 +21,13 @@ import { Datastore } from '../../../src/remote/datastore'; import { EmptyCredentialsProvider } from '../../../src/api/credentials'; import { PlatformSupport } from '../../../src/platform/platform'; -import { AsyncQueue } from '../../../src/util/async_queue'; +import { AsyncQueue, TimerId } from '../../../src/util/async_queue'; import { DEFAULT_SETTINGS, DEFAULT_PROJECT_ID } from './helpers'; import { Firestore } from '../../../src/api/database'; -/** Drains the AsyncQueue. Delayed tasks are executed immediately. */ -export function drainAsyncQueue( - db: firestore.FirebaseFirestore -): Promise { - return (db as Firestore)._queue.drain(/* executeDelayedTasks= */ true); +/** Helper to retrieve the AsyncQueue for a give FirebaseFirestore instance. */ +export function asyncQueue(db: firestore.FirebaseFirestore): AsyncQueue { + return (db as Firestore)._queue; } export function getDefaultDatabaseInfo(): DatabaseInfo { diff --git a/packages/firestore/test/unit/specs/spec_test_runner.ts b/packages/firestore/test/unit/specs/spec_test_runner.ts index 67767313259..0c6387a2421 100644 --- a/packages/firestore/test/unit/specs/spec_test_runner.ts +++ b/packages/firestore/test/unit/specs/spec_test_runner.ts @@ -70,7 +70,7 @@ import { WatchTargetChangeState } from '../../../src/remote/watch_change'; import { assert, fail } from '../../../src/util/assert'; -import { AsyncQueue } from '../../../src/util/async_queue'; +import { AsyncQueue, TimerId } from '../../../src/util/async_queue'; import { FirestoreError } from '../../../src/util/error'; import { AnyDuringMigration, AnyJs } from '../../../src/util/misc'; import * as obj from '../../../src/util/obj'; @@ -232,7 +232,7 @@ class MockConnection implements Connection { this.resetAndCloseWriteStream(); } }); - this.queue.schedule(() => { + this.queue.enqueue(() => { if (this.writeStream === writeStream) { writeStream.callOnOpen(); } @@ -265,7 +265,7 @@ class MockConnection implements Connection { } }); // Call on open immediately after returning - this.queue.schedule(() => { + this.queue.enqueue(() => { if (this.watchStream === watchStream) { watchStream.callOnOpen(); this.watchOpen.resolve(); @@ -373,14 +373,11 @@ abstract class TestRunner { this.queue = new AsyncQueue(); this.connection = new MockConnection(this.queue); - // Set backoff delay to 1ms so simulated disconnects don't delay the tests. - const initialBackoffDelay = 1; this.datastore = new Datastore( this.queue, this.connection, new EmptyCredentialsProvider(), - this.serializer, - initialBackoffDelay + this.serializer ); const onlineStateChangedHandler = (onlineState: OnlineState) => { this.syncEngine.applyOnlineStateChange(onlineState); @@ -432,7 +429,7 @@ abstract class TestRunner { console.log('Running spec: ' + this.name); return sequence(steps, async step => { await this.doStep(step); - await this.queue.drain(/* executeDelayedTasks */ false); + await this.queue.drain(); this.validateStepExpectations(step.expect!); this.validateStateExpectations(step.stateExpect!); this.eventList = []; @@ -496,7 +493,7 @@ abstract class TestRunner { const queryListener = new QueryListener(query, aggregator, options); this.queryListeners.set(query, queryListener); - await this.queue.schedule(async () => { + await this.queue.enqueue(async () => { const targetId = await this.eventManager.listen(queryListener); expect(targetId).to.equal( expectedTargetId, @@ -515,7 +512,7 @@ abstract class TestRunner { const eventEmitter = this.queryListeners.get(query); assert(!!eventEmitter, 'There must be a query to unlisten too!'); this.queryListeners.delete(query); - await this.queue.schedule(() => this.eventManager.unlisten(eventEmitter!)); + await this.queue.enqueue(() => this.eventManager.unlisten(eventEmitter!)); } private doSet(setSpec: SpecUserSet): Promise { @@ -534,7 +531,7 @@ abstract class TestRunner { private doMutations(mutations: Mutation[]): Promise { const userCallback = new Deferred(); this.outstandingWrites.push({ mutations, userCallback }); - return this.queue.schedule(() => { + return this.queue.enqueue(() => { return this.syncEngine.write(mutations, userCallback); }); } @@ -691,7 +688,7 @@ abstract class TestRunner { } // Put a no-op in the queue so that we know when any outstanding RemoteStore // writes on the network are complete. - return this.queue.schedule(() => { + return this.queue.enqueue(() => { return Promise.resolve(); }); } @@ -705,6 +702,7 @@ abstract class TestRunner { ); // The watch stream should re-open if we have active listeners. if (!this.queryListeners.isEmpty()) { + await this.queue.runDelayedOperationsEarly(TimerId.ListenStreamBackoff); await this.connection.waitForWatchOpen(); } } @@ -787,7 +785,7 @@ abstract class TestRunner { // We have to schedule the starts, otherwise we could end up with // interleaved events. - await this.queue.schedule(async () => { + await this.queue.enqueue(async () => { await this.localStore.start(); await this.remoteStore.start(); }); @@ -795,7 +793,7 @@ abstract class TestRunner { private doChangeUser(user: string | null): Promise { this.user = new User(user); - return this.queue.schedule(() => + return this.queue.enqueue(() => this.syncEngine.handleUserChange(this.user) ); } diff --git a/packages/firestore/test/unit/util/async_queue.test.ts b/packages/firestore/test/unit/util/async_queue.test.ts index 08b628120b1..fdf4e62fcc6 100644 --- a/packages/firestore/test/unit/util/async_queue.test.ts +++ b/packages/firestore/test/unit/util/async_queue.test.ts @@ -15,12 +15,18 @@ */ import { expect } from 'chai'; -import { AsyncQueue } from '../../../src/util/async_queue'; +import { AsyncQueue, TimerId } from '../../../src/util/async_queue'; import { getLogLevel, LogLevel, setLogLevel } from '../../../src/util/log'; import { AnyJs } from '../../../src/util/misc'; import { Deferred, Rejecter, Resolver } from '../../../src/util/promise'; +import { Code } from '../../../src/util/error'; describe('AsyncQueue', () => { + // We reuse these TimerIds for generic testing. + const timerId1 = TimerId.ListenStreamBackoff; + const timerId2 = TimerId.ListenStreamIdle; + const timerId3 = TimerId.WriteStreamBackoff; + it('schedules ops in right order', () => { const queue = new AsyncQueue(); const results: string[] = []; @@ -29,7 +35,7 @@ describe('AsyncQueue', () => { results.push(result); } - const op1 = queue.schedule(() => { + const op1 = queue.enqueue(() => { return defer(() => 'Hello') .then((result: string) => { return defer(() => result + ' world!'); @@ -37,13 +43,13 @@ describe('AsyncQueue', () => { .then(pushResult); }); - const op2 = queue.schedule(() => { + const op2 = queue.enqueue(() => { return defer(() => 'Bye bye.').then(pushResult); }); const op4 = new Deferred(); - const op3 = queue.schedule(() => { - queue.schedule(() => { + const op3 = queue.enqueue(() => { + queue.enqueue(() => { return Promise.resolve('Bye for good.') .then(pushResult) .then(op4.resolve); @@ -74,7 +80,7 @@ describe('AsyncQueue', () => { // Schedule a failing operation and make sure it's handled correctly. const op1Promise = queue - .schedule(() => { + .enqueue(() => { // This promise represents something that is rejected return defer(() => { throw expected; @@ -95,7 +101,7 @@ describe('AsyncQueue', () => { // Schedule a second failing operation (before the first one has actually // executed and failed). It should not be run. const op2Promise = queue - .schedule(() => { + .enqueue(() => { return defer(() => { expect.fail('op2 should not be executed.'); }); @@ -116,13 +122,87 @@ describe('AsyncQueue', () => { // synchronously throw with "already failed" error. const dummyOp = () => Promise.reject('dummyOp should not be run'); expect(() => { - queue.schedule(dummyOp); + queue.enqueue(dummyOp); }).to.throw(/already failed:.*Simulated Error/); // Finally, restore log level. setLogLevel(oldLogLevel); }); }); + + it('can schedule ops in the future', async () => { + const queue = new AsyncQueue(); + const completedSteps = []; + const doStep = (n: number) => defer(() => completedSteps.push(n)); + queue.enqueue(() => doStep(1)); + const last = queue.enqueueAfterDelay(timerId1, 5, () => doStep(4)); + queue.enqueueAfterDelay(timerId2, 1, () => doStep(3)); + queue.enqueue(() => doStep(2)); + + await last; + expect(completedSteps).to.deep.equal([1, 2, 3, 4]); + }); + + it('Can cancel delayed operations', async () => { + const queue = new AsyncQueue(); + const completedSteps = []; + const doStep = (n: number) => defer(() => completedSteps.push(n)); + queue.enqueue(() => doStep(1)); + const delayedPromise = queue.enqueueAfterDelay(timerId1, 1, () => + doStep(2) + ); + + expect(queue.hasDelayedOperation(timerId1)).to.be.true; + delayedPromise.cancel(); + expect(queue.hasDelayedOperation(timerId1)).to.be.false; + + await delayedPromise.then( + () => expect.fail('resolved promise', 'rejected promise'), + err => expect(err.code === Code.CANCELLED) + ); + + await queue.runDelayedOperationsEarly(); + expect(completedSteps).to.deep.equal([1]); + }); + + it('Can run all delayed operations early', async () => { + const queue = new AsyncQueue(); + const completedSteps = []; + const doStep = (n: number) => defer(() => completedSteps.push(n)); + queue.enqueue(() => doStep(1)); + queue.enqueueAfterDelay(timerId1, 20000, () => doStep(4)); + queue.enqueueAfterDelay(timerId2, 10000, () => doStep(3)); + queue.enqueue(() => doStep(2)); + + await queue.runDelayedOperationsEarly(); + expect(completedSteps).to.deep.equal([1, 2, 3, 4]); + }); + + it('Can run some delayed operations early', async () => { + const queue = new AsyncQueue(); + const completedSteps = []; + const doStep = (n: number) => defer(() => completedSteps.push(n)); + queue.enqueue(() => doStep(1)); + queue.enqueueAfterDelay(timerId1, 20000, () => doStep(5)); + queue.enqueueAfterDelay(timerId2, 10000, () => doStep(3)); + queue.enqueueAfterDelay(timerId3, 15000, () => doStep(4)); + queue.enqueue(() => doStep(2)); + + await queue.runDelayedOperationsEarly(timerId3); + expect(completedSteps).to.deep.equal([1, 2, 3, 4]); + }); + + it('Can drain (non-delayed) operations', async () => { + const queue = new AsyncQueue(); + const completedSteps = []; + const doStep = (n: number) => defer(() => completedSteps.push(n)); + queue.enqueue(() => doStep(1)); + queue.enqueueAfterDelay(timerId1, 10000, () => doStep(5)); + queue.enqueue(() => doStep(2)); + + await queue.drain(); + expect(completedSteps).to.deep.equal([1, 2]); + }); }); function defer(op: () => T): Promise { From 42f16e49d421184f9682d86a1dfc6872fc5ef8cf Mon Sep 17 00:00:00 2001 From: Michael Lehenbauer Date: Thu, 8 Feb 2018 15:08:28 -0800 Subject: [PATCH 2/3] CR Feedback. --- packages/firestore/src/remote/persistent_stream.ts | 8 ++++---- packages/firestore/src/util/async_queue.ts | 8 ++++---- packages/firestore/test/unit/specs/spec_test_runner.ts | 4 +++- packages/firestore/test/unit/util/async_queue.test.ts | 4 ++-- 4 files changed, 13 insertions(+), 11 deletions(-) diff --git a/packages/firestore/src/remote/persistent_stream.ts b/packages/firestore/src/remote/persistent_stream.ts index 72a4ddc5821..2155e23ba1a 100644 --- a/packages/firestore/src/remote/persistent_stream.ts +++ b/packages/firestore/src/remote/persistent_stream.ts @@ -163,14 +163,14 @@ export abstract class PersistentStream< constructor( private queue: AsyncQueue, - backoffTimerId: TimerId, + connectionTimerId: TimerId, private idleTimerId: TimerId, protected connection: Connection, private credentialsProvider: CredentialsProvider ) { this.backoff = new ExponentialBackoff( queue, - backoffTimerId, + connectionTimerId, BACKOFF_INITIAL_DELAY_MS, BACKOFF_FACTOR, BACKOFF_MAX_DELAY_MS @@ -539,7 +539,7 @@ export class PersistentListenStream extends PersistentStream< ) { super( queue, - TimerId.ListenStreamBackoff, + TimerId.ListenStreamConnection, TimerId.ListenStreamIdle, connection, credentials @@ -647,7 +647,7 @@ export class PersistentWriteStream extends PersistentStream< ) { super( queue, - TimerId.WriteStreamBackoff, + TimerId.WriteStreamConnection, TimerId.WriteStreamIdle, connection, credentials diff --git a/packages/firestore/src/util/async_queue.ts b/packages/firestore/src/util/async_queue.ts index 7c3b3c3c4f4..4cec01e6e05 100644 --- a/packages/firestore/src/util/async_queue.ts +++ b/packages/firestore/src/util/async_queue.ts @@ -30,9 +30,9 @@ type TimerHandle = any; */ export enum TimerId { ListenStreamIdle, - ListenStreamBackoff, + ListenStreamConnection, WriteStreamIdle, - WriteStreamBackoff + WriteStreamConnection } /** @@ -52,7 +52,7 @@ class DelayedOperation implements CancelablePromise { private constructor( private readonly asyncQueue: AsyncQueue, readonly timerId: TimerId, - readonly targetTime: number, + readonly targetTimeMs: number, private readonly op: () => Promise, private readonly removalCallback: (op: DelayedOperation) => void ) {} @@ -292,7 +292,7 @@ export class AsyncQueue { ); // Run ops in the same order they'd run if they ran naturally. - this.delayedOperations.sort((a, b) => a.targetTime - b.targetTime); + this.delayedOperations.sort((a, b) => a.targetTimeMs - b.targetTimeMs); for (const op of this.delayedOperations) { op.skipDelay(); diff --git a/packages/firestore/test/unit/specs/spec_test_runner.ts b/packages/firestore/test/unit/specs/spec_test_runner.ts index 0c6387a2421..261870162c6 100644 --- a/packages/firestore/test/unit/specs/spec_test_runner.ts +++ b/packages/firestore/test/unit/specs/spec_test_runner.ts @@ -702,7 +702,9 @@ abstract class TestRunner { ); // The watch stream should re-open if we have active listeners. if (!this.queryListeners.isEmpty()) { - await this.queue.runDelayedOperationsEarly(TimerId.ListenStreamBackoff); + await this.queue.runDelayedOperationsEarly( + TimerId.ListenStreamConnection + ); await this.connection.waitForWatchOpen(); } } diff --git a/packages/firestore/test/unit/util/async_queue.test.ts b/packages/firestore/test/unit/util/async_queue.test.ts index fdf4e62fcc6..2d4010994a1 100644 --- a/packages/firestore/test/unit/util/async_queue.test.ts +++ b/packages/firestore/test/unit/util/async_queue.test.ts @@ -23,9 +23,9 @@ import { Code } from '../../../src/util/error'; describe('AsyncQueue', () => { // We reuse these TimerIds for generic testing. - const timerId1 = TimerId.ListenStreamBackoff; + const timerId1 = TimerId.ListenStreamConnection; const timerId2 = TimerId.ListenStreamIdle; - const timerId3 = TimerId.WriteStreamBackoff; + const timerId3 = TimerId.WriteStreamConnection; it('schedules ops in right order', () => { const queue = new AsyncQueue(); From e889e0c76dce6ed0d642e14f449a5fc8beffd0af Mon Sep 17 00:00:00 2001 From: Michael Lehenbauer Date: Fri, 9 Feb 2018 12:18:20 -0800 Subject: [PATCH 3/3] hasDelayedOperation() => containsDelayedOperation(). --- packages/firestore/src/util/async_queue.ts | 9 +++++---- .../firestore/test/integration/remote/stream.test.ts | 6 ++++-- packages/firestore/test/unit/util/async_queue.test.ts | 4 ++-- 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/packages/firestore/src/util/async_queue.ts b/packages/firestore/src/util/async_queue.ts index 4cec01e6e05..453e720c373 100644 --- a/packages/firestore/src/util/async_queue.ts +++ b/packages/firestore/src/util/async_queue.ts @@ -220,7 +220,7 @@ export class AsyncQueue { // While not necessarily harmful, we currently don't expect to have multiple // ops with the same timer id in the queue, so defensively reject them. assert( - !this.hasDelayedOperation(timerId), + !this.containsDelayedOperation(timerId), `Attempted to schedule multiple operations with timer id ${ TimerId[timerId] }.` @@ -269,9 +269,10 @@ export class AsyncQueue { } /** - * For Tests: Determine if a particular delayed operation exists. + * For Tests: Determine if a delayed operation with a particular TimerId + * exists. */ - hasDelayedOperation(timerId: TimerId): boolean { + containsDelayedOperation(timerId: TimerId): boolean { return this.delayedOperations.findIndex(op => op.timerId === timerId) >= 0; } @@ -287,7 +288,7 @@ export class AsyncQueue { // Note that draining may generate more delayed ops, so we do that first. return this.drain().then(() => { assert( - lastTimerId === undefined || this.hasDelayedOperation(lastTimerId), + lastTimerId === undefined || this.containsDelayedOperation(lastTimerId), `Attempted to drain to missing operation ${lastTimerId}` ); diff --git a/packages/firestore/test/integration/remote/stream.test.ts b/packages/firestore/test/integration/remote/stream.test.ts index 2b71e5ea32c..ef23ba3d004 100644 --- a/packages/firestore/test/integration/remote/stream.test.ts +++ b/packages/firestore/test/integration/remote/stream.test.ts @@ -227,7 +227,8 @@ describe('Write Stream', () => { }) .then(() => { writeStream.markIdle(); - expect(queue.hasDelayedOperation(TimerId.WriteStreamIdle)).to.be.true; + expect(queue.containsDelayedOperation(TimerId.WriteStreamIdle)).to.be + .true; return Promise.all([ queue.runDelayedOperationsEarly(TimerId.WriteStreamIdle), streamListener.awaitCallback('close') @@ -254,7 +255,8 @@ describe('Write Stream', () => { .then(() => { // Mark the stream idle, but immediately cancel the idle timer by issuing another write. writeStream.markIdle(); - expect(queue.hasDelayedOperation(TimerId.WriteStreamIdle)).to.be.true; + expect(queue.containsDelayedOperation(TimerId.WriteStreamIdle)).to.be + .true; writeStream.writeMutations(SINGLE_MUTATION); return streamListener.awaitCallback('mutationResult'); }) diff --git a/packages/firestore/test/unit/util/async_queue.test.ts b/packages/firestore/test/unit/util/async_queue.test.ts index 2d4010994a1..f1c49f66251 100644 --- a/packages/firestore/test/unit/util/async_queue.test.ts +++ b/packages/firestore/test/unit/util/async_queue.test.ts @@ -152,9 +152,9 @@ describe('AsyncQueue', () => { doStep(2) ); - expect(queue.hasDelayedOperation(timerId1)).to.be.true; + expect(queue.containsDelayedOperation(timerId1)).to.be.true; delayedPromise.cancel(); - expect(queue.hasDelayedOperation(timerId1)).to.be.false; + expect(queue.containsDelayedOperation(timerId1)).to.be.false; await delayedPromise.then( () => expect.fail('resolved promise', 'rejected promise'),