From 8ce0fe3ae57eb41379df0b8fd4f12a8ed5c2da80 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Mon, 11 Dec 2023 09:12:53 +0100 Subject: [PATCH] move away from process.nextTick There are two problems with process.nextTick. Less severe is that it reduces performance for all async callback from native code. The more severe one is that it causes weird and unpredictable behavior when trying to interop with promises and async/await code. In particular, we have an invariant where we always emit certain events and invoke certain callbacks "asynchronously". However, that currently doesn't apply to Promise, since we "force" asynchronousity throug process.nextTick which occurs before any microtick. Hence, for any promise/micro-tick based code things actually appear to occur synchronously. Refs: https://github.com/nodejs/node/pull/51070 PR: https://github.com/nodejs/node/pull/51114 --- lib/_http_client.js | 11 +++--- lib/_http_incoming.js | 5 +-- lib/_http_outgoing.js | 11 +++--- lib/_http_server.js | 3 +- lib/_tls_wrap.js | 5 +-- lib/child_process.js | 3 +- lib/dgram.js | 15 ++++---- lib/diagnostics_channel.js | 5 +-- lib/dns.js | 14 ++++---- lib/events.js | 4 +-- lib/fs.js | 19 +++++----- lib/inspector.js | 2 +- .../bootstrap/switches/is_main_thread.js | 7 ++-- lib/internal/child_process.js | 25 ++++++------- lib/internal/cluster/primary.js | 9 ++--- lib/internal/crypto/random.js | 3 +- lib/internal/debugger/inspect.js | 5 +-- lib/internal/debugger/inspect_repl.js | 3 +- lib/internal/event_target.js | 5 +-- lib/internal/fs/dir.js | 7 ++-- lib/internal/fs/read/context.js | 3 +- lib/internal/fs/streams.js | 3 +- lib/internal/fs/watchers.js | 5 +-- lib/internal/js_stream_socket.js | 3 +- lib/internal/main/inspect.js | 3 +- lib/internal/process/promises.js | 3 +- lib/internal/process/warning.js | 5 +-- lib/internal/readline/callbacks.js | 9 ++--- lib/internal/readline/interface.js | 3 +- lib/internal/readline/promises.js | 9 ++--- lib/internal/tls/secure-pair.js | 3 +- lib/internal/webstreams/adapters.js | 15 ++++---- lib/internal/webstreams/readablestream.js | 4 +-- lib/internal/worker.js | 3 +- lib/net.js | 36 +++++++++---------- lib/repl.js | 5 +-- lib/util.js | 9 ++--- lib/zlib.js | 7 ++-- 38 files changed, 162 insertions(+), 127 deletions(-) diff --git a/lib/_http_client.js b/lib/_http_client.js index 8db3f8a2766887..07d8cf7cbdff70 100644 --- a/lib/_http_client.js +++ b/lib/_http_client.js @@ -43,6 +43,7 @@ const { const net = require('net'); const assert = require('internal/assert'); +const { queueMicrotask } = require('internal/process/task_queues'); const { kEmptyObject, once, @@ -342,7 +343,7 @@ function ClientRequest(input, options, cb) { if (typeof optsWithoutSignal.createConnection === 'function') { const oncreate = once((err, socket) => { if (err) { - process.nextTick(() => this.emit('error', err)); + queueMicrotask(() => this.emit('error', err)); } else { this.onSocket(socket); } @@ -405,7 +406,7 @@ ClientRequest.prototype.abort = function abort() { return; } this.aborted = true; - process.nextTick(emitAbortNT, this); + queueMicrotask(() => emitAbortNT(this)); this.destroy(); }; @@ -722,11 +723,11 @@ function responseKeepAlive(req) { // has no 'error' handler. // There are cases where _handle === null. Avoid those. Passing undefined to - // nextTick() will call getDefaultTriggerAsyncId() to retrieve the id. + // queueMicrotask will call getDefaultTriggerAsyncId() to retrieve the id. const asyncId = socket._handle ? socket._handle.getAsyncId() : undefined; // Mark this socket as available, AFTER user-added end // handlers have a chance to run. - defaultTriggerAsyncIdScope(asyncId, process.nextTick, emitFreeNT, req); + defaultTriggerAsyncIdScope(asyncId, queueMicrotask, emitFreeNT, req); req.destroyed = true; if (req.res) { @@ -860,7 +861,7 @@ function listenSocketTimeout(req) { ClientRequest.prototype.onSocket = function onSocket(socket, err) { // TODO(ronag): Between here and onSocketNT the socket // has no 'error' handler. - process.nextTick(onSocketNT, this, socket, err); + queueMicrotask(() => onSocketNT(this, socket, err)); }; function onSocketNT(req, socket, err) { diff --git a/lib/_http_incoming.js b/lib/_http_incoming.js index e45ae8190e2215..471be4f7aa8cbe 100644 --- a/lib/_http_incoming.js +++ b/lib/_http_incoming.js @@ -31,6 +31,7 @@ const { } = primordials; const { Readable, finished } = require('stream'); +const { queueMicrotask } = require('internal/process/task_queues'); const kHeaders = Symbol('kHeaders'); const kHeadersDistinct = Symbol('kHeadersDistinct'); @@ -236,10 +237,10 @@ IncomingMessage.prototype._destroy = function _destroy(err, cb) { e = null; } cleanup(); - process.nextTick(onError, this, e || err, cb); + queueMicrotask(() => (onError, this, e || err, cb)); }); } else { - process.nextTick(onError, this, err, cb); + queueMicrotask(() => onError(this, err, cb)); } }; diff --git a/lib/_http_outgoing.js b/lib/_http_outgoing.js index 7303bd88aaa37a..db8d2cbd1c178d 100644 --- a/lib/_http_outgoing.js +++ b/lib/_http_outgoing.js @@ -76,6 +76,7 @@ const { } = require('internal/errors'); const { validateString } = require('internal/validators'); const { isUint8Array } = require('internal/util/types'); +const { queueMicrotask } = require('internal/process/task_queues'); let debug = require('internal/util/debuglog').debuglog('http', (fn) => { debug = fn; @@ -888,7 +889,7 @@ OutgoingMessage.prototype.write = function write(chunk, encoding, callback) { function onError(msg, err, callback) { const triggerAsyncId = msg.socket ? msg.socket[async_id_symbol] : undefined; defaultTriggerAsyncIdScope(triggerAsyncId, - process.nextTick, + queueMicrotask, emitErrorNt, msg, err, @@ -935,7 +936,7 @@ function write_(msg, chunk, encoding, callback, fromEnd) { if (!msg.destroyed) { onError(msg, err, callback); } else { - process.nextTick(callback, err); + queueMicrotask(() => callback(err)); } return false; } @@ -969,14 +970,14 @@ function write_(msg, chunk, encoding, callback, fromEnd) { } else { debug('This type of response MUST NOT have a body. ' + 'Ignoring write() calls.'); - process.nextTick(callback); + queueMicrotask(callback); return true; } } if (!fromEnd && msg.socket && !msg.socket.writableCorked) { msg.socket.cork(); - process.nextTick(connectionCorkNT, msg.socket); + queueMicrotask(() => connectionCorkNT(msg.socket)); } let ret; @@ -1110,7 +1111,7 @@ OutgoingMessage.prototype.end = function end(chunk, encoding, callback) { } else if (!this._headerSent || this.writableLength || chunk) { this._send('', 'latin1', finish); } else { - process.nextTick(finish); + queueMicrotask(finish); } if (this[kSocket]) { diff --git a/lib/_http_server.js b/lib/_http_server.js index 3e19f1ba78e7cc..526e4b15d9bbf2 100644 --- a/lib/_http_server.js +++ b/lib/_http_server.js @@ -38,6 +38,7 @@ const { const net = require('net'); const EE = require('events'); const assert = require('internal/assert'); +const { queueMicrotask } = require('internal/process/task_queues'); const { parsers, freeParser, @@ -994,7 +995,7 @@ function resOnFinish(req, res, socket, state, server) { res.detachSocket(socket); clearIncoming(req); - process.nextTick(emitCloseNT, res); + queueMicrotask(() => emitCloseNT(res)); if (res._last) { if (typeof socket.destroySoon === 'function') { diff --git a/lib/_tls_wrap.js b/lib/_tls_wrap.js index 95f61cb9f6c289..7c8af7c820cbf0 100644 --- a/lib/_tls_wrap.js +++ b/lib/_tls_wrap.js @@ -67,6 +67,7 @@ const { owner_symbol } = require('internal/async_hooks').symbols; const { isArrayBufferView } = require('internal/util/types'); const { SecureContext: NativeSecureContext } = internalBinding('crypto'); const { ConnResetException, codes } = require('internal/errors'); +const { queueMicrotask } = require('internal/process/task_queues'); const { ERR_INVALID_ARG_TYPE, ERR_INVALID_ARG_VALUE, @@ -609,7 +610,7 @@ function TLSSocket(socket, opts) { } // Read on next tick so the caller has a chance to setup listeners - process.nextTick(initRead, this, socket); + queueMicrotask(() => initRead(this, socket)); } ObjectSetPrototypeOf(TLSSocket.prototype, net.Socket.prototype); ObjectSetPrototypeOf(TLSSocket, net.Socket); @@ -999,7 +1000,7 @@ TLSSocket.prototype.renegotiate = function(options, callback) { this._handle.renegotiate(); } catch (err) { if (callback) { - process.nextTick(callback, err); + queueMicrotask(() => callback(err)); } return false; } diff --git a/lib/child_process.js b/lib/child_process.js index 449013906e93e5..3e805a50e26f81 100644 --- a/lib/child_process.js +++ b/lib/child_process.js @@ -86,6 +86,7 @@ const { validateString, } = require('internal/validators'); const child_process = require('internal/child_process'); +const { queueMicrotask } = require('internal/process/task_queues'); const { getValidStdio, setupChannel, @@ -783,7 +784,7 @@ function spawn(file, args, options) { if (options.signal) { const signal = options.signal; if (signal.aborted) { - process.nextTick(onAbortListener); + queueMicrotask(onAbortListener); } else { addAbortListener ??= require('events').addAbortListener; const disposable = addAbortListener(signal, onAbortListener); diff --git a/lib/dgram.js b/lib/dgram.js index 8c8da1ad8856b5..9875e3c43c6312 100644 --- a/lib/dgram.js +++ b/lib/dgram.js @@ -63,6 +63,7 @@ const { Buffer } = require('buffer'); const { deprecate, guessHandleType, promisify } = require('internal/util'); const { isArrayBufferView } = require('internal/util/types'); const EventEmitter = require('events'); +const { queueMicrotask } = require('internal/process/task_queues'); const { defaultTriggerAsyncIdScope, symbols: { async_id_symbol, owner_symbol }, @@ -436,7 +437,7 @@ function doConnect(ex, self, ip, address, port, callback) { if (ex) { state.connectState = CONNECT_STATE_DISCONNECTED; - return process.nextTick(() => { + return queueMicrotask(() => { if (callback) { self.removeListener('connect', callback); callback(ex); @@ -447,7 +448,7 @@ function doConnect(ex, self, ip, address, port, callback) { } state.connectState = CONNECT_STATE_CONNECTED; - process.nextTick(() => self.emit('connect')); + queueMicrotask(() => self.emit('connect')); } @@ -680,11 +681,11 @@ function doSend(ex, self, ip, list, address, port, callback) { if (ex) { if (typeof callback === 'function') { - process.nextTick(callback, ex); + queueMicrotask(callback, ex); return; } - process.nextTick(() => self.emit('error', ex)); + queueMicrotask(() => self.emit('error', ex)); return; } else if (!state.handle) { return; @@ -709,14 +710,14 @@ function doSend(ex, self, ip, list, address, port, callback) { // Synchronous finish. The return code is msg_length + 1 so that we can // distinguish between synchronous success and asynchronous success. if (callback) - process.nextTick(callback, null, err - 1); + queueMicrotask(() => (callback, null, err - 1)); return; } if (err && callback) { // Don't emit as error, dgram_legacy.js compatibility const ex = new ExceptionWithHostPort(err, 'send', address, port); - process.nextTick(callback, ex); + queueMicrotask(() => callback(ex)); } } @@ -747,7 +748,7 @@ Socket.prototype.close = function(callback) { state.handle.close(); state.handle = null; defaultTriggerAsyncIdScope(this[async_id_symbol], - process.nextTick, + queueMicrotask, socketCloseNT, this); diff --git a/lib/diagnostics_channel.js b/lib/diagnostics_channel.js index 97a86805ce7a32..64046ae339b273 100644 --- a/lib/diagnostics_channel.js +++ b/lib/diagnostics_channel.js @@ -25,6 +25,7 @@ const { const { validateFunction, } = require('internal/validators'); +const { queueMicrotask } = require('internal/process/task_queues'); const { triggerUncaughtException } = internalBinding('errors'); @@ -82,7 +83,7 @@ function wrapStoreRun(store, data, next, transform = defaultTransform) { try { context = transform(data); } catch (err) { - process.nextTick(() => { + queueMicrotask(() => { triggerUncaughtException(err, false); }); return next(); @@ -141,7 +142,7 @@ class ActiveChannel { const onMessage = this._subscribers[i]; onMessage(data, this.name); } catch (err) { - process.nextTick(() => { + queueMicrotask(() => { triggerUncaughtException(err, false); }); } diff --git a/lib/dns.js b/lib/dns.js index 681f0aa3e58ecf..e804b340624486 100644 --- a/lib/dns.js +++ b/lib/dns.js @@ -85,6 +85,7 @@ const { validatePort, validateString, } = require('internal/validators'); +const { queueMicrotask } = require('internal/process/task_queues'); const { GetAddrInfoReqWrap, @@ -194,9 +195,9 @@ function lookup(hostname, options, callback) { if (!hostname) { emitInvalidHostnameWarning(hostname); if (all) { - process.nextTick(callback, null, []); + queueMicrotask(() => callback(null, [])); } else { - process.nextTick(callback, null, null, family === 6 ? 6 : 4); + queueMicrotask(() => callback(null, null, family === 6 ? 6 : 4)); } return {}; } @@ -204,10 +205,10 @@ function lookup(hostname, options, callback) { const matchedFamily = isIP(hostname); if (matchedFamily) { if (all) { - process.nextTick( - callback, null, [{ address: hostname, family: matchedFamily }]); + queueMicrotask(() => + callback(null, [{ address: hostname, family: matchedFamily }])); } else { - process.nextTick(callback, null, hostname, matchedFamily); + queueMicrotask(() => callback(null, hostname, matchedFamily)); } return {}; } @@ -222,7 +223,8 @@ function lookup(hostname, options, callback) { req, hostname, family, hints, verbatim, ); if (err) { - process.nextTick(callback, new DNSException(err, 'getaddrinfo', hostname)); + const ex = new DNSException(err, 'getaddrinfo', hostname); + queueMicrotask(() => callback(ex)); return {}; } if (hasObserver('dns')) { diff --git a/lib/events.js b/lib/events.js index 3ba2484ece938f..ca1a8eafaec445 100644 --- a/lib/events.js +++ b/lib/events.js @@ -375,9 +375,9 @@ function addCatch(that, promise, type, args) { if (typeof then === 'function') { then.call(promise, undefined, function(err) { - // The callback is called with nextTick to avoid a follow-up + // The callback is called with queueMicrotask to avoid a follow-up // rejection from this promise. - process.nextTick(emitUnhandledRejectionOrErr, that, err, type, args); + queueMicrotask(() => emitUnhandledRejectionOrErr(that, err, type, args)); }); } } catch (err) { diff --git a/lib/fs.js b/lib/fs.js index 34ad6ef6be835b..e6838941b02851 100644 --- a/lib/fs.js +++ b/lib/fs.js @@ -60,6 +60,7 @@ const { const pathModule = require('path'); const { isAbsolute } = pathModule; const { isArrayBufferView } = require('internal/util/types'); +const { queueMicrotask } = require('internal/process/task_queues'); const binding = internalBinding('fs'); @@ -378,9 +379,9 @@ function readFile(path, options, callback) { context.signal = options.signal; } if (context.isUserFd) { - process.nextTick(function tick(context) { + queueMicrotask(function tick() { ReflectApply(readFileAfterOpen, { context }, [null, path]); - }, context); + }); return; } @@ -664,7 +665,7 @@ function read(fd, buffer, offsetOrOptions, length, position, callback) { length |= 0; if (length === 0) { - return process.nextTick(function tick() { + return queueMicrotask(function tick() { callback(null, 0, buffer); }); } @@ -956,7 +957,7 @@ function writev(fd, buffers, position, callback) { callback = maybeCallback(callback || position); if (buffers.length === 0) { - process.nextTick(callback, null, 0, buffers); + queueMicrotask(() => callback(null, 0, buffers)); return; } @@ -2479,7 +2480,7 @@ function watch(filename, options, listener) { } if (options.signal) { if (options.signal.aborted) { - process.nextTick(() => watcher.close()); + queueMicrotask(() => watcher.close()); } else { const listener = () => watcher.close(); kResistStopPropagation ??= require('internal/event_target').kResistStopPropagation; @@ -2820,7 +2821,7 @@ function realpath(p, options, callback) { LOOP(); }); } else { - process.nextTick(LOOP); + queueMicrotask(LOOP); } // Walk down the path, swapping out linked path parts for their real @@ -2851,7 +2852,7 @@ function realpath(p, options, callback) { isFileType(statValues, S_IFSOCK)) { return callback(null, encodeRealpathResult(p, options)); } - return process.nextTick(LOOP); + return queueMicrotask(LOOP); } return fs.lstat(base, { bigint: true }, gotStat); @@ -2863,7 +2864,7 @@ function realpath(p, options, callback) { // If not a symlink, skip to the next path part if (!stats.isSymbolicLink()) { knownHard.add(base); - return process.nextTick(LOOP); + return queueMicrotask(LOOP); } // Stat & read the link if not read before. @@ -2908,7 +2909,7 @@ function realpath(p, options, callback) { LOOP(); }); } else { - process.nextTick(LOOP); + queueMicrotask(LOOP); } } } diff --git a/lib/inspector.js b/lib/inspector.js index e51bcf2f3cd977..1247567a140b34 100644 --- a/lib/inspector.js +++ b/lib/inspector.js @@ -145,7 +145,7 @@ class Session extends EventEmitter { this.#connection = null; const remainingCallbacks = this.#messageCallbacks.values(); for (const callback of remainingCallbacks) { - process.nextTick(callback, new ERR_INSPECTOR_CLOSED()); + queueMicrotask(() => callback(new ERR_INSPECTOR_CLOSED())); } this.#messageCallbacks.clear(); this.#nextId = 1; diff --git a/lib/internal/bootstrap/switches/is_main_thread.js b/lib/internal/bootstrap/switches/is_main_thread.js index 8707bc7daaa616..eacd5b0e8156b8 100644 --- a/lib/internal/bootstrap/switches/is_main_thread.js +++ b/lib/internal/bootstrap/switches/is_main_thread.js @@ -8,6 +8,7 @@ const { isBuildingSnapshot, }, } = require('internal/v8/startup_snapshot'); +const { queueMicrotask } = require('internal/process/task_queues'); // TODO(joyeecheung): deprecate and remove these underscore methods process._debugProcess = rawMethods._debugProcess; process._debugEnd = rawMethods._debugEnd; @@ -118,7 +119,7 @@ function dummyDestroy(err, cb) { // The 'close' event is needed so that finished and // pipeline work correctly. if (!this._writableState.emitClose) { - process.nextTick(() => { + queueMicrotask(() => { this.emit('close'); }); } @@ -255,10 +256,10 @@ function getStdin() { } // If the user calls stdin.pause(), then we need to stop reading - // once the stream implementation does so (one nextTick later), + // once the stream implementation does so (one micro task later), // so that the process can close down. stdin.on('pause', () => { - process.nextTick(onpause); + queueMicrotask(onpause); }); function onpause() { diff --git a/lib/internal/child_process.js b/lib/internal/child_process.js index 49edaba5b558e9..de1131913e83ab 100644 --- a/lib/internal/child_process.js +++ b/lib/internal/child_process.js @@ -62,6 +62,7 @@ const spawn_sync = internalBinding('spawn_sync'); const { kStateSymbol } = require('internal/dgram'); const dc = require('diagnostics_channel'); const childProcessChannel = dc.channel('child_process'); +const { queueMicrotask } = require('internal/process/task_queues'); const { UV_EACCES, @@ -297,10 +298,10 @@ function ChildProcess() { // If any of the stdio streams have not been touched, // then pull all the data through so that it can get the // eof and emit a 'close' event. - // Do it on nextTick so that the user has one last chance + // Do it on queueMicrotask so that the user has one last chance // to consume the output, if for example they only want to // start reading the data once the process exits. - process.nextTick(flushStdio, this); + queueMicrotask(() => flushStdio(this)); maybeClose(this); }; @@ -401,7 +402,7 @@ ChildProcess.prototype.spawn = function(options) { err === UV_EMFILE || err === UV_ENFILE || err === UV_ENOENT) { - process.nextTick(onErrorNT, this, err); + queueMicrotask(() => onErrorNT(this, err)); // There is no point in continuing when we've hit EMFILE or ENFILE // because we won't be able to set up the stdio file descriptors. @@ -420,7 +421,7 @@ ChildProcess.prototype.spawn = function(options) { this._handle = null; throw new ErrnoException(err, 'spawn'); } else { - process.nextTick(onSpawnNT, this); + queueMicrotask(() => onSpawnNT(this)); } this.pid = this._handle.pid; @@ -718,7 +719,7 @@ function setupChannel(target, channel, serializationMode) { target.on('newListener', function() { - process.nextTick(() => { + queueMicrotask(() => { if (!target.channel || !target.listenerCount('message')) return; @@ -753,9 +754,9 @@ function setupChannel(target, channel, serializationMode) { } const ex = new ERR_IPC_CHANNEL_CLOSED(); if (typeof callback === 'function') { - process.nextTick(callback, ex); + queueMicrotask(() => callback(ex)); } else { - process.nextTick(() => this.emit('error', ex)); + queueMicrotask(() => this.emit('error', ex)); } return false; }; @@ -868,7 +869,7 @@ function setupChannel(target, channel, serializationMode) { }; control.refCounted(); } else if (typeof callback === 'function') { - process.nextTick(callback, null); + queueMicrotask(() => callback(null)); } } else { // Cleanup handle on error @@ -878,9 +879,9 @@ function setupChannel(target, channel, serializationMode) { if (!options.swallowErrors) { const ex = new ErrnoException(err, 'write'); if (typeof callback === 'function') { - process.nextTick(callback, ex); + queueMicrotask(() => callback(ex)); } else { - process.nextTick(() => this.emit('error', ex)); + queueMicrotask(() => this.emit('error', ex)); } } } @@ -943,7 +944,7 @@ function setupChannel(target, channel, serializationMode) { return; } - process.nextTick(finish); + queueMicrotask(finish); }; function emit(event, message, handle) { @@ -964,7 +965,7 @@ function setupChannel(target, channel, serializationMode) { const eventName = (internal ? 'internalMessage' : 'message'); - process.nextTick(emit, eventName, message, handle); + queueMicrotask(() => emit(eventName, message, handle)); } channel.readStart(); diff --git a/lib/internal/cluster/primary.js b/lib/internal/cluster/primary.js index 945f440cd19797..e3954f08869b87 100644 --- a/lib/internal/cluster/primary.js +++ b/lib/internal/cluster/primary.js @@ -24,6 +24,7 @@ const SharedHandle = require('internal/cluster/shared_handle'); const Worker = require('internal/cluster/worker'); const { getInspectPort, isUsingInspector } = require('internal/util/inspector'); const { internal, sendHelper } = require('internal/cluster/utils'); +const { queueMicrotask } = require('internal/process/task_queues'); const cluster = new EventEmitter(); const intercom = new EventEmitter(); const SCHED_NONE = 1; @@ -83,14 +84,14 @@ cluster.setupPrimary = function(options) { cluster.settings = settings; if (initialized === true) - return process.nextTick(setupSettingsNT, settings); + return queueMicrotask(() => setupSettingsNT(settings)); initialized = true; schedulingPolicy = cluster.schedulingPolicy; // Freeze policy. assert(schedulingPolicy === SCHED_NONE || schedulingPolicy === SCHED_RR, `Bad cluster.schedulingPolicy: ${schedulingPolicy}`); - process.nextTick(setupSettingsNT, settings); + queueMicrotask(() => setupSettingsNT(settings)); process.on('internalMessage', (message) => { if (message.cmd !== 'NODE_DEBUG_ENABLED') @@ -211,7 +212,7 @@ cluster.fork = function(env) { }); worker.process.on('internalMessage', internal(worker, onmessage)); - process.nextTick(emitForkNT, worker); + queueMicrotask(() => emitForkNT(worker)); cluster.workers[worker.id] = worker; return worker; }; @@ -224,7 +225,7 @@ cluster.disconnect = function(cb) { const workers = ObjectKeys(cluster.workers); if (workers.length === 0) { - process.nextTick(() => intercom.emit('disconnect')); + queueMicrotask(() => intercom.emit('disconnect')); } else { for (const worker of ObjectValues(cluster.workers)) { if (worker.isConnected()) { diff --git a/lib/internal/crypto/random.js b/lib/internal/crypto/random.js index 0533216969bc8e..97fc789419c89c 100644 --- a/lib/internal/crypto/random.js +++ b/lib/internal/crypto/random.js @@ -34,6 +34,7 @@ const { kEmptyObject, lazyDOMException, } = require('internal/util'); +const { queueMicrotask } = require('internal/process/task_queues'); const { Buffer, kMaxLength } = require('buffer'); @@ -262,7 +263,7 @@ function randomInt(min, max, callback) { if (x < randLimit) { const n = (x % range) + min; if (isSync) return n; - process.nextTick(callback, undefined, n); + queueMicrotask(() => callback(undefined, n)); return; } } diff --git a/lib/internal/debugger/inspect.js b/lib/internal/debugger/inspect.js index 5e93699f8ba078..69990fa6160bf0 100644 --- a/lib/internal/debugger/inspect.js +++ b/lib/internal/debugger/inspect.js @@ -31,6 +31,7 @@ const { const { AbortController, } = require('internal/abort_controller'); +const { queueMicrotask } = require('internal/process/task_queues'); const { 0: InspectClient, 1: createRepl } = [ @@ -182,7 +183,7 @@ class NodeInspector { this.repl.on('exit', exitCodeZero); this.paused = false; } catch (error) { - process.nextTick(() => { throw error; }); + queueMicrotask(() => { throw error; }); } })(); } @@ -203,7 +204,7 @@ class NodeInspector { } this.stdin.resume(); } catch (error) { - process.nextTick(() => { throw error; }); + queueMicrotask(() => { throw error; }); } })(); } diff --git a/lib/internal/debugger/inspect_repl.js b/lib/internal/debugger/inspect_repl.js index b4f454152dc438..0ee876fc428781 100644 --- a/lib/internal/debugger/inspect_repl.js +++ b/lib/internal/debugger/inspect_repl.js @@ -45,6 +45,7 @@ const { } = primordials; const { ERR_DEBUGGER_ERROR } = require('internal/errors').codes; +const { queueMicrotask } = require('internal/process/task_queues'); const { validateString, validateNumber } = require('internal/validators'); @@ -1098,7 +1099,7 @@ function createRepl(inspector) { exitDebugRepl = () => { // Restore all listeners - process.nextTick(() => { + queueMicrotask(() => { ArrayPrototypeForEach(listeners, (listener) => { repl.on('SIGINT', listener); }); diff --git a/lib/internal/event_target.js b/lib/internal/event_target.js index 4c67453fea4c59..79d1e39027c59b 100644 --- a/lib/internal/event_target.js +++ b/lib/internal/event_target.js @@ -39,6 +39,7 @@ const { kValidateObjectAllowArray, kValidateObjectAllowFunction, } = require('internal/validators'); +const { queueMicrotask } = require('internal/process/task_queues'); const { customInspectSymbol, @@ -1085,7 +1086,7 @@ function addCatch(promise) { const then = promise.then; if (typeof then === 'function') { FunctionPrototypeCall(then, promise, undefined, function(err) { - // The callback is called with nextTick to avoid a follow-up + // The callback is called with queueMicrotask to avoid a follow-up // rejection from this promise. emitUncaughtException(err); }); @@ -1093,7 +1094,7 @@ function addCatch(promise) { } function emitUncaughtException(err) { - process.nextTick(() => { throw err; }); + queueMicrotask(() => { throw err; }); } function makeEventHandler(handler) { diff --git a/lib/internal/fs/dir.js b/lib/internal/fs/dir.js index 82c6c1bd780fba..a237b0a66babfa 100644 --- a/lib/internal/fs/dir.js +++ b/lib/internal/fs/dir.js @@ -33,6 +33,7 @@ const { validateFunction, validateUint32, } = require('internal/validators'); +const { queueMicrotask } = require('internal/process/task_queues'); const kDirHandle = Symbol('kDirHandle'); const kDirPath = Symbol('kDirPath'); @@ -106,7 +107,7 @@ class Dir { } if (maybeSync) - process.nextTick(callback, null, dirent); + queueMicrotask(() => callback(null, dirent)); else callback(null, dirent); return; @@ -117,7 +118,7 @@ class Dir { const req = new FSReqCallback(); req.oncomplete = (err, result) => { - process.nextTick(() => { + queueMicrotask(() => { const queue = this[kDirOperationQueue]; this[kDirOperationQueue] = null; for (const op of queue) op(); @@ -237,7 +238,7 @@ class Dir { validateFunction(callback, 'callback'); if (this[kDirClosed] === true) { - process.nextTick(callback, new ERR_DIR_CLOSED()); + queueMicrotask(() => callback(new ERR_DIR_CLOSED())); return; } diff --git a/lib/internal/fs/read/context.js b/lib/internal/fs/read/context.js index b1a5d6ae03e953..7f5e9b860f7bb5 100644 --- a/lib/internal/fs/read/context.js +++ b/lib/internal/fs/read/context.js @@ -14,6 +14,7 @@ const { } = require('internal/fs/utils'); const { Buffer } = require('buffer'); +const { queueMicrotask } = require('internal/process/task_queues'); const { FSReqCallback, close, read } = internalBinding('fs'); @@ -111,7 +112,7 @@ class ReadFileContext { close(err) { if (this.isUserFd) { - process.nextTick(function tick(context) { + queueMicrotask(function tick(context) { ReflectApply(readFileAfterClose, { context }, [null]); }, this); return; diff --git a/lib/internal/fs/streams.js b/lib/internal/fs/streams.js index 43f06d0104de61..f3b0428101acde 100644 --- a/lib/internal/fs/streams.js +++ b/lib/internal/fs/streams.js @@ -41,6 +41,7 @@ const { Readable, Writable, finished } = require('stream'); const { toPathIfFileURL } = require('internal/url'); const kIoDone = Symbol('kIoDone'); const kIsPerformingIO = Symbol('kIsPerformingIO'); +const { queueMicrotask } = require('internal/process/task_queues'); const kFs = Symbol('kFs'); const kHandle = Symbol('kHandle'); @@ -534,7 +535,7 @@ WriteStream.prototype._destroy = function(err, cb) { WriteStream.prototype.close = function(cb) { if (cb) { if (this.closed) { - process.nextTick(cb); + queueMicrotask(cb); return; } this.on('close', cb); diff --git a/lib/internal/fs/watchers.js b/lib/internal/fs/watchers.js index f5ecc15159f457..b4a5f004d0d77a 100644 --- a/lib/internal/fs/watchers.js +++ b/lib/internal/fs/watchers.js @@ -39,6 +39,7 @@ const { } = require('internal/async_hooks'); const { toNamespacedPath } = require('path'); +const { queueMicrotask } = require('internal/process/task_queues'); const { validateAbortSignal, @@ -144,7 +145,7 @@ StatWatcher.prototype.stop = function() { return; defaultTriggerAsyncIdScope(this._handle.getAsyncId(), - process.nextTick, + queueMicrotask, emitStop, this); this._handle.close(); @@ -274,7 +275,7 @@ FSWatcher.prototype.close = function() { } this._handle.close(); this._handle = null; // Make the handle garbage collectable. - process.nextTick(emitCloseNT, this); + queueMicrotask(() => emitCloseNT(this)); }; FSWatcher.prototype.ref = function() { diff --git a/lib/internal/js_stream_socket.js b/lib/internal/js_stream_socket.js index a6aee73f468b08..6fd385300d1cb6 100644 --- a/lib/internal/js_stream_socket.js +++ b/lib/internal/js_stream_socket.js @@ -15,6 +15,7 @@ let debug = require('internal/util/debuglog').debuglog( debug = fn; }, ); +const { queueMicrotask } = require('internal/process/task_queues'); const { owner_symbol } = require('internal/async_hooks').symbols; const { ERR_STREAM_WRAP } = require('internal/errors').codes; @@ -151,7 +152,7 @@ class JSStreamSocket extends Socket { const handle = this._handle; assert(handle !== null); - process.nextTick(() => { + queueMicrotask(() => { // Ensure that write is dispatched asynchronously. this.stream.end(() => { this.finishShutdown(handle, 0); diff --git a/lib/internal/main/inspect.js b/lib/internal/main/inspect.js index a60e4aa40b9605..824b10338a1ccf 100644 --- a/lib/internal/main/inspect.js +++ b/lib/internal/main/inspect.js @@ -6,6 +6,7 @@ const { prepareMainThreadExecution, markBootstrapComplete, } = require('internal/process/pre_execution'); +const { queueMicrotask } = require('internal/process/task_queues'); prepareMainThreadExecution(); @@ -13,6 +14,6 @@ prepareMainThreadExecution(); markBootstrapComplete(); // Start the debugger agent. -process.nextTick(() => { +queueMicrotask(() => { require('internal/debugger/inspect').start(); }); diff --git a/lib/internal/process/promises.js b/lib/internal/process/promises.js index d80ce1ef764a00..31191781e52006 100644 --- a/lib/internal/process/promises.js +++ b/lib/internal/process/promises.js @@ -21,6 +21,7 @@ const { } = internalBinding('task_queue'); const { deprecate } = require('internal/util'); +const { queueMicrotask } = require('internal/process/task_queues'); const { noSideEffectsToString, @@ -135,7 +136,7 @@ const multipleResolvesDeprecate = deprecate( function resolveError(type, promise, reason) { // We have to wrap this in a next tick. Otherwise the error could be caught by // the executed promise. - process.nextTick(() => { + queueMicrotask(() => { if (process.emit('multipleResolves', type, promise, reason)) { multipleResolvesDeprecate(); } diff --git a/lib/internal/process/warning.js b/lib/internal/process/warning.js index 9e1e6f7a6a2dde..9c140704ac779a 100644 --- a/lib/internal/process/warning.js +++ b/lib/internal/process/warning.js @@ -21,6 +21,7 @@ const { isErrorStackTraceLimitWritable, } = require('internal/errors'); const { validateString } = require('internal/validators'); +const { queueMicrotask } = require('internal/process/task_queues'); // Lazily loaded let fs; @@ -181,12 +182,12 @@ function emitWarning(warning, type, code, ctor) { if (process.throwDeprecation) { // Delay throwing the error to guarantee that all former warnings were // properly logged. - return process.nextTick(() => { + return queueMicrotask(() => { throw warning; }); } } - process.nextTick(doEmitWarning, warning); + queueMicrotask(() => doEmitWarning(warning)); } function emitWarningSync(warning, type, code, ctor) { diff --git a/lib/internal/readline/callbacks.js b/lib/internal/readline/callbacks.js index 692048c9157199..ecc597cc32ebcc 100644 --- a/lib/internal/readline/callbacks.js +++ b/lib/internal/readline/callbacks.js @@ -17,6 +17,7 @@ const { const { CSI, } = require('internal/readline/utils'); +const { queueMicrotask } = require('internal/process/task_queues'); const { kClearLine, @@ -44,7 +45,7 @@ function cursorTo(stream, x, y, callback) { if (NumberIsNaN(y)) throw new ERR_INVALID_ARG_VALUE('y', y); if (stream == null || (typeof x !== 'number' && typeof y !== 'number')) { - if (typeof callback === 'function') process.nextTick(callback, null); + if (typeof callback === 'function') queueMicrotask(() => callback(null)); return true; } @@ -64,7 +65,7 @@ function moveCursor(stream, dx, dy, callback) { } if (stream == null || !(dx || dy)) { - if (typeof callback === 'function') process.nextTick(callback, null); + if (typeof callback === 'function') queueMicrotask(() => callback(null)); return true; } @@ -98,7 +99,7 @@ function clearLine(stream, dir, callback) { } if (stream === null || stream === undefined) { - if (typeof callback === 'function') process.nextTick(callback, null); + if (typeof callback === 'function') queueMicrotask(() => callback(null)); return true; } @@ -117,7 +118,7 @@ function clearScreenDown(stream, callback) { } if (stream === null || stream === undefined) { - if (typeof callback === 'function') process.nextTick(callback, null); + if (typeof callback === 'function') queueMicrotask(() => callback(null)); return true; } diff --git a/lib/internal/readline/interface.js b/lib/internal/readline/interface.js index f7f06674ef7c41..4181554d0d5aa1 100644 --- a/lib/internal/readline/interface.js +++ b/lib/internal/readline/interface.js @@ -66,6 +66,7 @@ const { cursorTo, moveCursor, } = require('internal/readline/callbacks'); +const { queueMicrotask } = require('internal/process/task_queues'); const { StringDecoder } = require('string_decoder'); @@ -324,7 +325,7 @@ function InterfaceConstructor(input, output, completer, terminal) { if (signal) { const onAborted = () => self.close(); if (signal.aborted) { - process.nextTick(onAborted); + queueMicrotask(onAborted); } else { const disposable = EventEmitter.addAbortListener(signal, onAborted); self.once('close', disposable[SymbolDispose]); diff --git a/lib/internal/readline/promises.js b/lib/internal/readline/promises.js index 1a0c7b4c809d47..b4a7862e13111b 100644 --- a/lib/internal/readline/promises.js +++ b/lib/internal/readline/promises.js @@ -10,6 +10,7 @@ const { CSI } = require('internal/readline/utils'); const { validateBoolean, validateInteger } = require('internal/validators'); const { isWritable } = require('internal/streams/utils'); const { codes: { ERR_INVALID_ARG_TYPE } } = require('internal/errors'); +const { queueMicrotask } = require('internal/process/task_queues'); const { kClearToLineBeginning, @@ -44,7 +45,7 @@ class Readline { if (y != null) validateInteger(y, 'y'); const data = y == null ? CSI`${x + 1}G` : CSI`${y + 1};${x + 1}H`; - if (this.#autoCommit) process.nextTick(() => this.#stream.write(data)); + if (this.#autoCommit) queueMicrotask(() => this.#stream.write(data)); else ArrayPrototypePush(this.#todo, data); return this; @@ -74,7 +75,7 @@ class Readline { } else if (dy > 0) { data += CSI`${dy}B`; } - if (this.#autoCommit) process.nextTick(() => this.#stream.write(data)); + if (this.#autoCommit) queueMicrotask(() => this.#stream.write(data)); else ArrayPrototypePush(this.#todo, data); } return this; @@ -95,7 +96,7 @@ class Readline { dir < 0 ? kClearToLineBeginning : dir > 0 ? kClearToLineEnd : kClearLine; - if (this.#autoCommit) process.nextTick(() => this.#stream.write(data)); + if (this.#autoCommit) queueMicrotask(() => this.#stream.write(data)); else ArrayPrototypePush(this.#todo, data); return this; } @@ -106,7 +107,7 @@ class Readline { */ clearScreenDown() { if (this.#autoCommit) { - process.nextTick(() => this.#stream.write(kClearScreenDown)); + queueMicrotask(() => this.#stream.write(kClearScreenDown)); } else { ArrayPrototypePush(this.#todo, kClearScreenDown); } diff --git a/lib/internal/tls/secure-pair.js b/lib/internal/tls/secure-pair.js index 7e0a2f992a82b8..44f496498727c5 100644 --- a/lib/internal/tls/secure-pair.js +++ b/lib/internal/tls/secure-pair.js @@ -5,6 +5,7 @@ const { kEmptyObject } = require('internal/util'); const { Duplex } = require('stream'); const _tls_wrap = require('_tls_wrap'); const _tls_common = require('_tls_common'); +const { queueMicrotask } = require('internal/process/task_queues'); const { Symbol, @@ -31,7 +32,7 @@ class DuplexSocket extends Duplex { _write(chunk, encoding, callback) { if (chunk.length === 0) { - process.nextTick(callback); + queueMicrotask(callback); } else { this[kOtherSide].push(chunk); this[kOtherSide][kCallback] = callback; diff --git a/lib/internal/webstreams/adapters.js b/lib/internal/webstreams/adapters.js index cffa549eafd683..890ab3ec029837 100644 --- a/lib/internal/webstreams/adapters.js +++ b/lib/internal/webstreams/adapters.js @@ -18,6 +18,7 @@ const { } = primordials; const { TextEncoder } = require('internal/encoding'); +const { queueMicrotask } = require('internal/process/task_queues'); const { ReadableStream, @@ -286,7 +287,7 @@ function newStreamWritableFromWritableStream(writableStream, options = kEmptyObj // thrown we don't want those to cause an unhandled // rejection. Let's just escape the promise and // handle it separately. - process.nextTick(() => destroy(writable, error)); + queueMicrotask(() => destroy(writable, error)); } } @@ -348,7 +349,7 @@ function newStreamWritableFromWritableStream(writableStream, options = kEmptyObj // thrown we don't want those to cause an unhandled // rejection. Let's just escape the promise and // handle it separately. - process.nextTick(() => { throw error; }); + queueMicrotask(() => { throw error; }); } } @@ -380,7 +381,7 @@ function newStreamWritableFromWritableStream(writableStream, options = kEmptyObj // thrown we don't want those to cause an unhandled // rejection. Let's just escape the promise and // handle it separately. - process.nextTick(() => destroy(writable, error)); + queueMicrotask(() => destroy(writable, error)); } } @@ -562,7 +563,7 @@ function newStreamReadableFromReadableStream(readableStream, options = kEmptyObj // thrown we don't want those to cause an unhandled // rejection. Let's just escape the promise and // handle it separately. - process.nextTick(() => { throw error; }); + queueMicrotask(() => { throw error; }); } } @@ -709,7 +710,7 @@ function newStreamDuplexFromReadableWritablePair(pair = kEmptyObject, options = // thrown we don't want those to cause an unhandled // rejection. Let's just escape the promise and // handle it separately. - process.nextTick(() => destroy(duplex, error)); + queueMicrotask(() => destroy(duplex, error)); } } @@ -771,7 +772,7 @@ function newStreamDuplexFromReadableWritablePair(pair = kEmptyObject, options = // thrown we don't want those to cause an unhandled // rejection. Let's just escape the promise and // handle it separately. - process.nextTick(() => destroy(duplex, error)); + queueMicrotask(() => destroy(duplex, error)); } } @@ -806,7 +807,7 @@ function newStreamDuplexFromReadableWritablePair(pair = kEmptyObject, options = // thrown we don't want those to cause an unhandled // rejection. Let's just escape the promise and // handle it separately. - process.nextTick(() => { throw error; }); + queueMicrotask(() => { throw error; }); } } diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index 8e27bae439f8e3..454d4c4a7bad75 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -1610,10 +1610,10 @@ function readableStreamDefaultTee(stream, cloneForBranch2) { }); }, [kClose]() { - // The `process.nextTick()` is not part of the spec. + // The `queueMicrotask` is not part of the spec. // This approach was needed to avoid a race condition working with esm // Further information, see: https://github.com/nodejs/node/issues/39758 - process.nextTick(() => { + queueMicrotask(() => { reading = false; if (!canceled1) readableStreamDefaultControllerClose(branch1[kState].controller); diff --git a/lib/internal/worker.js b/lib/internal/worker.js index b58cbe56d01703..def5bb206178af 100644 --- a/lib/internal/worker.js +++ b/lib/internal/worker.js @@ -76,6 +76,7 @@ const { kStackSizeMb, kTotalResourceLimitCount, } = internalBinding('worker'); +const { queueMicrotask } = require('internal/process/task_queues'); const kHandle = Symbol('kHandle'); const kPublicPort = Symbol('kPublicPort'); @@ -290,7 +291,7 @@ class Worker extends EventEmitter { // Actually start the new thread now that everything is in place. this[kHandle].startThread(); - process.nextTick(() => process.emit('worker', this)); + queueMicrotask(() => process.emit('worker', this)); if (workerThreadsChannel.hasSubscribers) { workerThreadsChannel.publish({ worker: this, diff --git a/lib/net.js b/lib/net.js index a7e1dbb3f524a5..eadd0e00c208e1 100644 --- a/lib/net.js +++ b/lib/net.js @@ -562,7 +562,7 @@ function writeAfterFIN(chunk, encoding, cb) { { code: 'EPIPE' }, ); if (typeof cb === 'function') { - defaultTriggerAsyncIdScope(this[async_id_symbol], process.nextTick, cb, er); + defaultTriggerAsyncIdScope(this[async_id_symbol], queueMicrotask, cb, er); } this.destroy(er); @@ -833,7 +833,7 @@ Socket.prototype._destroy = function(exception, cb) { cb(exception); } else { cb(exception); - process.nextTick(emitCloseNT, this); + queueMicrotask(() => emitCloseNT(this)); } if (this._server) { @@ -1310,7 +1310,7 @@ function lookupAndConnect(self, options) { // If host is an IP, skip performing a lookup const addressType = isIP(host); if (addressType) { - defaultTriggerAsyncIdScope(self[async_id_symbol], process.nextTick, () => { + defaultTriggerAsyncIdScope(self[async_id_symbol], queueMicrotask, () => { if (self.connecting) defaultTriggerAsyncIdScope( self[async_id_symbol], @@ -1377,15 +1377,15 @@ function lookupAndConnect(self, options) { // net.createConnection() creates a net.Socket object and immediately // calls net.Socket.connect() on it (that's us). There are no event // listeners registered yet so defer the error event to the next tick. - process.nextTick(connectErrorNT, self, err); + queueMicrotask(() => connectErrorNT(self, err)); } else if (!isIP(ip)) { err = new ERR_INVALID_IP_ADDRESS(ip); - process.nextTick(connectErrorNT, self, err); + queueMicrotask(() => connectErrorNT(self, err)); } else if (addressType !== 4 && addressType !== 6) { err = new ERR_INVALID_ADDRESS_FAMILY(addressType, options.host, options.port); - process.nextTick(connectErrorNT, self, err); + queueMicrotask(() => connectErrorNT(self, err)); } else { self._unrefTimer(); defaultTriggerAsyncIdScope( @@ -1414,7 +1414,7 @@ function lookupAndConnectMultiple( // net.createConnection() creates a net.Socket object and immediately // calls net.Socket.connect() on it (that's us). There are no event // listeners registered yet so defer the error event to the next tick. - process.nextTick(connectErrorNT, self, err); + queueMicrotask(() => connectErrorNT(self, err)); return; } @@ -1451,12 +1451,12 @@ function lookupAndConnectMultiple( if (!isIP(firstIp)) { err = new ERR_INVALID_IP_ADDRESS(firstIp); - process.nextTick(connectErrorNT, self, err); + queueMicrotask(() => connectErrorNT(self, err)); } else if (firstAddressType !== 4 && firstAddressType !== 6) { err = new ERR_INVALID_ADDRESS_FAMILY(firstAddressType, options.host, options.port); - process.nextTick(connectErrorNT, self, err); + queueMicrotask(() => connectErrorNT(self, err)); } return; @@ -1616,9 +1616,9 @@ function addClientAbortSignalOption(self, options) { } if (signal.aborted) { - process.nextTick(onAbort); + queueMicrotask(onAbort); } else { - process.nextTick(() => { + queueMicrotask(() => { disposable = EventEmitter.addAbortListener(signal, onAbort); }); } @@ -1697,7 +1697,7 @@ function addServerAbortSignalOption(self, options) { self.close(); }; if (signal.aborted) { - process.nextTick(onAborted); + queueMicrotask(onAborted); } else { const disposable = EventEmitter.addAbortListener(signal, onAborted); self.once('close', disposable[SymbolDispose]); @@ -1852,8 +1852,8 @@ function setupListenHandle(address, port, addressType, backlog, fd, flags) { rval = createServerHandle(address, port, addressType, fd, flags); if (typeof rval === 'number') { - const error = new UVExceptionWithHostPort(rval, 'listen', address, port); - process.nextTick(emitErrorNT, this, error); + const ex = new UVExceptionWithHostPort(rval, 'listen', address, port); + queueMicrotask(() => emitErrorNT(this, ex)); return; } this._handle = rval; @@ -1873,7 +1873,7 @@ function setupListenHandle(address, port, addressType, backlog, fd, flags) { this._handle.close(); this._handle = null; defaultTriggerAsyncIdScope(this[async_id_symbol], - process.nextTick, + queueMicrotask, emitErrorNT, this, ex); @@ -1888,7 +1888,7 @@ function setupListenHandle(address, port, addressType, backlog, fd, flags) { this.unref(); defaultTriggerAsyncIdScope(this[async_id_symbol], - process.nextTick, + queueMicrotask, emitListeningNT, this); } @@ -2179,7 +2179,7 @@ Server.prototype.getConnections = function(cb) { function end(err, connections) { defaultTriggerAsyncIdScope(self[async_id_symbol], - process.nextTick, + queueMicrotask, cb, err, connections); @@ -2268,7 +2268,7 @@ Server.prototype._emitCloseIfDrained = function() { } defaultTriggerAsyncIdScope(this[async_id_symbol], - process.nextTick, + queueMicrotask, emitCloseNT, this); }; diff --git a/lib/repl.js b/lib/repl.js index 3029c94b1e1ac0..73b461b7c97bed 100644 --- a/lib/repl.js +++ b/lib/repl.js @@ -118,6 +118,7 @@ const { } = require('internal/util'); const { inspect } = require('internal/util/inspect'); const vm = require('vm'); +const { queueMicrotask } = require('internal/process/task_queues'); const { runInThisContext, runInContext } = vm.Script.prototype; @@ -745,7 +746,7 @@ function REPLServer(prompt, if (options[kStandaloneREPL] && process.listenerCount('uncaughtException') !== 0) { - process.nextTick(() => { + queueMicrotask(() => { process.emit('uncaughtException', e); self.clearBufferedCommand(); self.lines.level = []; @@ -1089,7 +1090,7 @@ REPLServer.prototype.close = function close() { return; } - process.nextTick(() => + queueMicrotask(() => ReflectApply(Interface.prototype.close, this, []), ); }; diff --git a/lib/util.js b/lib/util.js index 13a437c9318d05..25bf7d567fca0e 100644 --- a/lib/util.js +++ b/lib/util.js @@ -78,6 +78,7 @@ const { promisify, defineLazyProperties, } = require('internal/util'); +const { queueMicrotask } = require('internal/process/task_queues'); let abortController; @@ -309,11 +310,11 @@ function callbackify(original) { const maybeCb = ArrayPrototypePop(args); validateFunction(maybeCb, 'last argument'); const cb = FunctionPrototypeBind(maybeCb, this); - // In true node style we process the callback on `nextTick` with all the - // implications (stack, `uncaughtException`, `async_hooks`) + // In true node style we process the callback with a custom `queueMicrotask` + // with all the implications (stack, `uncaughtException`, `async_hooks`) ReflectApply(original, this, args) - .then((ret) => process.nextTick(cb, null, ret), - (rej) => process.nextTick(callbackifyOnRejected, rej, cb)); + .then((ret) => queueMicrotask(cb, null, ret), + (rej) => queueMicrotask(callbackifyOnRejected, rej, cb)); } const descriptors = ObjectGetOwnPropertyDescriptors(original); diff --git a/lib/zlib.js b/lib/zlib.js index 3766938f6bc7bb..4bb657f8313b35 100644 --- a/lib/zlib.js +++ b/lib/zlib.js @@ -73,6 +73,7 @@ const { validateFunction, validateNumber, } = require('internal/validators'); +const { queueMicrotask } = require('internal/process/task_queues'); const kFlushFlag = Symbol('kFlushFlag'); const kError = Symbol('kError'); @@ -390,7 +391,7 @@ ZlibBase.prototype.flush = function(kind, callback) { if (this.writableFinished) { if (callback) - process.nextTick(callback); + queueMicrotask(callback); } else if (this.writableEnded) { if (callback) this.once('end', callback); @@ -521,7 +522,7 @@ function processChunkSync(self, chunk, flushFlag) { function processChunk(self, chunk, flushFlag, cb) { const handle = self._handle; - if (!handle) return process.nextTick(cb); + if (!handle) return queueMicrotask(cb); handle.buffer = chunk; handle.cb = cb; @@ -723,7 +724,7 @@ Zlib.prototype.params = function params(level, strategy, callback) { FunctionPrototypeBind(paramsAfterFlushCallback, this, level, strategy, callback)); } else { - process.nextTick(callback); + queueMicrotask(callback); } };