Skip to content

Commit 54062d3

Browse files
committed
timers: refactor setImmediate error handling
If an error is encountered during the processing of Immediates, schedule the remaining queue to finish after all error handling code runs (if the process is still alive to do so). The new changes make the Immediates error handling behaviour entirely deterministic and predictable, as the full queue will be flushed on each Immediates cycle, regardless of whether an error is encountered or not. Currently this processing is scheduled for nextTick which can yield unpredictable results as the nextTick might happen as early as close callbacks phase or as late as after the next event loop turns Immediates all fully processed. The latter can result in two full cycles of Immediates processing during one even loop turn. The current implementation also doesn't differentiate between Immediates scheduled for the current queue run or the next one, so Immediates that were scheduled for the next turn of the event loop, will process alongside the ones that were scheduled for the current turn. PR-URL: #17879 Reviewed-By: Anna Henningsen <[email protected]> Reviewed-By: James M Snell <[email protected]>
1 parent 11a1bc1 commit 54062d3

File tree

6 files changed

+160
-51
lines changed

6 files changed

+160
-51
lines changed

lib/timers.js

Lines changed: 36 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,11 @@ const { kInit, kDestroy, kAsyncIdCounter } = async_wrap.constants;
5151
const async_id_symbol = timerInternals.async_id_symbol;
5252
const trigger_async_id_symbol = timerInternals.trigger_async_id_symbol;
5353

54-
const [activateImmediateCheck, scheduledImmediateCountArray] =
54+
// *Must* match Environment::ImmediateInfo::Fields in src/env.h.
55+
const kCount = 0;
56+
const kHasOutstanding = 1;
57+
58+
const [activateImmediateCheck, immediateInfo] =
5559
setImmediateCallback(processImmediate);
5660

5761
// The Timeout class
@@ -627,16 +631,23 @@ ImmediateList.prototype.remove = function(item) {
627631
};
628632

629633
// Create a single linked list instance only once at startup
630-
var immediateQueue = new ImmediateList();
634+
const immediateQueue = new ImmediateList();
635+
636+
// If an uncaught exception was thrown during execution of immediateQueue,
637+
// this queue will store all remaining Immediates that need to run upon
638+
// resolution of all error handling (if process is still alive).
639+
const outstandingQueue = new ImmediateList();
631640

632641

