Skip to content

Commit ea6e6a3

Browse files
committed
Add playout delay header interceptor
This interceptor adds the playout delay header extension following http://www.webrtc.org/experiments/rtp-hdrext/playout-delay
1 parent a82b843 commit ea6e6a3

File tree

4 files changed

+180
-0
lines changed

4 files changed

+180
-0
lines changed

AUTHORS.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,6 @@ boks1971 <[email protected]>
1515
David Zhao <[email protected]>
1616
Jonathan Müller <[email protected]>
1717
Kevin Caffrey <[email protected]>
18+
Kevin Wang <[email protected]>
1819
Mathis Engelbart <[email protected]>
1920
Sean DuBois <[email protected]>
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package playoutdelay
2+
3+
import (
4+
"time"
5+
6+
"github.com/pion/interceptor"
7+
"github.com/pion/rtp"
8+
)
9+
10+
// HeaderExtensionInterceptorFactory is a interceptor.Factory for a HeaderExtensionInterceptor
11+
type HeaderExtensionInterceptorFactory struct{}
12+
13+
// NewInterceptor constructs a new HeaderExtensionInterceptor
14+
func (h *HeaderExtensionInterceptorFactory) NewInterceptor(id string, minDelay, maxDelay time.Duration) (interceptor.Interceptor, error) {
15+
if minDelay.Milliseconds() < 0 || minDelay.Milliseconds() > 40950 || maxDelay.Milliseconds() < 0 || maxDelay.Milliseconds() > 40950 {
16+
return nil, errPlayoutDelayInvalidValue
17+
}
18+
return &HeaderExtensionInterceptor{minDelay: uint16(minDelay.Milliseconds() / 10), maxDelay: uint16(maxDelay.Milliseconds() / 10)}, nil
19+
}
20+
21+
// NewHeaderExtensionInterceptor returns a HeaderExtensionInterceptorFactory
22+
func NewHeaderExtensionInterceptor() (*HeaderExtensionInterceptorFactory, error) {
23+
return &HeaderExtensionInterceptorFactory{}, nil
24+
}
25+
26+
// HeaderExtensionInterceptor adds transport wide sequence numbers as header extension to each RTP packet
27+
type HeaderExtensionInterceptor struct {
28+
interceptor.NoOp
29+
minDelay, maxDelay uint16
30+
}
31+
32+
const playoutDelayURI = "http://www.webrtc.org/experiments/rtp-hdrext/playout-delay"
33+
34+
// BindLocalStream returns a writer that adds a rtp.TransportCCExtension
35+
// header with increasing sequence numbers to each outgoing packet.
36+
func (h *HeaderExtensionInterceptor) BindLocalStream(info *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter {
37+
var hdrExtID uint8
38+
for _, e := range info.RTPHeaderExtensions {
39+
if e.URI == playoutDelayURI {
40+
hdrExtID = uint8(e.ID)
41+
break
42+
}
43+
}
44+
if hdrExtID == 0 { // Don't add header extension if ID is 0, because 0 is an invalid extension ID
45+
return writer
46+
}
47+
return interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
48+
ext, err := (&Extension{
49+
minDelay: h.minDelay,
50+
maxDelay: h.maxDelay,
51+
}).Marshal()
52+
if err != nil {
53+
return 0, err
54+
}
55+
err = header.SetExtension(hdrExtID, ext)
56+
if err != nil {
57+
return 0, err
58+
}
59+
return writer.Write(header, payload, attributes)
60+
})
61+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package playoutdelay
2+
3+
import (
4+
"sync"
5+
"testing"
6+
"time"
7+
8+
"github.com/pion/interceptor"
9+
"github.com/pion/interceptor/internal/test"
10+
"github.com/pion/rtp"
11+
"github.com/stretchr/testify/assert"
12+
)
13+
14+
func TestHeaderExtensionInterceptor(t *testing.T) {
15+
t.Run("add playout delay to each packet", func(t *testing.T) {
16+
factory, err := NewHeaderExtensionInterceptor()
17+
assert.NoError(t, err)
18+
19+
inter, err := factory.NewInterceptor("", 10*time.Millisecond, 20*time.Millisecond)
20+
assert.NoError(t, err)
21+
22+
pChan := make(chan *rtp.Packet, 10*5)
23+
go func() {
24+
// start some parallel streams using the same interceptor to test for race conditions
25+
var wg sync.WaitGroup
26+
num := 10
27+
wg.Add(num)
28+
for i := 0; i < num; i++ {
29+
go func(ch chan *rtp.Packet, id uint16) {
30+
stream := test.NewMockStream(&interceptor.StreamInfo{RTPHeaderExtensions: []interceptor.RTPHeaderExtension{
31+
{
32+
URI: playoutDelayURI,
33+
ID: 1,
34+
},
35+
}}, inter)
36+
defer func() {
37+
wg.Done()
38+
assert.NoError(t, stream.Close())
39+
}()
40+
41+
for _, seqNum := range []uint16{id * 1, id * 2, id * 3, id * 4, id * 5} {
42+
assert.NoError(t, stream.WriteRTP(&rtp.Packet{Header: rtp.Header{SequenceNumber: seqNum}}))
43+
select {
44+
case p := <-stream.WrittenRTP():
45+
assert.Equal(t, seqNum, p.SequenceNumber)
46+
ch <- p
47+
case <-time.After(10 * time.Millisecond):
48+
panic("written rtp packet not found")
49+
}
50+
}
51+
}(pChan, uint16(i+1))
52+
}
53+
wg.Wait()
54+
close(pChan)
55+
}()
56+
57+
for p := range pChan {
58+
extensionHeader := p.GetExtension(1)
59+
ext := &Extension{}
60+
err = ext.Unmarshal(extensionHeader)
61+
assert.NoError(t, err)
62+
assert.Equal(t, uint16(1), ext.minDelay)
63+
assert.Equal(t, uint16(2), ext.maxDelay)
64+
}
65+
})
66+
}

