Skip to content

Commit

Permalink
feat: allow overriding stream handlers
Browse files Browse the repository at this point in the history
Adds a `force` flag to `libp2p.handle` that means the method will
not throw if a handler already exists for the protocol being handled.

Fixes #2928
  • Loading branch information
achingbrain committed Feb 6, 2025
1 parent 96f14e4 commit 06750d1
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 62 deletions.
51 changes: 10 additions & 41 deletions packages/interface-internal/src/registrar/index.ts
Original file line number Diff line number Diff line change
@@ -1,56 +1,25 @@
import type { Connection, Stream, Topology } from '@libp2p/interface'
import type { StreamHandler, StreamHandlerOptions, StreamHandlerRecord, Topology, IncomingStreamData } from '@libp2p/interface'

export interface IncomingStreamData {
export type {
/**
* The stream that has been opened
* @deprecated This type should be imported from @libp2p/interface directly
*/
stream: Stream
IncomingStreamData,

/**
* The connection that the stream was opened on
* @deprecated This type should be imported from @libp2p/interface directly
*/
connection: Connection
}

export interface StreamHandler {
(data: IncomingStreamData): void
}

export interface StreamHandlerOptions {
/**
* How many incoming streams can be open for this protocol at the same time on each connection
*
* @default 32
*/
maxInboundStreams?: number

/**
* How many outgoing streams can be open for this protocol at the same time on each connection
*
* @default 64
*/
maxOutboundStreams?: number

/**
* If true, allow this protocol to run on limited connections (e.g.
* connections with data or duration limits such as circuit relay
* connections)
*
* @default false
*/
runOnLimitedConnection?: boolean
}
StreamHandler,

export interface StreamHandlerRecord {
/**
* The handler that was registered to handle streams opened on the protocol
* @deprecated This type should be imported from @libp2p/interface directly
*/
handler: StreamHandler
StreamHandlerOptions,

/**
* The options that were used to register the stream handler
* @deprecated This type should be imported from @libp2p/interface directly
*/
options: StreamHandlerOptions
StreamHandlerRecord
}

export interface Registrar {
Expand Down
3 changes: 2 additions & 1 deletion packages/interface/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,8 @@ export interface Libp2p<T extends ServiceMap = ServiceMap> extends Startable, Ty
*
* `libp2p.handle(protocols, handler, options)`
*
* In the event of a new handler for the same protocol being added, the first one is discarded.
* In the event of a new handler for the same protocol being added and error
* will be thrown. Pass `force: true` to override this.
*
* @example
*
Expand Down
23 changes: 23 additions & 0 deletions packages/interface/src/stream-handler/index.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,21 @@
import type { Connection, Stream } from '../connection/index.js'

export interface IncomingStreamData {
/**
* The newly opened stream
*/
stream: Stream

/**
* The connection the stream was opened on
*/
connection: Connection
}

export interface StreamHandler {
/**
* A callback function that accepts the incoming stream data
*/
(data: IncomingStreamData): void
}

Expand All @@ -29,9 +39,22 @@ export interface StreamHandlerOptions {
* transferred or how long it can be open for.
*/
runOnLimitedConnection?: boolean

/**
* If `true`, and a handler is already registered for the specified
* protocol(s), the existing handler will be discarded.
*/
force?: true
}

export interface StreamHandlerRecord {
/**
* The handler that was registered to handle streams opened on the protocol
*/
handler: StreamHandler

/**
* The options that were used to register the stream handler
*/
options: StreamHandlerOptions
}
7 changes: 3 additions & 4 deletions packages/libp2p/src/libp2p.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@ import { ConnectionMonitor } from './connection-monitor.js'
import { CompoundContentRouting } from './content-routing.js'
import { DefaultPeerRouting } from './peer-routing.js'
import { RandomWalk } from './random-walk.js'
import { DefaultRegistrar } from './registrar.js'
import { Registrar } from './registrar.js'
import { DefaultTransportManager } from './transport-manager.js'
import { DefaultUpgrader } from './upgrader.js'
import { userAgent } from './user-agent.js'
import * as pkg from './version.js'
import type { Components } from './components.js'
import type { Libp2p as Libp2pInterface, Libp2pInit } from './index.js'
import type { PeerRouting, ContentRouting, Libp2pEvents, PendingDial, ServiceMap, AbortOptions, ComponentLogger, Logger, Connection, NewStreamOptions, Stream, Metrics, PeerId, PeerInfo, PeerStore, Topology, Libp2pStatus, IsDialableOptions, DialOptions, PublicKey, Ed25519PeerId, Secp256k1PeerId, RSAPublicKey, RSAPeerId, URLPeerId, Ed25519PublicKey, Secp256k1PublicKey } from '@libp2p/interface'
import type { StreamHandler, StreamHandlerOptions } from '@libp2p/interface-internal'
import type { PeerRouting, ContentRouting, Libp2pEvents, PendingDial, ServiceMap, AbortOptions, ComponentLogger, Logger, Connection, NewStreamOptions, Stream, Metrics, PeerId, PeerInfo, PeerStore, Topology, Libp2pStatus, IsDialableOptions, DialOptions, PublicKey, Ed25519PeerId, Secp256k1PeerId, RSAPublicKey, RSAPeerId, URLPeerId, Ed25519PublicKey, Secp256k1PublicKey, StreamHandler, StreamHandlerOptions } from '@libp2p/interface'

export class Libp2p<T extends ServiceMap = ServiceMap> extends TypedEventEmitter<Libp2pEvents> implements Libp2pInterface<T> {
public peerId: PeerId
Expand Down Expand Up @@ -132,7 +131,7 @@ export class Libp2p<T extends ServiceMap = ServiceMap> extends TypedEventEmitter
}

// Create the Registrar
this.configureComponent('registrar', new DefaultRegistrar(this.components))
this.configureComponent('registrar', new Registrar(this.components))

// Addresses {listen, announce, noAnnounce}
this.configureComponent('addressManager', new AddressManager(this.components, init.addresses))
Expand Down
8 changes: 4 additions & 4 deletions packages/libp2p/src/registrar.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { InvalidParametersError } from '@libp2p/interface'
import merge from 'merge-options'
import * as errorsJs from './errors.js'
import type { IdentifyResult, Libp2pEvents, Logger, PeerUpdate, TypedEventTarget, PeerId, PeerStore, Topology } from '@libp2p/interface'
import type { StreamHandlerOptions, StreamHandlerRecord, Registrar, StreamHandler } from '@libp2p/interface-internal'
import type { IdentifyResult, Libp2pEvents, Logger, PeerUpdate, TypedEventTarget, PeerId, PeerStore, Topology, StreamHandlerRecord, StreamHandlerOptions } from '@libp2p/interface'
import type { Registrar as RegistrarInterface, StreamHandler } from '@libp2p/interface-internal'
import type { ComponentLogger } from '@libp2p/logger'

export const DEFAULT_MAX_INBOUND_STREAMS = 32
Expand All @@ -18,7 +18,7 @@ export interface RegistrarComponents {
/**
* Responsible for notifying registered protocols of events in the network.
*/
export class DefaultRegistrar implements Registrar {
export class Registrar implements RegistrarInterface {
private readonly log: Logger
private readonly topologies: Map<string, Map<string, Topology>>
private readonly handlers: Map<string, StreamHandlerRecord>
Expand Down Expand Up @@ -73,7 +73,7 @@ export class DefaultRegistrar implements Registrar {
* Registers the `handler` for each protocol
*/
async handle (protocol: string, handler: StreamHandler, opts?: StreamHandlerOptions): Promise<void> {
if (this.handlers.has(protocol)) {
if (this.handlers.has(protocol) && opts?.force !== true) {
throw new errorsJs.DuplicateProtocolHandlerError(`Handler already registered for protocol ${protocol}`)
}

Expand Down
6 changes: 3 additions & 3 deletions packages/libp2p/test/registrar/errors.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import { MemoryDatastore } from 'datastore-core/memory'
import { stubInterface } from 'sinon-ts'
import { defaultComponents } from '../../src/components.js'
import { DefaultConnectionManager } from '../../src/connection-manager/index.js'
import { DefaultRegistrar } from '../../src/registrar.js'
import { Registrar } from '../../src/registrar.js'
import type { Components } from '../../src/components.js'
import type { Upgrader, ConnectionGater, PeerId } from '@libp2p/interface'
import type { Registrar, TransportManager } from '@libp2p/interface-internal'
import type { TransportManager } from '@libp2p/interface-internal'

describe('registrar errors', () => {
let components: Components
Expand All @@ -35,7 +35,7 @@ describe('registrar errors', () => {
maxConnections: 1000,
inboundUpgradeTimeout: 1000
})
registrar = new DefaultRegistrar(components)
registrar = new Registrar(components)
})

it('should fail to register a protocol if no multicodec is provided', () => {
Expand Down
49 changes: 43 additions & 6 deletions packages/libp2p/test/registrar/protocols.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,13 @@ import pDefer from 'p-defer'
import { createLibp2p } from '../../src/index.js'
import type { Components } from '../../src/components.js'
import type { Libp2p } from '@libp2p/interface'
import type { Registrar } from '@libp2p/interface-internal'

describe('registrar protocols', () => {
let libp2p: Libp2p
let registrar: Registrar

afterEach(async () => {
await libp2p?.stop()
})

it('should be able to register and unregister a handler', async () => {
beforeEach(async () => {
const deferred = pDefer<Components>()

libp2p = await createLibp2p({
Expand All @@ -25,9 +23,14 @@ describe('registrar protocols', () => {
})

const components = await deferred.promise
registrar = components.registrar
})

const registrar = components.registrar
afterEach(async () => {
await libp2p?.stop()
})

it('should be able to register and unregister a handler', async () => {
expect(registrar.getProtocols()).to.not.have.any.keys(['/echo/1.0.0', '/echo/1.0.1'])

const echoHandler = (): void => {}
Expand All @@ -43,4 +46,38 @@ describe('registrar protocols', () => {
'/echo/1.0.1'
])
})

it('should error if registering two handlers for the same protocol', async () => {
const echoHandler = (): void => {}
await libp2p.handle('/echo/1.0.0', echoHandler)

await expect(libp2p.handle('/echo/1.0.0', echoHandler)).to.eventually.be.rejected
.with.property('name', 'DuplicateProtocolHandlerError')
})

it('should error if registering two handlers for the same protocols', async () => {
const echoHandler = (): void => {}
await libp2p.handle('/echo/1.0.0', echoHandler)

await expect(libp2p.handle(['/echo/2.0.0', '/echo/1.0.0'], echoHandler)).to.eventually.be.rejected
.with.property('name', 'DuplicateProtocolHandlerError')
})

it('should not error if force-registering two handlers for the same protocol', async () => {
const echoHandler = (): void => {}
await libp2p.handle('/echo/1.0.0', echoHandler)

await expect(libp2p.handle('/echo/1.0.0', echoHandler, {
force: true
})).to.eventually.be.ok
})

it('should not error if force-registering two handlers for the same protocols', async () => {
const echoHandler = (): void => {}
await libp2p.handle('/echo/1.0.0', echoHandler)

await expect(libp2p.handle(['/echo/2.0.0', '/echo/1.0.0'], echoHandler, {
force: true
})).to.eventually.be.ok
})
})
5 changes: 2 additions & 3 deletions packages/libp2p/test/registrar/registrar.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@ import { peerIdFromPrivateKey } from '@libp2p/peer-id'
import { expect } from 'aegir/chai'
import pDefer from 'p-defer'
import { stubInterface } from 'sinon-ts'
import { DefaultRegistrar } from '../../src/registrar.js'
import { Registrar } from '../../src/registrar.js'
import type { TypedEventTarget, Libp2pEvents, PeerId, PeerStore, Topology, Peer, Connection } from '@libp2p/interface'
import type { Registrar } from '@libp2p/interface-internal'
import type { StubbedInstance } from 'sinon-ts'

const protocol = '/test/1.0.0'
Expand All @@ -31,7 +30,7 @@ describe('registrar topologies', () => {
peerStore = stubInterface<PeerStore>()
events = new TypedEventEmitter<Libp2pEvents>()

registrar = new DefaultRegistrar({
registrar = new Registrar({
peerId,
peerStore,
events,
Expand Down

0 comments on commit 06750d1

Please sign in to comment.