11package processor
22
33import (
4+ "context"
45 "crypto/ecdsa"
56 "encoding/hex"
67
78 "github.com/golang/protobuf/proto"
89 "github.com/pkg/errors"
10+ otelattribute "go.opentelemetry.io/otel/attribute"
11+ oteltrace "go.opentelemetry.io/otel/trace"
912 "go.uber.org/zap"
1013
1114 "github.com/status-im/status-go/crypto"
1215 cryptotypes "github.com/status-im/status-go/crypto/types"
1316 ethtypes "github.com/status-im/status-go/eth-node/types"
17+ "github.com/status-im/status-go/internal/instrumentation/trace"
1418 "github.com/status-im/status-go/messaging/adapters"
1519 "github.com/status-im/status-go/messaging/common"
20+ "github.com/status-im/status-go/messaging/controller/utils"
1621 "github.com/status-im/status-go/messaging/layers/encryption"
1722 "github.com/status-im/status-go/messaging/layers/encryption/sharedsecret"
1823 "github.com/status-im/status-go/messaging/layers/segmentation"
@@ -37,6 +42,8 @@ type Processor struct {
3742
3843 publisher * pubsub.Publisher
3944 logger * zap.Logger
45+
46+ tracer trace.Tracer
4047}
4148
4249func NewProcessor (
@@ -45,6 +52,7 @@ func NewProcessor(
4552 messageConfirmationStorage common.MessageConfirmationPersistence ,
4653 hashRatchetStorage common.HashRatchetPersistence ,
4754 logger * zap.Logger ,
55+ tracer trace.Tracer ,
4856) * Processor {
4957 return & Processor {
5058 identity : identity ,
@@ -54,6 +62,7 @@ func NewProcessor(
5462 ephemeralKeysManager : NewEphemeralKeysManager (maxNumOfEphemeralKeys ),
5563 publisher : pubsub .NewPublisher (),
5664 logger : logger .Named ("processor" ),
65+ tracer : tracer ,
5766 }
5867}
5968
@@ -122,29 +131,49 @@ func (r *Processor) processMessage(m *types.ReceivedMessage) (*processMessageRes
122131 return nil , err
123132 }
124133
134+ hashes := [][]byte {m .Hash }
125135 if responseMessage .SegmentationLayer .Segmented {
126136 // Segments not completed yet, stop processing
127137 if ! responseMessage .SegmentationLayer .Completed {
128138 return nil , nil
129139 }
140+ hashes = responseMessage .SegmentationLayer .Hashes
130141 }
131142
132- err = r .processEncryptionLayer (responseMessage , logger )
133- if err != nil {
134- logger .Debug ("failed to process encryption layer" , zap .Error (err ))
143+ ctx , span := r .tracer .Start (trace .DeriveRemoteContext (utils .MergeByteSlices (hashes )), "Processor.processMessage" ,
144+ oteltrace .WithAttributes (
145+ otelattribute .String ("hash" , cryptotypes .EncodeHex (m .Hash )),
146+ otelattribute .StringSlice ("hashes" , cryptotypes .EncodeHexes (hashes )),
147+ ),
148+ )
149+ defer span .End ()
135150
151+ err = r .processEncryptionLayer (ctx , responseMessage , logger )
152+ if err == nil {
153+ span .AddEvent ("encryption layer processed" )
154+ } else {
136155 // Hash ratchet with a group id not found yet, save the message for future processing
137156 if err == encryption .ErrHashRatchetGroupIDNotFound && len (responseMessage .EncryptionLayer .HashRatchetInfo ) == 1 {
138157 info := responseMessage .EncryptionLayer .HashRatchetInfo [0 ]
158+ span .AddEvent ("hash ratchet with group id not found yet" , oteltrace .WithAttributes (
159+ otelattribute .String ("groupID" , cryptotypes .ToHex (info .GroupID )),
160+ ))
139161 return nil , r .hashRatchetStorage .SaveMessage (info .GroupID , info .KeyID , m )
162+ } else {
163+ span .AddEvent ("encryption layer not processed" , oteltrace .WithAttributes (
164+ otelattribute .String ("error" , err .Error ()),
165+ ))
166+ logger .Debug ("failed to process encryption layer" , zap .Error (err ))
140167 }
141168 }
142169
143170 messages , ackedMessageIDs , err := r .processReliabilityLayer (responseMessage , logger )
144171 if err == nil {
172+ span .AddEvent ("reliability layer processed" )
145173 response .messages = messages
146174 response .ackedMessageIDs = ackedMessageIDs
147175 } else {
176+ span .AddEvent ("reliability layer not processed" )
148177 logger .Debug ("failed to process reliability layer" , zap .Error (err ))
149178 }
150179
@@ -211,38 +240,45 @@ func processTransportLayer(m *types.Message, receivedMessage *types.ReceivedMess
211240 return nil
212241}
213242
214- func (r * Processor ) processSegmentationLayer (m * types.Message ) (segmented , completed bool , err error ) {
215- var reconstructedPayload []byte
216- reconstructedPayload , err = r .stack .Segmentation .Reconstruct (m .TransportLayer .Payload , m .TransportLayer .SigPubKey )
243+ func (r * Processor ) processSegmentationLayer (m * types.Message ) error {
244+ reconstructedPayload , transportIDs , err := r .stack .Segmentation .Reconstruct (
245+ m .TransportLayer .Payload ,
246+ m .TransportLayer .SigPubKey ,
247+ m .TransportLayer .Hash )
217248
218249 switch err {
219250 case nil :
220251 m .TransportLayer .Payload = reconstructedPayload
221- segmented = true
222- completed = true
252+ m .SegmentationLayer .Segmented = true
253+ m .SegmentationLayer .Completed = true
254+ m .SegmentationLayer .Hashes = transportIDs
223255 case segmentation .ErrIncomplete :
224- segmented = true
225- completed = false
256+ m . SegmentationLayer . Segmented = true
257+ m . SegmentationLayer . Completed = false
226258 err = nil
227259 case segmentation .ErrInvalidPayload :
228- segmented = false
229- completed = false
260+ m . SegmentationLayer . Segmented = false
261+ m . SegmentationLayer . Completed = false
230262 err = nil
231263 }
232264
233- return
265+ return err
234266}
235267
236- func (r * Processor ) processEncryptionLayer (m * types.Message , logger * zap.Logger ) error {
268+ func (r * Processor ) processEncryptionLayer (ctx context. Context , m * types.Message , logger * zap.Logger ) error {
237269 logger = logger .Named ("processEncryptionLayer" )
238270
271+ ctx , span := r .tracer .Start (ctx , "Processor.processEncryptionLayer" )
272+ defer span .End ()
273+
239274 // As we handle non-encrypted messages, we make sure that DecryptPayload
240275 // is set regardless of whether this step is successful
241276 m .EncryptionLayer .Payload = m .TransportLayer .Payload
242277
243278 // if it's an ephemeral key, we don't negotiate a topic
244279 ephemeralKey := r .ephemeralKeysManager .GetPrivateKeyFor (m .TransportLayer .Dst )
245280 if ephemeralKey != nil {
281+ span .AddEvent ("targeted ephemeral key" )
246282 return nil
247283 }
248284
@@ -253,6 +289,7 @@ func (r *Processor) processEncryptionLayer(m *types.Message, logger *zap.Logger)
253289 }
254290
255291 response , err := r .stack .Encryption .HandleMessage (
292+ ctx ,
256293 r .identity ,
257294 m .SigPubKey (),
258295 & protocolMessage ,
0 commit comments