pkg/playoutdelay/playout_delay.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
// Package playoutdelay implements the playout delay header extension.
2+
package playoutdelay
3+
4+
import (
5+
"encoding/binary"
6+
"errors"
7+
)
8+
9+
const (
10+
// transport-wide sequence
11+
playoutDelayExtensionSize = 3
12+
playoutDelayMaxValue = (1 << 12) - 1
13+
)
14+
15+
var (
16+
errPlayoutDelayInvalidValue = errors.New("invalid playout delay value")
17+
errTooSmall = errors.New("playout delay header extension too short")
18+
)
19+
20+
// Extension is a extension payload format in
21+
// https://webrtc.googlesource.com/src/+/refs/heads/main/docs/native-code/rtp-hdrext/playout-delay
22+
// 0 1 2 3
23+
// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
24+
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
25+
// | ID | len=2 | MIN delay | MAX delay |
26+
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
27+
type Extension struct {
28+
minDelay, maxDelay uint16
29+
}
30+
31+
// Marshal serializes the members to buffer
32+
func (p Extension) Marshal() ([]byte, error) {
33+
if p.minDelay >= playoutDelayMaxValue || p.maxDelay >= playoutDelayMaxValue {
34+
return nil, errPlayoutDelayInvalidValue
35+
}
36+
37+
return []byte{
38+
byte(p.minDelay >> 4),
39+
byte(p.minDelay<<4) | byte(p.maxDelay>>8),
40+
byte(p.maxDelay),
41+
}, nil
42+
}
43+
44+
// Unmarshal parses the passed byte slice and stores the result in the members
45+
func (p *Extension) Unmarshal(rawData []byte) error {
46+
if len(rawData) < playoutDelayExtensionSize {
47+
return errTooSmall
48+
}
49+
p.minDelay = binary.BigEndian.Uint16(rawData[0:2]) >> 4
50+
p.maxDelay = binary.BigEndian.Uint16(rawData[1:3]) & 0x0FFF
51+
return nil
52+
}

0 commit comments

Comments
 (0)