11package processor
22
33import (
4+ "context"
45 "crypto/ecdsa"
56 "encoding/hex"
67
78 "github.com/golang/protobuf/proto"
89 "github.com/pkg/errors"
10+ otelattribute "go.opentelemetry.io/otel/attribute"
11+ oteltrace "go.opentelemetry.io/otel/trace"
912 "go.uber.org/zap"
1013
1114 "github.com/status-im/status-go/crypto"
1215 cryptotypes "github.com/status-im/status-go/crypto/types"
1316 ethtypes "github.com/status-im/status-go/eth-node/types"
17+ "github.com/status-im/status-go/internal/instrumentation/trace"
1418 "github.com/status-im/status-go/messaging/adapters"
1519 "github.com/status-im/status-go/messaging/common"
20+ "github.com/status-im/status-go/messaging/controller/utils"
1621 "github.com/status-im/status-go/messaging/layers/encryption"
1722 "github.com/status-im/status-go/messaging/layers/encryption/sharedsecret"
1823 "github.com/status-im/status-go/messaging/layers/segmentation"
@@ -37,6 +42,8 @@ type Processor struct {
3742
3843 publisher * pubsub.Publisher
3944 logger * zap.Logger
45+
46+ tracer trace.Tracer
4047}
4148
4249func NewProcessor (
@@ -45,6 +52,7 @@ func NewProcessor(
4552 messageConfirmationStorage common.MessageConfirmationPersistence ,
4653 hashRatchetStorage common.HashRatchetPersistence ,
4754 logger * zap.Logger ,
55+ tracer trace.Tracer ,
4856) * Processor {
4957 return & Processor {
5058 identity : identity ,
@@ -54,6 +62,7 @@ func NewProcessor(
5462 ephemeralKeysManager : NewEphemeralKeysManager (maxNumOfEphemeralKeys ),
5563 publisher : pubsub .NewPublisher (),
5664 logger : logger .Named ("processor" ),
65+ tracer : tracer ,
5766 }
5867}
5968
@@ -122,29 +131,48 @@ func (r *Processor) processMessage(m *types.ReceivedMessage) (*processMessageRes
122131 return nil , err
123132 }
124133
134+ hashes := [][]byte {m .Hash }
125135 if responseMessage .SegmentationLayer .Segmented {
126136 // Segments not completed yet, stop processing
127137 if ! responseMessage .SegmentationLayer .Completed {
128138 return nil , nil
129139 }
140+ hashes = responseMessage .SegmentationLayer .Hashes
130141 }
131142
132- err = r .processEncryptionLayer (responseMessage , logger )
133- if err != nil {
134- logger .Debug ("failed to process encryption layer" , zap .Error (err ))
143+ ctx , span := r .tracer .Start (trace .DeriveRemoteContext (utils .MergeByteSlices (hashes )), "Processor.processMessage" ,
144+ oteltrace .WithAttributes (
145+ otelattribute .StringSlice ("transportHashes" , cryptotypes .EncodeHexes (hashes )),
146+ ),
147+ )
148+ defer span .End ()
135149
150+ err = r .processEncryptionLayer (ctx , responseMessage , logger )
151+ if err == nil {
152+ span .AddEvent ("encryption layer processed" )
153+ } else {
136154 // Hash ratchet with a group id not found yet, save the message for future processing
137155 if err == encryption .ErrHashRatchetGroupIDNotFound && len (responseMessage .EncryptionLayer .HashRatchetInfo ) == 1 {
138156 info := responseMessage .EncryptionLayer .HashRatchetInfo [0 ]
157+ span .AddEvent ("hash ratchet with group id not found yet" , oteltrace .WithAttributes (
158+ otelattribute .String ("groupID" , cryptotypes .ToHex (info .GroupID )),
159+ ))
139160 return nil , r .hashRatchetStorage .SaveMessage (info .GroupID , info .KeyID , m )
161+ } else {
162+ span .AddEvent ("encryption layer not processed" , oteltrace .WithAttributes (
163+ otelattribute .String ("error" , err .Error ()),
164+ ))
165+ logger .Debug ("failed to process encryption layer" , zap .Error (err ))
140166 }
141167 }
142168
143169 messages , ackedMessageIDs , err := r .processReliabilityLayer (responseMessage , logger )
144170 if err == nil {
171+ span .AddEvent ("reliability layer processed" )
145172 response .messages = messages
146173 response .ackedMessageIDs = ackedMessageIDs
147174 } else {
175+ span .AddEvent ("reliability layer not processed" )
148176 logger .Debug ("failed to process reliability layer" , zap .Error (err ))
149177 }
150178
@@ -211,38 +239,45 @@ func processTransportLayer(m *types.Message, receivedMessage *types.ReceivedMess
211239 return nil
212240}
213241
214- func (r * Processor ) processSegmentationLayer (m * types.Message ) (segmented , completed bool , err error ) {
215- var reconstructedPayload []byte
216- reconstructedPayload , err = r .stack .Segmentation .Reconstruct (m .TransportLayer .Payload , m .TransportLayer .SigPubKey )
242+ func (r * Processor ) processSegmentationLayer (m * types.Message ) error {
243+ reconstructedPayload , transportIDs , err := r .stack .Segmentation .Reconstruct (
244+ m .TransportLayer .Payload ,
245+ m .TransportLayer .SigPubKey ,
246+ m .TransportLayer .Hash )
217247
218248 switch err {
219249 case nil :
220250 m .TransportLayer .Payload = reconstructedPayload
221- segmented = true
222- completed = true
251+ m .SegmentationLayer .Segmented = true
252+ m .SegmentationLayer .Completed = true
253+ m .SegmentationLayer .Hashes = transportIDs
223254 case segmentation .ErrIncomplete :
224- segmented = true
225- completed = false
255+ m . SegmentationLayer . Segmented = true
256+ m . SegmentationLayer . Completed = false
226257 err = nil
227258 case segmentation .ErrInvalidPayload :
228- segmented = false
229- completed = false
259+ m . SegmentationLayer . Segmented = false
260+ m . SegmentationLayer . Completed = false
230261 err = nil
231262 }
232263
233- return
264+ return err
234265}
235266
236- func (r * Processor ) processEncryptionLayer (m * types.Message , logger * zap.Logger ) error {
267+ func (r * Processor ) processEncryptionLayer (ctx context. Context , m * types.Message , logger * zap.Logger ) error {
237268 logger = logger .Named ("processEncryptionLayer" )
238269
270+ ctx , span := r .tracer .Start (ctx , "Processor.processEncryptionLayer" )
271+ defer span .End ()
272+
239273 // As we handle non-encrypted messages, we make sure that DecryptPayload
240274 // is set regardless of whether this step is successful
241275 m .EncryptionLayer .Payload = m .TransportLayer .Payload
242276
243277 // if it's an ephemeral key, we don't negotiate a topic
244278 ephemeralKey := r .ephemeralKeysManager .GetPrivateKeyFor (m .TransportLayer .Dst )
245279 if ephemeralKey != nil {
280+ span .AddEvent ("targeted ephemeral key" )
246281 return nil
247282 }
248283
@@ -253,6 +288,7 @@ func (r *Processor) processEncryptionLayer(m *types.Message, logger *zap.Logger)
253288 }
254289
255290 response , err := r .stack .Encryption .HandleMessage (
291+ ctx ,
256292 r .identity ,
257293 m .SigPubKey (),
258294 & protocolMessage ,
0 commit comments