Skip to content

Commit 7b8fb9a

Browse files
author
Michael Lehenbauer
committed
Refactor AsyncQueue's "delayed scheduling" support and add cancellation.
* Introduces a DelayedOperation helper class in AsyncQueue to encapsulate delayed op logic. * Adds cancellation support which I want to use in #412 * Remove delayedOperationsCount in favor of keeping delayedOperations populated correctly at all times. * Fixes a preexisting issue in AsyncQueue.schedule() where the returned promise would always be resolved with undefined, instead of the result of your op. Also remove an AnyDuringMigration usage.
1 parent 49dcb25 commit 7b8fb9a

File tree

2 files changed

+129
-74
lines changed

2 files changed

+129
-74
lines changed

packages/firestore/src/remote/persistent_stream.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -251,10 +251,10 @@ export abstract class PersistentStream<
251251
markIdle(): void {
252252
this.idle = true;
253253
this.queue
254-
.schedule(() => {
254+
.scheduleWithDelay(() => {
255255
return this.handleIdleCloseTimer();
256256
}, IDLE_TIMEOUT_MS)
257-
.catch((err: FirestoreError) => {
257+
.promise.catch((err: FirestoreError) => {
258258
// When the AsyncQueue gets drained during testing, pending Promises
259259
// (including these idle checks) will get rejected. We special-case
260260
// these cancelled idle checks to make sure that these specific Promise

packages/firestore/src/util/async_queue.ts

Lines changed: 127 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -20,30 +20,103 @@ import { AnyDuringMigration, AnyJs } from './misc';
2020
import { Deferred } from './promise';
2121
import { Code, FirestoreError } from './error';
2222

23-
type DelayedOperation<T> = {
23+
/** External Result of scheduling a delayed operation. */
24+
export interface DelayedOperationResult<T> {
25+
/** A promise that will resolve once the operation has been run. */
26+
promise: Promise<T>;
27+
28+
/**
29+
* Prevents the operation from running and rejects the promise with a
30+
* Code.CANCELLED error.
31+
*/
32+
cancel(): void;
33+
}
34+
35+
/**
36+
* Represents an operation scheduled to be run in the future.
37+
*
38+
* Created via DelayedOperation.createAndSchedule().
39+
* Supports cancellation (via cancel()) and early execution (via scheduleNow()).
40+
*/
41+
class DelayedOperation<T> implements DelayedOperationResult<T> {
2442
// tslint:disable-next-line:no-any Accept any return type from setTimeout().
25-
handle: any;
26-
op: () => Promise<T>;
27-
deferred: Deferred<T>;
28-
};
43+
private timerHandle: any;
44+
private readonly deferred = new Deferred<T>();
45+
/** true if the operation has not been executed or cancelled yet. */
46+
private pending = true;
47+
48+
static createAndSchedule<T>(
49+
asyncQueue: AsyncQueue,
50+
op: () => Promise<T>,
51+
delayMs: number
52+
) {
53+
const delayedOp = new DelayedOperation(op);
54+
delayedOp.timerHandle = setTimeout(
55+
() => delayedOp.scheduleNow(asyncQueue),
56+
delayMs
57+
);
58+
return delayedOp;
59+
}
60+
61+
private constructor(private op: () => Promise<T>) {}
62+
63+
get promise(): Promise<T> {
64+
return this.deferred.promise;
65+
}
66+
67+
/**
68+
* Schedules the operation to run on the provided AsyncQueue if it has not
69+
* already been run or cancelled.
70+
*/
71+
scheduleNow(asyncQueue: AsyncQueue): void {
72+
this.clearTimeout();
73+
asyncQueue.schedule(this.runIfNecessary.bind(this));
74+
}
75+
76+
cancel(reason?: string): void {
77+
if (this.pending) {
78+
this.pending = false;
79+
this.clearTimeout();
80+
this.deferred.reject(
81+
new FirestoreError(
82+
Code.CANCELLED,
83+
'Operation cancelled' + (reason ? ': ' + reason : '')
84+
)
85+
);
86+
}
87+
}
88+
89+
private runIfNecessary(): Promise<void> {
90+
if (this.pending) {
91+
this.pending = false;
92+
return this.op().then(result => {
93+
return this.deferred.resolve(result);
94+
});
95+
}
96+
}
97+
98+
private clearTimeout() {
99+
if (this.timerHandle) {
100+
clearTimeout(this.timerHandle);
101+
this.timerHandle = null;
102+
}
103+
}
104+
}
29105

30106
export class AsyncQueue {
31107
// The last promise in the queue.
32108
private tail: Promise<AnyJs | void> = Promise.resolve();
33109

34110
// A list with timeout handles and their respective deferred promises.
35111
// Contains an entry for each operation that is queued to run in the future
36-
// (i.e. it has a delay that has not yet elapsed). Prior to cleanup, this list
37-
// may also contain entries that have already been run (in which case `handle` is
38-
// null).
112+
// (i.e. it has a delay that has not yet elapsed).
39113
private delayedOperations: Array<DelayedOperation<AnyJs>> = [];
40114

41115
// The number of operations that are queued to be run in the future (i.e. they
42-
// have a delay that has not yet elapsed). Unlike `delayedOperations`, this
43-
// is guaranteed to only contain operations that have not yet been run.
44-
//
45-
// Visible for testing.
46-
delayedOperationsCount = 0;
116+
// have a delay that has not yet elapsed). Used for testing.
117+
get delayedOperationsCount() {
118+
return this.delayedOperations.length;
119+
}
47120

48121
// visible for testing
49122
failure: Error;
@@ -55,47 +128,10 @@ export class AsyncQueue {
55128
/**
56129
* Adds a new operation to the queue. Returns a promise that will be resolved
57130
* when the promise returned by the new operation is (with its value).
58-
*
59-
* Can optionally specify a delay (in milliseconds) to wait before queuing the
60-
* operation.
61131
*/
62-
schedule<T>(op: () => Promise<T>, delay?: number): Promise<T> {
63-
if (this.failure) {
64-
fail(
65-
'AsyncQueue is already failed: ' +
66-
(this.failure.stack || this.failure.message)
67-
);
68-
}
69-
70-
if ((delay || 0) > 0) {
71-
this.delayedOperationsCount++;
72-
const delayedOp: DelayedOperation<T> = {
73-
handle: null,
74-
op,
75-
deferred: new Deferred<T>()
76-
};
77-
delayedOp.handle = setTimeout(() => {
78-
this.scheduleInternal(() => {
79-
return delayedOp.op().then(result => {
80-
delayedOp.deferred.resolve(result);
81-
});
82-
});
83-
delayedOp.handle = null;
84-
85-
this.delayedOperationsCount--;
86-
if (this.delayedOperationsCount === 0) {
87-
this.delayedOperations = [];
88-
}
89-
}, delay);
90-
this.delayedOperations.push(delayedOp);
91-
return delayedOp.deferred.promise;
92-
} else {
93-
return this.scheduleInternal(op);
94-
}
95-
}
96-
97-
private scheduleInternal<T>(op: () => Promise<T>): Promise<T> {
98-
this.tail = this.tail.then(() => {
132+
schedule<T>(op: () => Promise<T>): Promise<T> {
133+
this.verifyNotFailed();
134+
const newTail = this.tail.then(() => {
99135
this.operationInProgress = true;
100136
return op()
101137
.catch(error => {
@@ -118,11 +154,42 @@ export class AsyncQueue {
118154
// and return the rejected Promise.
119155
throw error;
120156
})
121-
.then(() => {
157+
.then(result => {
122158
this.operationInProgress = false;
159+
return result;
123160
});
124161
});
125-
return this.tail as AnyDuringMigration;
162+
this.tail = newTail;
163+
return newTail;
164+
}
165+
166+
/**
167+
* Schedules an operation to be run on the AsyncQueue once the specified
168+
* `delayMs` has elapsed. The returned DelayedOperationResult can be
169+
* used to cancel the operation prior to its running.
170+
*/
171+
scheduleWithDelay<T>(
172+
op: () => Promise<T>,
173+
delayMs: number
174+
): DelayedOperationResult<T> {
175+
this.verifyNotFailed();
176+
177+
const delayedOp = DelayedOperation.createAndSchedule(this, op, delayMs);
178+
const index = this.delayedOperations.push(delayedOp);
179+
180+
delayedOp.promise.catch(err => {}).then(() => {
181+
this.delayedOperations.slice(index, 1);
182+
});
183+
return delayedOp;
184+
}
185+
186+
private verifyNotFailed(): void {
187+
if (this.failure) {
188+
fail(
189+
'AsyncQueue is already failed: ' +
190+
(this.failure.stack || this.failure.message)
191+
);
192+
}
126193
}
127194

128195
/**
@@ -143,26 +210,14 @@ export class AsyncQueue {
143210
* scheduled with a delay can be rejected or queued for immediate execution.
144211
*/
145212
drain(executeDelayedTasks: boolean): Promise<void> {
146-
this.delayedOperations.forEach(entry => {
147-
if (entry.handle) {
148-
clearTimeout(entry.handle);
149-
if (executeDelayedTasks) {
150-
this.scheduleInternal(entry.op).then(
151-
entry.deferred.resolve,
152-
entry.deferred.reject
153-
);
154-
} else {
155-
entry.deferred.reject(
156-
new FirestoreError(
157-
Code.CANCELLED,
158-
'Operation cancelled by shutdown'
159-
)
160-
);
161-
}
213+
this.delayedOperations.forEach(delayedOp => {
214+
if (executeDelayedTasks) {
215+
delayedOp.scheduleNow(this);
216+
} else {
217+
delayedOp.cancel('shutdown');
162218
}
163219
});
164220
this.delayedOperations = [];
165-
this.delayedOperationsCount = 0;
166221
return this.schedule(() => Promise.resolve());
167222
}
168223
}

0 commit comments

Comments
 (0)