Skip to content

Commit c71d466

Browse files
authored
ai/live: Remove segment probing from payments (#3797)
Since we now do time based payments rather than resolution based payments, segment probing is no longer needed. However, we still send handle payments roughly once per segment to roughly synchronize the frequency of payment transmissions and checks between gateways and orchestrators. In theory having the G / O handle payments at different rates should be OK as long as the G has some balance with the O, but for now we can try to keep the cadence similar to the current setup that is known to work well.
1 parent 0e4401f commit c71d466

File tree

3 files changed

+12
-83
lines changed

3 files changed

+12
-83
lines changed

server/ai_http.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler {
214214
clog.Warningf(ctx, "No price info found for model %v, Orchestrator will not charge for video processing", modelID)
215215
}
216216

217-
// Subscribe to the publishUrl for payments monitoring and payment processing
217+
// For every segment, check payments
218218
go func() {
219219
sub := trickle.NewLocalSubscriber(h.trickleSrv, mid)
220220
for {
@@ -227,11 +227,12 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler {
227227
closeSession()
228228
return
229229
}
230-
reader := segment.Reader
231230
if paymentProcessor != nil {
232-
reader = paymentProcessor.process(ctx, segment.Reader)
231+
paymentProcessor.process(ctx)
233232
}
234-
io.Copy(io.Discard, reader)
233+
// read the segment so we know when it is complete, otherwise sub.Read()
234+
// would rapidly request follow-on segments that do not yet exist
235+
io.Copy(io.Discard, segment.Reader)
235236
}
236237
}()
237238

server/ai_live_video.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ func startTricklePublish(ctx context.Context, url *url.URL, params aiRequestPara
121121
defer slowOrchChecker.EndSegment()
122122
var r io.Reader = reader
123123
if paymentProcessor != nil {
124-
r = paymentProcessor.process(ctx, reader)
124+
paymentProcessor.process(ctx)
125125
}
126126

127127
clog.V(8).Infof(ctx, "trickle publish writing data seq=%d", seq)

server/live_payment_processor.go

Lines changed: 6 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@ package server
22

33
import (
44
"context"
5-
"io"
6-
"os"
75
"sync"
86
"time"
97

@@ -24,11 +22,6 @@ type LivePaymentProcessor struct {
2422
lastProcessedMu sync.RWMutex
2523
processCh chan time.Time
2624

27-
lastProbedAt time.Time
28-
lastProbedSegInfoMu sync.RWMutex
29-
lastProbedSegInfo *ffmpeg.MediaFormatInfo
30-
probeSegCh chan *segment
31-
3225
processSegmentFunc func(inPixels int64) error
3326
}
3427

@@ -44,10 +37,6 @@ func NewLivePaymentProcessor(ctx context.Context, processInterval time.Duration,
4437
processCh: make(chan time.Time, 1),
4538
processSegmentFunc: processSegmentFunc,
4639
lastProcessedAt: time.Now(),
47-
48-
lastProbedAt: time.Now(),
49-
lastProbedSegInfo: &defaultSegInfo,
50-
probeSegCh: make(chan *segment, 1),
5140
}
5241
pp.start(ctx)
5342
return pp
@@ -63,19 +52,6 @@ func (p *LivePaymentProcessor) start(ctx context.Context) {
6352
clog.Info(ctx, "Done processing payments for session")
6453
return
6554
}
66-
67-
}
68-
}()
69-
go func() {
70-
for {
71-
select {
72-
case seg := <-p.probeSegCh:
73-
p.probeOne(ctx, seg)
74-
case <-ctx.Done():
75-
clog.Info(ctx, "Done probing segments for session")
76-
return
77-
}
78-
7955
}
8056
}()
8157
}
@@ -85,14 +61,12 @@ func (p *LivePaymentProcessor) processOne(ctx context.Context, timestamp time.Ti
8561
return
8662
}
8763

88-
p.lastProbedSegInfoMu.RLock()
89-
info := p.lastProbedSegInfo
90-
p.lastProbedSegInfoMu.RUnlock()
64+
info := defaultSegInfo
9165

