From ed68ce31202716ca29274b0aeb51521a9b7b97d1 Mon Sep 17 00:00:00 2001 From: ARJUN SHAJI Date: Thu, 13 Mar 2025 14:40:38 +0530 Subject: [PATCH 1/3] Use RTPProcessor with RTPReader RTPReader simply reads packets from the buffer, while the processor retains them through the interceptors. --- dtlstransport.go | 30 ++++++++++++++++------------ go.mod | 2 ++ go.sum | 1 + peerconnection.go | 9 +++++---- rtpreceiver.go | 50 ++++++++++++++++++++++++++++++++++------------- track_remote.go | 2 +- 6 files changed, 63 insertions(+), 31 deletions(-) diff --git a/dtlstransport.go b/dtlstransport.go index e0b575a1188..04367244141 100644 --- a/dtlstransport.go +++ b/dtlstransport.go @@ -530,36 +530,42 @@ func (t *DTLSTransport) storeSimulcastStream( func (t *DTLSTransport) streamsForSSRC( ssrc SSRC, streamInfo interceptor.StreamInfo, -) (*srtp.ReadStreamSRTP, interceptor.RTPReader, *srtp.ReadStreamSRTCP, interceptor.RTCPReader, error) { +) (*srtp.ReadStreamSRTP, interceptor.RTPReader, interceptor.RTPProcessor, *srtp.ReadStreamSRTCP, interceptor.RTCPReader, error) { srtpSession, err := t.getSRTPSession() if err != nil { - return nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, err } rtpReadStream, err := srtpSession.OpenReadStream(uint32(ssrc)) if err != nil { - return nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, err } - rtpInterceptor := t.api.interceptor.BindRemoteStream( + rtpProcessor := t.api.interceptor.BindRemoteStream( &streamInfo, - interceptor.RTPReaderFunc( - func(in []byte, a interceptor.Attributes) (n int, attributes interceptor.Attributes, err error) { - n, err = rtpReadStream.Read(in) - - return n, a, err + interceptor.RTPProcessorFunc( + func(s int, in []byte, a interceptor.Attributes) (n int, attributes interceptor.Attributes, err error) { + return s, a, nil }, ), ) + rtpReader := interceptor.RTPReaderFunc( + func(in []byte, a interceptor.Attributes) (n int, attributes interceptor.Attributes, err error) { + n, err = rtpReadStream.Read(in) + + return n, a, err + }, + ) + srtcpSession, err := t.getSRTCPSession() if err != nil { - return nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, err } rtcpReadStream, err := srtcpSession.OpenReadStream(uint32(ssrc)) if err != nil { - return nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, err } rtcpInterceptor := t.api.interceptor.BindRTCPReader(interceptor.RTCPReaderFunc( @@ -570,5 +576,5 @@ func (t *DTLSTransport) streamsForSSRC( }), ) - return rtpReadStream, rtpInterceptor, rtcpReadStream, rtcpInterceptor, nil + return rtpReadStream, rtpReader, rtpProcessor, rtcpReadStream, rtcpInterceptor, nil } diff --git a/go.mod b/go.mod index 4d3e15aad0d..b883a92097a 100644 --- a/go.mod +++ b/go.mod @@ -34,3 +34,5 @@ require ( golang.org/x/sys v0.30.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) + +replace github.com/pion/interceptor v0.1.37 => github.com/arjunshajitech/interceptor v0.0.0-20250312123050-c9d476a510a1 diff --git a/go.sum b/go.sum index 8e583d4f148..5a6d5e1d874 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,4 @@ +github.com/arjunshajitech/interceptor v0.0.0-20250312123050-c9d476a510a1/go.mod h1:tYRp/5W3dEUrbYzdB49i4WictfIG2eEOSoFCb+oJAHY= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/peerconnection.go b/peerconnection.go index 07b188b71bb..2a9cebdec7c 100644 --- a/peerconnection.go +++ b/peerconnection.go @@ -1732,7 +1732,7 @@ func (pc *PeerConnection) handleIncomingSSRC(rtpStream io.Reader, ssrc SSRC) err params.Codecs[0].RTPCodecCapability, params.HeaderExtensions, ) - readStream, interceptor, rtcpReadStream, rtcpInterceptor, err := pc.dtlsTransport.streamsForSSRC(ssrc, *streamInfo) + readStream, rtpReader, rtpProcessor, rtcpReadStream, rtcpInterceptor, err := pc.dtlsTransport.streamsForSSRC(ssrc, *streamInfo) if err != nil { return err } @@ -1746,7 +1746,7 @@ func (pc *PeerConnection) handleIncomingSSRC(rtpStream io.Reader, ssrc SSRC) err readCount-- } - i, _, err := interceptor.Read(b, nil) + i, _, err := rtpReader.Read(b, nil) if err != nil { return err } @@ -1775,7 +1775,7 @@ func (pc *PeerConnection) handleIncomingSSRC(rtpStream io.Reader, ssrc SSRC) err receiver.mu.Lock() defer receiver.mu.Unlock() - return receiver.receiveForRtx(SSRC(0), rsid, streamInfo, readStream, interceptor, rtcpReadStream, rtcpInterceptor) + return receiver.receiveForRtx(SSRC(0), rsid, streamInfo, readStream, rtpReader, rtpProcessor, rtcpReadStream, rtcpInterceptor) } track, err := receiver.receiveForRid( @@ -1783,7 +1783,8 @@ func (pc *PeerConnection) handleIncomingSSRC(rtpStream io.Reader, ssrc SSRC) err params, streamInfo, readStream, - interceptor, + rtpReader, + rtpProcessor, rtcpReadStream, rtcpInterceptor, ) diff --git a/rtpreceiver.go b/rtpreceiver.go index 109f88a4a21..0bb766929db 100644 --- a/rtpreceiver.go +++ b/rtpreceiver.go @@ -26,14 +26,16 @@ type trackStreams struct { streamInfo, repairStreamInfo *interceptor.StreamInfo - rtpReadStream *srtp.ReadStreamSRTP - rtpInterceptor interceptor.RTPReader + rtpReadStream *srtp.ReadStreamSRTP + rtpReader interceptor.RTPReader + rtpProcessor interceptor.RTPProcessor rtcpReadStream *srtp.ReadStreamSRTCP rtcpInterceptor interceptor.RTCPReader repairReadStream *srtp.ReadStreamSRTP - repairInterceptor interceptor.RTPReader + repairReader interceptor.RTPReader + repairProcessor interceptor.RTPProcessor repairStreamChannel chan rtxPacketWithAttributes repairRtcpReadStream *srtp.ReadStreamSRTCP @@ -228,13 +230,13 @@ func (r *RTPReceiver) startReceive(parameters RTPReceiveParameters) error { //no var err error //nolint:lll // # TODO refactor - if streams.rtpReadStream, streams.rtpInterceptor, streams.rtcpReadStream, streams.rtcpInterceptor, err = r.transport.streamsForSSRC(parameters.Encodings[i].SSRC, *streams.streamInfo); err != nil { + if streams.rtpReadStream, streams.rtpReader, streams.rtpProcessor, streams.rtcpReadStream, streams.rtcpInterceptor, err = r.transport.streamsForSSRC(parameters.Encodings[i].SSRC, *streams.streamInfo); err != nil { return err } if rtxSsrc := parameters.Encodings[i].RTX.SSRC; rtxSsrc != 0 { streamInfo := createStreamInfo("", rtxSsrc, 0, 0, 0, 0, 0, codec, globalParams.HeaderExtensions) - rtpReadStream, rtpInterceptor, rtcpReadStream, rtcpInterceptor, err := r.transport.streamsForSSRC( + rtpReadStream, rtpReader, rtpProcessor, rtcpReadStream, rtcpInterceptor, err := r.transport.streamsForSSRC( rtxSsrc, *streamInfo, ) @@ -247,7 +249,8 @@ func (r *RTPReceiver) startReceive(parameters RTPReceiveParameters) error { //no "", streamInfo, rtpReadStream, - rtpInterceptor, + rtpReader, + rtpProcessor, rtcpReadStream, rtcpInterceptor, ); err != nil { @@ -412,7 +415,11 @@ func (r *RTPReceiver) readRTP(b []byte, reader *TrackRemote) (n int, a intercept } if t := r.streamsForTrack(reader); t != nil { - return t.rtpInterceptor.Read(b, a) + i, attr, err := t.rtpReader.Read(b, a) + if err != nil { + return 0, nil, err + } + return t.rtpProcessor.Process(i, b, attr) } return 0, nil, fmt.Errorf("%w: %d", errRTPReceiverWithSSRCTrackStreamNotFound, reader.SSRC()) @@ -425,7 +432,8 @@ func (r *RTPReceiver) receiveForRid( params RTPParameters, streamInfo *interceptor.StreamInfo, rtpReadStream *srtp.ReadStreamSRTP, - rtpInterceptor interceptor.RTPReader, + rtpReader interceptor.RTPReader, + rtpProcessor interceptor.RTPProcessor, rtcpReadStream *srtp.ReadStreamSRTCP, rtcpInterceptor interceptor.RTCPReader, ) (*TrackRemote, error) { @@ -443,7 +451,8 @@ func (r *RTPReceiver) receiveForRid( r.tracks[i].streamInfo = streamInfo r.tracks[i].rtpReadStream = rtpReadStream - r.tracks[i].rtpInterceptor = rtpInterceptor + r.tracks[i].rtpReader = rtpReader + r.tracks[i].rtpProcessor = rtpProcessor r.tracks[i].rtcpReadStream = rtcpReadStream r.tracks[i].rtcpInterceptor = rtcpInterceptor @@ -462,7 +471,8 @@ func (r *RTPReceiver) receiveForRtx( rsid string, streamInfo *interceptor.StreamInfo, rtpReadStream *srtp.ReadStreamSRTP, - rtpInterceptor interceptor.RTPReader, + rtpReader interceptor.RTPReader, + rtpProcessor interceptor.RTPProcessor, rtcpReadStream *srtp.ReadStreamSRTCP, rtcpInterceptor interceptor.RTCPReader, ) error { @@ -488,7 +498,8 @@ func (r *RTPReceiver) receiveForRtx( track.repairStreamInfo = streamInfo track.repairReadStream = rtpReadStream - track.repairInterceptor = rtpInterceptor + track.repairReader = rtpReader + track.repairProcessor = rtpProcessor track.repairRtcpReadStream = rtcpReadStream track.repairRtcpInterceptor = rtcpInterceptor track.repairStreamChannel = make(chan rtxPacketWithAttributes, 50) @@ -496,7 +507,12 @@ func (r *RTPReceiver) receiveForRtx( go func() { for { b := r.rtxPool.Get().([]byte) // nolint:forcetypeassert - i, attributes, err := track.repairInterceptor.Read(b, nil) + i, attributes, err := track.repairReader.Read(b, nil) + if err != nil { + r.rtxPool.Put(b) // nolint:staticcheck + return + } + i, attributes, err = track.repairProcessor.Process(i, b, attributes) if err != nil { r.rtxPool.Put(b) // nolint:staticcheck @@ -590,7 +606,7 @@ func (r *RTPReceiver) setRTPReadDeadline(deadline time.Time, reader *TrackRemote } // readRTX returns an RTX packet if one is available on the RTX track, otherwise returns nil. -func (r *RTPReceiver) readRTX(reader *TrackRemote) *rtxPacketWithAttributes { +func (r *RTPReceiver) readRTX(b []byte, reader *TrackRemote) *rtxPacketWithAttributes { if !reader.HasRTX() { return nil } @@ -604,7 +620,13 @@ func (r *RTPReceiver) readRTX(reader *TrackRemote) *rtxPacketWithAttributes { if t := r.streamsForTrack(reader); t != nil { select { case rtxPacketReceived := <-t.repairStreamChannel: - return &rtxPacketReceived + { + n := copy(b, rtxPacketReceived.pkt) + _, _, err := t.rtpProcessor.Process(n, b, nil) + if err == nil { + return &rtxPacketReceived + } + } default: } } diff --git a/track_remote.go b/track_remote.go index 1b037e71626..126ecccf797 100644 --- a/track_remote.go +++ b/track_remote.go @@ -135,7 +135,7 @@ func (t *TrackRemote) Read(b []byte) (n int, attributes interceptor.Attributes, } // If there's a separate RTX track and an RTX packet is available, return that - if rtxPacketReceived := receiver.readRTX(t); rtxPacketReceived != nil { + if rtxPacketReceived := receiver.readRTX(b, t); rtxPacketReceived != nil { n = copy(b, rtxPacketReceived.pkt) attributes = rtxPacketReceived.attributes rtxPacketReceived.release() From d132d03386528f8d4397a1a49cbde53bfb055053 Mon Sep 17 00:00:00 2001 From: ARJUN SHAJI Date: Thu, 13 Mar 2025 17:33:23 +0530 Subject: [PATCH 2/3] Fix test cases Ensure ICEConnection/ConnectionState is established before proceeding --- go.sum | 1 + interceptor_test.go | 26 ++++++++++++++++++++++---- peerconnection_media_test.go | 10 ++++++++-- 3 files changed, 31 insertions(+), 6 deletions(-) diff --git a/go.sum b/go.sum index 5a6d5e1d874..2348a1656c3 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,4 @@ +github.com/arjunshajitech/interceptor v0.0.0-20250312123050-c9d476a510a1 h1:5p3Tm/VZUdN8aqLJp1noK/fAqggXJBHSsWXQJbksmw0= github.com/arjunshajitech/interceptor v0.0.0-20250312123050-c9d476a510a1/go.mod h1:tYRp/5W3dEUrbYzdB49i4WictfIG2eEOSoFCb+oJAHY= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= diff --git a/interceptor_test.go b/interceptor_test.go index a3540edc082..83ee2a2ef34 100644 --- a/interceptor_test.go +++ b/interceptor_test.go @@ -51,15 +51,15 @@ func TestPeerConnection_Interceptor(t *testing.T) { }, ) }, - BindRemoteStreamFn: func(_ *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader { - return interceptor.RTPReaderFunc(func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) { + BindRemoteStreamFn: func(_ *interceptor.StreamInfo, reader interceptor.RTPProcessor) interceptor.RTPProcessor { + return interceptor.RTPProcessorFunc(func(i int, b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) { if a == nil { a = interceptor.Attributes{} } a.Set("attribute", "value") - return reader.Read(b, a) + return reader.Process(i, b, a) }) }, }, nil @@ -146,7 +146,7 @@ func Test_Interceptor_BindUnbind(t *testing.T) { //nolint:cyclop UnbindLocalStreamFn: func(*interceptor.StreamInfo) { atomic.AddUint32(&cntUnbindLocalStream, 1) }, - BindRemoteStreamFn: func(_ *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader { + BindRemoteStreamFn: func(_ *interceptor.StreamInfo, reader interceptor.RTPProcessor) interceptor.RTPProcessor { atomic.AddUint32(&cntBindRemoteStream, 1) return reader @@ -413,6 +413,24 @@ func testInterceptorNack(t *testing.T, requestNack bool) { //nolint:cyclop close(done) }) + pcOfferConnected := make(chan struct{}) + pcAnswerConnected := make(chan struct{}) + + pc1.OnConnectionStateChange(func(state PeerConnectionState) { + if state == PeerConnectionStateConnected { + close(pcOfferConnected) + } + }) + + pc2.OnConnectionStateChange(func(state PeerConnectionState) { + if state == PeerConnectionStateConnected { + close(pcAnswerConnected) + } + }) + + <-pcOfferConnected + <-pcAnswerConnected + go func() { for i := 0; i < numPackets; i++ { time.Sleep(20 * time.Millisecond) diff --git a/peerconnection_media_test.go b/peerconnection_media_test.go index 30c9307a38c..881399d9b46 100644 --- a/peerconnection_media_test.go +++ b/peerconnection_media_test.go @@ -150,10 +150,11 @@ func TestPeerConnection_Media_Sample(t *testing.T) { go func() { for { - time.Sleep(time.Millisecond * 100) if pcOffer.ICEConnectionState() != ICEConnectionStateConnected { + time.Sleep(time.Millisecond * 100) continue } + if routineErr := vp8Track.WriteSample(media.Sample{Data: []byte{0x00}, Duration: time.Second}); routineErr != nil { fmt.Println(routineErr) } @@ -169,6 +170,12 @@ func TestPeerConnection_Media_Sample(t *testing.T) { }() go func() { + for { + if pcOffer.ICEConnectionState() == ICEConnectionStateConnected { + break + } + time.Sleep(time.Millisecond * 100) + } parameters := sender.GetParameters() for { @@ -190,7 +197,6 @@ func TestPeerConnection_Media_Sample(t *testing.T) { } } }() - go func() { if _, _, routineErr := sender.Read(make([]byte, 1400)); routineErr == nil { close(awaitRTCPSenderRecv) From 4c398486cda25b7f5b2160d030780249e4a22e33 Mon Sep 17 00:00:00 2001 From: ARJUN SHAJI Date: Thu, 13 Mar 2025 18:52:22 +0530 Subject: [PATCH 3/3] Update go.mod and go.sum --- go.mod | 2 +- go.sum | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index b883a92097a..a61796e1541 100644 --- a/go.mod +++ b/go.mod @@ -35,4 +35,4 @@ require ( gopkg.in/yaml.v3 v3.0.1 // indirect ) -replace github.com/pion/interceptor v0.1.37 => github.com/arjunshajitech/interceptor v0.0.0-20250312123050-c9d476a510a1 +replace github.com/pion/interceptor v0.1.37 => github.com/arjunshajitech/interceptor v0.0.0-20250313131735-36472c6290cb diff --git a/go.sum b/go.sum index 2348a1656c3..327139a64fb 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ github.com/arjunshajitech/interceptor v0.0.0-20250312123050-c9d476a510a1 h1:5p3Tm/VZUdN8aqLJp1noK/fAqggXJBHSsWXQJbksmw0= github.com/arjunshajitech/interceptor v0.0.0-20250312123050-c9d476a510a1/go.mod h1:tYRp/5W3dEUrbYzdB49i4WictfIG2eEOSoFCb+oJAHY= +github.com/arjunshajitech/interceptor v0.0.0-20250313131735-36472c6290cb h1:qu70eQhcmCvNkrzYeVTDXS1RGmt14Qu5vo+sQH+q16w= +github.com/arjunshajitech/interceptor v0.0.0-20250313131735-36472c6290cb/go.mod h1:tYRp/5W3dEUrbYzdB49i4WictfIG2eEOSoFCb+oJAHY= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=