Skip to content

Commit 4203d13

Browse files
ErickWendeljuanarbol
authored andcommitted
child_process: queue pending messages
It fixes the problem of the child process not receiving messages. Fixes: #41134 PR-URL: #41221 Reviewed-By: Adrian Estrada <[email protected]> Reviewed-By: Antoine du Hamel <[email protected]> Reviewed-By: Minwoo Jung <[email protected]> Backport-PR-URL: #42840
1 parent c73ac52 commit 4203d13

File tree

2 files changed

+51
-1
lines changed

2 files changed

+51
-1
lines changed

lib/internal/child_process.js

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@
22

33
const {
44
ArrayIsArray,
5+
ArrayPrototypePush,
56
ObjectDefineProperty,
67
ObjectSetPrototypeOf,
8+
ReflectApply,
79
Symbol,
810
Uint8Array,
911
} = primordials;
@@ -73,6 +75,7 @@ let HTTPParser;
7375
const MAX_HANDLE_RETRANSMISSIONS = 3;
7476
const kChannelHandle = Symbol('kChannelHandle');
7577
const kIsUsedAsStdio = Symbol('kIsUsedAsStdio');
78+
const kPendingMessages = Symbol('kPendingMessages');
7679

7780
// This object contain function to convert TCP objects to native handle objects
7881
// and back again.
@@ -520,6 +523,7 @@ class Control extends EventEmitter {
520523
constructor(channel) {
521524
super();
522525
this.#channel = channel;
526+
this[kPendingMessages] = [];
523527
}
524528

525529
// The methods keeping track of the counter are being used to track the
@@ -693,6 +697,24 @@ function setupChannel(target, channel, serializationMode) {
693697
});
694698
});
695699

700+
target.on('newListener', function() {
701+
702+
process.nextTick(() => {
703+
if (!target.channel || !target.listenerCount('message'))
704+
return;
705+
706+
const messages = target.channel[kPendingMessages];
707+
const { length } = messages;
708+
if (!length) return;
709+
710+
for (let i = 0; i < length; i++) {
711+
ReflectApply(target.emit, target, messages[i]);
712+
}
713+
714+
target.channel[kPendingMessages] = [];
715+
});
716+
});
717+
696718
target.send = function(message, handle, options, callback) {
697719
if (typeof handle === 'function') {
698720
callback = handle;
@@ -909,7 +931,15 @@ function setupChannel(target, channel, serializationMode) {
909931
};
910932

911933
function emit(event, message, handle) {
912-
target.emit(event, message, handle);
934+
if ('internalMessage' === event || target.listenerCount('message')) {
935+
target.emit(event, message, handle);
936+
return;
937+
}
938+
939+
ArrayPrototypePush(
940+
target.channel[kPendingMessages],
941+
[event, message, handle]
942+
);
913943
}
914944

915945
function handleMessage(message, handle, internal) {
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
import '../common/index.mjs';
2+
import assert from 'assert';
3+
import { fork } from 'child_process';
4+
import { once } from 'events';
5+
import { fileURLToPath } from 'url';
6+
7+
if (process.argv[2] !== 'child') {
8+
const filename = fileURLToPath(import.meta.url);
9+
const cp = fork(filename, ['child']);
10+
const message = 'Hello World';
11+
cp.send(message);
12+
13+
const [received] = await once(cp, 'message');
14+
assert.deepStrictEqual(received, message);
15+
16+
cp.disconnect();
17+
await once(cp, 'exit');
18+
} else {
19+
process.on('message', (msg) => process.send(msg));
20+
}

0 commit comments

Comments
 (0)