Skip to content

Commit

Permalink
move away from process.nextTick
Browse files Browse the repository at this point in the history
There are two problems with process.nextTick.

Less severe is that it reduces performance for
all async callback from native code.

The more severe one is that it causes weird and
unpredictable behavior when trying to interop
with promises and async/await code.

In particular, we have an invariant where we always
emit certain events and invoke certain callbacks
"asynchronously". However, that currently doesn't
apply to Promise, since we "force" asynchronousity
throug process.nextTick which occurs before any
microtick. Hence, for any promise/micro-tick based
code things actually appear to occur synchronously.

Refs: #51070
PR: #51114
  • Loading branch information
ronag committed Dec 11, 2023
1 parent fc102f2 commit 8ce0fe3
Show file tree
Hide file tree
Showing 38 changed files with 162 additions and 127 deletions.
11 changes: 6 additions & 5 deletions lib/_http_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const {

const net = require('net');
const assert = require('internal/assert');
const { queueMicrotask } = require('internal/process/task_queues');
const {
kEmptyObject,
once,
Expand Down Expand Up @@ -342,7 +343,7 @@ function ClientRequest(input, options, cb) {
if (typeof optsWithoutSignal.createConnection === 'function') {
const oncreate = once((err, socket) => {
if (err) {
process.nextTick(() => this.emit('error', err));
queueMicrotask(() => this.emit('error', err));
} else {
this.onSocket(socket);
}
Expand Down Expand Up @@ -405,7 +406,7 @@ ClientRequest.prototype.abort = function abort() {
return;
}
this.aborted = true;
process.nextTick(emitAbortNT, this);
queueMicrotask(() => emitAbortNT(this));
this.destroy();
};

Expand Down Expand Up @@ -722,11 +723,11 @@ function responseKeepAlive(req) {
// has no 'error' handler.

// There are cases where _handle === null. Avoid those. Passing undefined to
// nextTick() will call getDefaultTriggerAsyncId() to retrieve the id.
// queueMicrotask will call getDefaultTriggerAsyncId() to retrieve the id.
const asyncId = socket._handle ? socket._handle.getAsyncId() : undefined;
// Mark this socket as available, AFTER user-added end
// handlers have a chance to run.
defaultTriggerAsyncIdScope(asyncId, process.nextTick, emitFreeNT, req);
defaultTriggerAsyncIdScope(asyncId, queueMicrotask, emitFreeNT, req);

req.destroyed = true;
if (req.res) {
Expand Down Expand Up @@ -860,7 +861,7 @@ function listenSocketTimeout(req) {
ClientRequest.prototype.onSocket = function onSocket(socket, err) {
// TODO(ronag): Between here and onSocketNT the socket
// has no 'error' handler.
process.nextTick(onSocketNT, this, socket, err);
queueMicrotask(() => onSocketNT(this, socket, err));
};

function onSocketNT(req, socket, err) {
Expand Down
5 changes: 3 additions & 2 deletions lib/_http_incoming.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const {
} = primordials;

const { Readable, finished } = require('stream');
const { queueMicrotask } = require('internal/process/task_queues');

const kHeaders = Symbol('kHeaders');
const kHeadersDistinct = Symbol('kHeadersDistinct');
Expand Down Expand Up @@ -236,10 +237,10 @@ IncomingMessage.prototype._destroy = function _destroy(err, cb) {
e = null;
}
cleanup();
process.nextTick(onError, this, e || err, cb);
queueMicrotask(() => (onError, this, e || err, cb));
});
} else {
process.nextTick(onError, this, err, cb);
queueMicrotask(() => onError(this, err, cb));
}
};

Expand Down
11 changes: 6 additions & 5 deletions lib/_http_outgoing.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ const {
} = require('internal/errors');
const { validateString } = require('internal/validators');
const { isUint8Array } = require('internal/util/types');
const { queueMicrotask } = require('internal/process/task_queues');

let debug = require('internal/util/debuglog').debuglog('http', (fn) => {
debug = fn;
Expand Down Expand Up @@ -888,7 +889,7 @@ OutgoingMessage.prototype.write = function write(chunk, encoding, callback) {
function onError(msg, err, callback) {
const triggerAsyncId = msg.socket ? msg.socket[async_id_symbol] : undefined;
defaultTriggerAsyncIdScope(triggerAsyncId,
process.nextTick,
queueMicrotask,
emitErrorNt,
msg,
err,
Expand Down Expand Up @@ -935,7 +936,7 @@ function write_(msg, chunk, encoding, callback, fromEnd) {
if (!msg.destroyed) {
onError(msg, err, callback);
} else {
process.nextTick(callback, err);
queueMicrotask(() => callback(err));
}
return false;
}
Expand Down Expand Up @@ -969,14 +970,14 @@ function write_(msg, chunk, encoding, callback, fromEnd) {
} else {
debug('This type of response MUST NOT have a body. ' +
'Ignoring write() calls.');
process.nextTick(callback);
queueMicrotask(callback);
return true;
}
}

if (!fromEnd && msg.socket && !msg.socket.writableCorked) {
msg.socket.cork();
process.nextTick(connectionCorkNT, msg.socket);
queueMicrotask(() => connectionCorkNT(msg.socket));
}

let ret;
Expand Down Expand Up @@ -1110,7 +1111,7 @@ OutgoingMessage.prototype.end = function end(chunk, encoding, callback) {
} else if (!this._headerSent || this.writableLength || chunk) {
this._send('', 'latin1', finish);
} else {
process.nextTick(finish);
queueMicrotask(finish);
}

if (this[kSocket]) {
Expand Down
3 changes: 2 additions & 1 deletion lib/_http_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ const {
const net = require('net');
const EE = require('events');
const assert = require('internal/assert');
const { queueMicrotask } = require('internal/process/task_queues');
const {
parsers,
freeParser,
Expand Down Expand Up @@ -994,7 +995,7 @@ function resOnFinish(req, res, socket, state, server) {

res.detachSocket(socket);
clearIncoming(req);
process.nextTick(emitCloseNT, res);
queueMicrotask(() => emitCloseNT(res));

if (res._last) {
if (typeof socket.destroySoon === 'function') {
Expand Down
5 changes: 3 additions & 2 deletions lib/_tls_wrap.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ const { owner_symbol } = require('internal/async_hooks').symbols;
const { isArrayBufferView } = require('internal/util/types');
const { SecureContext: NativeSecureContext } = internalBinding('crypto');
const { ConnResetException, codes } = require('internal/errors');
const { queueMicrotask } = require('internal/process/task_queues');
const {
ERR_INVALID_ARG_TYPE,
ERR_INVALID_ARG_VALUE,
Expand Down Expand Up @@ -609,7 +610,7 @@ function TLSSocket(socket, opts) {
}

// Read on next tick so the caller has a chance to setup listeners
process.nextTick(initRead, this, socket);
queueMicrotask(() => initRead(this, socket));
}
ObjectSetPrototypeOf(TLSSocket.prototype, net.Socket.prototype);
ObjectSetPrototypeOf(TLSSocket, net.Socket);
Expand Down Expand Up @@ -999,7 +1000,7 @@ TLSSocket.prototype.renegotiate = function(options, callback) {
this._handle.renegotiate();
} catch (err) {
if (callback) {
process.nextTick(callback, err);
queueMicrotask(() => callback(err));
}
return false;
}
Expand Down
3 changes: 2 additions & 1 deletion lib/child_process.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ const {
validateString,
} = require('internal/validators');
const child_process = require('internal/child_process');
const { queueMicrotask } = require('internal/process/task_queues');
const {
getValidStdio,
setupChannel,
Expand Down Expand Up @@ -783,7 +784,7 @@ function spawn(file, args, options) {
if (options.signal) {
const signal = options.signal;
if (signal.aborted) {
process.nextTick(onAbortListener);
queueMicrotask(onAbortListener);
} else {
addAbortListener ??= require('events').addAbortListener;
const disposable = addAbortListener(signal, onAbortListener);
Expand Down
15 changes: 8 additions & 7 deletions lib/dgram.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ const { Buffer } = require('buffer');
const { deprecate, guessHandleType, promisify } = require('internal/util');
const { isArrayBufferView } = require('internal/util/types');
const EventEmitter = require('events');
const { queueMicrotask } = require('internal/process/task_queues');
const {
defaultTriggerAsyncIdScope,
symbols: { async_id_symbol, owner_symbol },
Expand Down Expand Up @@ -436,7 +437,7 @@ function doConnect(ex, self, ip, address, port, callback) {

if (ex) {
state.connectState = CONNECT_STATE_DISCONNECTED;
return process.nextTick(() => {
return queueMicrotask(() => {
if (callback) {
self.removeListener('connect', callback);
callback(ex);
Expand All @@ -447,7 +448,7 @@ function doConnect(ex, self, ip, address, port, callback) {
}

state.connectState = CONNECT_STATE_CONNECTED;
process.nextTick(() => self.emit('connect'));
queueMicrotask(() => self.emit('connect'));
}


Expand Down Expand Up @@ -680,11 +681,11 @@ function doSend(ex, self, ip, list, address, port, callback) {

if (ex) {
if (typeof callback === 'function') {
process.nextTick(callback, ex);
queueMicrotask(callback, ex);
return;
}

process.nextTick(() => self.emit('error', ex));
queueMicrotask(() => self.emit('error', ex));
return;
} else if (!state.handle) {
return;
Expand All @@ -709,14 +710,14 @@ function doSend(ex, self, ip, list, address, port, callback) {
// Synchronous finish. The return code is msg_length + 1 so that we can
// distinguish between synchronous success and asynchronous success.
if (callback)
process.nextTick(callback, null, err - 1);
queueMicrotask(() => (callback, null, err - 1));
return;
}

if (err && callback) {
// Don't emit as error, dgram_legacy.js compatibility
const ex = new ExceptionWithHostPort(err, 'send', address, port);
process.nextTick(callback, ex);
queueMicrotask(() => callback(ex));
}
}

Expand Down Expand Up @@ -747,7 +748,7 @@ Socket.prototype.close = function(callback) {
state.handle.close();
state.handle = null;
defaultTriggerAsyncIdScope(this[async_id_symbol],
process.nextTick,
queueMicrotask,
socketCloseNT,
this);

Expand Down
5 changes: 3 additions & 2 deletions lib/diagnostics_channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const {
const {
validateFunction,
} = require('internal/validators');
const { queueMicrotask } = require('internal/process/task_queues');

const { triggerUncaughtException } = internalBinding('errors');

Expand Down Expand Up @@ -82,7 +83,7 @@ function wrapStoreRun(store, data, next, transform = defaultTransform) {
try {
context = transform(data);
} catch (err) {
process.nextTick(() => {
queueMicrotask(() => {
triggerUncaughtException(err, false);
});
return next();
Expand Down Expand Up @@ -141,7 +142,7 @@ class ActiveChannel {
const onMessage = this._subscribers[i];
onMessage(data, this.name);
} catch (err) {
process.nextTick(() => {
queueMicrotask(() => {
triggerUncaughtException(err, false);
});
}
Expand Down
14 changes: 8 additions & 6 deletions lib/dns.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ const {
validatePort,
validateString,
} = require('internal/validators');
const { queueMicrotask } = require('internal/process/task_queues');

const {
GetAddrInfoReqWrap,
Expand Down Expand Up @@ -194,20 +195,20 @@ function lookup(hostname, options, callback) {
if (!hostname) {
emitInvalidHostnameWarning(hostname);
if (all) {
process.nextTick(callback, null, []);
queueMicrotask(() => callback(null, []));
} else {
process.nextTick(callback, null, null, family === 6 ? 6 : 4);
queueMicrotask(() => callback(null, null, family === 6 ? 6 : 4));
}
return {};
}

const matchedFamily = isIP(hostname);
if (matchedFamily) {
if (all) {
process.nextTick(
callback, null, [{ address: hostname, family: matchedFamily }]);
queueMicrotask(() =>
callback(null, [{ address: hostname, family: matchedFamily }]));
} else {
process.nextTick(callback, null, hostname, matchedFamily);
queueMicrotask(() => callback(null, hostname, matchedFamily));
}
return {};
}
Expand All @@ -222,7 +223,8 @@ function lookup(hostname, options, callback) {
req, hostname, family, hints, verbatim,
);
if (err) {
process.nextTick(callback, new DNSException(err, 'getaddrinfo', hostname));
const ex = new DNSException(err, 'getaddrinfo', hostname);
queueMicrotask(() => callback(ex));
return {};
}
if (hasObserver('dns')) {
Expand Down
4 changes: 2 additions & 2 deletions lib/events.js
Original file line number Diff line number Diff line change
Expand Up @@ -375,9 +375,9 @@ function addCatch(that, promise, type, args) {

if (typeof then === 'function') {
then.call(promise, undefined, function(err) {
// The callback is called with nextTick to avoid a follow-up
// The callback is called with queueMicrotask to avoid a follow-up
// rejection from this promise.
process.nextTick(emitUnhandledRejectionOrErr, that, err, type, args);
queueMicrotask(() => emitUnhandledRejectionOrErr(that, err, type, args));
});
}
} catch (err) {
Expand Down
Loading

0 comments on commit 8ce0fe3

Please sign in to comment.