9266
pixelsPerSec := float64(info.Height) * float64(info.Width) * float64(info.FPS)
9367
secSinceLastProcessed := timestamp.Sub(p.lastProcessedAt).Seconds()
9468
pixelsSinceLastProcessed := pixelsPerSec * secSinceLastProcessed
95-
clog.V(6).Infof(ctx, "Processing live payment: secSinceLastProcessed=%v, pixelsSinceLastProcessed=%v, height=%d, width=%d, FPS=%v", secSinceLastProcessed, pixelsSinceLastProcessed, info.Height, info.Width, info.FPS)
69+
clog.Info(ctx, "Processing live payment", "secsSinceLastProcessed", secSinceLastProcessed, "pixelsSinceLastProcessed", pixelsSinceLastProcessed)
9670

9771
err := p.processSegmentFunc(int64(pixelsSinceLastProcessed))
9872
if err != nil {
@@ -106,44 +80,22 @@ func (p *LivePaymentProcessor) processOne(ctx context.Context, timestamp time.Ti
10680
p.lastProcessedAt = timestamp
10781
}
10882

109-
func (p *LivePaymentProcessor) process(ctx context.Context, reader io.Reader) io.Reader {
83+
func (p *LivePaymentProcessor) process(ctx context.Context) {
11084
timestamp := time.Now()
11185
if p.shouldSkip(timestamp) {
112-
// We don't process every segment, because it's too compute-expensive
113-
return reader
114-
}
115-
116-
pipeReader, pipeWriter, err := os.Pipe()
117-
if err != nil {
118-
clog.InfofErr(ctx, "Error creating pipe", err)
119-
return reader
86+
// Only need to process segments periodically
87+
return
12088
}
12189

122-
resReader := io.TeeReader(reader, pipeWriter)
12390
go func() {
12491
select {
12592
case p.processCh <- timestamp:
12693
default:
12794
// We process one segment at the time, no need to buffer them
12895
}
129-
130-
// read the segment into the buffer, because the direct use of the reader causes Broken pipe
131-
// it's probably related to different pace of reading by trickle and ffmpeg.GetCodecInfo()
132-
defer pipeReader.Close()
133-
segData, err := io.ReadAll(pipeReader)
134-
if err != nil {
135-
clog.InfofErr(ctx, "Error reading segment data", err)
136-
return
137-
}
138-
139-
select {
140-
case p.probeSegCh <- &segment{timestamp: timestamp, segData: segData}:
141-
default:
142-
// We process one segment at the time, no need to buffer them
143-
}
14496
}()
14597

146-
return resReader
98+
return
14799
}
148100

149101
func (p *LivePaymentProcessor) shouldSkip(timestamp time.Time) bool {
@@ -156,27 +108,3 @@ func (p *LivePaymentProcessor) shouldSkip(timestamp time.Time) bool {
156108
}
157109
return false
158110
}
159-
160-
func (p *LivePaymentProcessor) probeOne(ctx context.Context, seg *segment) {
161-
if p.lastProbedAt.Add(p.interval).After(seg.timestamp) {
162-
// We don't probe every segment, because it's too compute-expensive
163-
return
164-
}
165-
166-
info, err := probeSegment(ctx, seg)
167-
if err != nil {
168-
clog.Warningf(ctx, "Error probing segment, err=%v", err)
169-
return
170-
}
171-
clog.V(6).Infof(ctx, "Probed segment: height=%d, width=%d, FPS=%v", info.Height, info.Width, info.FPS)
172-
173-
p.lastProbedSegInfoMu.Lock()
174-
defer p.lastProbedSegInfoMu.Unlock()
175-
p.lastProbedSegInfo = &info
176-
p.lastProbedAt = seg.timestamp
177-
}
178-
179-
func probeSegment(ctx context.Context, seg *segment) (ffmpeg.MediaFormatInfo, error) {
180-
// Return a constant value to calculate payments based on time intervals rather than input segment pixel data
181-
return defaultSegInfo, nil
182-
}

0 commit comments

Comments
 (0)