Skip to content

Commit bdfda17

Browse files
authored
Fix handling of sender report offsets (#731)
* Decompose desired PTS offset into 2 parts One is fixed and determined on track start up, the other needs to be updated on SRs * Adding basic track syncrhonizer unit tests * increasing time cmp tolerance * Adding slack notifier action
1 parent b11bf0b commit bdfda17

File tree

6 files changed

+585
-1
lines changed

6 files changed

+585
-1
lines changed
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
name: PR Slack Notifier
2+
3+
on:
4+
pull_request:
5+
types: [review_requested, reopened, closed]
6+
pull_request_review:
7+
types: [submitted]
8+
9+
permissions:
10+
contents: read
11+
pull-requests: write
12+
issues: write
13+
14+
concurrency:
15+
group: pr-slack-${{ github.event.pull_request.number }}-${{ github.workflow }}
16+
cancel-in-progress: false
17+
18+
jobs:
19+
notify-devs:
20+
runs-on: ubuntu-latest
21+
steps:
22+
- uses: livekit/slack-notifier-action@main
23+
with:
24+
config_json: ${{ secrets.SLACK_NOTIFY_CONFIG_JSON }}
25+
slack_token: ${{ secrets.SLACK_PR_NOTIFIER_TOKEN }}

go.mod

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@ require (
2828

2929
require golang.org/x/mod v0.27.0
3030

31+
require (
32+
github.com/maxbrunsfeld/counterfeiter/v6 v6.11.3 // indirect
33+
golang.org/x/tools v0.36.0 // indirect
34+
)
35+
3136
require (
3237
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.8-20250717185734-6c6e0d3c608e.1 // indirect
3338
buf.build/go/protovalidate v0.14.0 // indirect
@@ -87,3 +92,5 @@ require (
8792
gopkg.in/hraban/opus.v2 v2.0.0-20230925203106-0188a62cb302 // indirect
8893
gopkg.in/yaml.v3 v3.0.1 // indirect
8994
)
95+
96+
tool github.com/maxbrunsfeld/counterfeiter/v6

go.sum

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@ github.com/livekit/psrpc v0.6.1-0.20250726180611-3915e005e741 h1:KKL1u94l6dF9u4c
105105
github.com/livekit/psrpc v0.6.1-0.20250726180611-3915e005e741/go.mod h1:AuDC5uOoEjQJEc69v4Li3t77Ocz0e0NdjQEuFfO+vfk=
106106
github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg=
107107
github.com/magefile/mage v1.15.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A=
108+
github.com/maxbrunsfeld/counterfeiter/v6 v6.11.3 h1:Eaq36EIyJNp7b3qDhjV7jmDVq/yPeW2v4pTqzGbOGB4=
109+
github.com/maxbrunsfeld/counterfeiter/v6 v6.11.3/go.mod h1:6KKUoQBZBW6PDXJtNfqeEjPXMj/ITTk+cWK9t9uS5+E=
108110
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
109111
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
110112
github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0=
@@ -267,6 +269,8 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm
267269
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
268270
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
269271
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
272+
golang.org/x/tools v0.36.0 h1:kWS0uv/zsvHEle1LbV5LE8QujrxB3wfQyxHfhOk0Qkg=
273+
golang.org/x/tools v0.36.0/go.mod h1:WBDiHKJK8YgLHlcQPYQzNCkUxUypCaa5ZegCVutKm+s=
270274
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
271275
gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk=
272276
gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E=
Lines changed: 253 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,253 @@
1+
package synchronizer_test
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/pion/rtcp"
8+
"github.com/pion/rtp"
9+
"github.com/pion/webrtc/v4"
10+
"github.com/stretchr/testify/require"
11+
12+
"github.com/livekit/mediatransportutil"
13+
"github.com/livekit/server-sdk-go/v2/pkg/synchronizer"
14+
"github.com/livekit/server-sdk-go/v2/pkg/synchronizer/synchronizerfakes"
15+
)
16+
17+
const timeTolerance = time.Millisecond * 10
18+
19+
// ---------- helpers ----------
20+
21+
func near(t *testing.T, got, want, tol time.Duration) {
22+
t.Helper()
23+
d := got - want
24+
if d < 0 {
25+
d = -d
26+
}
27+
require.LessOrEqualf(t, d, tol, "got %v, want ~%v (±%v)", got, want, tol)
28+
}
29+
30+
func pkt(ts uint32) *rtp.Packet {
31+
return &rtp.Packet{Header: rtp.Header{Timestamp: ts}}
32+
}
33+
34+
func fakeAudio48k(ssrc uint32) *synchronizerfakes.FakeTrackRemote {
35+
f := &synchronizerfakes.FakeTrackRemote{}
36+
f.IDReturns("audio-1")
37+
f.KindReturns(webrtc.RTPCodecTypeAudio)
38+
f.SSRCReturns(webrtc.SSRC(ssrc))
39+
f.CodecReturns(webrtc.RTPCodecParameters{
40+
RTPCodecCapability: webrtc.RTPCodecCapability{
41+
MimeType: webrtc.MimeTypeOpus,
42+
ClockRate: 48000,
43+
},
44+
})
45+
return f
46+
}
47+
48+
func fakeVideo90k(ssrc uint32) *synchronizerfakes.FakeTrackRemote {
49+
f := &synchronizerfakes.FakeTrackRemote{}
50+
f.IDReturns("video-1")
51+
f.KindReturns(webrtc.RTPCodecTypeVideo)
52+
f.SSRCReturns(webrtc.SSRC(ssrc))
53+
f.CodecReturns(webrtc.RTPCodecParameters{
54+
RTPCodecCapability: webrtc.RTPCodecCapability{
55+
MimeType: webrtc.MimeTypeVP8, // codec value not important here
56+
ClockRate: 90000,
57+
},
58+
})
59+
return f
60+
}
61+
62+
// ---------- tests ----------
63+
64+
// Initialize + same timestamp returns previous adjusted value (near zero right after init)
65+
func TestInitialize_AndSameTimestamp(t *testing.T) {
66+
s := synchronizer.NewSynchronizerWithOptions(synchronizer.WithMaxTsDiff(50 * time.Millisecond))
67+
tr := fakeAudio48k(0xA0010001)
68+
69+
ts := uint32(1_000_000)
70+
tsync := s.AddTrack(tr, "p1")
71+
72+
tsync.Initialize(pkt(ts))
73+
74+
adj0, err := tsync.GetPTS(pkt(ts))
75+
require.NoError(t, err)
76+
near(t, adj0, 0, timeTolerance)
77+
78+
adj1, err := tsync.GetPTS(pkt(ts))
79+
require.NoError(t, err)
80+
require.Equal(t, adj0, adj1)
81+
}
82+
83+
// 20 ms RTP step at 48 kHz → ~20 ms adjusted PTS (with small tolerance)
84+
func TestMonotonicPTS_SmallRTPDelta(t *testing.T) {
85+
s := synchronizer.NewSynchronizerWithOptions(synchronizer.WithMaxTsDiff(50 * time.Millisecond))
86+
tr := fakeAudio48k(0xA0010002)
87+
88+
ts0 := uint32(500_000)
89+
delta20ms := uint32(48000 * 20 / 1000) // 960 ticks
90+
91+
tsync := s.AddTrack(tr, "p1")
92+
tsync.Initialize(pkt(ts0))
93+
94+
// establish startTime
95+
_, _ = tsync.GetPTS(pkt(ts0))
96+
97+
adj, err := tsync.GetPTS(pkt(ts0 + delta20ms))
98+
require.NoError(t, err)
99+
near(t, adj, 20*time.Millisecond, timeTolerance)
100+
}
101+
102+
// Large RTP jump with tight MaxTsDiff should reset to small estimatedPTS (not ~2s)
103+
func TestUnacceptableDrift_ResetsToEstimatedPTS(t *testing.T) {
104+
s := synchronizer.NewSynchronizerWithOptions(synchronizer.WithMaxTsDiff(10 * time.Millisecond))
105+
tr := fakeAudio48k(0xA0010003)
106+
107+
ts0 := uint32(777_000)
108+
delta2s := uint32(48000 * 2) // 96,000 ticks
109+
110+
tsync := s.AddTrack(tr, "p1")
111+
tsync.Initialize(pkt(ts0))
112+
113+
// establish startTime
114+
_, _ = tsync.GetPTS(pkt(ts0))
115+
116+
adj, err := tsync.GetPTS(pkt(ts0 + delta2s))
117+
require.NoError(t, err)
118+
require.LessOrEqual(t, adj, 150*time.Millisecond, "expected reset to small estimatedPTS, not ~2s")
119+
}
120+
121+
func TestOnSenderReport_SlewsTowardDesiredOffset(t *testing.T) {
122+
const (
123+
maxAdjustment = 5 * time.Millisecond
124+
ts0 = uint32(900_000)
125+
stepRTP = uint32(48000 * 20 / 1000) // 20 ms @ 48 kHz
126+
stepDur = 20 * time.Millisecond
127+
desired = 50 * time.Millisecond // target offset from SR
128+
)
129+
130+
s := synchronizer.NewSynchronizerWithOptions(synchronizer.WithMaxTsDiff(1 * time.Second))
131+
tr := fakeAudio48k(0xA0010004)
132+
133+
tsync := s.AddTrack(tr, "p1")
134+
tsync.Initialize(pkt(ts0))
135+
136+
// Anchor wall-clock just before startTime is set.
137+
t0 := time.Now()
138+
_, _ = tsync.GetPTS(pkt(ts0)) // sets startTime ≈ t0
139+
140+
cur := ts0 + stepRTP
141+
baseAdj, _ := tsync.GetPTS(pkt(cur)) // baseline adjusted PTS with 20ms content
142+
143+
// Craft SR so offset ≈ desired:
144+
// offset := NTP - (startTime + pts_at_SR)
145+
// pts_at_SR here is 20ms, startTime ≈ t0 → set NTP to t0 + 20ms + desired
146+
ntp := mediatransportutil.ToNtpTime(t0.Add(stepDur + desired))
147+
tsync.OnSenderReport(func(d time.Duration) {})
148+
s.OnRTCP(&rtcp.SenderReport{
149+
SSRC: uint32(tr.SSRC()),
150+
NTPTime: uint64(ntp),
151+
RTPTime: cur,
152+
})
153+
154+
// Converge in N = ceil(desired / 5ms) steps (5ms maxAdjustment)
155+
N := int((desired + 5*time.Millisecond - 1) / (5 * time.Millisecond))
156+
157+
for i := 0; i < N; i++ {
158+
cur += stepRTP
159+
_, err := tsync.GetPTS(pkt(cur))
160+
require.NoError(t, err)
161+
}
162+
163+
// After N steps, total adjusted delta over base should be:
164+
// content progression (N * 20ms) + desired (slew)
165+
finalAdj, err := tsync.GetPTS(pkt(cur)) // same TS → returns last adjusted
166+
require.NoError(t, err)
167+
168+
gotDelta := finalAdj - baseAdj
169+
wantDelta := time.Duration(N)*stepDur + desired
170+
171+
near(t, gotDelta, wantDelta, timeTolerance)
172+
}
173+
174+
// Regression: late video start (~2s) + tiny SR offset (~10ms) must NOT emit a big negative drift
175+
func TestOnSenderReport_LateVideoStart_SmallSROffset_NoHugeNegativeDrift(t *testing.T) {
176+
const (
177+
lateStart = 2 * time.Second
178+
srOffset = 50 * time.Millisecond
179+
stepSlew = 5 * time.Millisecond // TrackSynchronizer's maxAdjustment
180+
)
181+
182+
s := synchronizer.NewSynchronizerWithOptions(synchronizer.WithMaxTsDiff(2 * time.Second))
183+
184+
// 1) Audio publishes immediately → establishes startedAt
185+
audio := fakeAudio48k(0xA0010005)
186+
tsA0 := uint32(1_000_000)
187+
aSync := s.AddTrack(audio, "p1")
188+
aSync.Initialize(pkt(tsA0))
189+
_, _ = aSync.GetPTS(pkt(tsA0))
190+
191+
// Simulate a real late video publish
192+
time.Sleep(lateStart)
193+
194+
// 2) Video publishes later
195+
video := fakeVideo90k(0x00BEEF01)
196+
tsV0 := uint32(2_000_000)
197+
vSync := s.AddTrack(video, "p1")
198+
vSync.Initialize(pkt(tsV0))
199+
_, _ = vSync.GetPTS(pkt(tsV0))
200+
201+
// 3) First SR: small positive drift
202+
ntp := mediatransportutil.ToNtpTime(time.Now().Add(srOffset))
203+
var observedDrift time.Duration
204+
vSync.OnSenderReport(func(d time.Duration) { observedDrift = d })
205+
206+
s.OnRTCP(&rtcp.SenderReport{
207+
SSRC: uint32(video.SSRC()),
208+
NTPTime: uint64(ntp),
209+
RTPTime: tsV0, // equals lastTS → SR uses lastPTS at tsV0
210+
})
211+
212+
near(t, observedDrift, srOffset, timeTolerance)
213+
214+
// baseline adjusted PTS at the SR moment (same TS => returns last adjusted)
215+
baseAdj, err := vSync.GetPTS(pkt(tsV0))
216+
require.NoError(t, err)
217+
218+
step33ms := uint32(90000 * 33 / 1000) // ~33 ms per 30fps frame at 90 kHz
219+
stepDur := 33 * time.Millisecond
220+
221+
// Converge in N = ceil(srOffset / stepSlew) steps (50ms / 5ms = 10)
222+
N := int((srOffset + stepSlew - 1) / stepSlew)
223+
224+
cur := tsV0
225+
var adj time.Duration
226+
227+
// Drive N frames to converge
228+
for i := 1; i <= N; i++ {
229+
cur += step33ms
230+
adj, err = vSync.GetPTS(pkt(cur))
231+
require.NoError(t, err)
232+
}
233+
234+
// After N steps, the extra beyond content cadence should be ~srOffset
235+
gotDelta := adj - baseAdj
236+
wantDelta := time.Duration(N)*stepDur + srOffset
237+
near(t, gotDelta, wantDelta, timeTolerance)
238+
239+
// Now push more frames and ensure we DON'T keep slewing (stays near srOffset)
240+
const extraFrames = 8
241+
for i := 1; i <= extraFrames; i++ {
242+
cur += step33ms
243+
adj, err = vSync.GetPTS(pkt(cur))
244+
require.NoError(t, err)
245+
246+
// Extra slew measured from the SR moment:
247+
totalContent := time.Duration(N+i) * stepDur
248+
extra := (adj - baseAdj) - totalContent
249+
250+
// Stay within a small band around srOffset (no steady growth)
251+
near(t, extra, srOffset, timeTolerance)
252+
}
253+
}

0 commit comments

Comments
 (0)