Skip to content

Improve usage and testing of delayed operations. #499

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions packages/firestore/src/core/firestore_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,14 +143,14 @@ export class FirestoreClient {
.then(() => this.initializeRest(user))
.then(initializationDone.resolve, initializationDone.reject);
} else {
this.asyncQueue.schedule(() => {
this.asyncQueue.enqueue(() => {
return this.handleUserChange(user);
});
}
});

// Block the async queue until initialization is done
this.asyncQueue.schedule(() => {
this.asyncQueue.enqueue(() => {
return initializationDone.promise;
});

Expand All @@ -162,7 +162,7 @@ export class FirestoreClient {

/** Enables the network connection and requeues all pending operations. */
enableNetwork(): Promise<void> {
return this.asyncQueue.schedule(() => {
return this.asyncQueue.enqueue(() => {
return this.remoteStore.enableNetwork();
});
}
Expand Down Expand Up @@ -321,14 +321,14 @@ export class FirestoreClient {

/** Disables the network connection. Pending operations will not complete. */
disableNetwork(): Promise<void> {
return this.asyncQueue.schedule(() => {
return this.asyncQueue.enqueue(() => {
return this.remoteStore.disableNetwork();
});
}

shutdown(): Promise<void> {
return this.asyncQueue
.schedule(() => {
.enqueue(() => {
this.credentials.removeUserChangeListener();
return this.remoteStore.shutdown();
})
Expand All @@ -344,21 +344,21 @@ export class FirestoreClient {
options: ListenOptions
): QueryListener {
const listener = new QueryListener(query, observer, options);
this.asyncQueue.schedule(() => {
this.asyncQueue.enqueue(() => {
return this.eventMgr.listen(listener);
});
return listener;
}

unlisten(listener: QueryListener): void {
this.asyncQueue.schedule(() => {
this.asyncQueue.enqueue(() => {
return this.eventMgr.unlisten(listener);
});
}

write(mutations: Mutation[]): Promise<void> {
const deferred = new Deferred<void>();
this.asyncQueue.schedule(() => this.syncEngine.write(mutations, deferred));
this.asyncQueue.enqueue(() => this.syncEngine.write(mutations, deferred));
return deferred.promise;
}

Expand All @@ -371,7 +371,7 @@ export class FirestoreClient {
): Promise<T> {
// We have to wait for the async queue to be sure syncEngine is initialized.
return this.asyncQueue
.schedule(() => {
.enqueue(() => {
return Promise.resolve();
})
.then(() => {
Expand Down
32 changes: 22 additions & 10 deletions packages/firestore/src/remote/backoff.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
*/

import * as log from '../util/log';
import { Deferred } from '../util/promise';

import { CancelablePromise } from '../util/promise';
import { AsyncQueue, TimerId } from '../util/async_queue';
const LOG_TAG = 'ExponentialBackoff';

/**
Expand All @@ -30,8 +30,17 @@ const LOG_TAG = 'ExponentialBackoff';
*/
export class ExponentialBackoff {
private currentBaseMs: number;
private timerPromise: CancelablePromise<void> | null = null;

constructor(
/**
* The AsyncQueue to run backoff operations on.
*/
private readonly queue: AsyncQueue,
/**
* The ID to use when scheduling backoff operations on the AsyncQueue.
*/
private readonly timerId: TimerId,
/**
* The initial delay (used as the base delay on the first retry attempt).
* Note that jitter will still be applied, so the actual delay could be as
Expand Down Expand Up @@ -74,10 +83,13 @@ export class ExponentialBackoff {

/**
* Returns a promise that resolves after currentDelayMs, and increases the
* delay for any subsequent attempts.
* delay for any subsequent attempts. If there was a pending backoff operation
* already, it will be canceled.
*/
backoffAndWait(): Promise<void> {
const def = new Deferred<void>();
backoffAndRun(op: () => Promise<void>): void {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

backoffAndRun lets you schedule any type of operation here, but the ID already got assigned in the constructor. Should we move the assignment of the ID to here as well?

Copy link
Contributor Author

@mikelehen mikelehen Feb 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eh... The intention is that an ExponentialBackoff object is used for one purpose (restarting the listen stream for instance). It wouldn't make sense to do a different kind of operation each time backoff elapses. So I think it'd be more inline with the intent of ExponentialBackoff to move op to the constructor, but I think that makes usage of the class more awkward. So I am inclined to keep it as-is.

if (this.timerPromise !== null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the old API, we had a way to tell the caller that its operation got cancelled. Right now, if I follow this correctly, we will just silently drop the previous invocation/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not 100% sure what you're referring to. Prior to my change we never canceled backoff timers. This actually means that you could end up with multiple outstanding backoff timers. While this doesn't /break/ anything, I'm not sure it was intentional and it means we might reconnect earlier than we should.

I did change the return type of this method to void instead of Promise, but that's just because we weren't using it and it matches iOS / Android now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep it as is for now, but I do think that it would be useful to be able to figure that when operations get cancelled. If we don't need this now, we can deal with across all platforms we end up needing it.

this.timerPromise.cancel();
}

// First schedule using the current base (which may be 0 and should be
// honored as such).
Expand All @@ -89,9 +101,11 @@ export class ExponentialBackoff {
`(base delay: ${this.currentBaseMs} ms)`
);
}
setTimeout(() => {
def.resolve();
}, delayWithJitterMs);
this.timerPromise = this.queue.enqueueAfterDelay(
this.timerId,
delayWithJitterMs,
op
);

// Apply backoff factor to determine next delay and ensure it is within
// bounds.
Expand All @@ -102,8 +116,6 @@ export class ExponentialBackoff {
if (this.currentBaseMs > this.maxDelayMs) {
this.currentBaseMs = this.maxDelayMs;
}

return def.promise;
}

/** Returns a random value in the range [-currentBaseMs/2, currentBaseMs/2] */
Expand Down
9 changes: 3 additions & 6 deletions packages/firestore/src/remote/datastore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,15 @@ export class Datastore {
private queue: AsyncQueue,
private connection: Connection,
private credentials: CredentialsProvider,
private serializer: JsonProtoSerializer,
private initialBackoffDelay?: number
private serializer: JsonProtoSerializer
) {}

newPersistentWriteStream(): PersistentWriteStream {
return new PersistentWriteStream(
this.queue,
this.connection,
this.credentials,
this.serializer,
this.initialBackoffDelay
this.serializer
);
}

Expand All @@ -69,8 +67,7 @@ export class Datastore {
this.queue,
this.connection,
this.credentials,
this.serializer,
this.initialBackoffDelay
this.serializer
);
}

Expand Down
67 changes: 38 additions & 29 deletions packages/firestore/src/remote/persistent_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import { ProtoByteString, TargetId } from '../core/types';
import { QueryData } from '../local/query_data';
import { Mutation, MutationResult } from '../model/mutation';
import { assert } from '../util/assert';
import { AsyncQueue } from '../util/async_queue';
import { AsyncQueue, TimerId } from '../util/async_queue';
import { Code, FirestoreError } from '../util/error';
import * as log from '../util/log';

Expand Down Expand Up @@ -163,13 +163,15 @@ export abstract class PersistentStream<

constructor(
private queue: AsyncQueue,
connectionTimerId: TimerId,
private idleTimerId: TimerId,
protected connection: Connection,
private credentialsProvider: CredentialsProvider,
// Used for faster retries in testing
initialBackoffDelay?: number
private credentialsProvider: CredentialsProvider
) {
this.backoff = new ExponentialBackoff(
initialBackoffDelay ? initialBackoffDelay : BACKOFF_INITIAL_DELAY_MS,
queue,
connectionTimerId,
BACKOFF_INITIAL_DELAY_MS,
BACKOFF_FACTOR,
BACKOFF_MAX_DELAY_MS
);
Expand Down Expand Up @@ -258,9 +260,10 @@ export abstract class PersistentStream<
// Starts the idle time if we are in state 'Open' and are not yet already
// running a timer (in which case the previous idle timeout still applies).
if (this.isOpen() && this.inactivityTimerPromise === null) {
this.inactivityTimerPromise = this.queue.scheduleWithDelay(
() => this.handleIdleCloseTimer(),
IDLE_TIMEOUT_MS
this.inactivityTimerPromise = this.queue.enqueueAfterDelay(
this.idleTimerId,
IDLE_TIMEOUT_MS,
() => this.handleIdleCloseTimer()
);

this.inactivityTimerPromise.catch((err: FirestoreError) => {
Expand Down Expand Up @@ -400,7 +403,7 @@ export abstract class PersistentStream<
this.startStream(token);
},
(error: Error) => {
this.queue.schedule(() => {
this.queue.enqueue(() => {
if (this.state !== PersistentStreamState.Stopped) {
// Stream can be stopped while waiting for authorization.
const rpcError = new FirestoreError(
Expand Down Expand Up @@ -433,7 +436,7 @@ export abstract class PersistentStream<
stream: Stream<SendType, ReceiveType>,
fn: () => Promise<void>
) => {
this.queue.schedule(() => {
this.queue.enqueue(() => {
// Only raise events if the stream instance has not changed
if (this.stream === stream) {
return fn();
Expand Down Expand Up @@ -477,20 +480,16 @@ export abstract class PersistentStream<
);
this.state = PersistentStreamState.Backoff;

this.backoff.backoffAndWait().then(() => {
// Backoff does not run on the AsyncQueue, so we need to reschedule to
// make sure the queue blocks
this.queue.schedule(() => {
if (this.state === PersistentStreamState.Stopped) {
// Stream can be stopped while waiting for backoff to complete.
return Promise.resolve();
}

this.state = PersistentStreamState.Initial;
this.start(listener);
assert(this.isStarted(), 'PersistentStream should have started');
this.backoff.backoffAndRun(() => {
if (this.state === PersistentStreamState.Stopped) {
// Stream can be stopped while waiting for backoff to complete.
return Promise.resolve();
});
}

this.state = PersistentStreamState.Initial;
this.start(listener);
assert(this.isStarted(), 'PersistentStream should have started');
return Promise.resolve();
});
}

Expand Down Expand Up @@ -536,10 +535,15 @@ export class PersistentListenStream extends PersistentStream<
queue: AsyncQueue,
connection: Connection,
credentials: CredentialsProvider,
private serializer: JsonProtoSerializer,
initialBackoffDelay?: number
private serializer: JsonProtoSerializer
) {
super(queue, connection, credentials, initialBackoffDelay);
super(
queue,
TimerId.ListenStreamConnection,
TimerId.ListenStreamIdle,
connection,
credentials
);
}

protected startRpc(
Expand Down Expand Up @@ -639,10 +643,15 @@ export class PersistentWriteStream extends PersistentStream<
queue: AsyncQueue,
connection: Connection,
credentials: CredentialsProvider,
private serializer: JsonProtoSerializer,
initialBackoffDelay?: number
private serializer: JsonProtoSerializer
) {
super(queue, connection, credentials, initialBackoffDelay);
super(
queue,
TimerId.WriteStreamConnection,
TimerId.WriteStreamIdle,
connection,
credentials
);
}

/**
Expand Down
Loading