Skip to content

Commit cda84db

Browse files
committed
add buffer wrapper
1 parent ea10ecb commit cda84db

File tree

10 files changed

+224
-201
lines changed

10 files changed

+224
-201
lines changed

client.go

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -269,10 +269,9 @@ type Client struct {
269269
sdkVersion string
270270
// Transport is read-only. Replacing the transport of an existing client is
271271
// not supported, create a new client instead.
272-
Transport Transport
273-
batchLogger *BatchLogger
274-
telemetryBuffers map[ratelimit.Category]telemetry.BufferInterface[protocol.EnvelopeItemConvertible]
275-
telemetryScheduler *telemetry.Scheduler
272+
Transport Transport
273+
batchLogger *BatchLogger
274+
telemetryBuffer *telemetry.Buffer
276275
}
277276

278277
// NewClient creates and returns an instance of Client configured using
@@ -422,20 +421,19 @@ func (client *Client) setupTelemetryBuffer() {
422421
})
423422
client.Transport = &internalAsyncTransportAdapter{transport: transport}
424423

425-
client.telemetryBuffers = map[ratelimit.Category]telemetry.BufferInterface[protocol.EnvelopeItemConvertible]{
426-
ratelimit.CategoryError: telemetry.NewBuffer[protocol.EnvelopeItemConvertible](ratelimit.CategoryError, 100, telemetry.OverflowPolicyDropOldest, 1, 0),
427-
ratelimit.CategoryTransaction: telemetry.NewBuffer[protocol.EnvelopeItemConvertible](ratelimit.CategoryTransaction, 1000, telemetry.OverflowPolicyDropOldest, 1, 0),
428-
ratelimit.CategoryLog: telemetry.NewBuffer[protocol.EnvelopeItemConvertible](ratelimit.CategoryLog, 100, telemetry.OverflowPolicyDropOldest, 100, 5*time.Second),
429-
ratelimit.CategoryMonitor: telemetry.NewBuffer[protocol.EnvelopeItemConvertible](ratelimit.CategoryMonitor, 100, telemetry.OverflowPolicyDropOldest, 1, 0),
424+
storage := map[ratelimit.Category]telemetry.Storage[protocol.EnvelopeItemConvertible]{
425+
ratelimit.CategoryError: telemetry.NewRingBuffer[protocol.EnvelopeItemConvertible](ratelimit.CategoryError, 100, telemetry.OverflowPolicyDropOldest, 1, 0),
426+
ratelimit.CategoryTransaction: telemetry.NewRingBuffer[protocol.EnvelopeItemConvertible](ratelimit.CategoryTransaction, 1000, telemetry.OverflowPolicyDropOldest, 1, 0),
427+
ratelimit.CategoryLog: telemetry.NewRingBuffer[protocol.EnvelopeItemConvertible](ratelimit.CategoryLog, 100, telemetry.OverflowPolicyDropOldest, 100, 5*time.Second),
428+
ratelimit.CategoryMonitor: telemetry.NewRingBuffer[protocol.EnvelopeItemConvertible](ratelimit.CategoryMonitor, 100, telemetry.OverflowPolicyDropOldest, 1, 0),
430429
}
431430

432431
sdkInfo := &protocol.SdkInfo{
433432
Name: client.sdkIdentifier,
434433
Version: client.sdkVersion,
435434
}
436435

437-
client.telemetryScheduler = telemetry.NewScheduler(client.telemetryBuffers, transport, &client.dsn.Dsn, sdkInfo)
438-
client.telemetryScheduler.Start()
436+
client.telemetryBuffer = telemetry.NewBuffer(storage, transport, &client.dsn.Dsn, sdkInfo)
439437
}
440438

