Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow overriding stream handlers #2945

Merged
merged 1 commit into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 10 additions & 41 deletions packages/interface-internal/src/registrar/index.ts
Original file line number Diff line number Diff line change
@@ -1,56 +1,25 @@
import type { Connection, Stream, Topology } from '@libp2p/interface'
import type { StreamHandler, StreamHandlerOptions, StreamHandlerRecord, Topology, IncomingStreamData } from '@libp2p/interface'

export interface IncomingStreamData {
export type {
/**
* The stream that has been opened
* @deprecated This type should be imported from @libp2p/interface directly
*/
stream: Stream
IncomingStreamData,

/**
* The connection that the stream was opened on
* @deprecated This type should be imported from @libp2p/interface directly
*/
connection: Connection
}

export interface StreamHandler {
(data: IncomingStreamData): void
}

export interface StreamHandlerOptions {
/**
* How many incoming streams can be open for this protocol at the same time on each connection
*
* @default 32
*/
maxInboundStreams?: number

/**
* How many outgoing streams can be open for this protocol at the same time on each connection
*
* @default 64
*/
maxOutboundStreams?: number

/**
* If true, allow this protocol to run on limited connections (e.g.
* connections with data or duration limits such as circuit relay
* connections)
*
* @default false
*/
runOnLimitedConnection?: boolean
}
StreamHandler,

export interface StreamHandlerRecord {
/**
* The handler that was registered to handle streams opened on the protocol
* @deprecated This type should be imported from @libp2p/interface directly
*/
handler: StreamHandler
StreamHandlerOptions,

/**
* The options that were used to register the stream handler
* @deprecated This type should be imported from @libp2p/interface directly
*/
options: StreamHandlerOptions
StreamHandlerRecord
}

export interface Registrar {
Expand Down
3 changes: 2 additions & 1 deletion packages/interface/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,8 @@ export interface Libp2p<T extends ServiceMap = ServiceMap> extends Startable, Ty
*
* `libp2p.handle(protocols, handler, options)`
*
* In the event of a new handler for the same protocol being added, the first one is discarded.
* In the event of a new handler for the same protocol being added and error
* will be thrown. Pass `force: true` to override this.
*
* @example
*
Expand Down
23 changes: 23 additions & 0 deletions packages/interface/src/stream-handler/index.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,21 @@
import type { Connection, Stream } from '../connection/index.js'

export interface IncomingStreamData {
/**
* The newly opened stream
*/
stream: Stream

/**
* The connection the stream was opened on
*/
connection: Connection
}

export interface StreamHandler {
/**
* A callback function that accepts the incoming stream data
*/
(data: IncomingStreamData): void
}

Expand All @@ -29,9 +39,22 @@ export interface StreamHandlerOptions {
* transferred or how long it can be open for.
*/
runOnLimitedConnection?: boolean

/**
* If `true`, and a handler is already registered for the specified
* protocol(s), the existing handler will be discarded.
*/
force?: true
}

export interface StreamHandlerRecord {
/**
* The handler that was registered to handle streams opened on the protocol
*/
handler: StreamHandler

/**
* The options that were used to register the stream handler
*/
options: StreamHandlerOptions
}
7 changes: 3 additions & 4 deletions packages/libp2p/src/libp2p.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@ import { ConnectionMonitor } from './connection-monitor.js'
import { CompoundContentRouting } from './content-routing.js'
import { DefaultPeerRouting } from './peer-routing.js'
import { RandomWalk } from './random-walk.js'
import { DefaultRegistrar } from './registrar.js'
import { Registrar } from './registrar.js'
import { DefaultTransportManager } from './transport-manager.js'
import { DefaultUpgrader } from './upgrader.js'
import { userAgent } from './user-agent.js'
import * as pkg from './version.js'
import type { Components } from './components.js'
import type { Libp2p as Libp2pInterface, Libp2pInit } from './index.js'
import type { PeerRouting, ContentRouting, Libp2pEvents, PendingDial, ServiceMap, AbortOptions, ComponentLogger, Logger, Connection, NewStreamOptions, Stream, Metrics, PeerId, PeerInfo, PeerStore, Topology, Libp2pStatus, IsDialableOptions, DialOptions, PublicKey, Ed25519PeerId, Secp256k1PeerId, RSAPublicKey, RSAPeerId, URLPeerId, Ed25519PublicKey, Secp256k1PublicKey } from '@libp2p/interface'
import type { StreamHandler, StreamHandlerOptions } from '@libp2p/interface-internal'
import type { PeerRouting, ContentRouting, Libp2pEvents, PendingDial, ServiceMap, AbortOptions, ComponentLogger, Logger, Connection, NewStreamOptions, Stream, Metrics, PeerId, PeerInfo, PeerStore, Topology, Libp2pStatus, IsDialableOptions, DialOptions, PublicKey, Ed25519PeerId, Secp256k1PeerId, RSAPublicKey, RSAPeerId, URLPeerId, Ed25519PublicKey, Secp256k1PublicKey, StreamHandler, StreamHandlerOptions } from '@libp2p/interface'

export class Libp2p<T extends ServiceMap = ServiceMap> extends TypedEventEmitter<Libp2pEvents> implements Libp2pInterface<T> {
public peerId: PeerId
Expand Down Expand Up @@ -132,7 +131,7 @@ export class Libp2p<T extends ServiceMap = ServiceMap> extends TypedEventEmitter
}

// Create the Registrar
this.configureComponent('registrar', new DefaultRegistrar(this.components))
this.configureComponent('registrar', new Registrar(this.components))

// Addresses {listen, announce, noAnnounce}
this.configureComponent('addressManager', new AddressManager(this.components, init.addresses))
Expand Down
8 changes: 4 additions & 4 deletions packages/libp2p/src/registrar.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { InvalidParametersError } from '@libp2p/interface'
import merge from 'merge-options'
import * as errorsJs from './errors.js'
import type { IdentifyResult, Libp2pEvents, Logger, PeerUpdate, TypedEventTarget, PeerId, PeerStore, Topology } from '@libp2p/interface'
import type { StreamHandlerOptions, StreamHandlerRecord, Registrar, StreamHandler } from '@libp2p/interface-internal'
import type { IdentifyResult, Libp2pEvents, Logger, PeerUpdate, TypedEventTarget, PeerId, PeerStore, Topology, StreamHandlerRecord, StreamHandlerOptions } from '@libp2p/interface'
import type { Registrar as RegistrarInterface, StreamHandler } from '@libp2p/interface-internal'
import type { ComponentLogger } from '@libp2p/logger'

export const DEFAULT_MAX_INBOUND_STREAMS = 32
Expand All @@ -18,7 +18,7 @@ export interface RegistrarComponents {
/**
* Responsible for notifying registered protocols of events in the network.
*/
export class DefaultRegistrar implements Registrar {
export class Registrar implements RegistrarInterface {
private readonly log: Logger
private readonly topologies: Map<string, Map<string, Topology>>
private readonly handlers: Map<string, StreamHandlerRecord>
Expand Down Expand Up @@ -73,7 +73,7 @@ export class DefaultRegistrar implements Registrar {
* Registers the `handler` for each protocol
*/
async handle (protocol: string, handler: StreamHandler, opts?: StreamHandlerOptions): Promise<void> {
if (this.handlers.has(protocol)) {
if (this.handlers.has(protocol) && opts?.force !== true) {
throw new errorsJs.DuplicateProtocolHandlerError(`Handler already registered for protocol ${protocol}`)
}

Expand Down
6 changes: 3 additions & 3 deletions packages/libp2p/test/registrar/errors.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import { MemoryDatastore } from 'datastore-core/memory'
import { stubInterface } from 'sinon-ts'
import { defaultComponents } from '../../src/components.js'
import { DefaultConnectionManager } from '../../src/connection-manager/index.js'
import { DefaultRegistrar } from '../../src/registrar.js'
import { Registrar } from '../../src/registrar.js'
import type { Components } from '../../src/components.js'
import type { Upgrader, ConnectionGater, PeerId } from '@libp2p/interface'
import type { Registrar, TransportManager } from '@libp2p/interface-internal'
import type { TransportManager } from '@libp2p/interface-internal'

describe('registrar errors', () => {
let components: Components
Expand All @@ -35,7 +35,7 @@ describe('registrar errors', () => {
maxConnections: 1000,
inboundUpgradeTimeout: 1000
})
registrar = new DefaultRegistrar(components)
registrar = new Registrar(components)
})

it('should fail to register a protocol if no multicodec is provided', () => {
Expand Down
49 changes: 43 additions & 6 deletions packages/libp2p/test/registrar/protocols.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,13 @@ import pDefer from 'p-defer'
import { createLibp2p } from '../../src/index.js'
import type { Components } from '../../src/components.js'
import type { Libp2p } from '@libp2p/interface'
import type { Registrar } from '@libp2p/interface-internal'

describe('registrar protocols', () => {
let libp2p: Libp2p
let registrar: Registrar

afterEach(async () => {
await libp2p?.stop()
})

it('should be able to register and unregister a handler', async () => {
beforeEach(async () => {
const deferred = pDefer<Components>()

libp2p = await createLibp2p({
Expand All @@ -25,9 +23,14 @@ describe('registrar protocols', () => {
})

const components = await deferred.promise
registrar = components.registrar
})

const registrar = components.registrar
afterEach(async () => {
await libp2p?.stop()
})

it('should be able to register and unregister a handler', async () => {
expect(registrar.getProtocols()).to.not.have.any.keys(['/echo/1.0.0', '/echo/1.0.1'])

const echoHandler = (): void => {}
Expand All @@ -43,4 +46,38 @@ describe('registrar protocols', () => {
'/echo/1.0.1'
])
})

it('should error if registering two handlers for the same protocol', async () => {
const echoHandler = (): void => {}
await libp2p.handle('/echo/1.0.0', echoHandler)

await expect(libp2p.handle('/echo/1.0.0', echoHandler)).to.eventually.be.rejected
.with.property('name', 'DuplicateProtocolHandlerError')
})

it('should error if registering two handlers for the same protocols', async () => {
const echoHandler = (): void => {}
await libp2p.handle('/echo/1.0.0', echoHandler)

await expect(libp2p.handle(['/echo/2.0.0', '/echo/1.0.0'], echoHandler)).to.eventually.be.rejected
.with.property('name', 'DuplicateProtocolHandlerError')
})

it('should not error if force-registering two handlers for the same protocol', async () => {
const echoHandler = (): void => {}
await libp2p.handle('/echo/1.0.0', echoHandler)

await expect(libp2p.handle('/echo/1.0.0', echoHandler, {
force: true
})).to.eventually.be.ok
})

it('should not error if force-registering two handlers for the same protocols', async () => {
const echoHandler = (): void => {}
await libp2p.handle('/echo/1.0.0', echoHandler)

await expect(libp2p.handle(['/echo/2.0.0', '/echo/1.0.0'], echoHandler, {
force: true
})).to.eventually.be.ok
})
})
5 changes: 2 additions & 3 deletions packages/libp2p/test/registrar/registrar.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@ import { peerIdFromPrivateKey } from '@libp2p/peer-id'
import { expect } from 'aegir/chai'
import pDefer from 'p-defer'
import { stubInterface } from 'sinon-ts'
import { DefaultRegistrar } from '../../src/registrar.js'
import { Registrar } from '../../src/registrar.js'
import type { TypedEventTarget, Libp2pEvents, PeerId, PeerStore, Topology, Peer, Connection } from '@libp2p/interface'
import type { Registrar } from '@libp2p/interface-internal'
import type { StubbedInstance } from 'sinon-ts'

const protocol = '/test/1.0.0'
Expand All @@ -31,7 +30,7 @@ describe('registrar topologies', () => {
peerStore = stubInterface<PeerStore>()
events = new TypedEventEmitter<Libp2pEvents>()

registrar = new DefaultRegistrar({
registrar = new Registrar({
peerId,
peerStore,
events,
Expand Down
Loading