diff --git a/packages/firestore/src/remote/datastore.ts b/packages/firestore/src/remote/datastore.ts index 6930764f38a..a958e8076a5 100644 --- a/packages/firestore/src/remote/datastore.ts +++ b/packages/firestore/src/remote/datastore.ts @@ -1,3 +1,4 @@ +import { WatchStreamListener, WriteStreamListener } from './persistent_stream'; /** * Copyright 2017 Google Inc. * @@ -54,21 +55,27 @@ export class Datastore { private serializer: JsonProtoSerializer ) {} - newPersistentWriteStream(): PersistentWriteStream { + newPersistentWriteStream( + listener: WriteStreamListener + ): PersistentWriteStream { return new PersistentWriteStream( this.queue, this.connection, this.credentials, - this.serializer + this.serializer, + listener ); } - newPersistentWatchStream(): PersistentListenStream { + newPersistentWatchStream( + listener: WatchStreamListener + ): PersistentListenStream { return new PersistentListenStream( this.queue, this.connection, this.credentials, - this.serializer + this.serializer, + listener ); } diff --git a/packages/firestore/src/remote/persistent_stream.ts b/packages/firestore/src/remote/persistent_stream.ts index bd2774e0cc0..8d20cd9195c 100644 --- a/packages/firestore/src/remote/persistent_stream.ts +++ b/packages/firestore/src/remote/persistent_stream.ts @@ -43,21 +43,38 @@ interface ListenRequest extends api.ListenRequest { export interface WriteRequest extends api.WriteRequest { database?: string; } - +/** + * PersistentStream can be in one of 5 states (each described in detail below) + * based on the following state transition diagram: + * + * start() called auth & connection succeeded + * INITIAL ----------------> STARTING -----------------------------> OPEN + * ^ | | + * | | error occurred | + * | \-----------------------------v-----/ + * | | + * backoff | | + * elapsed | start() called | + * \--- BACKOFF <---------------- ERROR + * + * [any state] --------------------------> INITIAL + * stop() called or + * idle timer expired + */ enum PersistentStreamState { /** - * The streaming RPC is not running and there's no error condition. + * The streaming RPC is not yet running and there's no error condition. * Calling `start` will start the stream immediately without backoff. * While in this state isStarted will return false. */ Initial, /** - * The stream is starting, and is waiting for an auth token to attach to - * the initial request. While in this state, isStarted will return - * true but isOpen will return false. + * The stream is starting, either waiting for an auth token or for the stream + * to successfully open. While in this state, isStarted will return true but + * isOpen will return false. */ - Auth, + Starting, /** * The streaming RPC is up and running. Requests and responses can flow @@ -68,22 +85,16 @@ enum PersistentStreamState { /** * The stream encountered an error. The next start attempt will back off. * While in this state isStarted() will return false. - * */ Error, /** * An in-between state after an error where the stream is waiting before - * re-starting. After - * waiting is complete, the stream will try to open. While in this - * state isStarted() will return YES but isOpen will return false. + * re-starting. After waiting is complete, the stream will try to open. + * While in this state isStarted() will return true but isOpen will return + * false. */ - Backoff, - - /** - * The stream has been explicitly stopped; no further events will be emitted. - */ - Stopped + Backoff } /** @@ -125,6 +136,7 @@ const IDLE_TIMEOUT_MS = 60 * 1000; * - Exponential backoff on failure * - Authentication via CredentialsProvider * - Dispatching all callbacks into the shared worker queue + * - Closing idle streams after 60 seconds of inactivity * * Subclasses of PersistentStream implement serialization of models to and * from the JSON representation of the protocol buffers for a specific @@ -153,20 +165,26 @@ export abstract class PersistentStream< ReceiveType, ListenerType extends PersistentStreamListener > { - private state: PersistentStreamState; + private state = PersistentStreamState.Initial; + /** + * A close count that's incremented every time the stream is closed; used by + * getCloseGuardedDispatcher() to invalidate callbacks that happen after + * close. + */ + private closeCount = 0; + private inactivityTimerPromise: CancelablePromise | null = null; private stream: Stream | null = null; protected backoff: ExponentialBackoff; - protected listener: ListenerType | null = null; - constructor( private queue: AsyncQueue, connectionTimerId: TimerId, private idleTimerId: TimerId, protected connection: Connection, - private credentialsProvider: CredentialsProvider + private credentialsProvider: CredentialsProvider, + protected listener: ListenerType ) { this.backoff = new ExponentialBackoff( queue, @@ -175,7 +193,6 @@ export abstract class PersistentStream< BACKOFF_FACTOR, BACKOFF_MAX_DELAY_MS ); - this.state = PersistentStreamState.Initial; } /** @@ -187,14 +204,14 @@ export abstract class PersistentStream< */ isStarted(): boolean { return ( - this.state === PersistentStreamState.Backoff || - this.state === PersistentStreamState.Auth || - this.state === PersistentStreamState.Open + this.state === PersistentStreamState.Starting || + this.state === PersistentStreamState.Open || + this.state === PersistentStreamState.Backoff ); } /** - * Returns true if the underlying RPC is open (the openHandler has been + * Returns true if the underlying RPC is open (the onOpen callback has been * called) and the stream is ready for outbound requests. */ isOpen(): boolean { @@ -206,16 +223,15 @@ export abstract class PersistentStream< * not immediately ready for use: onOpen will be invoked when the RPC is ready * for outbound requests, at which point isOpen will return true. * - * When start returns, isStarted will return true. + * When start returns, isStarted will return true. */ - start(listener: ListenerType): void { + start(): void { if (this.state === PersistentStreamState.Error) { - this.performBackoff(listener); + this.performBackoff(); return; } assert(this.state === PersistentStreamState.Initial, 'Already started'); - this.listener = listener; this.auth(); } @@ -227,7 +243,7 @@ export abstract class PersistentStream< */ stop(): void { if (this.isStarted()) { - this.close(PersistentStreamState.Stopped); + this.close(PersistentStreamState.Initial); } } @@ -299,8 +315,7 @@ export abstract class PersistentStream< * * sets internal stream state to 'finalState'; * * adjusts the backoff timer based on the error * - * A new stream can be opened by calling `start` unless `finalState` is set to - * `PersistentStreamState.Stopped`. + * A new stream can be opened by calling `start`. * * @param finalState the intended state of the stream after closing. * @param error the error the connection was closed with. @@ -309,18 +324,20 @@ export abstract class PersistentStream< finalState: PersistentStreamState, error?: FirestoreError ): Promise { + assert(this.isStarted(), 'Only started streams should be closed.'); assert( finalState === PersistentStreamState.Error || isNullOrUndefined(error), "Can't provide an error when not in an error state." ); - // The stream will be closed so we don't need our idle close timer anymore. + // Cancel any outstanding timers (they're guaranteed not to execute). this.cancelIdleCheck(); - - // Ensure we don't leave a pending backoff operation queued (in case close() - // was called while we were waiting to reconnect). this.backoff.cancel(); + // Invalidates any stream-related callbacks (e.g. from auth or the + // underlying stream), guaranteeing they won't execute. + this.closeCount++; + if (finalState !== PersistentStreamState.Error) { // If this is an intentional close ensure we don't delay our next connection attempt. this.backoff.reset(); @@ -347,16 +364,9 @@ export abstract class PersistentStream< // This state must be assigned before calling onClose() to allow the callback to // inhibit backoff or otherwise manipulate the state in its non-started state. this.state = finalState; - const listener = this.listener!; - // Clear the listener to avoid bleeding of events from the underlying streams. - this.listener = null; - - // If the caller explicitly requested a stream stop, don't notify them of a closing stream (it - // could trigger undesirable recovery logic, etc.). - if (finalState !== PersistentStreamState.Stopped) { - return listener.onClose(error); - } + // Notify the listener that the stream closed. + await this.listener.onClose(error); } /** @@ -386,98 +396,84 @@ export abstract class PersistentStream< 'Must be in initial state to auth' ); - this.state = PersistentStreamState.Auth; + this.state = PersistentStreamState.Starting; + + const dispatchIfNotClosed = this.getCloseGuardedDispatcher(this.closeCount); + + // TODO(mikelehen): Just use dispatchIfNotClosed, but see TODO below. + const closeCount = this.closeCount; this.credentialsProvider.getToken().then( token => { - // Normally we'd have to schedule the callback on the AsyncQueue. - // However, the following calls are safe to be called outside the - // AsyncQueue since they don't chain asynchronous calls - this.startStream(token); + // Stream can be stopped while waiting for authentication. + // TODO(mikelehen): We really should just use dispatchIfNotClosed + // and let this dispatch onto the queue, but that opened a spec test can + // of worms that I don't want to deal with in this PR. + if (this.closeCount === closeCount) { + // Normally we'd have to schedule the callback on the AsyncQueue. + // However, the following calls are safe to be called outside the + // AsyncQueue since they don't chain asynchronous calls + this.startStream(token); + } }, (error: Error) => { - this.queue.enqueue(async () => { - if (this.state !== PersistentStreamState.Stopped) { - // Stream can be stopped while waiting for authorization. - const rpcError = new FirestoreError( - Code.UNKNOWN, - 'Fetching auth token failed: ' + error.message - ); - return this.handleStreamClose(rpcError); - } + dispatchIfNotClosed(() => { + const rpcError = new FirestoreError( + Code.UNKNOWN, + 'Fetching auth token failed: ' + error.message + ); + return this.handleStreamClose(rpcError); }); } ); } private startStream(token: Token | null): void { - if (this.state === PersistentStreamState.Stopped) { - // Stream can be stopped while waiting for authorization. - return; - } - assert( - this.state === PersistentStreamState.Auth, - 'Trying to start stream in a non-auth state' + this.state === PersistentStreamState.Starting, + 'Trying to start stream in a non-starting state' ); - // Helper function to dispatch to AsyncQueue and make sure that any - // close will seem instantaneous and events are prevented from being - // raised after the close call - const dispatchIfStillActive = ( - stream: Stream, - fn: () => Promise - ) => { - this.queue.enqueue(async () => { - // Only raise events if the stream instance has not changed - if (this.stream === stream) { - return fn(); - } - }); - }; - // Only start stream if listener has not changed - if (this.listener !== null) { - const currentStream = this.startRpc(token); - this.stream = currentStream; - this.stream.onOpen(() => { - dispatchIfStillActive(currentStream, () => { - assert( - this.state === PersistentStreamState.Auth, - 'Expected stream to be in state auth, but was ' + this.state - ); - this.state = PersistentStreamState.Open; - return this.listener!.onOpen(); - }); + const dispatchIfNotClosed = this.getCloseGuardedDispatcher(this.closeCount); + + this.stream = this.startRpc(token); + this.stream.onOpen(() => { + dispatchIfNotClosed(() => { + assert( + this.state === PersistentStreamState.Starting, + 'Expected stream to be in state Starting, but was ' + this.state + ); + this.state = PersistentStreamState.Open; + return this.listener!.onOpen(); }); - this.stream.onClose((error: FirestoreError) => { - dispatchIfStillActive(currentStream, () => { - return this.handleStreamClose(error); - }); + }); + this.stream.onClose((error: FirestoreError) => { + dispatchIfNotClosed(() => { + return this.handleStreamClose(error); }); - this.stream.onMessage((msg: ReceiveType) => { - dispatchIfStillActive(currentStream, () => { - return this.onMessage(msg); - }); + }); + this.stream.onMessage((msg: ReceiveType) => { + dispatchIfNotClosed(() => { + return this.onMessage(msg); }); - } + }); } - private performBackoff(listener: ListenerType): void { + private performBackoff(): void { assert( this.state === PersistentStreamState.Error, - 'Should only perform backoff in an error case' + 'Should only perform backoff when in Error state' ); this.state = PersistentStreamState.Backoff; this.backoff.backoffAndRun(async () => { - if (this.state === PersistentStreamState.Stopped) { - // We should have canceled the backoff timer when the stream was - // closed, but just in case we make this a no-op. - return; - } + assert( + this.state === PersistentStreamState.Backoff, + 'Backoff elapsed but state is now: ' + this.state + ); this.state = PersistentStreamState.Initial; - this.start(listener); + this.start(); assert(this.isStarted(), 'PersistentStream should have started'); }); } @@ -495,6 +491,30 @@ export abstract class PersistentStream< // without a backoff accidentally, we set the stream to error in all cases. return this.close(PersistentStreamState.Error, error); } + + /** + * Returns a "dispatcher" function that dispatches operations onto the + * AsyncQueue but only runs them if closeCount remains unchanged. This allows + * us to turn auth / stream callbacks into no-ops if the stream is closed / + * re-opened, etc. + */ + private getCloseGuardedDispatcher( + startCloseCount: number + ): (fn: () => Promise) => void { + return (fn: () => Promise): void => { + this.queue.enqueue(() => { + if (this.closeCount === startCloseCount) { + return fn(); + } else { + log.debug( + LOG_TAG, + 'stream callback skipped by getCloseGuardedDispatcher.' + ); + return Promise.resolve(); + } + }); + }; + } } /** Listener for the PersistentWatchStream */ @@ -525,14 +545,16 @@ export class PersistentListenStream extends PersistentStream< queue: AsyncQueue, connection: Connection, credentials: CredentialsProvider, - private serializer: JsonProtoSerializer + private serializer: JsonProtoSerializer, + listener: WatchStreamListener ) { super( queue, TimerId.ListenStreamConnectionBackoff, TimerId.ListenStreamIdle, connection, - credentials + credentials, + listener ); } @@ -633,14 +655,16 @@ export class PersistentWriteStream extends PersistentStream< queue: AsyncQueue, connection: Connection, credentials: CredentialsProvider, - private serializer: JsonProtoSerializer + private serializer: JsonProtoSerializer, + listener: WriteStreamListener ) { super( queue, TimerId.WriteStreamConnectionBackoff, TimerId.WriteStreamIdle, connection, - credentials + credentials, + listener ); } @@ -663,9 +687,9 @@ export class PersistentWriteStream extends PersistentStream< } // Override of PersistentStream.start - start(listener: WriteStreamListener): void { + start(): void { this.handshakeComplete_ = false; - super.start(listener); + super.start(); } protected tearDown(): void { diff --git a/packages/firestore/src/remote/remote_store.ts b/packages/firestore/src/remote/remote_store.ts index ea24b364eeb..d54189a9ba9 100644 --- a/packages/firestore/src/remote/remote_store.ts +++ b/packages/firestore/src/remote/remote_store.ts @@ -107,8 +107,10 @@ export class RemoteStore implements TargetMetadataProvider { */ private listenTargets: { [targetId: number]: QueryData } = {}; - private watchStream: PersistentListenStream = null; - private writeStream: PersistentWriteStream = null; + private networkEnabled = false; + + private watchStream: PersistentListenStream; + private writeStream: PersistentWriteStream; private watchChangeAggregator: WatchChangeAggregator = null; private onlineStateTracker: OnlineStateTracker; @@ -127,6 +129,20 @@ export class RemoteStore implements TargetMetadataProvider { asyncQueue, onlineStateHandler ); + + // Create streams (but note they're not started yet). + this.watchStream = this.datastore.newPersistentWatchStream({ + onOpen: this.onWatchStreamOpen.bind(this), + onClose: this.onWatchStreamClose.bind(this), + onWatchChange: this.onWatchStreamChange.bind(this) + }); + + this.writeStream = this.datastore.newPersistentWriteStream({ + onOpen: this.onWriteStreamOpen.bind(this), + onClose: this.onWriteStreamClose.bind(this), + onHandshakeComplete: this.onWriteHandshakeComplete.bind(this), + onMutationResult: this.onMutationResult.bind(this) + }); } /** SyncEngine to notify of watch and write events. */ @@ -136,31 +152,15 @@ export class RemoteStore implements TargetMetadataProvider { * Starts up the remote store, creating streams, restoring state from * LocalStore, etc. */ - start(): Promise { - return this.enableNetwork(); - } - - private isNetworkEnabled(): boolean { - assert( - (this.watchStream == null) === (this.writeStream == null), - 'WatchStream and WriteStream should both be null or non-null' - ); - return this.watchStream != null; + async start(): Promise { + await this.enableNetwork(); } /** Re-enables the network. Idempotent. */ - enableNetwork(): Promise { - if (this.isNetworkEnabled()) { - return Promise.resolve(); - } - - // Create new streams (but note they're not started yet). - this.watchStream = this.datastore.newPersistentWatchStream(); - this.writeStream = this.datastore.newPersistentWriteStream(); - - // Load any saved stream token from persistent storage - return this.localStore.getLastStreamToken().then(token => { - this.writeStream.lastStreamToken = token; + async enableNetwork(): Promise { + if (!this.networkEnabled) { + this.networkEnabled = true; + this.writeStream.lastStreamToken = await this.localStore.getLastStreamToken(); if (this.shouldStartWatchStream()) { this.startWatchStream(); @@ -168,8 +168,9 @@ export class RemoteStore implements TargetMetadataProvider { this.onlineStateTracker.set(OnlineState.Unknown); } - return this.fillWritePipeline(); // This may start the writeStream. - }); + // This will start the write stream if necessary. + await this.fillWritePipeline(); + } } /** @@ -177,41 +178,37 @@ export class RemoteStore implements TargetMetadataProvider { * enableNetwork(). */ async disableNetwork(): Promise { - this.disableNetworkInternal(); + await this.disableNetworkInternal(); + // Set the OnlineState to Offline so get()s return from cache, etc. this.onlineStateTracker.set(OnlineState.Offline); } - /** - * Disables the network, if it is currently enabled. - */ - private disableNetworkInternal(): void { - if (this.isNetworkEnabled()) { - // NOTE: We're guaranteed not to get any further events from these streams (not even a close - // event). - this.watchStream.stop(); - this.writeStream.stop(); + private async disableNetworkInternal(): Promise { + if (this.networkEnabled) { + this.networkEnabled = false; - this.cleanUpWatchStreamState(); + this.writeStream.stop(); + this.watchStream.stop(); - log.debug( - LOG_TAG, - 'Stopping write stream with ' + - this.writePipeline.length + - ' pending writes' - ); - // TODO(mikelehen): We only actually need to clear the write pipeline if - // this is being called as part of handleUserChange(). Consider reworking. - this.writePipeline = []; + if (this.writePipeline.length > 0) { + log.debug( + LOG_TAG, + `Stopping write stream with ${ + this.writePipeline.length + } pending writes` + ); + this.writePipeline = []; + } - this.writeStream = null; - this.watchStream = null; + this.cleanUpWatchStreamState(); } } shutdown(): Promise { log.debug(LOG_TAG, 'RemoteStore shutting down.'); this.disableNetworkInternal(); + // Set the OnlineState to Unknown (rather than Offline) to avoid potentially // triggering spurious listener events with cached data, etc. this.onlineStateTracker.set(OnlineState.Unknown); @@ -230,7 +227,7 @@ export class RemoteStore implements TargetMetadataProvider { if (this.shouldStartWatchStream()) { // The listen will be sent in onWatchStreamOpen this.startWatchStream(); - } else if (this.isNetworkEnabled() && this.watchStream.isOpen()) { + } else if (this.watchStream.isOpen()) { this.sendWatchRequest(queryData); } } @@ -242,7 +239,7 @@ export class RemoteStore implements TargetMetadataProvider { 'unlisten called without assigned target ID!' ); delete this.listenTargets[targetId]; - if (this.isNetworkEnabled() && this.watchStream.isOpen()) { + if (this.watchStream.isOpen()) { this.sendUnwatchRequest(targetId); if (objUtils.isEmpty(this.listenTargets)) { this.watchStream.markIdle(); @@ -282,15 +279,11 @@ export class RemoteStore implements TargetMetadataProvider { private startWatchStream(): void { assert( this.shouldStartWatchStream(), - 'startWriteStream() called when shouldStartWatchStream() is false.' + 'startWatchStream() called when shouldStartWatchStream() is false.' ); this.watchChangeAggregator = new WatchChangeAggregator(this); - this.watchStream.start({ - onOpen: this.onWatchStreamOpen.bind(this), - onClose: this.onWatchStreamClose.bind(this), - onWatchChange: this.onWatchStreamChange.bind(this) - }); + this.watchStream.start(); this.onlineStateTracker.handleWatchStreamStart(); } @@ -300,39 +293,43 @@ export class RemoteStore implements TargetMetadataProvider { */ private shouldStartWatchStream(): boolean { return ( - this.isNetworkEnabled() && + this.canUseNetwork() && !this.watchStream.isStarted() && !objUtils.isEmpty(this.listenTargets) ); } + private canUseNetwork(): boolean { + // TODO(mikelehen): This could take into account isPrimary when we merge + // with multitab. + return this.networkEnabled; + } + private cleanUpWatchStreamState(): void { this.watchChangeAggregator = null; } private async onWatchStreamOpen(): Promise { - // TODO(b/35852690): close the stream again (with some timeout?) if no watch - // targets are active objUtils.forEachNumber(this.listenTargets, (targetId, queryData) => { this.sendWatchRequest(queryData); }); } private async onWatchStreamClose(error?: FirestoreError): Promise { - assert( - this.isNetworkEnabled(), - 'onWatchStreamClose() should only be called when the network is enabled' - ); + if (error === undefined) { + // Graceful stop (due to stop() or idle timeout). Make sure that's + // desirable. + assert( + !this.shouldStartWatchStream(), + 'Watch stream was stopped gracefully while still needed.' + ); + } this.cleanUpWatchStreamState(); // If we still need the watch stream, retry the connection. if (this.shouldStartWatchStream()) { - // There should generally be an error if the watch stream was closed when - // it's still needed, but it's not quite worth asserting. - if (error) { - this.onlineStateTracker.handleWatchStreamFailure(error); - } + this.onlineStateTracker.handleWatchStreamFailure(error); this.startWatchStream(); } else { @@ -480,28 +477,32 @@ export class RemoteStore implements TargetMetadataProvider { this.writePipeline.length > 0 ? this.writePipeline[this.writePipeline.length - 1].batchId : BATCHID_UNKNOWN; - return this.localStore - .nextMutationBatch(lastBatchIdRetrieved) - .then(batch => { - if (batch === null) { - if (this.writePipeline.length === 0) { - this.writeStream.markIdle(); - } - } else { - this.addToWritePipeline(batch); - return this.fillWritePipeline(); - } - }); + const batch = await this.localStore.nextMutationBatch( + lastBatchIdRetrieved + ); + + if (batch === null) { + if (this.writePipeline.length === 0) { + this.writeStream.markIdle(); + } + } else { + this.addToWritePipeline(batch); + await this.fillWritePipeline(); + } + } + + if (this.shouldStartWriteStream()) { + this.startWriteStream(); } } /** - * Returns true if we can add to the write pipeline (i.e. it is not full and - * the network is enabled). + * Returns true if we can add to the write pipeline (i.e. the network is + * enabled and the write pipeline is not full). */ private canAddToWritePipeline(): boolean { return ( - this.isNetworkEnabled() && this.writePipeline.length < MAX_PENDING_WRITES + this.networkEnabled && this.writePipeline.length < MAX_PENDING_WRITES ); } @@ -512,8 +513,7 @@ export class RemoteStore implements TargetMetadataProvider { /** * Queues additional writes to be sent to the write stream, sending them - * immediately if the write stream is established, else starting the write - * stream if it is not yet started. + * immediately if the write stream is established. */ private addToWritePipeline(batch: MutationBatch): void { assert( @@ -522,16 +522,14 @@ export class RemoteStore implements TargetMetadataProvider { ); this.writePipeline.push(batch); - if (this.shouldStartWriteStream()) { - this.startWriteStream(); - } else if (this.isNetworkEnabled() && this.writeStream.handshakeComplete) { + if (this.writeStream.isOpen() && this.writeStream.handshakeComplete) { this.writeStream.writeMutations(batch.mutations); } } private shouldStartWriteStream(): boolean { return ( - this.isNetworkEnabled() && + this.canUseNetwork() && !this.writeStream.isStarted() && this.writePipeline.length > 0 ); @@ -542,12 +540,7 @@ export class RemoteStore implements TargetMetadataProvider { this.shouldStartWriteStream(), 'startWriteStream() called when shouldStartWriteStream() is false.' ); - this.writeStream.start({ - onOpen: this.onWriteStreamOpen.bind(this), - onClose: this.onWriteStreamClose.bind(this), - onHandshakeComplete: this.onWriteHandshakeComplete.bind(this), - onMutationResult: this.onMutationResult.bind(this) - }); + this.writeStream.start(); } private async onWriteStreamOpen(): Promise { @@ -591,10 +584,14 @@ export class RemoteStore implements TargetMetadataProvider { } private async onWriteStreamClose(error?: FirestoreError): Promise { - assert( - this.isNetworkEnabled(), - 'onWriteStreamClose() should only be called when the network is enabled' - ); + if (error === undefined) { + // Graceful stop (due to stop() or idle timeout). Make sure that's + // desirable. + assert( + !this.shouldStartWriteStream(), + 'Write stream was stopped gracefully while still needed.' + ); + } // If the write stream closed due to an error, invoke the error callbacks if // there are pending writes. @@ -667,18 +664,16 @@ export class RemoteStore implements TargetMetadataProvider { return new Transaction(this.datastore); } - handleUserChange(user: User): Promise { + async handleUserChange(user: User): Promise { log.debug(LOG_TAG, 'RemoteStore changing users: uid=', user.uid); - // If the network has been explicitly disabled, make sure we don't - // accidentally re-enable it. - if (this.isNetworkEnabled()) { + if (this.networkEnabled) { // Tear down and re-create our network streams. This will ensure we get a fresh auth token // for the new user and re-fill the write pipeline with new mutations from the LocalStore // (since mutations are per-user). this.disableNetworkInternal(); this.onlineStateTracker.set(OnlineState.Unknown); - return this.enableNetwork(); + await this.enableNetwork(); } } } diff --git a/packages/firestore/test/integration/remote/stream.test.ts b/packages/firestore/test/integration/remote/stream.test.ts index 24997b816ce..94d2c747dcd 100644 --- a/packages/firestore/test/integration/remote/stream.test.ts +++ b/packages/firestore/test/integration/remote/stream.test.ts @@ -134,20 +134,20 @@ describe('Watch Stream', () => { }); /** - * Verifies that the watch stream does not issue an onClose callback after a + * Verifies that the watch stream issues an onClose callback after a * call to stop(). */ it('can be stopped before handshake', () => { let watchStream: PersistentListenStream; return withTestDatastore(ds => { - watchStream = ds.newPersistentWatchStream(); - watchStream.start(streamListener); + watchStream = ds.newPersistentWatchStream(streamListener); + watchStream.start(); return streamListener.awaitCallback('open').then(() => { - // Stop must not call onClose because the full implementation of the callback could - // attempt to restart the stream in the event it had pending watches. watchStream.stop(); + + return streamListener.awaitCallback('close'); }); }); }); @@ -183,22 +183,20 @@ describe('Write Stream', () => { }); /** - * Verifies that the write stream does not issue an onClose callback after a - * call to stop(). + * Verifies that the write stream issues an onClose callback after a call to + * stop(). */ it('can be stopped before handshake', () => { let writeStream: PersistentWriteStream; return withTestDatastore(ds => { - writeStream = ds.newPersistentWriteStream(); - writeStream.start(streamListener); + writeStream = ds.newPersistentWriteStream(streamListener); + writeStream.start(); return streamListener.awaitCallback('open'); }).then(() => { - // Don't start the handshake. - - // Stop must not call onClose because the full implementation of the callback could - // attempt to restart the stream in the event it had pending writes. writeStream.stop(); + + return streamListener.awaitCallback('close'); }); }); @@ -206,8 +204,8 @@ describe('Write Stream', () => { let writeStream: PersistentWriteStream; return withTestDatastore(ds => { - writeStream = ds.newPersistentWriteStream(); - writeStream.start(streamListener); + writeStream = ds.newPersistentWriteStream(streamListener); + writeStream.start(); return streamListener.awaitCallback('open'); }) .then(() => { @@ -225,6 +223,8 @@ describe('Write Stream', () => { }) .then(() => { writeStream.stop(); + + return streamListener.awaitCallback('close'); }); }); @@ -232,8 +232,8 @@ describe('Write Stream', () => { const queue = new AsyncQueue(); return withTestDatastore(ds => { - const writeStream = ds.newPersistentWriteStream(); - writeStream.start(streamListener); + const writeStream = ds.newPersistentWriteStream(streamListener); + writeStream.start(); return streamListener .awaitCallback('open') .then(() => { @@ -259,8 +259,8 @@ describe('Write Stream', () => { const queue = new AsyncQueue(); return withTestDatastore(ds => { - const writeStream = ds.newPersistentWriteStream(); - writeStream.start(streamListener); + const writeStream = ds.newPersistentWriteStream(streamListener); + writeStream.start(); return streamListener .awaitCallback('open') .then(() => { @@ -288,8 +288,8 @@ describe('Write Stream', () => { return withTestDatastore( ds => { - const writeStream = ds.newPersistentWriteStream(); - writeStream.start(streamListener); + const writeStream = ds.newPersistentWriteStream(streamListener); + writeStream.start(); return streamListener .awaitCallback('open') .then(() => { @@ -301,7 +301,7 @@ describe('Write Stream', () => { return streamListener.awaitCallback('close'); }) .then(() => { - writeStream.start(streamListener); + writeStream.start(); return streamListener.awaitCallback('open'); }) .then(() => { @@ -312,7 +312,7 @@ describe('Write Stream', () => { return streamListener.awaitCallback('close'); }) .then(() => { - writeStream.start(streamListener); + writeStream.start(); return streamListener.awaitCallback('open'); }) .then(() => { diff --git a/packages/firestore/test/unit/specs/spec_test_runner.ts b/packages/firestore/test/unit/specs/spec_test_runner.ts index fcfc6daad8a..6865c2339e0 100644 --- a/packages/firestore/test/unit/specs/spec_test_runner.ts +++ b/packages/firestore/test/unit/specs/spec_test_runner.ts @@ -424,9 +424,11 @@ abstract class TestRunner { protected abstract destroyPersistence(): Promise; async shutdown(): Promise { - await this.remoteStore.shutdown(); - await this.persistence.shutdown(/* deleteData= */ true); - await this.destroyPersistence(); + await this.queue.enqueue(async () => { + await this.remoteStore.shutdown(); + await this.persistence.shutdown(/* deleteData= */ true); + await this.destroyPersistence(); + }); } run(steps: SpecStep[]): Promise {