Skip to content

Commit

Permalink
move away from process.nextTick
Browse files Browse the repository at this point in the history
There are two problems with process.nextTick.

Less severe is that it reduces performance for
all async callback from native code.

The more severe one is that it causes weird and
unpredictable behavior when trying to interop
with promises and async/await code.

In particular, we have an invariant where we always
emit certain events and invoke certain callbacks
"asynchronously". However, that currently doesn't
apply to Promise, since we "force" asynchronousity
throug process.nextTick which occurs before any
microtick. Hence, for any promise/micro-tick based
code things actually appear to occur synchronously.

Refs: nodejs#51070
PR: nodejs#51114
  • Loading branch information
ronag committed Dec 11, 2023
1 parent fc102f2 commit f420729
Show file tree
Hide file tree
Showing 38 changed files with 128 additions and 127 deletions.
10 changes: 5 additions & 5 deletions lib/_http_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ function ClientRequest(input, options, cb) {
if (typeof optsWithoutSignal.createConnection === 'function') {
const oncreate = once((err, socket) => {
if (err) {
process.nextTick(() => this.emit('error', err));
queueMicrotask(() => this.emit('error', err));
} else {
this.onSocket(socket);
}
Expand Down Expand Up @@ -405,7 +405,7 @@ ClientRequest.prototype.abort = function abort() {
return;
}
this.aborted = true;
process.nextTick(emitAbortNT, this);
queueMicrotask(() => emitAbortNT(this));
this.destroy();
};

Expand Down Expand Up @@ -722,11 +722,11 @@ function responseKeepAlive(req) {
// has no 'error' handler.

// There are cases where _handle === null. Avoid those. Passing undefined to
// nextTick() will call getDefaultTriggerAsyncId() to retrieve the id.
// queueMicrotask will call getDefaultTriggerAsyncId() to retrieve the id.
const asyncId = socket._handle ? socket._handle.getAsyncId() : undefined;
// Mark this socket as available, AFTER user-added end
// handlers have a chance to run.
defaultTriggerAsyncIdScope(asyncId, process.nextTick, emitFreeNT, req);
defaultTriggerAsyncIdScope(asyncId, queueMicrotask, emitFreeNT, req);

req.destroyed = true;
if (req.res) {
Expand Down Expand Up @@ -860,7 +860,7 @@ function listenSocketTimeout(req) {
ClientRequest.prototype.onSocket = function onSocket(socket, err) {
// TODO(ronag): Between here and onSocketNT the socket
// has no 'error' handler.
process.nextTick(onSocketNT, this, socket, err);
queueMicrotask(() => onSocketNT(this, socket, err));
};

function onSocketNT(req, socket, err) {
Expand Down
4 changes: 2 additions & 2 deletions lib/_http_incoming.js
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,10 @@ IncomingMessage.prototype._destroy = function _destroy(err, cb) {
e = null;
}
cleanup();
process.nextTick(onError, this, e || err, cb);
queueMicrotask(() => (onError, this, e || err, cb));
});
} else {
process.nextTick(onError, this, err, cb);
queueMicrotask(() => onError(this, err, cb));
}
};

Expand Down
10 changes: 5 additions & 5 deletions lib/_http_outgoing.js
Original file line number Diff line number Diff line change
Expand Up @@ -888,7 +888,7 @@ OutgoingMessage.prototype.write = function write(chunk, encoding, callback) {
function onError(msg, err, callback) {
const triggerAsyncId = msg.socket ? msg.socket[async_id_symbol] : undefined;
defaultTriggerAsyncIdScope(triggerAsyncId,
process.nextTick,
queueMicrotask,
emitErrorNt,
msg,
err,
Expand Down Expand Up @@ -935,7 +935,7 @@ function write_(msg, chunk, encoding, callback, fromEnd) {
if (!msg.destroyed) {
onError(msg, err, callback);
} else {
process.nextTick(callback, err);
queueMicrotask(() => callback(err));
}
return false;
}
Expand Down Expand Up @@ -969,14 +969,14 @@ function write_(msg, chunk, encoding, callback, fromEnd) {
} else {
debug('This type of response MUST NOT have a body. ' +
'Ignoring write() calls.');
process.nextTick(callback);
queueMicrotask(callback);
return true;
}
}

if (!fromEnd && msg.socket && !msg.socket.writableCorked) {
msg.socket.cork();
process.nextTick(connectionCorkNT, msg.socket);
queueMicrotask(() => connectionCorkNT(msg.socket));
}

let ret;
Expand Down Expand Up @@ -1110,7 +1110,7 @@ OutgoingMessage.prototype.end = function end(chunk, encoding, callback) {
} else if (!this._headerSent || this.writableLength || chunk) {
this._send('', 'latin1', finish);
} else {
process.nextTick(finish);
queueMicrotask(finish);
}

if (this[kSocket]) {
Expand Down
2 changes: 1 addition & 1 deletion lib/_http_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -994,7 +994,7 @@ function resOnFinish(req, res, socket, state, server) {

res.detachSocket(socket);
clearIncoming(req);
process.nextTick(emitCloseNT, res);
queueMicrotask(() => emitCloseNT(res));

if (res._last) {
if (typeof socket.destroySoon === 'function') {
Expand Down
4 changes: 2 additions & 2 deletions lib/_tls_wrap.js
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ function TLSSocket(socket, opts) {
}

// Read on next tick so the caller has a chance to setup listeners
process.nextTick(initRead, this, socket);
queueMicrotask(() => initRead(this, socket));
}
ObjectSetPrototypeOf(TLSSocket.prototype, net.Socket.prototype);
ObjectSetPrototypeOf(TLSSocket, net.Socket);
Expand Down Expand Up @@ -999,7 +999,7 @@ TLSSocket.prototype.renegotiate = function(options, callback) {
this._handle.renegotiate();
} catch (err) {
if (callback) {
process.nextTick(callback, err);
queueMicrotask(() => callback(err));
}
return false;
}
Expand Down
2 changes: 1 addition & 1 deletion lib/child_process.js
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,7 @@ function spawn(file, args, options) {
if (options.signal) {
const signal = options.signal;
if (signal.aborted) {
process.nextTick(onAbortListener);
queueMicrotask(onAbortListener);
} else {
addAbortListener ??= require('events').addAbortListener;
const disposable = addAbortListener(signal, onAbortListener);
Expand Down
14 changes: 7 additions & 7 deletions lib/dgram.js
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ function doConnect(ex, self, ip, address, port, callback) {

if (ex) {
state.connectState = CONNECT_STATE_DISCONNECTED;
return process.nextTick(() => {
return queueMicrotask(() => {
if (callback) {
self.removeListener('connect', callback);
callback(ex);
Expand All @@ -447,7 +447,7 @@ function doConnect(ex, self, ip, address, port, callback) {
}

state.connectState = CONNECT_STATE_CONNECTED;
process.nextTick(() => self.emit('connect'));
queueMicrotask(() => self.emit('connect'));
}


Expand Down Expand Up @@ -680,11 +680,11 @@ function doSend(ex, self, ip, list, address, port, callback) {

if (ex) {
if (typeof callback === 'function') {
process.nextTick(callback, ex);
queueMicrotask(callback, ex);
return;
}

process.nextTick(() => self.emit('error', ex));
queueMicrotask(() => self.emit('error', ex));
return;
} else if (!state.handle) {
return;
Expand All @@ -709,14 +709,14 @@ function doSend(ex, self, ip, list, address, port, callback) {
// Synchronous finish. The return code is msg_length + 1 so that we can
// distinguish between synchronous success and asynchronous success.
if (callback)
process.nextTick(callback, null, err - 1);
queueMicrotask(() => (callback, null, err - 1));
return;
}

if (err && callback) {
// Don't emit as error, dgram_legacy.js compatibility
const ex = new ExceptionWithHostPort(err, 'send', address, port);
process.nextTick(callback, ex);
queueMicrotask(() => callback(ex));
}
}

Expand Down Expand Up @@ -747,7 +747,7 @@ Socket.prototype.close = function(callback) {
state.handle.close();
state.handle = null;
defaultTriggerAsyncIdScope(this[async_id_symbol],
process.nextTick,
queueMicrotask,
socketCloseNT,
this);

Expand Down
4 changes: 2 additions & 2 deletions lib/diagnostics_channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ function wrapStoreRun(store, data, next, transform = defaultTransform) {
try {
context = transform(data);
} catch (err) {
process.nextTick(() => {
queueMicrotask(() => {
triggerUncaughtException(err, false);
});
return next();
Expand Down Expand Up @@ -141,7 +141,7 @@ class ActiveChannel {
const onMessage = this._subscribers[i];
onMessage(data, this.name);
} catch (err) {
process.nextTick(() => {
queueMicrotask(() => {
triggerUncaughtException(err, false);
});
}
Expand Down
13 changes: 7 additions & 6 deletions lib/dns.js
Original file line number Diff line number Diff line change
Expand Up @@ -194,20 +194,20 @@ function lookup(hostname, options, callback) {
if (!hostname) {
emitInvalidHostnameWarning(hostname);
if (all) {
process.nextTick(callback, null, []);
queueMicrotask(() => callback(null, []));
} else {
process.nextTick(callback, null, null, family === 6 ? 6 : 4);
queueMicrotask(() => callback(null, null, family === 6 ? 6 : 4));
}
return {};
}

const matchedFamily = isIP(hostname);
if (matchedFamily) {
if (all) {
process.nextTick(
callback, null, [{ address: hostname, family: matchedFamily }]);
queueMicrotask(() =>
callback(null, [{ address: hostname, family: matchedFamily }]));
} else {
process.nextTick(callback, null, hostname, matchedFamily);
queueMicrotask(() => callback(null, hostname, matchedFamily));
}
return {};
}
Expand All @@ -222,7 +222,8 @@ function lookup(hostname, options, callback) {
req, hostname, family, hints, verbatim,
);
if (err) {
process.nextTick(callback, new DNSException(err, 'getaddrinfo', hostname));
const ex = new DNSException(err, 'getaddrinfo', hostname);
queueMicrotask(() => callback(ex));
return {};
}
if (hasObserver('dns')) {
Expand Down
4 changes: 2 additions & 2 deletions lib/events.js
Original file line number Diff line number Diff line change
Expand Up @@ -375,9 +375,9 @@ function addCatch(that, promise, type, args) {

if (typeof then === 'function') {
then.call(promise, undefined, function(err) {
// The callback is called with nextTick to avoid a follow-up
// The callback is called with queueMicrotask to avoid a follow-up
// rejection from this promise.
process.nextTick(emitUnhandledRejectionOrErr, that, err, type, args);
queueMicrotask(() => emitUnhandledRejectionOrErr(that, err, type, args));
});
}
} catch (err) {
Expand Down
18 changes: 9 additions & 9 deletions lib/fs.js
Original file line number Diff line number Diff line change
Expand Up @@ -378,9 +378,9 @@ function readFile(path, options, callback) {
context.signal = options.signal;
}
if (context.isUserFd) {
process.nextTick(function tick(context) {
queueMicrotask(function tick() {
ReflectApply(readFileAfterOpen, { context }, [null, path]);
}, context);
});
return;
}

Expand Down Expand Up @@ -664,7 +664,7 @@ function read(fd, buffer, offsetOrOptions, length, position, callback) {
length |= 0;

if (length === 0) {
return process.nextTick(function tick() {
return queueMicrotask(function tick() {
callback(null, 0, buffer);
});
}
Expand Down Expand Up @@ -956,7 +956,7 @@ function writev(fd, buffers, position, callback) {
callback = maybeCallback(callback || position);

if (buffers.length === 0) {
process.nextTick(callback, null, 0, buffers);
queueMicrotask(() => callback(null, 0, buffers));
return;
}

Expand Down Expand Up @@ -2479,7 +2479,7 @@ function watch(filename, options, listener) {
}
if (options.signal) {
if (options.signal.aborted) {
process.nextTick(() => watcher.close());
queueMicrotask(() => watcher.close());
} else {
const listener = () => watcher.close();
kResistStopPropagation ??= require('internal/event_target').kResistStopPropagation;
Expand Down Expand Up @@ -2820,7 +2820,7 @@ function realpath(p, options, callback) {
LOOP();
});
} else {
process.nextTick(LOOP);
queueMicrotask(LOOP);
}

// Walk down the path, swapping out linked path parts for their real
Expand Down Expand Up @@ -2851,7 +2851,7 @@ function realpath(p, options, callback) {
isFileType(statValues, S_IFSOCK)) {
return callback(null, encodeRealpathResult(p, options));
}
return process.nextTick(LOOP);
return queueMicrotask(LOOP);
}

return fs.lstat(base, { bigint: true }, gotStat);
Expand All @@ -2863,7 +2863,7 @@ function realpath(p, options, callback) {
// If not a symlink, skip to the next path part
if (!stats.isSymbolicLink()) {
knownHard.add(base);
return process.nextTick(LOOP);
return queueMicrotask(LOOP);
}

// Stat & read the link if not read before.
Expand Down Expand Up @@ -2908,7 +2908,7 @@ function realpath(p, options, callback) {
LOOP();
});
} else {
process.nextTick(LOOP);
queueMicrotask(LOOP);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion lib/inspector.js
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ class Session extends EventEmitter {
this.#connection = null;
const remainingCallbacks = this.#messageCallbacks.values();
for (const callback of remainingCallbacks) {
process.nextTick(callback, new ERR_INSPECTOR_CLOSED());
queueMicrotask(() => callback(new ERR_INSPECTOR_CLOSED()));
}
this.#messageCallbacks.clear();
this.#nextId = 1;
Expand Down
6 changes: 3 additions & 3 deletions lib/internal/bootstrap/switches/is_main_thread.js
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ function dummyDestroy(err, cb) {
// The 'close' event is needed so that finished and
// pipeline work correctly.
if (!this._writableState.emitClose) {
process.nextTick(() => {
queueMicrotask(() => {
this.emit('close');
});
}
Expand Down Expand Up @@ -255,10 +255,10 @@ function getStdin() {
}

// If the user calls stdin.pause(), then we need to stop reading
// once the stream implementation does so (one nextTick later),
// once the stream implementation does so (one micro task later),
// so that the process can close down.
stdin.on('pause', () => {
process.nextTick(onpause);
queueMicrotask(onpause);
});

function onpause() {
Expand Down
Loading

0 comments on commit f420729

Please sign in to comment.