Skip to content

Commit 5d1d063

Browse files
authored
Merge branch 'master' into ndyakov/CAE-1088-resp3-notification-handlers
2 parents af6a103 + 23a87a2 commit 5d1d063

File tree

2 files changed

+81
-44
lines changed

2 files changed

+81
-44
lines changed

extra/redisotel/config.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ type config struct {
2828
meter metric.Meter
2929

3030
poolName string
31+
32+
closeChan chan struct{}
3133
}
3234

3335
type baseOption interface {
@@ -145,3 +147,9 @@ func WithMeterProvider(mp metric.MeterProvider) MetricsOption {
145147
conf.mp = mp
146148
})
147149
}
150+
151+
func WithCloseChan(closeChan chan struct{}) MetricsOption {
152+
return metricsOption(func(conf *config) {
153+
conf.closeChan = closeChan
154+
})
155+
}

extra/redisotel/metrics.go

Lines changed: 73 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"net"
7+
"sync"
78
"time"
89

910
"go.opentelemetry.io/otel"
@@ -13,6 +14,12 @@ import (
1314
"github.com/redis/go-redis/v9"
1415
)
1516

17+
type metricsState struct {
18+
registrations []metric.Registration
19+
closed bool
20+
mutex sync.Mutex
21+
}
22+
1623
// InstrumentMetrics starts reporting OpenTelemetry Metrics.
1724
//
1825
// Based on https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/semantic_conventions/database-metrics.md
@@ -30,49 +37,42 @@ func InstrumentMetrics(rdb redis.UniversalClient, opts ...MetricsOption) error {
3037
)
3138
}
3239