633642
function processImmediate() {
634-
var immediate = immediateQueue.head;
635-
var tail = immediateQueue.tail;
643+
const queue = outstandingQueue.head !== null ?
644+
outstandingQueue : immediateQueue;
645+
var immediate = queue.head;
646+
var tail = queue.tail;
636647

637648
// Clear the linked list early in case new `setImmediate()` calls occur while
638649
// immediate callbacks are executed
639-
immediateQueue.head = immediateQueue.tail = null;
650+
queue.head = queue.tail = null;
640651

641652
while (immediate !== null) {
642653
if (!immediate._onImmediate) {
@@ -645,9 +656,14 @@ function processImmediate() {
645656
}
646657

647658
// Save next in case `clearImmediate(immediate)` is called from callback
648-
var next = immediate._idleNext;
659+
const next = immediate._idleNext;
660+
661+
const asyncId = immediate[async_id_symbol];
662+
emitBefore(asyncId, immediate[trigger_async_id_symbol]);
649663

650-
tryOnImmediate(immediate, tail);
664+
tryOnImmediate(immediate, next, tail);
665+
666+
emitAfter(asyncId);
651667

652668
// If `clearImmediate(immediate)` wasn't called from the callback, use the
653669
// `immediate`'s next item
@@ -656,45 +672,36 @@ function processImmediate() {
656672
else
657673
immediate = next;
658674
}
675+
676+
immediateInfo[kHasOutstanding] = 0;
659677
}
660678

661679
// An optimization so that the try/finally only de-optimizes (since at least v8
662680
// 4.7) what is in this smaller function.
663-
function tryOnImmediate(immediate, oldTail) {
681+
function tryOnImmediate(immediate, next, oldTail) {
664682
var threw = true;
665-
emitBefore(immediate[async_id_symbol], immediate[trigger_async_id_symbol]);
666683
try {
667684
// make the actual call outside the try/finally to allow it to be optimized
668685
runCallback(immediate);
669686
threw = false;
670687
} finally {
671688
immediate._onImmediate = null;
672-
if (!threw)
673-
emitAfter(immediate[async_id_symbol]);
674689

675690
if (!immediate._destroyed) {
676691
immediate._destroyed = true;
677-
scheduledImmediateCountArray[0]--;
692+
immediateInfo[kCount]--;
678693

679694
if (async_hook_fields[kDestroy] > 0) {
680695
emitDestroy(immediate[async_id_symbol]);
681696
}
682697
}
683698

684-
if (threw && immediate._idleNext !== null) {
685-
// Handle any remaining on next tick, assuming we're still alive to do so.
686-
const curHead = immediateQueue.head;
687-
const next = immediate._idleNext;
688-
if (curHead !== null) {
689-
curHead._idlePrev = oldTail;
690-
oldTail._idleNext = curHead;
691-
next._idlePrev = null;
692-
immediateQueue.head = next;
693-
} else {
694-
immediateQueue.head = next;
695-
immediateQueue.tail = oldTail;
696-
}
697-
process.nextTick(processImmediate);
699+
if (threw && (immediate._idleNext !== null || next !== null)) {
700+
// Handle any remaining Immediates after error handling has resolved,
701+
// assuming we're still alive to do so.
702+
outstandingQueue.head = immediate._idleNext || next;
703+
outstandingQueue.tail = oldTail;
704+
immediateInfo[kHasOutstanding] = 1;
698705
}
699706
}
700707
}
@@ -728,9 +735,9 @@ function Immediate(callback, args) {
728735
this);
729736
}
730737

731-
if (scheduledImmediateCountArray[0] === 0)
738+
if (immediateInfo[kCount] === 0)
732739
activateImmediateCheck();
733-
scheduledImmediateCountArray[0]++;
740+
immediateInfo[kCount]++;
734741

735742
immediateQueue.append(this);
736743
}
@@ -776,7 +783,7 @@ exports.clearImmediate = function(immediate) {
776783
if (!immediate) return;
777784

778785
if (!immediate._destroyed) {
779-
scheduledImmediateCountArray[0]--;
786+
immediateInfo[kCount]--;
780787
immediate._destroyed = true;
781788

782789
if (async_hook_fields[kDestroy] > 0) {

src/env-inl.h

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,30 @@ inline bool Environment::AsyncCallbackScope::in_makecallback() const {
217217
return env_->makecallback_cntr_ > 1;
218218
}
219219

220+
inline Environment::ImmediateInfo::ImmediateInfo(v8::Isolate* isolate)
221+
: fields_(isolate, kFieldsCount) {}
222+
223+
inline AliasedBuffer<uint32_t, v8::Uint32Array>&
224+
Environment::ImmediateInfo::fields() {
225+
return fields_;
226+
}
227+
228+
inline uint32_t Environment::ImmediateInfo::count() const {
229+
return fields_[kCount];
230+
}
231+
232+
inline bool Environment::ImmediateInfo::has_outstanding() const {
233+
return fields_[kHasOutstanding] == 1;
234+
}
235+
236+
inline void Environment::ImmediateInfo::count_inc(uint32_t increment) {
237+
fields_[kCount] = fields_[kCount] + increment;
238+
}
239+
240+
inline void Environment::ImmediateInfo::count_dec(uint32_t decrement) {
241+
fields_[kCount] = fields_[kCount] - decrement;
242+
}
243+
220244
inline Environment::TickInfo::TickInfo(v8::Isolate* isolate)
221245
: fields_(isolate, kFieldsCount) {}
222246

@@ -263,6 +287,7 @@ inline Environment::Environment(IsolateData* isolate_data,
263287
v8::Local<v8::Context> context)
264288
: isolate_(context->GetIsolate()),
265289
isolate_data_(isolate_data),
290+
immediate_info_(context->GetIsolate()),
266291
tick_info_(context->GetIsolate()),
267292
timer_base_(uv_now(isolate_data->event_loop())),
268293
using_domains_(false),
@@ -271,7 +296,6 @@ inline Environment::Environment(IsolateData* isolate_data,
271296
abort_on_uncaught_exception_(false),
272297
emit_napi_warning_(true),
273298
makecallback_cntr_(0),
274-
scheduled_immediate_count_(isolate_, 1),
275299
should_abort_on_uncaught_toggle_(isolate_, 1),
276300
#if HAVE_INSPECTOR
277301
inspector_agent_(new inspector::Agent(this)),
@@ -371,6 +395,10 @@ inline Environment::AsyncHooks* Environment::async_hooks() {
371395
return &async_hooks_;
372396
}
373397

398+
inline Environment::ImmediateInfo* Environment::immediate_info() {
399+
return &immediate_info_;
400+
}
401+
374402
inline Environment::TickInfo* Environment::tick_info() {
375403
return &tick_info_;
376404
}
@@ -508,11 +536,6 @@ inline void Environment::set_fs_stats_field_array(double* fields) {
508536
fs_stats_field_array_ = fields;
509537
}
510538

511-
inline AliasedBuffer<uint32_t, v8::Uint32Array>&
512-
Environment::scheduled_immediate_count() {
513-
return scheduled_immediate_count_;
514-
}
515-
516539
void Environment::SetImmediate(native_immediate_callback cb,
517540
void* data,
518541
v8::Local<v8::Object> obj) {
@@ -522,9 +545,9 @@ void Environment::SetImmediate(native_immediate_callback cb,
522545
std::unique_ptr<v8::Persistent<v8::Object>>(
523546
obj.IsEmpty() ? nullptr : new v8::Persistent<v8::Object>(isolate_, obj))
524547
});
525-
if (scheduled_immediate_count_[0] == 0)
548+
if (immediate_info()->count() == 0)
526549
ActivateImmediateCheck();
527-
scheduled_immediate_count_[0] = scheduled_immediate_count_[0] + 1;
550+
immediate_info()->count_inc(1);
528551
}
529552

530553
inline performance::performance_state* Environment::performance_state() {

src/env.cc

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -283,14 +283,14 @@ void Environment::RunAndClearNativeImmediates() {
283283
}
284284

285285
#ifdef DEBUG
286-
CHECK_GE(scheduled_immediate_count_[0], count);
286+
CHECK_GE(immediate_info()->count(), count);
287287
#endif
288-
scheduled_immediate_count_[0] = scheduled_immediate_count_[0] - count;
288+
immediate_info()->count_dec(count);
289289
}
290290
}
291291

292292
static bool MaybeStopImmediate(Environment* env) {
293-
if (env->scheduled_immediate_count()[0] == 0) {
293+
if (env->immediate_info()->count() == 0) {
294294
uv_check_stop(env->immediate_check_handle());
295295
uv_idle_stop(env->immediate_idle_handle());
296296
return true;
@@ -309,12 +309,14 @@ void Environment::CheckImmediate(uv_check_t* handle) {
309309

310310
env->RunAndClearNativeImmediates();
311311

312-
MakeCallback(env->isolate(),
313-
env->process_object(),
314-
env->immediate_callback_function(),
315-
0,
316-
nullptr,
317-
{0, 0}).ToLocalChecked();
312+
do {
313+
MakeCallback(env->isolate(),
314+
env->process_object(),
315+
env->immediate_callback_function(),
316+
0,
317+
nullptr,
318+
{0, 0}).ToLocalChecked();
319+
} while (env->immediate_info()->has_outstanding());
318320

319321
MaybeStopImmediate(env);
320322
}

src/env.h

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -453,6 +453,30 @@ class Environment {
453453
DISALLOW_COPY_AND_ASSIGN(AsyncCallbackScope);
454454
};
455455

456+
class ImmediateInfo {
457+
public:
458+
inline AliasedBuffer<uint32_t, v8::Uint32Array>& fields();
459+
inline uint32_t count() const;
460+
inline bool has_outstanding() const;
461+
462+
inline void count_inc(uint32_t increment);
463+
inline void count_dec(uint32_t decrement);
464+
465+
private:
466+
friend class Environment; // So we can call the constructor.
467+
inline explicit ImmediateInfo(v8::Isolate* isolate);
468+
469+
enum Fields {
470+
kCount,
471+
kHasOutstanding,
472+
kFieldsCount
473+
};
474+
475+
AliasedBuffer<uint32_t, v8::Uint32Array> fields_;
476+
477+
DISALLOW_COPY_AND_ASSIGN(ImmediateInfo);
478+
};
479+
456480
class TickInfo {
457481
public:
458482
inline AliasedBuffer<uint8_t, v8::Uint8Array>& fields();
@@ -532,6 +556,7 @@ class Environment {
532556
inline void FinishHandleCleanup(uv_handle_t* handle);
533557

534558
inline AsyncHooks* async_hooks();
559+
inline ImmediateInfo* immediate_info();
535560
inline TickInfo* tick_info();
536561
inline uint64_t timer_base() const;
537562

@@ -582,8 +607,6 @@ class Environment {
582607
inline double* fs_stats_field_array() const;
583608
inline void set_fs_stats_field_array(double* fields);
584609

585-
inline AliasedBuffer<uint32_t, v8::Uint32Array>& scheduled_immediate_count();
586-
587610
inline performance::performance_state* performance_state();
588611
inline std::map<std::string, uint64_t>* performance_marks();
589612

@@ -704,6 +727,7 @@ class Environment {
704727
uv_check_t idle_check_handle_;
705728

706729
AsyncHooks async_hooks_;
730+
ImmediateInfo immediate_info_;
707731
TickInfo tick_info_;
708732
const uint64_t timer_base_;
709733
bool using_domains_;
@@ -714,7 +738,6 @@ class Environment {
714738
size_t makecallback_cntr_;
715739
std::vector<double> destroy_async_id_list_;
716740

717-
AliasedBuffer<uint32_t, v8::Uint32Array> scheduled_immediate_count_;
718741
AliasedBuffer<uint32_t, v8::Uint32Array> should_abort_on_uncaught_toggle_;
719742

720743
int should_not_abort_scope_counter_ = 0;

src/timer_wrap.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,9 @@ class TimerWrap : public HandleWrap {
9090
env->NewFunctionTemplate(activate_cb)->GetFunction(env->context())
9191
.ToLocalChecked();
9292
auto result = Array::New(env->isolate(), 2);
93-
result->Set(0, activate_function);
94-
result->Set(1, env->scheduled_immediate_count().GetJSArray());
93+
result->Set(env->context(), 0, activate_function).FromJust();
94+
result->Set(env->context(), 1,
95+
env->immediate_info()->fields().GetJSArray()).FromJust();
9596
args.GetReturnValue().Set(result);
9697
}
9798

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const assert = require('assert');
5+
const domain = require('domain');
6+
7+
// setImmediate should run clear its queued cbs once per event loop turn
8+
// but immediates queued while processing the current queue should happen
9+
// on the next turn of the event loop.
10+
11+
// In addition, if any setImmediate throws, the rest of the queue should
12+
// be processed after all error handling is resolved, but that queue
13+
// should not include any setImmediate calls scheduled after the
14+
// processing of the queue started.
15+
16+
let threw = false;
17+
let stage = -1;
18+
19+
const QUEUE = 10;
20+
21+
const errObj = {
22+
type: Error,
23+
message: 'setImmediate Err'
24+
};
25+
26+
process.once('uncaughtException', common.expectsError(errObj));
27+
process.once('uncaughtException', () => assert.strictEqual(stage, 0));
28+
29+
const d1 = domain.create();
30+
d1.once('error', common.expectsError(errObj));
31+
d1.once('error', () => assert.strictEqual(stage, 0));
32+
33+
const run = common.mustCall((callStage) => {
34+
assert(callStage >= stage);
35+
stage = callStage;
36+
if (threw)
37+
return;
38+
39+
setImmediate(run, 2);
40+
}, QUEUE * 3);
41+
42+
for (let i = 0; i < QUEUE; i++)
43+
setImmediate(run, 0);
44+
setImmediate(() => {
45+
threw = true;
46+
process.nextTick(() => assert.strictEqual(stage, 1));
47+
throw new Error('setImmediate Err');
48+
});
49+
d1.run(() => setImmediate(() => {
50+
throw new Error('setImmediate Err');
51+
}));
52+
for (let i = 0; i < QUEUE; i++)
53+
setImmediate(run, 1);

0 commit comments

Comments
 (0)