Skip to content

Commit 5ee01df

Browse files
Disable max buffer linit on URL input queue. Log SRT stats every minute (#335)
1 parent 20938bf commit 5ee01df

File tree

1 file changed

+47
-6
lines changed

1 file changed

+47
-6
lines changed

pkg/media/urlpull/source.go

Lines changed: 47 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,14 @@ package urlpull
1717
import (
1818
"context"
1919
"strings"
20+
"time"
2021

22+
"github.com/frostbyte73/core"
2123
"github.com/go-gst/go-gst/gst"
2224

2325
"github.com/livekit/ingress/pkg/errors"
2426
"github.com/livekit/ingress/pkg/params"
27+
"github.com/livekit/protocol/logger"
2528
)
2629

2730
var (
@@ -39,12 +42,16 @@ var (
3942
)
4043

4144
type URLSource struct {
42-
params *params.Params
43-
src *gst.Element
44-
pad *gst.Pad
45+
params *params.Params
46+
src *gst.Element
47+
pad *gst.Pad
48+
printStats func()
49+
50+
done core.Fuse
4551
}
4652

4753
func NewURLSource(ctx context.Context, p *params.Params) (*URLSource, error) {
54+
var printStats func()
4855
bin := gst.NewBin("input")
4956

5057
var elem *gst.Element
@@ -69,6 +76,15 @@ func NewURLSource(ctx context.Context, p *params.Params) (*URLSource, error) {
6976
if err != nil {
7077
return nil, err
7178
}
79+
80+
printStats = func() {
81+
str, _ := elem.GetProperty("stats")
82+
if str != nil {
83+
if v, ok := str.(*gst.Structure); ok {
84+
logger.Infow("SRT input stats", "stats", v.String())
85+
}
86+
}
87+
}
7288
} else {
7389
return nil, errors.ErrUnsupportedURLFormat
7490
}
@@ -78,6 +94,11 @@ func NewURLSource(ctx context.Context, p *params.Params) (*URLSource, error) {
7894
return nil, err
7995
}
8096

97+
// Disable buffer count limit and rely on bytes and time limits
98+
if err := queue.SetProperty("max-size-buffers", uint(0)); err != nil {
99+
return nil, err
100+
}
101+
81102
if strings.HasPrefix(p.Url, "http://") || strings.HasPrefix(p.Url, "https://") {
82103
err = queue.SetProperty("use-buffering", true)
83104
if err != nil {
@@ -106,9 +127,10 @@ func NewURLSource(ctx context.Context, p *params.Params) (*URLSource, error) {
106127
}
107128

108129
return &URLSource{
109-
params: p,
110-
src: bin.Element,
111-
pad: pad,
130+
params: p,
131+
src: bin.Element,
132+
pad: pad,
133+
printStats: printStats,
112134
}, nil
113135
}
114136

@@ -138,11 +160,30 @@ func (s *URLSource) ValidateCaps(caps *gst.Caps) error {
138160
}
139161

140162
func (u *URLSource) Start(ctx context.Context) error {
163+
if u.printStats == nil {
164+
return nil
165+
}
166+
167+
go func() {
168+
ticker := time.NewTicker(time.Minute)
169+
for {
170+
select {
171+
case <-u.done.Watch():
172+
ticker.Stop()
173+
return
174+
case <-ticker.C:
175+
u.printStats()
176+
}
177+
}
178+
}()
179+
141180
return nil
142181
}
143182

144183
func (u *URLSource) Close() error {
145184
// TODO find a way to send a EOS event without hanging
146185

186+
u.done.Break()
187+
147188
return nil
148189
}

0 commit comments

Comments
 (0)