33-
switch rdb := rdb.(type) {
34-
case *redis.Client:
35-
if conf.poolName == "" {
36-
opt := rdb.Options()
37-
conf.poolName = opt.Addr
40+
var state *metricsState
41+
if conf.closeChan != nil {
42+
state = &metricsState{
43+
registrations: make([]metric.Registration, 0),
44+
closed: false,
45+
mutex: sync.Mutex{},
3846
}
39-
conf.attrs = append(conf.attrs, attribute.String("pool.name", conf.poolName))
4047

41-
if err := reportPoolStats(rdb, conf); err != nil {
42-
return err
43-
}
44-
if err := addMetricsHook(rdb, conf); err != nil {
45-
return err
46-
}
47-
return nil
48-
case *redis.ClusterClient:
49-
rdb.OnNewNode(func(rdb *redis.Client) {
50-
if conf.poolName == "" {
51-
opt := rdb.Options()
52-
conf.poolName = opt.Addr
53-
}
54-
conf.attrs = append(conf.attrs, attribute.String("pool.name", conf.poolName))
48+
go func() {
49+
<-conf.closeChan
5550

56-
if err := reportPoolStats(rdb, conf); err != nil {
57-
otel.Handle(err)
51+
state.mutex.Lock()
52+
state.closed = true
53+
54+
for _, registration := range state.registrations {
55+
if err := registration.Unregister(); err != nil {
56+
otel.Handle(err)
57+
}
5858
}
59-
if err := addMetricsHook(rdb, conf); err != nil {
59+
state.mutex.Unlock()
60+
}()
61+
}
62+
63+
switch rdb := rdb.(type) {
64+
case *redis.Client:
65+
return registerClient(rdb, conf, state)
66+
case *redis.ClusterClient:
67+
rdb.OnNewNode(func(rdb *redis.Client) {
68+
if err := registerClient(rdb, conf, state); err != nil {
6069
otel.Handle(err)
6170
}
6271
})
6372
return nil
6473
case *redis.Ring:
6574
rdb.OnNewNode(func(rdb *redis.Client) {
66-
if conf.poolName == "" {
67-
opt := rdb.Options()
68-
conf.poolName = opt.Addr
69-
}
70-
conf.attrs = append(conf.attrs, attribute.String("pool.name", conf.poolName))
71-
72-
if err := reportPoolStats(rdb, conf); err != nil {
73-
otel.Handle(err)
74-
}
75-
if err := addMetricsHook(rdb, conf); err != nil {
75+
if err := registerClient(rdb, conf, state); err != nil {
7676
otel.Handle(err)
7777
}
7878
})
@@ -82,7 +82,38 @@ func InstrumentMetrics(rdb redis.UniversalClient, opts ...MetricsOption) error {
8282
}
8383
}
8484

85-
func reportPoolStats(rdb *redis.Client, conf *config) error {
85+
func registerClient(rdb *redis.Client, conf *config, state *metricsState) error {
86+
if state != nil {
87+
state.mutex.Lock()
88+
defer state.mutex.Unlock()
89+
90+
if state.closed {
91+
return nil
92+
}
93+
}
94+
95+
if conf.poolName == "" {
96+
opt := rdb.Options()
97+
conf.poolName = opt.Addr
98+
}
99+
conf.attrs = append(conf.attrs, attribute.String("pool.name", conf.poolName))
100+
101+
registration, err := reportPoolStats(rdb, conf)
102+
if err != nil {
103+
return err
104+
}
105+
106+
if state != nil {
107+
state.registrations = append(state.registrations, registration)
108+
}
109+
110+
if err := addMetricsHook(rdb, conf); err != nil {
111+
return err
112+
}
113+
return nil
114+
}
115+
116+
func reportPoolStats(rdb *redis.Client, conf *config) (metric.Registration, error) {
86117
labels := conf.attrs
87118
idleAttrs := append(labels, attribute.String("state", "idle"))
88119
usedAttrs := append(labels, attribute.String("state", "used"))
@@ -92,59 +123,59 @@ func reportPoolStats(rdb *redis.Client, conf *config) error {
92123
metric.WithDescription("The maximum number of idle open connections allowed"),
93124
)
94125
if err != nil {
95-
return err
126+
return nil, err
96127
}
97128

98129
idleMin, err := conf.meter.Int64ObservableUpDownCounter(
99130
"db.client.connections.idle.min",
100131
metric.WithDescription("The minimum number of idle open connections allowed"),
101132
)
102133
if err != nil {
103-
return err
134+
return nil, err
104135
}
105136

106137
connsMax, err := conf.meter.Int64ObservableUpDownCounter(
107138
"db.client.connections.max",
108139
metric.WithDescription("The maximum number of open connections allowed"),
109140
)
110141
if err != nil {
111-
return err
142+
return nil, err
112143
}
113144

114145
usage, err := conf.meter.Int64ObservableUpDownCounter(
115146
"db.client.connections.usage",
116147
metric.WithDescription("The number of connections that are currently in state described by the state attribute"),
117148
)
118149
if err != nil {
119-
return err
150+
return nil, err
120151
}
121152

122153
timeouts, err := conf.meter.Int64ObservableUpDownCounter(
123154
"db.client.connections.timeouts",
124155
metric.WithDescription("The number of connection timeouts that have occurred trying to obtain a connection from the pool"),
125156
)
126157
if err != nil {
127-
return err
158+
return nil, err
128159
}
129160

130161
hits, err := conf.meter.Int64ObservableUpDownCounter(
131162
"db.client.connections.hits",
132163
metric.WithDescription("The number of times free connection was found in the pool"),
133164
)
134165
if err != nil {
135-
return err
166+
return nil, err
136167
}
137168

138169
misses, err := conf.meter.Int64ObservableUpDownCounter(
139170
"db.client.connections.misses",
140171
metric.WithDescription("The number of times free connection was not found in the pool"),
141172
)
142173
if err != nil {
143-
return err
174+
return nil, err
144175
}
145176

146177
redisConf := rdb.Options()
147-
_, err = conf.meter.RegisterCallback(
178+
return conf.meter.RegisterCallback(
148179
func(ctx context.Context, o metric.Observer) error {
149180
stats := rdb.PoolStats()
150181

@@ -168,8 +199,6 @@ func reportPoolStats(rdb *redis.Client, conf *config) error {
168199
hits,
169200
misses,
170201
)
171-
172-
return err
173202
}
174203

175204
func addMetricsHook(rdb *redis.Client, conf *config) error {

0 commit comments

Comments
 (0)