Skip to content

Commit 9587611

Browse files
authored
feat: add stream middleware (#3173)
Adds middleware handlers for protocol streams. They are invoked for incoming and outgoing streams and allow access to the stream and connection before the handler (incoming) or caller (outgoing) receive them. This way middleware can wrap streams in transforms, or deny access, or something else. ```ts libp2p.use('/my/protocol/1.0.0', (stream, connection, next) => { const originalSource = stream.source // increment all byte values in the stream by one stream.source = (async function * () { for await (const buf of originalSource) { buf = buf.map(val => val + 1) yield buf } })() // pass the stream on to the next middleware next(stream, connection) }) ```
1 parent 8484de8 commit 9587611

File tree

9 files changed

+243
-9
lines changed

9 files changed

+243
-9
lines changed

.github/dictionary.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ additionals
1414
SECG
1515
Certicom
1616
RSAES
17+
unuse
1718
dialback
1819
chacha
1920
peerStore

packages/interface-internal/src/registrar.ts

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import type { StreamHandler, StreamHandlerOptions, StreamHandlerRecord, Topology, AbortOptions } from '@libp2p/interface'
1+
import type { StreamHandler, StreamHandlerOptions, StreamHandlerRecord, Topology, StreamMiddleware, AbortOptions } from '@libp2p/interface'
22

33
export type {
44
/**
@@ -63,6 +63,30 @@ export interface Registrar {
6363
*/
6464
getHandler(protocol: string): StreamHandlerRecord
6565

66+
/**
67+
* Retrieve any registered middleware for a given protocol.
68+
*
69+
* @param protocol - The protocol to fetch middleware for
70+
* @returns A list of `StreamMiddleware` implementations
71+
*/
72+
use(protocol: string, middleware: StreamMiddleware[]): void
73+
74+
/**
75+
* Retrieve any registered middleware for a given protocol.
76+
*
77+
* @param protocol - The protocol to fetch middleware for
78+
* @returns A list of `StreamMiddleware` implementations
79+
*/
80+
unuse(protocol: string): void
81+
82+
/**
83+
* Retrieve any registered middleware for a given protocol.
84+
*
85+
* @param protocol - The protocol to fetch middleware for
86+
* @returns A list of `StreamMiddleware` implementations
87+
*/
88+
getMiddleware(protocol: string): StreamMiddleware[]
89+
6690
/**
6791
* Register a topology handler for a protocol - the topology will be
6892
* invoked when peers are discovered on the network that support the

packages/interface/src/index.ts

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import type { PeerInfo } from './peer-info.js'
2323
import type { PeerRouting } from './peer-routing.js'
2424
import type { Address, Peer, PeerStore } from './peer-store.js'
2525
import type { Startable } from './startable.js'
26-
import type { StreamHandler, StreamHandlerOptions } from './stream-handler.js'
26+
import type { StreamHandler, StreamHandlerOptions, StreamMiddleware } from './stream-handler.js'
2727
import type { Stream } from './stream.js'
2828
import type { Topology } from './topology.js'
2929
import type { Listener, OutboundConnectionUpgradeEvents } from './transport.js'
@@ -781,6 +781,33 @@ export interface Libp2p<T extends ServiceMap = ServiceMap> extends Startable, Ty
781781
*/
782782
unregister(id: string): void
783783

784+
/**
785+
* Registers one or more middleware implementations that will be invoked for
786+
* incoming and outgoing protocol streams that match the passed protocol.
787+
*
788+
* @example
789+
*
790+
* ```TypeScript
791+
* libp2p.use('/my/protocol/1.0.0', (stream, connection, next) => {
792+
* // do something with stream and/or connection
793+
* next(stream, connection)
794+
* })
795+
* ```
796+
*/
797+
use (protocol: string, middleware: StreamMiddleware | StreamMiddleware[]): void
798+
799+
/**
800+
* Deregisters all middleware for the passed protocol.
801+
*
802+
* @example
803+
*
804+
* ```TypeScript
805+
* libp2p.unuse('/my/protocol/1.0.0')
806+
* // any previously registered middleware will no longer be invoked
807+
* ```
808+
*/
809+
unuse (protocol: string): void
810+
784811
/**
785812
* Returns the public key for the passed PeerId. If the PeerId is of the 'RSA'
786813
* type this may mean searching the routing if the peer's key is not present

packages/interface/src/stream-handler.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,13 @@ export interface StreamHandler {
77
(stream: Stream, connection: Connection): void | Promise<void>
88
}
99

10+
/**
11+
* Stream middleware allows accessing stream data outside of the stream handler
12+
*/
13+
export interface StreamMiddleware {
14+
(stream: Stream, connection: Connection, next: (stream: Stream, connection: Connection) => void): void | Promise<void>
15+
}
16+
1017
export interface StreamHandlerOptions extends AbortOptions {
1118
/**
1219
* How many incoming streams can be open for this protocol at the same time on each connection
@@ -33,6 +40,11 @@ export interface StreamHandlerOptions extends AbortOptions {
3340
* protocol(s), the existing handler will be discarded.
3441
*/
3542
force?: true
43+
44+
/**
45+
* Middleware allows accessing stream data outside of the stream handler
46+
*/
47+
middleware?: StreamMiddleware[]
3648
}
3749

3850
export interface StreamHandlerRecord {

packages/libp2p/src/connection.ts

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ export class Connection extends TypedEventEmitter<MessageStreamEvents> implement
126126
}
127127

128128
this.log.trace('starting new stream for protocols %s', protocols)
129-
const muxedStream = await this.muxer.createStream({
129+
let muxedStream = await this.muxer.createStream({
130130
...options,
131131

132132
// most underlying transports only support negotiating a single protocol
@@ -177,6 +177,24 @@ export class Connection extends TypedEventEmitter<MessageStreamEvents> implement
177177

178178
this.components.metrics?.trackProtocolStream(muxedStream)
179179

180+
const middleware = this.components.registrar.getMiddleware(muxedStream.protocol)
181+
182+
middleware.push((stream, connection, next) => {
183+
next(stream, connection)
184+
})
185+
186+
let i = 0
187+
let connection: ConnectionInterface = this
188+
189+
while (i < middleware.length) {
190+
// eslint-disable-next-line no-loop-func
191+
middleware[i](muxedStream, connection, (s, c) => {
192+
muxedStream = s
193+
connection = c
194+
i++
195+
})
196+
}
197+
180198
return muxedStream
181199
} catch (err: any) {
182200
if (muxedStream.status === 'open') {
@@ -190,7 +208,7 @@ export class Connection extends TypedEventEmitter<MessageStreamEvents> implement
190208
}
191209

192210
private async onIncomingStream (evt: CustomEvent<Stream>): Promise<void> {
193-
const muxedStream = evt.detail
211+
let muxedStream = evt.detail
194212

195213
const signal = AbortSignal.timeout(this.inboundStreamProtocolNegotiationTimeout)
196214
setMaxListeners(Infinity, signal)
@@ -235,7 +253,22 @@ export class Connection extends TypedEventEmitter<MessageStreamEvents> implement
235253
throw new LimitedConnectionError('Cannot open protocol stream on limited connection')
236254
}
237255

238-
await handler(muxedStream, this)
256+
const middleware = this.components.registrar.getMiddleware(muxedStream.protocol)
257+
258+
middleware.push(async (stream, connection, next) => {
259+
await handler(stream, connection)
260+
next(stream, connection)
261+
})
262+
263+
let connection: ConnectionInterface = this
264+
265+
for (const m of middleware) {
266+
// eslint-disable-next-line no-loop-func
267+
await m(muxedStream, connection, (s, c) => {
268+
muxedStream = s
269+
connection = c
270+
})
271+
}
239272
} catch (err: any) {
240273
muxedStream.abort(err)
241274
}

packages/libp2p/src/libp2p.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import { userAgent } from './user-agent.js'
2424
import * as pkg from './version.js'
2525
import type { Components } from './components.js'
2626
import type { Libp2p as Libp2pInterface, Libp2pInit } from './index.js'
27-
import type { PeerRouting, ContentRouting, Libp2pEvents, PendingDial, ServiceMap, AbortOptions, ComponentLogger, Logger, Connection, NewStreamOptions, Stream, Metrics, PeerId, PeerInfo, PeerStore, Topology, Libp2pStatus, IsDialableOptions, DialOptions, PublicKey, Ed25519PeerId, Secp256k1PeerId, RSAPublicKey, RSAPeerId, URLPeerId, Ed25519PublicKey, Secp256k1PublicKey, StreamHandler, StreamHandlerOptions } from '@libp2p/interface'
27+
import type { PeerRouting, ContentRouting, Libp2pEvents, PendingDial, ServiceMap, AbortOptions, ComponentLogger, Logger, Connection, NewStreamOptions, Stream, Metrics, PeerId, PeerInfo, PeerStore, Topology, Libp2pStatus, IsDialableOptions, DialOptions, PublicKey, Ed25519PeerId, Secp256k1PeerId, RSAPublicKey, RSAPeerId, URLPeerId, Ed25519PublicKey, Secp256k1PublicKey, StreamHandler, StreamHandlerOptions, StreamMiddleware } from '@libp2p/interface'
2828
import type { Multiaddr } from '@multiformats/multiaddr'
2929

3030
export class Libp2p<T extends ServiceMap = ServiceMap> extends TypedEventEmitter<Libp2pEvents> implements Libp2pInterface<T> {
@@ -401,6 +401,14 @@ export class Libp2p<T extends ServiceMap = ServiceMap> extends TypedEventEmitter
401401
this.components.registrar.unregister(id)
402402
}
403403

404+
use (protocol: string, middleware: StreamMiddleware | StreamMiddleware[]): void {
405+
this.components.registrar.use(protocol, Array.isArray(middleware) ? middleware : [middleware])
406+
}
407+
408+
unuse (protocol: string): void {
409+
this.components.registrar.unuse(protocol)
410+
}
411+
404412
async isDialable (multiaddr: Multiaddr, options: IsDialableOptions = {}): Promise<boolean> {
405413
return this.components.connectionManager.isDialable(multiaddr, options)
406414
}

packages/libp2p/src/registrar.ts

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { InvalidParametersError } from '@libp2p/interface'
22
import { mergeOptions, trackedMap } from '@libp2p/utils'
33
import { DuplicateProtocolHandlerError, UnhandledProtocolError } from './errors.js'
4-
import type { IdentifyResult, Libp2pEvents, Logger, PeerUpdate, PeerId, PeerStore, Topology, StreamHandler, StreamHandlerRecord, StreamHandlerOptions, AbortOptions, Metrics } from '@libp2p/interface'
4+
import type { IdentifyResult, Libp2pEvents, Logger, PeerUpdate, PeerId, PeerStore, Topology, StreamHandler, StreamHandlerRecord, StreamHandlerOptions, AbortOptions, Metrics, StreamMiddleware } from '@libp2p/interface'
55
import type { Registrar as RegistrarInterface } from '@libp2p/interface-internal'
66
import type { ComponentLogger } from '@libp2p/logger'
77
import type { TypedEventTarget } from 'main-event'
@@ -25,10 +25,12 @@ export class Registrar implements RegistrarInterface {
2525
private readonly topologies: Map<string, Map<string, Topology>>
2626
private readonly handlers: Map<string, StreamHandlerRecord>
2727
private readonly components: RegistrarComponents
28+
private readonly middleware: Map<string, StreamMiddleware[]>
2829

2930
constructor (components: RegistrarComponents) {
3031
this.components = components
3132
this.log = components.logger.forComponent('libp2p:registrar')
33+
this.middleware = new Map()
3234
this.topologies = new Map()
3335
components.metrics?.registerMetricGroup('libp2p_registrar_topologies', {
3436
calculate: () => {
@@ -164,6 +166,18 @@ export class Registrar implements RegistrarInterface {
164166
}
165167
}
166168

169+
use (protocol: string, middleware: StreamMiddleware[]): void {
170+
this.middleware.set(protocol, middleware)
171+
}
172+
173+
unuse (protocol: string): void {
174+
this.middleware.delete(protocol)
175+
}
176+
177+
getMiddleware (protocol: string): StreamMiddleware[] {
178+
return this.middleware.get(protocol) ?? []
179+
}
180+
167181
/**
168182
* Remove a disconnected peer from the record
169183
*/

packages/libp2p/test/connection/index.spec.ts

Lines changed: 103 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
1111
import { createConnection } from '../../src/connection.js'
1212
import { UnhandledProtocolError } from '../../src/errors.ts'
1313
import type { ConnectionComponents, ConnectionInit } from '../../src/connection.js'
14-
import type { MultiaddrConnection, PeerStore, StreamMuxer } from '@libp2p/interface'
14+
import type { MultiaddrConnection, PeerStore, Stream, StreamMuxer } from '@libp2p/interface'
1515
import type { Registrar } from '@libp2p/interface-internal'
1616
import type { StubbedInstance } from 'sinon-ts'
1717

@@ -38,6 +38,7 @@ describe('connection', () => {
3838
},
3939
options: {}
4040
})
41+
registrar.getMiddleware.withArgs(ECHO_PROTOCOL).returns([])
4142

4243
components = {
4344
peerStore,
@@ -223,6 +224,7 @@ describe('connection', () => {
223224
}
224225
})
225226
registrar.getProtocols.returns([protocol])
227+
registrar.getMiddleware.withArgs(protocol).returns([])
226228

227229
const connection = createConnection(components, init)
228230
expect(connection.streams).to.have.lengthOf(0)
@@ -259,6 +261,7 @@ describe('connection', () => {
259261
}
260262
})
261263
registrar.getProtocols.returns([protocol])
264+
registrar.getMiddleware.withArgs(protocol).returns([])
262265

263266
const connection = createConnection(components, init)
264267
expect(connection.streams).to.have.lengthOf(0)
@@ -274,6 +277,7 @@ describe('connection', () => {
274277
const protocol = '/test/protocol'
275278

276279
registrar.getHandler.withArgs(protocol).throws(new UnhandledProtocolError())
280+
registrar.getMiddleware.withArgs(protocol).returns([])
277281

278282
const connection = createConnection(components, init)
279283
expect(connection.streams).to.have.lengthOf(0)
@@ -289,4 +293,102 @@ describe('connection', () => {
289293
await expect(connection.newStream(protocol, opts)).to.eventually.be.rejected
290294
.with.property('name', 'TooManyOutboundProtocolStreamsError')
291295
})
296+
297+
it('should support outgoing stream middleware', async () => {
298+
const streamProtocol = '/test/protocol'
299+
300+
const middleware1 = Sinon.stub().callsFake((stream, connection, next) => {
301+
next(stream, connection)
302+
})
303+
const middleware2 = Sinon.stub().callsFake((stream, connection, next) => {
304+
next(stream, connection)
305+
})
306+
307+
const middleware = [
308+
middleware1,
309+
middleware2
310+
]
311+
312+
registrar.getMiddleware.withArgs(streamProtocol).returns(middleware)
313+
registrar.getHandler.withArgs(streamProtocol).returns({
314+
handler: () => {},
315+
options: {}
316+
})
317+
318+
const connection = createConnection(components, init)
319+
320+
await connection.newStream(streamProtocol)
321+
322+
expect(middleware1.called).to.be.true()
323+
expect(middleware2.called).to.be.true()
324+
})
325+
326+
it('should support incoming stream middleware', async () => {
327+
const streamProtocol = '/test/protocol'
328+
329+
const middleware1 = Sinon.stub().callsFake((stream, connection, next) => {
330+
next(stream, connection)
331+
})
332+
const middleware2 = Sinon.stub().callsFake((stream, connection, next) => {
333+
next(stream, connection)
334+
})
335+
336+
const middleware = [
337+
middleware1,
338+
middleware2
339+
]
340+
341+
registrar.getMiddleware.withArgs(streamProtocol).returns(middleware)
342+
registrar.getHandler.withArgs(streamProtocol).returns({
343+
handler: () => {},
344+
options: {}
345+
})
346+
347+
const muxer = stubInterface<StreamMuxer>({
348+
streams: []
349+
})
350+
351+
createConnection(components, {
352+
...init,
353+
muxer
354+
})
355+
356+
expect(muxer.addEventListener.getCall(0).args[0]).to.equal('stream')
357+
const onIncomingStream = muxer.addEventListener.getCall(0).args[1]
358+
359+
if (onIncomingStream == null) {
360+
throw new Error('No incoming stream handler registered')
361+
}
362+
363+
const incomingStream = stubInterface<Stream>({
364+
protocol: streamProtocol
365+
})
366+
367+
if (typeof onIncomingStream !== 'function') {
368+
throw new Error('Stream handler was not function')
369+
}
370+
371+
onIncomingStream(new CustomEvent('stream', {
372+
detail: incomingStream
373+
}))
374+
/*
375+
const incomingStream = stubInterface<Stream>({
376+
id: 'stream-id',
377+
log: logger('test-stream'),
378+
direction: 'outbound',
379+
sink: async (source) => drain(source),
380+
source: map((async function * () {
381+
yield '/multistream/1.0.0\n'
382+
yield `${streamProtocol}\n`
383+
})(), str => encode.single(uint8ArrayFromString(str)))
384+
})
385+
*/
386+
// onIncomingStream?.(incomingStream)
387+
388+
// incoming stream is opened asynchronously
389+
await delay(100)
390+
391+
expect(middleware1.called).to.be.true()
392+
expect(middleware2.called).to.be.true()
393+
})
292394
})

0 commit comments

Comments
 (0)