Skip to content

Commit 52633ee

Browse files
authored
Move signalling utils to protocol so that both server and Go SDK can use (#1138)
it.
1 parent 93319bf commit 52633ee

File tree

8 files changed

+757
-3
lines changed

8 files changed

+757
-3
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ require (
88
github.com/dennwc/iters v1.1.0
99
github.com/frostbyte73/core v0.1.1
1010
github.com/fsnotify/fsnotify v1.9.0
11-
github.com/gammazero/deque v1.0.0
11+
github.com/gammazero/deque v1.1.0
1212
github.com/go-jose/go-jose/v3 v3.0.4
1313
github.com/go-logr/logr v1.4.3
1414
github.com/hashicorp/go-retryablehttp v0.7.7

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@ github.com/frostbyte73/core v0.1.1 h1:ChhJOR7bAKOCPbA+lqDLE2cGKlCG5JXsDvvQr4YaJI
5151
github.com/frostbyte73/core v0.1.1/go.mod h1:mhfOtR+xWAvwXiwor7jnqPMnu4fxbv1F2MwZ0BEpzZo=
5252
github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k=
5353
github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
54-
github.com/gammazero/deque v1.0.0 h1:LTmimT8H7bXkkCy6gZX7zNLtkbz4NdS2z8LZuor3j34=
55-
github.com/gammazero/deque v1.0.0/go.mod h1:iflpYvtGfM3U8S8j+sZEKIak3SAKYpA5/SQewgfXDKo=
54+
github.com/gammazero/deque v1.1.0 h1:OyiyReBbnEG2PP0Bnv1AASLIYvyKqIFN5xfl1t8oGLo=
55+
github.com/gammazero/deque v1.1.0/go.mod h1:JVrR+Bj1NMQbPnYclvDlvSX0nVGReLrQZ0aUMuWLctg=
5656
github.com/go-jose/go-jose/v3 v3.0.4 h1:Wp5HA7bLQcKnf6YYao/4kpRpVMp/yf6+pJKV8WFSaNY=
5757
github.com/go-jose/go-jose/v3 v3.0.4/go.mod h1:5b+7YgP7ZICgJDBdfjZaIt+H/9L9T/YQrVfLAMboGkQ=
5858
github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=

signalling/signalfragment_test.go

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
// Copyright 2023 LiveKit, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package signalling
16+
17+
import (
18+
"testing"
19+
20+
"github.com/stretchr/testify/require"
21+
"google.golang.org/protobuf/proto"
22+
23+
"github.com/livekit/protocol/livekit"
24+
)
25+
26+
func TestSignalFragment(t *testing.T) {
27+
inputMessage := &livekit.Envelope{
28+
ServerMessages: []*livekit.Signalv2ServerMessage{
29+
{
30+
Message: &livekit.Signalv2ServerMessage_ConnectResponse{
31+
ConnectResponse: &livekit.ConnectResponse{
32+
SifTrailer: []byte("abcdefghijklmnopqrstuvwxyz0123456789"),
33+
},
34+
},
35+
},
36+
{
37+
Message: &livekit.Signalv2ServerMessage_ConnectResponse{
38+
ConnectResponse: &livekit.ConnectResponse{
39+
SifTrailer: []byte("0123456789abcdefghijklmnopqrstuvwxyz0123456789"),
40+
},
41+
},
42+
},
43+
{
44+
Message: &livekit.Signalv2ServerMessage_ConnectResponse{
45+
ConnectResponse: &livekit.ConnectResponse{
46+
SifTrailer: []byte("ABCDEFGHIJKLMNOPQRSTabcdefghijklmnopqrstuvwxyz0123456789"),
47+
},
48+
},
49+
},
50+
},
51+
}
52+
53+
t.Run("no segmentation needed", func(t *testing.T) {
54+
sr := NewSignalSegmenter(SignalSegmenterParams{
55+
MaxFragmentSize: 5_000_000,
56+
})
57+
58+
marshalled, err := proto.Marshal(inputMessage)
59+
require.NoError(t, err)
60+
require.Nil(t, sr.Segment(marshalled))
61+
})
62+
63+
t.Run("segmentation + reassembly", func(t *testing.T) {
64+
maxFragmentSize := 5
65+
sr := NewSignalSegmenter(SignalSegmenterParams{
66+
MaxFragmentSize: maxFragmentSize,
67+
})
68+
69+
marshalled, err := proto.Marshal(inputMessage)
70+
require.NoError(t, err)
71+
72+
expectedNumFragments := (len(marshalled) + maxFragmentSize - 1) / maxFragmentSize
73+
74+
fragments := sr.Segment(marshalled)
75+
require.NotZero(t, len(fragments))
76+
require.Equal(t, uint32(len(marshalled)), fragments[0].TotalSize)
77+
78+
rr := NewSignalReassembler(SignalReassemblerParams{})
79+
var reassembled []byte
80+
for idx, fragment := range fragments {
81+
require.Equal(t, uint32(idx+1), fragment.FragmentNumber)
82+
require.NotZero(t, fragment.FragmentSize)
83+
require.Equal(t, uint32(expectedNumFragments), fragment.NumFragments)
84+
require.Equal(t, fragment.FragmentSize, uint32(len(fragment.Data)))
85+
86+
reassembled = rr.Reassemble(fragment)
87+
}
88+
require.Equal(t, marshalled, reassembled)
89+
})
90+
91+
t.Run("runt", func(t *testing.T) {
92+
maxFragmentSize := 5
93+
sr := NewSignalSegmenter(SignalSegmenterParams{
94+
MaxFragmentSize: maxFragmentSize,
95+
})
96+
97+
marshalled, err := proto.Marshal(inputMessage)
98+
require.NoError(t, err)
99+
100+
fragments := sr.Segment(marshalled)
101+
102+
rr := NewSignalReassembler(SignalReassemblerParams{})
103+
var reassembled []byte
104+
for idx, fragment := range fragments {
105+
// do not send one packet into re-assembly initially, re-assembly should not succeed
106+
if idx == 0 {
107+
continue
108+
}
109+
110+
reassembled = rr.Reassemble(fragment)
111+
}
112+
require.Zero(t, len(reassembled))
113+
114+
// submit 1st fragment and ensure reassembly completes
115+
reassembled = rr.Reassemble(fragments[0])
116+
require.Equal(t, marshalled, reassembled)
117+
})
118+
119+
t.Run("corrupted", func(t *testing.T) {
120+
maxFragmentSize := 5
121+
sr := NewSignalSegmenter(SignalSegmenterParams{
122+
MaxFragmentSize: maxFragmentSize,
123+
})
124+
125+
marshalled, err := proto.Marshal(inputMessage)
126+
require.NoError(t, err)
127+
128+
fragments := sr.Segment(marshalled)
129+
130+
rr := NewSignalReassembler(SignalReassemblerParams{})
131+
var reassembled []byte
132+
for idx, fragment := range fragments {
133+
// corrupt a fragment, re-assembly should fail
134+
if idx == 0 {
135+
fragment.FragmentSize += 1
136+
}
137+
138+
reassembled = rr.Reassemble(fragment)
139+
}
140+
require.Zero(t, len(reassembled))
141+
})
142+
}

signalling/signalreassembler.go

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
// Copyright 2023 LiveKit, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package signalling
16+
17+
import (
18+
"sync"
19+
"time"
20+
21+
"github.com/livekit/protocol/livekit"
22+
"github.com/livekit/protocol/logger"
23+
"github.com/livekit/protocol/utils"
24+
"go.uber.org/zap/zapcore"
25+
)
26+
27+
const (
28+
reassemblerTimeout = time.Minute
29+
)
30+
31+
type reassembly struct {
32+
packetId uint32
33+
startedAt time.Time
34+
fragments []*livekit.Fragment
35+
isCorrupted bool
36+
tqi *utils.TimeoutQueueItem[*reassembly]
37+
}
38+
39+
func (r *reassembly) MarshalLogObject(e zapcore.ObjectEncoder) error {
40+
if r == nil {
41+
return nil
42+
}
43+
44+
e.AddUint32("packetId", r.packetId)
45+
e.AddTime("startAt", r.startedAt)
46+
e.AddDuration("age", time.Since(r.startedAt))
47+
48+
expectedNumberOfFragments := len(r.fragments)
49+
expectedTotalSize := uint32(0)
50+
availableSize := uint32(0)
51+
var availableFragments []uint32
52+
for _, fragment := range r.fragments {
53+
if fragment == nil {
54+
continue
55+
}
56+
57+
expectedTotalSize = fragment.TotalSize
58+
availableSize += fragment.FragmentSize
59+
availableFragments = append(availableFragments, fragment.FragmentNumber)
60+
}
61+
e.AddInt("expectedNumberOfFragments", expectedNumberOfFragments)
62+
e.AddUint32("expectedTotalSize", expectedTotalSize)
63+
e.AddUint32("availableSize", availableSize)
64+
e.AddArray("availableFragments", logger.Uint32Slice(availableFragments))
65+
66+
e.AddBool("isCorrupted", r.isCorrupted)
67+
return nil
68+
}
69+
70+
// ------------------------------------------------
71+
72+
type SignalReassemblerParams struct {
73+
Logger logger.Logger
74+
}
75+
76+
type SignalReassembler struct {
77+
params SignalReassemblerParams
78+
79+
lock sync.Mutex
80+
reassemblies map[uint32]*reassembly
81+
82+
timeoutQueue utils.TimeoutQueue[*reassembly]
83+
}
84+
85+
func NewSignalReassembler(params SignalReassemblerParams) *SignalReassembler {
86+
return &SignalReassembler{
87+
params: params,
88+
reassemblies: make(map[uint32]*reassembly),
89+
}
90+
}
91+
92+
func (s *SignalReassembler) Reassemble(fragment *livekit.Fragment) []byte {
93+
s.lock.Lock()
94+
defer s.lock.Unlock()
95+
96+
re, ok := s.reassemblies[fragment.PacketId]
97+
if !ok {
98+
re = &reassembly{
99+
packetId: fragment.PacketId,
100+
startedAt: time.Now(),
101+
fragments: make([]*livekit.Fragment, fragment.NumFragments),
102+
}
103+
re.tqi = &utils.TimeoutQueueItem[*reassembly]{Value: re}
104+
105+
s.reassemblies[fragment.PacketId] = re
106+
}
107+
if int(fragment.FragmentNumber) <= len(re.fragments) {
108+
if int(fragment.FragmentSize) != len(fragment.Data) {
109+
re.isCorrupted = true // runt packet, data size of blob does not match fragment size
110+
} else {
111+
re.fragments[fragment.FragmentNumber-1] = fragment
112+
}
113+
} else {
114+
re.isCorrupted = true
115+
}
116+
117+
if re.isCorrupted {
118+
return nil
119+
}
120+
121+
// try to reassemble
122+
expectedTotalSize := uint32(0)
123+
totalSize := 0
124+
for _, fr := range re.fragments {
125+
if fr == nil {
126+
return nil // not received all fragments of packet yet
127+
}
128+
129+
expectedTotalSize = fr.TotalSize // can read this from any fragment of packet
130+
totalSize += len(fr.Data)
131+
}
132+
if expectedTotalSize != 0 && uint32(totalSize) != expectedTotalSize {
133+
re.isCorrupted = true
134+
return nil
135+
}
136+
137+
data := make([]byte, 0, expectedTotalSize)
138+
for _, fr := range re.fragments {
139+
data = append(data, fr.Data...)
140+
}
141+
delete(s.reassemblies, re.packetId) // fully re-assembled, can be deleted from cache
142+
return data
143+
}
144+
145+
func (s *SignalReassembler) Prune() {
146+
for it := s.timeoutQueue.IterateRemoveAfter(reassemblerTimeout); it.Next(); {
147+
re := it.Item().Value
148+
s.params.Logger.Infow("pruning stale reassembly packet", "reassembly", re)
149+
150+
s.lock.Lock()
151+
delete(s.reassemblies, re.packetId)
152+
s.lock.Unlock()
153+
}
154+
}

signalling/signalsegmenter.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
// Copyright 2023 LiveKit, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package signalling
16+
17+
import (
18+
"math/rand"
19+
20+
"github.com/livekit/protocol/livekit"
21+
"github.com/livekit/protocol/logger"
22+
"go.uber.org/atomic"
23+
)
24+
25+
const (
26+
defaultMaxFragmentSize = 8192
27+
)
28+
29+
type SignalSegmenterParams struct {
30+
Logger logger.Logger
31+
MaxFragmentSize int
32+
FirstPacketId uint32 // should be used for testing only
33+
}
34+
35+
type SignalSegmenter struct {
36+
params SignalSegmenterParams
37+
38+
packetId atomic.Uint32
39+
}
40+
41+
func NewSignalSegmenter(params SignalSegmenterParams) *SignalSegmenter {
42+
s := &SignalSegmenter{
43+
params: params,
44+
}
45+
if s.params.MaxFragmentSize == 0 {
46+
s.params.MaxFragmentSize = defaultMaxFragmentSize
47+
}
48+
s.packetId.Store(params.FirstPacketId)
49+
if s.packetId.Load() == 0 {
50+
s.packetId.Store(uint32(rand.Intn(1<<8) + 1))
51+
}
52+
return s
53+
}
54+
55+
func (s *SignalSegmenter) Segment(data []byte) []*livekit.Fragment {
56+
if len(data) <= s.params.MaxFragmentSize {
57+
return nil
58+
}
59+
60+
var fragments []*livekit.Fragment
61+
numFragments := uint32((len(data) + s.params.MaxFragmentSize - 1) / s.params.MaxFragmentSize)
62+
fragmentNumber := uint32(1)
63+
consumed := 0
64+
packetId := s.packetId.Inc()
65+
for len(data[consumed:]) != 0 {
66+
fragmentSize := min(len(data[consumed:]), s.params.MaxFragmentSize)
67+
fragment := &livekit.Fragment{
68+
PacketId: packetId,
69+
FragmentNumber: fragmentNumber,
70+
NumFragments: numFragments,
71+
FragmentSize: uint32(fragmentSize),
72+
TotalSize: uint32(len(data)),
73+
Data: data[consumed : consumed+fragmentSize],
74+
}
75+
fragments = append(fragments, fragment)
76+
fragmentNumber++
77+
consumed += fragmentSize
78+
}
79+
80+
return fragments
81+
}

0 commit comments

Comments
 (0)