77 "time"
88
99 "github.com/pion/interceptor"
10+ "github.com/pion/interceptor/internal/ntp"
1011 "github.com/pion/rtcp"
1112 "github.com/pion/rtp"
1213)
@@ -16,9 +17,8 @@ import (
1617const TwccExtensionAttributesKey = iota
1718
1819var (
19- errMissingTWCCExtensionID = errors .New ("missing transport layer cc header extension id" )
20- errMissingTWCCExtension = errors .New ("missing transport layer cc header extension" )
21- errInvalidFeedback = errors .New ("invalid feedback" )
20+ errMissingTWCCExtension = errors .New ("missing transport layer cc header extension" )
21+ errInvalidFeedback = errors .New ("invalid feedback" )
2222)
2323
2424// FeedbackAdapter converts incoming RTCP Packets (TWCC and RFC8888) into Acknowledgments.
@@ -33,15 +33,24 @@ func NewFeedbackAdapter() *FeedbackAdapter {
3333 return & FeedbackAdapter {history : newFeedbackHistory (250 )}
3434}
3535
36- // OnSent records that and when an outgoing packet was sent for later mapping to
37- // acknowledgments
38- func (f * FeedbackAdapter ) OnSent (ts time.Time , header * rtp.Header , size int , attributes interceptor.Attributes ) error {
39- hdrExtensionID := attributes .Get (TwccExtensionAttributesKey )
40- id , ok := hdrExtensionID .(uint8 )
41- if ! ok || hdrExtensionID == 0 {
42- return errMissingTWCCExtensionID
43- }
44- sequenceNumber := header .GetExtension (id )
36+ func (f * FeedbackAdapter ) onSentRFC8888 (ts time.Time , header * rtp.Header , size int ) error {
37+ f .lock .Lock ()
38+ defer f .lock .Unlock ()
39+
40+ f .history .add (Acknowledgment {
41+ SequenceNumber : header .SequenceNumber ,
42+ SSRC : header .SSRC ,
43+ Size : size ,
44+ Departure : ts ,
45+ Arrival : time.Time {},
46+ RTT : 0 ,
47+ ECN : 0 ,
48+ })
49+ return nil
50+ }
51+
52+ func (f * FeedbackAdapter ) onSentTWCC (ts time.Time , extID uint8 , header * rtp.Header , size int ) error {
53+ sequenceNumber := header .GetExtension (extID )
4554 var tccExt rtp.TransportCCExtension
4655 err := tccExt .Unmarshal (sequenceNumber )
4756 if err != nil {
@@ -51,23 +60,41 @@ func (f *FeedbackAdapter) OnSent(ts time.Time, header *rtp.Header, size int, att
5160 f .lock .Lock ()
5261 defer f .lock .Unlock ()
5362 f .history .add (Acknowledgment {
54- TLCC : tccExt .TransportSequence ,
55- Size : header .MarshalSize () + size ,
56- Departure : ts ,
57- Arrival : time.Time {},
58- RTT : 0 ,
63+ SequenceNumber : tccExt .TransportSequence ,
64+ SSRC : 0 ,
65+ Size : header .MarshalSize () + size ,
66+ Departure : ts ,
67+ Arrival : time.Time {},
68+ RTT : 0 ,
69+ ECN : 0 ,
5970 })
6071 return nil
6172}
6273
74+ // OnSent records that and when an outgoing packet was sent for later mapping to
75+ // acknowledgments
76+ func (f * FeedbackAdapter ) OnSent (ts time.Time , header * rtp.Header , size int , attributes interceptor.Attributes ) error {
77+ hdrExtensionID := attributes .Get (TwccExtensionAttributesKey )
78+ id , ok := hdrExtensionID .(uint8 )
79+ if ok && hdrExtensionID != 0 {
80+ return f .onSentTWCC (ts , id , header , size )
81+ }
82+
83+ return f .onSentRFC8888 (ts , header , size )
84+ }
85+
6386func (f * FeedbackAdapter ) unpackRunLengthChunk (ts time.Time , start uint16 , refTime time.Time , chunk * rtcp.RunLengthChunk , deltas []* rtcp.RecvDelta ) (consumedDeltas int , nextRef time.Time , acks []Acknowledgment , err error ) {
6487 result := make ([]Acknowledgment , chunk .RunLength )
6588 deltaIndex := 0
6689
6790 end := start + chunk .RunLength
6891 resultIndex := 0
6992 for i := start ; i != end ; i ++ {
70- if ack , ok := f .history .get (i ); ok {
93+ key := feedbackHistoryKey {
94+ ssrc : 0 ,
95+ sequenceNumber : i ,
96+ }
97+ if ack , ok := f .history .get (key ); ok {
7198 if chunk .PacketStatusSymbol != rtcp .TypeTCCPacketNotReceived {
7299 if len (deltas )- 1 < deltaIndex {
73100 return deltaIndex , refTime , result , errInvalidFeedback
@@ -89,7 +116,11 @@ func (f *FeedbackAdapter) unpackStatusVectorChunk(ts time.Time, start uint16, re
89116 deltaIndex := 0
90117 resultIndex := 0
91118 for i , symbol := range chunk .SymbolList {
92- if ack , ok := f .history .get (start + uint16 (i )); ok {
119+ key := feedbackHistoryKey {
120+ ssrc : 0 ,
121+ sequenceNumber : start + uint16 (i ),
122+ }
123+ if ack , ok := f .history .get (key ); ok {
93124 if symbol != rtcp .TypeTCCPacketNotReceived {
94125 if len (deltas )- 1 < deltaIndex {
95126 return deltaIndex , refTime , result , errInvalidFeedback
@@ -146,21 +177,55 @@ func (f *FeedbackAdapter) OnTransportCCFeedback(ts time.Time, feedback *rtcp.Tra
146177 return result , nil
147178}
148179
180+ // OnRFC8888Feedback converts incoming Congestion Control Feedback RTCP packet
181+ // to Acknowledgments.
182+ func (f * FeedbackAdapter ) OnRFC8888Feedback (ts time.Time , feedback * rtcp.CCFeedbackReport ) []Acknowledgment {
183+ f .lock .Lock ()
184+ defer f .lock .Unlock ()
185+
186+ result := []Acknowledgment {}
187+ referenceTime := ntp .ToTime (uint64 (feedback .ReportTimestamp ) << 16 )
188+ for _ , rb := range feedback .ReportBlocks {
189+ for i , mb := range rb .MetricBlocks {
190+ sequenceNumber := rb .BeginSequence + uint16 (i )
191+ key := feedbackHistoryKey {
192+ ssrc : rb .MediaSSRC ,
193+ sequenceNumber : sequenceNumber ,
194+ }
195+ if ack , ok := f .history .get (key ); ok {
196+ if mb .Received {
197+ delta := time .Duration ((float64 (mb .ArrivalTimeOffset ) / 1024.0 ) * float64 (time .Second ))
198+ ack .Arrival = referenceTime .Add (- delta )
199+ ack .RTT = ts .Sub (ack .Departure )
200+ ack .ECN = mb .ECN
201+ }
202+ result = append (result , ack )
203+ }
204+ }
205+ }
206+ return result
207+ }
208+
209+ type feedbackHistoryKey struct {
210+ ssrc uint32
211+ sequenceNumber uint16
212+ }
213+
149214type feedbackHistory struct {
150215 size int
151216 evictList * list.List
152- items map [uint16 ]* list.Element
217+ items map [feedbackHistoryKey ]* list.Element
153218}
154219
155220func newFeedbackHistory (size int ) * feedbackHistory {
156221 return & feedbackHistory {
157222 size : size ,
158223 evictList : list .New (),
159- items : make (map [uint16 ]* list.Element ),
224+ items : make (map [feedbackHistoryKey ]* list.Element ),
160225 }
161226}
162227
163- func (f * feedbackHistory ) get (key uint16 ) (Acknowledgment , bool ) {
228+ func (f * feedbackHistory ) get (key feedbackHistoryKey ) (Acknowledgment , bool ) {
164229 ent , ok := f .items [key ]
165230 if ok {
166231 if ack , ok := ent .Value .(Acknowledgment ); ok {
@@ -171,15 +236,19 @@ func (f *feedbackHistory) get(key uint16) (Acknowledgment, bool) {
171236}
172237
173238func (f * feedbackHistory ) add (ack Acknowledgment ) {
239+ key := feedbackHistoryKey {
240+ ssrc : ack .SSRC ,
241+ sequenceNumber : ack .SequenceNumber ,
242+ }
174243 // Check for existing
175- if ent , ok := f .items [ack . TLCC ]; ok {
244+ if ent , ok := f .items [key ]; ok {
176245 f .evictList .MoveToFront (ent )
177246 ent .Value = ack
178247 return
179248 }
180249 // Add new
181250 ent := f .evictList .PushFront (ack )
182- f .items [ack . TLCC ] = ent
251+ f .items [key ] = ent
183252 // Evict if necessary
184253 if f .evictList .Len () > f .size {
185254 f .removeOldest ()
@@ -190,7 +259,11 @@ func (f *feedbackHistory) removeOldest() {
190259 if ent := f .evictList .Back (); ent != nil {
191260 f .evictList .Remove (ent )
192261 if ack , ok := ent .Value .(Acknowledgment ); ok {
193- delete (f .items , ack .TLCC )
262+ key := feedbackHistoryKey {
263+ ssrc : ack .SSRC ,
264+ sequenceNumber : ack .SequenceNumber ,
265+ }
266+ delete (f .items , key )
194267 }
195268 }
196269}
0 commit comments