Skip to content

Merging Persistent Stream refactor #1069

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Aug 1, 2018
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions packages/firestore/src/remote/datastore.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { WatchStreamListener, WriteStreamListener } from './persistent_stream';
/**
* Copyright 2017 Google Inc.
*
Expand All @@ -24,7 +23,7 @@ import { Mutation, MutationResult } from '../model/mutation';
import { assert } from '../util/assert';
import { Code, FirestoreError } from '../util/error';
import { AsyncQueue } from '../util/async_queue';

import { WatchStreamListener, WriteStreamListener } from './persistent_stream';
import { Connection } from './connection';
import {
PersistentListenStream,
Expand Down
112 changes: 30 additions & 82 deletions packages/firestore/src/remote/remote_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,6 @@ const MAX_PENDING_WRITES = 10;
* - pulling pending mutations from LocalStore and sending them to Datastore.
* - retrying mutations that failed because of network problems.
* - acking mutations to the SyncEngine once they are accepted or rejected.
*
* RemoteStore always starts out offline. A call to `enableNetwork()`
* initializes the network connection.
*/
export class RemoteStore implements TargetMetadataProvider {
/**
Expand Down Expand Up @@ -111,19 +108,16 @@ export class RemoteStore implements TargetMetadataProvider {
*/
private listenTargets: { [targetId: number]: QueryData } = {};

private networkEnabled = false;

private watchStream: PersistentListenStream;
private writeStream: PersistentWriteStream;
private watchChangeAggregator: WatchChangeAggregator = null;

/**
* Set to true by enableNetwork() and false by disableNetwork() and indicates
* the user-preferred network state. A network connection is only established
* if `networkAllowed` is true, the client is primary and there are
* outstanding mutations or active listens.
* the user-preferred network state.
*/
private networkAllowed = true;
private networkEnabled = false;

private isPrimary = false;

private onlineStateTracker: OnlineStateTracker;
Expand Down Expand Up @@ -165,60 +159,25 @@ export class RemoteStore implements TargetMetadataProvider {
* Starts up the remote store, creating streams, restoring state from
* LocalStore, etc.
*/
<<<<<<< HEAD
start(): Promise<void> {
// Start is a no-op for RemoteStore.
return Promise.resolve();
}

private isNetworkEnabled(): boolean {
assert(
(this.watchStream == null) === (this.writeStream == null),
'WatchStream and WriteStream should both be null or non-null'
);
return this.watchStream != null;
=======
async start(): Promise<void> {
await this.enableNetwork();
>>>>>>> master
return this.enableNetwork();
}

/** Re-enables the network. Idempotent. */
async enableNetwork(): Promise<void> {
<<<<<<< HEAD
this.networkAllowed = true;
this.networkEnabled = true;

if (this.isPrimary) {
if (this.isNetworkEnabled()) {
return;
}

// Create new streams (but note they're not started yet).
this.watchStream = this.datastore.newPersistentWatchStream();
this.writeStream = this.datastore.newPersistentWriteStream();
=======
if (!this.networkEnabled) {
this.networkEnabled = true;
if (this.canUseNetwork()) {
this.writeStream.lastStreamToken = await this.localStore.getLastStreamToken();
>>>>>>> master

// Load any saved stream token from persistent storage
return this.localStore.getLastStreamToken().then(token => {
this.writeStream.lastStreamToken = token;

<<<<<<< HEAD
if (this.shouldStartWatchStream()) {
this.startWatchStream();
} else {
this.onlineStateTracker.set(OnlineState.Unknown);
}
if (this.shouldStartWatchStream()) {
this.startWatchStream();
} else {
this.onlineStateTracker.set(OnlineState.Unknown);
}

return this.fillWritePipeline(); // This may start the writeStream.
});
=======
// This will start the write stream if necessary.
await this.fillWritePipeline();
>>>>>>> master
}
}

Expand All @@ -227,41 +186,31 @@ export class RemoteStore implements TargetMetadataProvider {
* enableNetwork().
*/
async disableNetwork(): Promise<void> {
<<<<<<< HEAD
this.networkAllowed = false;

this.networkEnabled = false;
this.disableNetworkInternal();
=======
await this.disableNetworkInternal();

>>>>>>> master
// Set the OnlineState to Offline so get()s return from cache, etc.
this.onlineStateTracker.set(OnlineState.Offline);
}

private async disableNetworkInternal(): Promise<void> {
if (this.networkEnabled) {
this.networkEnabled = false;

this.writeStream.stop();
this.watchStream.stop();

if (this.writePipeline.length > 0) {
log.debug(
LOG_TAG,
`Stopping write stream with ${
this.writePipeline.length
} pending writes`
);
this.writePipeline = [];
}
private disableNetworkInternal(): void {
this.writeStream.stop();
this.watchStream.stop();

this.cleanUpWatchStreamState();
if (this.writePipeline.length > 0) {
log.debug(
LOG_TAG,
`Stopping write stream with ${this.writePipeline.length} pending writes`
);
this.writePipeline = [];
}

this.cleanUpWatchStreamState();
}

shutdown(): Promise<void> {
log.debug(LOG_TAG, 'RemoteStore shutting down.');
this.networkEnabled = false;
this.disableNetworkInternal();

// Set the OnlineState to Unknown (rather than Offline) to avoid potentially
Expand Down Expand Up @@ -355,9 +304,7 @@ export class RemoteStore implements TargetMetadataProvider {
}

private canUseNetwork(): boolean {
// TODO(mikelehen): This could take into account isPrimary when we merge
// with multitab.
return this.networkEnabled;
return this.isPrimary && this.networkEnabled;
}

private cleanUpWatchStreamState(): void {
Expand Down Expand Up @@ -555,7 +502,7 @@ export class RemoteStore implements TargetMetadataProvider {
*/
private canAddToWritePipeline(): boolean {
return (
this.networkEnabled && this.writePipeline.length < MAX_PENDING_WRITES
this.canUseNetwork() && this.writePipeline.length < MAX_PENDING_WRITES
);
}

Expand Down Expand Up @@ -739,10 +686,11 @@ export class RemoteStore implements TargetMetadataProvider {
async handleUserChange(user: User): Promise<void> {
log.debug(LOG_TAG, 'RemoteStore changing users: uid=', user.uid);

if (this.networkEnabled) {
if (this.canUseNetwork()) {
// Tear down and re-create our network streams. This will ensure we get a fresh auth token
// for the new user and re-fill the write pipeline with new mutations from the LocalStore
// (since mutations are per-user).
this.networkEnabled = false;
this.disableNetworkInternal();
this.onlineStateTracker.set(OnlineState.Unknown);
await this.enableNetwork();
Expand All @@ -755,9 +703,9 @@ export class RemoteStore implements TargetMetadataProvider {
async applyPrimaryState(isPrimary: boolean): Promise<void> {
this.isPrimary = isPrimary;

if (isPrimary && this.networkAllowed) {
if (isPrimary && this.networkEnabled) {
await this.enableNetwork();
} else if (!isPrimary && this.isNetworkEnabled()) {
} else if (!isPrimary) {
this.disableNetworkInternal();
this.onlineStateTracker.set(OnlineState.Unknown);
}
Expand Down
12 changes: 3 additions & 9 deletions packages/firestore/test/unit/specs/spec_test_runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -494,17 +494,11 @@ abstract class TestRunner {
}

async shutdown(): Promise<void> {
<<<<<<< HEAD
if (this.started) {
await this.doShutdown();
}
=======
await this.queue.enqueue(async () => {
await this.remoteStore.shutdown();
await this.persistence.shutdown(/* deleteData= */ true);
await this.destroyPersistence();
if (this.started) {
await this.doShutdown();
}
});
>>>>>>> master
}

/** Runs a single SpecStep on this runner. */
Expand Down
26 changes: 26 additions & 0 deletions packages/firestore/test/unit/specs/write_spec.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1245,4 +1245,30 @@ describeSpec('Writes:', [], () => {
);
}
);

specTest(
'Mutation are not sent twice after primary failover',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, did you really write a new test to test the merge of this? Or where did this come from?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I wanted to verify that we clear the write pipeline during failover.

['multi-client'],
() => {
const query = Query.atPath(path('collection'));
const docA = doc('collection/a', 0, { k: 'a' });
const docB = doc('collection/b', 0, { k: 'b' });

return client(0)
.expectPrimaryState(true)
.userSets('collection/a', { k: 'a' })
.userSets('collection/b', { k: 'b' })
.client(1)
.stealPrimaryLease()
.writeAcks('collection/a', 1000, { expectUserCallback: false })
.client(0)
.expectUserCallbacks({
acknowledged: ['collection/a']
})
.stealPrimaryLease()
.writeAcks('collection/b', 2000)
.userListens(query)
.expectEvents(query, { added: [docA, docB], fromCache: true });
}
);
});