Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
Signed-off-by: Matteo Collina <[email protected]>
  • Loading branch information
mcollina committed Nov 25, 2024
1 parent 14d818f commit 4f8527e
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 18 deletions.
34 changes: 22 additions & 12 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict'

const RoundRobin = require('./lib/roundrobin')
const { workerData } = require('worker_threads')
const hyperid = require('hyperid')
const { getGlobalDispatcher, setGlobalDispatcher } = require('undici')
const { threadId, MessageChannel, parentPort } = require('worker_threads')
Expand Down Expand Up @@ -153,8 +154,8 @@ function createThreadInterceptor (opts) {
const { port1, port2 } = new MessageChannel()
forwarded.get(otherPort).add(port2)
forwarded.get(port).add(port1)
otherPort.postMessage({ type: 'route', url, port: port2 }, [port2])
port.postMessage({ type: 'route', url: key, port: port1 }, [port1])
otherPort.postMessage({ type: 'route', url, port: port2, threadId: port.threadId }, [port2])
port.postMessage({ type: 'route', url: key, port: port1, threadId: otherPort.threadId }, [port1])
}
}
}
Expand All @@ -166,8 +167,11 @@ function createThreadInterceptor (opts) {
const roundRobin = routes.get(url)
roundRobin.add(port)

// We must copy the threadId outsise because it can be nulled
// by Node.js
const threadId = port.threadId

function onClose () {
const roundRobinIndex = routes.get(url).findIndex(port)
const roundRobin = routes.get(url)
roundRobin.remove(port)
for (const f of forwarded.get(port)) {
Expand All @@ -182,7 +186,7 @@ function createThreadInterceptor (opts) {
}

// Notify other threads that any eventual network address for this route is no longer valid
res.setAddress(url, roundRobinIndex)
res.setAddress(url, threadId)
}

// If port is a worker, we need to remove it from the routes
Expand All @@ -193,6 +197,7 @@ function createThreadInterceptor (opts) {
const inflights = new Map()
portInflights.set(port, inflights)
port.on('message', (msg) => {
process._rawDebug(workerData?.main || 'main', 'portOnMessage', url, msg)
if (msg.type === 'response') {
const { id, res, err } = msg
const inflight = inflights.get(id)
Expand All @@ -202,22 +207,21 @@ function createThreadInterceptor (opts) {
}
} else if (msg.type === 'address') {
if (!msg.url) {
const roundRobinIndex = roundRobin.findIndex(port)
res.setAddress(url, roundRobinIndex, msg.address, forward)
res.setAddress(url, port.threadId, msg.address, forward)
} else {
const roundRobin = routes.get(msg.url)
if (!roundRobin) {
return
}

res.setAddress(msg.url, msg.index, msg.address, false)
res.setAddress(msg.url, msg.threadId, msg.address, false)
}
}
})
}

res.setAddress = (url, index, address, forward = true) => {
const port = routes.get(url)?.get(index)
res.setAddress = (url, threadId, address, forward = true) => {
const port = routes.get(url)?.findByThreadId(threadId)

if (port) {
port[kAddress] = address
Expand All @@ -229,7 +233,10 @@ function createThreadInterceptor (opts) {

for (const [, roundRobin] of routes) {
for (const otherPort of roundRobin) {
otherPort.postMessage({ type: 'address', url, index, address })
// Avoid loops, do not send the message to the source
if (otherPort.threadId !== threadId) {
otherPort.postMessage({ type: 'address', url, address, threadId })
}
}
}
}
Expand Down Expand Up @@ -258,13 +265,15 @@ function wire ({ server: newServer, port, ...undiciOpts }) {
server = newServer

if (typeof server === 'string') {
parentPort.postMessage({ type: 'address', address: server })
process._rawDebug(workerData?.name || 'main', 'replaceServer', server)
parentPort.postMessage({ type: 'address', address: server, threadId })
} else {
hasInject = typeof server?.inject === 'function'
}
}

function onMessage (msg) {
process._rawDebug(workerData?.name || 'main', msg)
if (msg.type === 'request') {
const { id, opts } = msg

Expand Down Expand Up @@ -327,10 +336,11 @@ function wire ({ server: newServer, port, ...undiciOpts }) {
}
})
} else if (msg.type === 'route') {
msg.port.threadId = msg.threadId
interceptor.route(msg.url, msg.port, false)
msg.port.on('message', onMessage)
} else if (msg.type === 'address') {
interceptor.setAddress(msg.url, msg.index, msg.address, false)
interceptor.setAddress(msg.url, msg.threadId, msg.address, false)
}
}

Expand Down
8 changes: 2 additions & 6 deletions lib/roundrobin.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ class RoundRobin {
return this.ports.length - 1
}

findIndex (port) {
return this.ports.indexOf(port)
findByThreadId (threadId) {
return this.ports.find((p) => p.threadId === threadId)
}

remove (port) {
Expand All @@ -34,10 +34,6 @@ class RoundRobin {
this.index = this.index % this.ports.length
}

get (index) {
return this.ports[index]
}

get length () {
return this.ports.length
}
Expand Down

0 comments on commit 4f8527e

Please sign in to comment.