441439
func (client *Client) setupIntegrations() {
@@ -578,7 +576,7 @@ func (client *Client) RecoverWithContext(
578576
// the network synchronously, configure it to use the HTTPSyncTransport in the
579577
// call to Init.
580578
func (client *Client) Flush(timeout time.Duration) bool {
581-
if client.batchLogger != nil || client.telemetryScheduler != nil {
579+
if client.batchLogger != nil || client.telemetryBuffer != nil {
582580
ctx, cancel := context.WithTimeout(context.Background(), timeout)
583581
defer cancel()
584582
return client.FlushWithContext(ctx)
@@ -602,8 +600,8 @@ func (client *Client) FlushWithContext(ctx context.Context) bool {
602600
if client.batchLogger != nil {
603601
client.batchLogger.Flush(ctx.Done())
604602
}
605-
if client.telemetryScheduler != nil {
606-
return client.telemetryScheduler.FlushWithContext(ctx)
603+
if client.telemetryBuffer != nil {
604+
return client.telemetryBuffer.FlushWithContext(ctx)
607605
}
608606
return client.Transport.FlushWithContext(ctx)
609607
}
@@ -613,8 +611,8 @@ func (client *Client) FlushWithContext(ctx context.Context) bool {
613611
// Close should be called after Flush and before terminating the program
614612
// otherwise some events may be lost.
615613
func (client *Client) Close() {
616-
if client.telemetryScheduler != nil {
617-
client.telemetryScheduler.Stop(5 * time.Second)
614+
if client.telemetryBuffer != nil {
615+
client.telemetryBuffer.Close(5 * time.Second)
618616
}
619617
client.Transport.Close()
620618
}
@@ -736,7 +734,13 @@ func (client *Client) processEvent(event *Event, hint *EventHint, scope EventMod
736734
}
737735
}
738736

739-
client.Transport.SendEvent(event)
737+
if client.telemetryBuffer != nil {
738+
if !client.telemetryBuffer.Add(event) {
739+
debuglog.Println("Event dropped: telemetry buffer full or unavailable")
740+
}
741+
} else {
742+
client.Transport.SendEvent(event)
743+
}
740744

741745
return &event.EventID
742746
}

interfaces.go

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -731,34 +731,47 @@ type Log struct {
731731
Attributes map[string]Attribute `json:"attributes,omitempty"`
732732
}
733733

734-
// ToEnvelopeItem converts the Log to a Sentry envelope item.
734+
// ToEnvelopeItem converts the Log to a Sentry envelope item for batching.
735735
func (l *Log) ToEnvelopeItem() (*protocol.EnvelopeItem, error) {
736-
logData, err := json.Marshal(l)
737-
if err != nil {
738-
return nil, err
736+
type logJSON struct {
737+
Timestamp *float64 `json:"timestamp,omitempty"`
738+
TraceID string `json:"trace_id,omitempty"`
739+
Level string `json:"level"`
740+
Severity int `json:"severity_number,omitempty"`
741+
Body string `json:"body,omitempty"`
742+
Attributes map[string]protocol.LogAttribute `json:"attributes,omitempty"`
743+
}
744+
745+
// Convert time.Time to seconds float if set
746+
var ts *float64
747+
if !l.Timestamp.IsZero() {
748+
sec := float64(l.Timestamp.UnixNano()) / 1e9
749+
ts = &sec
739750
}
740-
return &protocol.EnvelopeItem{
741-
Header: &protocol.EnvelopeItemHeader{
742-
Type: protocol.EnvelopeItemTypeLog,
743-
},
744-
Payload: logData,
745-
}, nil
746-
}
747751

748-
// ToLogPayload converts the Log to a protocol.LogItem for batching.
749-
func (l *Log) ToLogPayload() protocol.LogItem {
750752
attrs := make(map[string]protocol.LogAttribute, len(l.Attributes))
751753
for k, v := range l.Attributes {
752754
attrs[k] = protocol.LogAttribute{Value: v.Value, Type: string(v.Type)}
753755
}
754-
return protocol.LogItem{
755-
Timestamp: l.Timestamp,
756+
757+
logData, err := json.Marshal(logJSON{
758+
Timestamp: ts,
756759
TraceID: l.TraceID.String(),
757760
Level: string(l.Level),
758761
Severity: l.Severity,
759762
Body: l.Body,
760763
Attributes: attrs,
764+
})
765+
if err != nil {
766+
return nil, err
761767
}
768+
769+
return &protocol.EnvelopeItem{
770+
Header: &protocol.EnvelopeItemHeader{
771+
Type: protocol.EnvelopeItemTypeLog,
772+
},
773+
Payload: logData,
774+
}, nil
762775
}
763776

764777
// GetCategory returns the rate limit category for logs.

internal/protocol/log_batch.go

Lines changed: 16 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package protocol
22

33
import (
44
"encoding/json"
5-
"time"
65

76
"github.com/getsentry/sentry-go/internal/ratelimit"
87
)
@@ -13,57 +12,28 @@ type LogAttribute struct {
1312
Type string `json:"type"`
1413
}
1514

16-
// LogItem represents the serialized shape of a single log record inside a batched
17-
// log envelope item. Keep in sync with sentry.Log fields that are meant to be emitted.
18-
type LogItem struct {
19-
Timestamp time.Time `json:"timestamp,omitempty"`
20-
TraceID string `json:"trace_id,omitempty"`
21-
Level string `json:"level"`
22-
Severity int `json:"severity_number,omitempty"`
23-
Body string `json:"body,omitempty"`
24-
Attributes map[string]LogAttribute `json:"attributes,omitempty"`
25-
}
26-
27-
// LogPayloader is implemented by items that can convert to a LogItem for batching.
28-
type LogPayloader interface {
29-
ToLogPayload() LogItem
30-
}
15+
// Logs is a container for multiple log items which knows how to convert
16+
// itself into a single batched log envelope item.
17+
type Logs []EnvelopeItemConvertible
3118

32-
// MarshalJSON encodes timestamp as seconds since epoch per Sentry logs spec.
33-
func (lp LogItem) MarshalJSON() ([]byte, error) {
34-
// Convert time.Time to seconds float if set
35-
var ts *float64
36-
if !lp.Timestamp.IsZero() {
37-
sec := float64(lp.Timestamp.UnixNano()) / 1e9
38-
ts = &sec
19+
func (ls Logs) ToEnvelopeItem() (*EnvelopeItem, error) {
20+
// Convert each log to its JSON representation
21+
items := make([]json.RawMessage, 0, len(ls))
22+
for _, log := range ls {
23+
envItem, err := log.ToEnvelopeItem()
24+
if err != nil {
25+
continue
26+
}
27+
items = append(items, envItem.Payload)
3928
}
4029

41-
out := struct {
42-
Timestamp *float64 `json:"timestamp,omitempty"`
43-
TraceID string `json:"trace_id,omitempty"`
44-
Level string `json:"level"`
45-
Severity int `json:"severity_number,omitempty"`
46-
Body string `json:"body,omitempty"`
47-
Attributes map[string]LogAttribute `json:"attributes,omitempty"`
48-
}{
49-
Timestamp: ts,
50-
TraceID: lp.TraceID,
51-
Level: lp.Level,
52-
Severity: lp.Severity,
53-
Body: lp.Body,
54-
Attributes: lp.Attributes,
30+
if len(items) == 0 {
31+
return nil, nil
5532
}
56-
return json.Marshal(out)
57-
}
58-
59-
// Logs is a container for multiple LogItem items which knows how to convert
60-
// itself into a single batched log envelope item.
61-
type Logs []LogItem
6233

63-
func (ls Logs) ToEnvelopeItem() (*EnvelopeItem, error) {
6434
wrapper := struct {
65-
Items []LogItem `json:"items"`
66-
}{Items: ls}
35+
Items []json.RawMessage `json:"items"`
36+
}{Items: items}
6737

6838
payload, err := json.Marshal(wrapper)
6939
if err != nil {

0 commit comments

Comments
 (0)