Skip to content

Commit ad24d42

Browse files
authored
Handle disconnect of non-primary and log first response time. (#710)
1 parent 2f7fee3 commit ad24d42

File tree

3 files changed

+37
-24
lines changed

3 files changed

+37
-24
lines changed

engine.go

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -231,10 +231,10 @@ func (e *RTCEngine) SetLogger(l protoLogger.Logger) {
231231
e.signalHandler.SetLogger(l)
232232
e.signalTransport.SetLogger(l)
233233
if e.publisher != nil {
234-
e.publisher.SetLogger(l)
234+
e.publisher.SetLogger(l.WithValues("transport", livekit.SignalTarget_PUBLISHER))
235235
}
236236
if e.subscriber != nil {
237-
e.subscriber.SetLogger(l)
237+
e.subscriber.SetLogger(l.WithValues("transport", livekit.SignalTarget_SUBSCRIBER))
238238
}
239239
}
240240

@@ -254,7 +254,7 @@ func (e *RTCEngine) JoinContext(
254254
)
255255
if e.signallingVersion == signalling.SignallingVersionV2 {
256256
e.pclock.Lock()
257-
e.createPublisherPCLocked(webrtc.Configuration{}, false)
257+
e.createPublisherPCLocked(webrtc.Configuration{})
258258

259259
publisherOffer, err = e.publisher.GetOffer()
260260
if err != nil {
@@ -369,23 +369,23 @@ func (e *RTCEngine) configure(
369369
if e.publisher != nil {
370370
setConfiguration(e.publisher, configuration)
371371
} else {
372-
if err := e.createPublisherPCLocked(configuration, !e.subscriberPrimary); err != nil {
372+
if err := e.createPublisherPCLocked(configuration); err != nil {
373373
return err
374374
}
375375
}
376376

377377
if e.subscriber != nil {
378378
setConfiguration(e.subscriber, configuration)
379379
} else {
380-
if err := e.createSubscriberPCLocked(configuration, e.subscriberPrimary); err != nil {
380+
if err := e.createSubscriberPCLocked(configuration); err != nil {
381381
return err
382382
}
383383
}
384384

385385
return nil
386386
}
387387

388-
func (e *RTCEngine) createPublisherPCLocked(configuration webrtc.Configuration, isPrimary bool) error {
388+
func (e *RTCEngine) createPublisherPCLocked(configuration webrtc.Configuration) error {
389389
var err error
390390
if e.publisher, err = NewPCTransport(PCTransportParams{
391391
Configuration: configuration,
@@ -397,7 +397,7 @@ func (e *RTCEngine) createPublisherPCLocked(configuration webrtc.Configuration,
397397
}); err != nil {
398398
return err
399399
}
400-
e.publisher.SetLogger(e.log)
400+
e.publisher.SetLogger(e.log.WithValues("transport", livekit.SignalTarget_PUBLISHER))
401401

402402
e.publisher.pc.OnICECandidate(func(candidate *webrtc.ICECandidate) {
403403
if candidate == nil {
@@ -407,20 +407,23 @@ func (e *RTCEngine) createPublisherPCLocked(configuration webrtc.Configuration,
407407
init := candidate.ToJSON()
408408
e.log.Debugw(
409409
"local ICE candidate",
410-
"target", livekit.SignalTarget_PUBLISHER,
410+
"transport", livekit.SignalTarget_PUBLISHER,
411411
"candidate", init.Candidate,
412412
)
413413
if err := e.signalTransport.SendMessage(
414414
e.signalling.SignalICECandidate(
415415
protosignalling.ToProtoTrickle(init, livekit.SignalTarget_PUBLISHER, false),
416416
),
417417
); err != nil {
418-
e.log.Errorw("could not send ICE candidates for publisher", err)
418+
e.log.Errorw(
419+
"could not send ICE candidate", err,
420+
"transport", livekit.SignalTarget_PUBLISHER,
421+
)
419422
}
420423
})
421424

422425
e.publisher.pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
423-
e.handleICEConnectionStateChange(e.publisher, livekit.SignalTarget_PUBLISHER, isPrimary, state)
426+
e.handleICEConnectionStateChange(e.publisher, livekit.SignalTarget_PUBLISHER, state)
424427
})
425428

426429
e.publisher.OnOffer = func(offer webrtc.SessionDescription) {
@@ -459,7 +462,6 @@ func (e *RTCEngine) createPublisherPCLocked(configuration webrtc.Configuration,
459462

460463
// SIGNALLING-V2-TODO: may need a separate peer connection
461464
// SIGNALLING-V2-TODO: instantiating this should rely on signal transport strategy rather than signalling version
462-
// SIGNALLING-V2-TODO: for signalling v2 instantiate publisher PC before connect and then do just SetConfiguration in OnConnectResponse
463465
if e.signallingVersion == signalling.SignallingVersionV2 {
464466
e.signallingDC, err = e.publisher.pc.CreateDataChannel(signallingDataChannelName, &webrtc.DataChannelInit{
465467
Ordered: &trueVal,
@@ -475,6 +477,7 @@ func (e *RTCEngine) createPublisherPCLocked(configuration webrtc.Configuration,
475477
SignalHandler: e.signalHandler,
476478
})
477479
e.signalTransport.SetAsyncTransport(signallingTransportDataChannel)
480+
e.signalTransport.Start()
478481
})
479482
e.signallingDC.OnClose(func() {
480483
// SIGNALLING-V2-TODO: should call SignalTransportHandler.OnClose
@@ -486,15 +489,15 @@ func (e *RTCEngine) createPublisherPCLocked(configuration webrtc.Configuration,
486489
return nil
487490
}
488491

489-
func (e *RTCEngine) createSubscriberPCLocked(configuration webrtc.Configuration, isPrimary bool) error {
492+
func (e *RTCEngine) createSubscriberPCLocked(configuration webrtc.Configuration) error {
490493
var err error
491494
if e.subscriber, err = NewPCTransport(PCTransportParams{
492495
Configuration: configuration,
493496
RetransmitBufferSize: e.connParams.RetransmitBufferSize,
494497
}); err != nil {
495498
return err
496499
}
497-
e.subscriber.SetLogger(e.log)
500+
e.subscriber.SetLogger(e.log.WithValues("transport", livekit.SignalTarget_SUBSCRIBER))
498501

499502
e.subscriber.OnRemoteDescriptionSettled(e.createSubscriberPCAnswerAndSend)
500503

@@ -506,20 +509,23 @@ func (e *RTCEngine) createSubscriberPCLocked(configuration webrtc.Configuration,
506509
init := candidate.ToJSON()
507510
e.log.Debugw(
508511
"local ICE candidate",
509-
"target", livekit.SignalTarget_SUBSCRIBER,
512+
"transport", livekit.SignalTarget_SUBSCRIBER,
510513
"candidate", init.Candidate,
511514
)
512515
if err := e.signalTransport.SendMessage(
513516
e.signalling.SignalICECandidate(
514517
protosignalling.ToProtoTrickle(init, livekit.SignalTarget_SUBSCRIBER, false),
515518
),
516519
); err != nil {
517-
e.log.Errorw("could not send ICE candidates for subscriber", err)
520+
e.log.Errorw(
521+
"could not send ICE candidate", err,
522+
"transport", livekit.SignalTarget_SUBSCRIBER,
523+
)
518524
}
519525
})
520526

521527
e.subscriber.pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
522-
e.handleICEConnectionStateChange(e.subscriber, livekit.SignalTarget_SUBSCRIBER, isPrimary, state)
528+
e.handleICEConnectionStateChange(e.subscriber, livekit.SignalTarget_SUBSCRIBER, state)
523529
})
524530

525531
e.subscriber.pc.OnTrack(func(remote *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
@@ -545,7 +551,6 @@ func (e *RTCEngine) createSubscriberPCLocked(configuration webrtc.Configuration,
545551
func (e *RTCEngine) handleICEConnectionStateChange(
546552
transport *PCTransport,
547553
signalTarget livekit.SignalTarget,
548-
isPrimary bool,
549554
state webrtc.ICEConnectionState,
550555
) {
551556
switch state {
@@ -559,9 +564,7 @@ func (e *RTCEngine) handleICEConnectionStateChange(
559564
e.log.Debugw("ICE disconnected", "transport", signalTarget)
560565
case webrtc.ICEConnectionStateFailed:
561566
e.log.Debugw("ICE failed", "transport", signalTarget)
562-
if isPrimary {
563-
e.handleDisconnect(false)
564-
}
567+
e.handleDisconnect(false)
565568
}
566569
}
567570

signalling/signaltransport_http.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -174,12 +174,19 @@ func (s *signalTransportHttp) connect(
174174
s.params.Signalling.SignalSdpOffer(protosignalling.ToProtoSessionDescription(publisherOffer, 0))
175175
}
176176

177-
return s.sendHttpRequest(
178-
urlPrefix+s.params.Signalling.Path(),
177+
path := urlPrefix + s.params.Signalling.Path()
178+
wireMessage := s.params.Signalling.PendingMessages()
179+
180+
startedAt := time.Now()
181+
msg, err := s.sendHttpRequest(
182+
path,
179183
http.MethodPost,
180184
token,
181-
s.params.Signalling.PendingMessages(),
185+
wireMessage,
182186
)
187+
s.params.Logger.Debugw("connect response received", "alapsed", time.Since(startedAt))
188+
189+
return msg, err
183190
}
184191

185192
func (s *signalTransportHttp) sendHttpRequest(

signalling/signaltransport_websocket.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,8 +208,10 @@ func (s *signalTransportWebSocket) connect(
208208
}
209209

210210
header := NewHTTPHeaderWithToken(token)
211+
path := u.String()
212+
211213
startedAt := time.Now()
212-
conn, hresp, err := websocket.DefaultDialer.DialContext(ctx, u.String(), header)
214+
conn, hresp, err := websocket.DefaultDialer.DialContext(ctx, path, header)
213215
if err != nil {
214216
fields := []interface{}{
215217
"duration", time.Since(startedAt),
@@ -236,6 +238,7 @@ func (s *signalTransportWebSocket) connect(
236238
if err != nil {
237239
return nil, err
238240
}
241+
s.params.Logger.Debugw("first message received", "elapsed", time.Since(startedAt))
239242

240243
return res, nil
241244
}

0 commit comments

Comments
 (0)