Skip to content

Commit 579d845

Browse files
authored
Add sequence for reliable data message (#692)
1 parent a734a2c commit 579d845

File tree

1 file changed

+29
-12
lines changed

1 file changed

+29
-12
lines changed

engine.go

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,13 @@ type RTCEngine struct {
4646
subscriber *PCTransport
4747
client *SignalClient
4848

49-
dclock sync.RWMutex
50-
reliableDC *webrtc.DataChannel
51-
lossyDC *webrtc.DataChannel
52-
reliableDCSub *webrtc.DataChannel
53-
lossyDCSub *webrtc.DataChannel
49+
dclock sync.RWMutex
50+
reliableDC *webrtc.DataChannel
51+
lossyDC *webrtc.DataChannel
52+
reliableDCSub *webrtc.DataChannel
53+
lossyDCSub *webrtc.DataChannel
54+
reliableMsgLock sync.Mutex
55+
reliableMsgSeq uint32
5456

5557
trackPublishedListenersLock sync.Mutex
5658
trackPublishedListeners map[string]chan *livekit.TrackPublishedResponse
@@ -107,6 +109,7 @@ func NewRTCEngine() *RTCEngine {
107109
client: NewSignalClient(),
108110
trackPublishedListeners: make(map[string]chan *livekit.TrackPublishedResponse),
109111
JoinTimeout: 15 * time.Second,
112+
reliableMsgSeq: 1,
110113
}
111114

112115
e.client.OnParticipantUpdate = func(info []*livekit.ParticipantInfo) {
@@ -284,6 +287,12 @@ func (e *RTCEngine) configure(
284287
subscriberPrimary *bool) error {
285288

286289
configuration := e.makeRTCConfiguration(iceServers, clientConfig)
290+
291+
// reset reliable message sequence
292+
e.reliableMsgLock.Lock()
293+
e.reliableMsgSeq = 1
294+
e.reliableMsgLock.Unlock()
295+
287296
e.pclock.Lock()
288297
defer e.pclock.Unlock()
289298

@@ -835,13 +844,7 @@ func (e *RTCEngine) makeRTCConfiguration(iceServers []*livekit.ICEServer, client
835844
}
836845

837846
func (e *RTCEngine) publishDataPacket(pck *livekit.DataPacket, kind livekit.DataPacket_Kind) error {
838-
data, err := proto.Marshal(pck)
839-
if err != nil {
840-
e.log.Errorw("could not marshal data packet", err)
841-
return err
842-
}
843-
844-
err = e.ensurePublisherConnected(true)
847+
err := e.ensurePublisherConnected(true)
845848
if err != nil {
846849
e.log.Errorw("could not ensure publisher connected", err)
847850
return err
@@ -853,6 +856,20 @@ func (e *RTCEngine) publishDataPacket(pck *livekit.DataPacket, kind livekit.Data
853856
return errors.New("datachannel not found")
854857
}
855858

859+
if kind == livekit.DataPacket_RELIABLE {
860+
e.reliableMsgLock.Lock()
861+
defer e.reliableMsgLock.Unlock()
862+
863+
pck.Sequence = e.reliableMsgSeq
864+
e.reliableMsgSeq++
865+
}
866+
867+
data, err := proto.Marshal(pck)
868+
if err != nil {
869+
e.log.Errorw("could not marshal data packet", err)
870+
return err
871+
}
872+
856873
dc.Send(data)
857874
return nil
858875
}

0 commit comments

Comments
 (0)