Skip to content

Commit 8a2c1ae

Browse files
djeebusCode42Cate
authored andcommitted
Expose some counters to nbd and network pools (#1353)
1 parent 1447626 commit 8a2c1ae

File tree

7 files changed

+89
-72
lines changed

7 files changed

+89
-72
lines changed

packages/orchestrator/benchmark_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ func BenchmarkBaseImageLaunch(b *testing.B) {
128128
sbxlogger.SetSandboxLoggerInternal(logger)
129129
// sbxlogger.SetSandboxLoggerExternal(logger)
130130

131-
networkPool, err := network.NewPool(noop.MeterProvider{}, 8, 8, clientID, networkConfig)
131+
networkPool, err := network.NewPool(8, 8, clientID, networkConfig)
132132
require.NoError(b, err)
133133
go func() {
134134
networkPool.Populate(b.Context())
@@ -139,7 +139,7 @@ func BenchmarkBaseImageLaunch(b *testing.B) {
139139
assert.NoError(b, err)
140140
}()
141141

142-
devicePool, err := nbd.NewDevicePool(noop.MeterProvider{})
142+
devicePool, err := nbd.NewDevicePool()
143143
require.NoError(b, err, "do you have the nbd kernel module installed?")
144144
go func() {
145145
devicePool.Populate(b.Context())

packages/orchestrator/cmd/build-template/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ func buildTemplate(
116116
return fmt.Errorf("could not create storage provider: %w", err)
117117
}
118118

119-
devicePool, err := nbd.NewDevicePool(noop.MeterProvider{})
119+
devicePool, err := nbd.NewDevicePool()
120120
if err != nil {
121121
return fmt.Errorf("could not create device pool: %w", err)
122122
}
@@ -130,7 +130,7 @@ func buildTemplate(
130130
}
131131
}()
132132

133-
networkPool, err := network.NewPool(noop.MeterProvider{}, 8, 8, clientID, networkConfig)
133+
networkPool, err := network.NewPool(8, 8, clientID, networkConfig)
134134
if err != nil {
135135
return fmt.Errorf("could not create network pool: %w", err)
136136
}

packages/orchestrator/cmd/mock-nbd/mock.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010

1111
"github.com/google/uuid"
1212
"github.com/pojntfx/go-nbd/pkg/backend"
13-
"go.opentelemetry.io/otel/metric/noop"
1413

1514
"github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/block"
1615
"github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/nbd"
@@ -87,7 +86,7 @@ func main() {
8786

8887
done := make(chan os.Signal, 1)
8988
signal.Notify(done, os.Interrupt)
90-
devicePool, err := nbd.NewDevicePool(noop.MeterProvider{})
89+
devicePool, err := nbd.NewDevicePool()
9190
if err != nil {
9291
fmt.Fprintf(os.Stderr, "failed to create device pool: %v\n", err)
9392
return

packages/orchestrator/internal/sandbox/nbd/pool.go

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,11 @@ import (
1212
"time"
1313

1414
"github.com/bits-and-blooms/bitset"
15+
"go.opentelemetry.io/otel"
1516
"go.opentelemetry.io/otel/metric"
1617
"go.uber.org/zap"
1718

18-
"github.com/e2b-dev/infra/packages/shared/pkg/telemetry"
19+
"github.com/e2b-dev/infra/packages/shared/pkg/utils"
1920
)
2021

2122
// maxSlotsReady is the number of slots that are ready to be used.
@@ -25,6 +26,22 @@ const (
2526
devicePoolCloseReleaseTimeout = 10 * time.Minute
2627
)
2728

29+
var (
30+
meter = otel.Meter("github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/nbd")
31+
slotCounter = utils.Must(meter.Int64UpDownCounter("orchestrator.nbd.slots_pool.ready",
32+
metric.WithDescription("Number of nbd slots ready to be used."),
33+
metric.WithUnit("{slot}"),
34+
))
35+
acquired = utils.Must(meter.Int64Counter("orchestrator.nbd.slots_pool.acquired",
36+
metric.WithDescription("Number of nbd slots acquired."),
37+
metric.WithUnit("{slot}"),
38+
))
39+
released = utils.Must(meter.Int64Counter("orchestrator.nbd.slots_pool.released",
40+
metric.WithDescription("Number of nbd slots released."),
41+
metric.WithUnit("{slot}"),
42+
))
43+
)
44+
2845
// NoFreeSlotsError is returned when there are no free slots.
2946
// You can retry the request after some time.
3047
type NoFreeSlotsError struct{}
@@ -62,11 +79,9 @@ type DevicePool struct {
6279
mu sync.Mutex
6380

6481
slots chan DeviceSlot
65-
66-
slotCounter metric.Int64UpDownCounter
6782
}
6883

69-
func NewDevicePool(meterProvider metric.MeterProvider) (*DevicePool, error) {
84+
func NewDevicePool() (*DevicePool, error) {
7085
maxDevices, err := getMaxDevices()
7186
if err != nil {
7287
return nil, fmt.Errorf("failed to get max devices: %w", err)
@@ -76,17 +91,10 @@ func NewDevicePool(meterProvider metric.MeterProvider) (*DevicePool, error) {
7691
return nil, errors.New("max devices is 0")
7792
}
7893

79-
meter := meterProvider.Meter("orchestrator.device.pool")
80-
counter, err := telemetry.GetUpDownCounter(meter, telemetry.NBDkSlotSReadyPoolCounterMeterName)
81-
if err != nil {
82-
return nil, fmt.Errorf("failed to get slot pool counter: %w", err)
83-
}
84-
8594
pool := &DevicePool{
86-
done: make(chan struct{}),
87-
usedSlots: bitset.New(maxDevices),
88-
slots: make(chan DeviceSlot, int(math.Min(maxSlotsReady, float64(maxDevices)))),
89-
slotCounter: counter,
95+
done: make(chan struct{}),
96+
usedSlots: bitset.New(maxDevices),
97+
slots: make(chan DeviceSlot, int(math.Min(maxSlotsReady, float64(maxDevices)))),
9098
}
9199

92100
return pool, nil
@@ -133,7 +141,7 @@ func (d *DevicePool) Populate(ctx context.Context) {
133141
}
134142
failedCount = 0
135143

136-
d.slotCounter.Add(ctx, 1)
144+
slotCounter.Add(ctx, 1)
137145

138146
// Use select to avoid panic if context is canceled before writing
139147
select {
@@ -249,12 +257,13 @@ func (d *DevicePool) GetDevice(ctx context.Context) (DeviceSlot, error) {
249257
case <-ctx.Done():
250258
return 0, ctx.Err()
251259
case slot := <-d.slots:
252-
d.slotCounter.Add(ctx, -1)
260+
acquired.Add(ctx, 1)
261+
slotCounter.Add(ctx, -1)
253262
return slot, nil
254263
}
255264
}
256265

257-
func (d *DevicePool) release(idx DeviceSlot) error {
266+
func (d *DevicePool) release(ctx context.Context, idx DeviceSlot) error {
258267
free, err := d.isDeviceFree(idx)
259268
if err != nil {
260269
return fmt.Errorf("failed to check if device is free: %w", err)
@@ -268,6 +277,7 @@ func (d *DevicePool) release(idx DeviceSlot) error {
268277
d.usedSlots.Clear(uint(idx))
269278
d.mu.Unlock()
270279

280+
released.Add(ctx, 1)
271281
return nil
272282
}
273283

@@ -294,7 +304,7 @@ func (d *DevicePool) ReleaseDevice(ctx context.Context, idx DeviceSlot, opts ...
294304

295305
attempt++
296306

297-
err := d.release(idx)
307+
err := d.release(ctx, idx)
298308
if err == nil {
299309
return nil
300310
}

packages/orchestrator/internal/sandbox/network/pool.go

Lines changed: 51 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,45 @@ import (
77
"sync"
88

99
"github.com/caarlos0/env/v11"
10+
"go.opentelemetry.io/otel"
11+
"go.opentelemetry.io/otel/attribute"
1012
"go.opentelemetry.io/otel/metric"
1113
"go.uber.org/zap"
1214

1315
"github.com/e2b-dev/infra/packages/shared/pkg/telemetry"
16+
"github.com/e2b-dev/infra/packages/shared/pkg/utils"
1417
)
1518

1619
const (
1720
NewSlotsPoolSize = 32
1821
ReusedSlotsPoolSize = 100
1922
)
2023

24+
var (
25+
meter = otel.Meter("github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/network")
26+
27+
newSlotsAvailableCounter = utils.Must(meter.Int64UpDownCounter("orchestrator.network.slots_pool.new",
28+
metric.WithDescription("Number of new network slots ready to be used."),
29+
metric.WithUnit("{slot"),
30+
))
31+
reusableSlotsAvailableCounter = utils.Must(meter.Int64UpDownCounter("orchestrator.network.slots_pool.reused",
32+
metric.WithDescription("Number of reused network slots ready to be used."),
33+
metric.WithUnit("{slot}"),
34+
))
35+
acquiredSlots = utils.Must(meter.Int64Counter("orchestrator.network.slots_pool.acquired",
36+
metric.WithDescription("Number of network slots acquired."),
37+
metric.WithUnit("{slot}"),
38+
))
39+
returnedSlotCounter = utils.Must(meter.Int64Counter("orchestrator.network.slots_pool.returned",
40+
metric.WithDescription("Number of network slots returned."),
41+
metric.WithUnit("{slot}"),
42+
))
43+
releasedSlotCounter = utils.Must(meter.Int64Counter("orchestrator.network.slots_pool.released",
44+
metric.WithDescription("Number of network slots released."),
45+
metric.WithUnit("{slot}"),
46+
))
47+
)
48+
2149
type Config struct {
2250
// Using reserver IPv4 in range that is used for experiments and documentation
2351
// https://en.wikipedia.org/wiki/Reserved_IP_addresses
@@ -36,45 +64,29 @@ type Pool struct {
3664
done chan struct{}
3765
doneOnce sync.Once
3866

39-
newSlots chan *Slot
40-
reusedSlots chan *Slot
41-
newSlotCounter metric.Int64UpDownCounter
42-
reusedSlotCounter metric.Int64UpDownCounter
67+
newSlots chan *Slot
68+
reusedSlots chan *Slot
4369

4470
slotStorage Storage
4571
}
4672

4773
var ErrClosed = errors.New("cannot read from a closed pool")
4874

49-
func NewPool(meterProvider metric.MeterProvider, newSlotsPoolSize, reusedSlotsPoolSize int, nodeID string, config Config) (*Pool, error) {
75+
func NewPool(newSlotsPoolSize, reusedSlotsPoolSize int, nodeID string, config Config) (*Pool, error) {
5076
newSlots := make(chan *Slot, newSlotsPoolSize-1)
5177
reusedSlots := make(chan *Slot, reusedSlotsPoolSize)
5278

53-
meter := meterProvider.Meter("orchestrator.network.pool")
54-
55-
newSlotCounter, err := telemetry.GetUpDownCounter(meter, telemetry.NewNetworkSlotSPoolCounterMeterName)
56-
if err != nil {
57-
return nil, fmt.Errorf("failed to create new slot counter: %w", err)
58-
}
59-
60-
reusedSlotsCounter, err := telemetry.GetUpDownCounter(meter, telemetry.ReusedNetworkSlotSPoolCounterMeterName)
61-
if err != nil {
62-
return nil, fmt.Errorf("failed to create reused slot counter: %w", err)
63-
}
64-
6579
slotStorage, err := NewStorage(vrtSlotsSize, nodeID, config)
6680
if err != nil {
6781
return nil, fmt.Errorf("failed to create slot storage: %w", err)
6882
}
6983

7084
pool := &Pool{
71-
config: config,
72-
done: make(chan struct{}),
73-
newSlots: newSlots,
74-
reusedSlots: reusedSlots,
75-
newSlotCounter: newSlotCounter,
76-
reusedSlotCounter: reusedSlotsCounter,
77-
slotStorage: slotStorage,
85+
config: config,
86+
done: make(chan struct{}),
87+
newSlots: newSlots,
88+
reusedSlots: reusedSlots,
89+
slotStorage: slotStorage,
7890
}
7991

8092
return pool, nil
@@ -114,7 +126,7 @@ func (p *Pool) Populate(ctx context.Context) {
114126
continue
115127
}
116128

117-
p.newSlotCounter.Add(ctx, 1)
129+
newSlotsAvailableCounter.Add(ctx, 1)
118130
p.newSlots <- slot
119131
}
120132
}
@@ -127,7 +139,8 @@ func (p *Pool) Get(ctx context.Context, allowInternet bool) (*Slot, error) {
127139
case <-p.done:
128140
return nil, ErrClosed
129141
case s := <-p.reusedSlots:
130-
p.reusedSlotCounter.Add(ctx, -1)
142+
reusableSlotsAvailableCounter.Add(ctx, -1)
143+
acquiredSlots.Add(ctx, 1, metric.WithAttributes(attribute.String("pool", "reused")))
131144
telemetry.ReportEvent(ctx, "reused network slot")
132145

133146
slot = s
@@ -138,7 +151,8 @@ func (p *Pool) Get(ctx context.Context, allowInternet bool) (*Slot, error) {
138151
case <-ctx.Done():
139152
return nil, ctx.Err()
140153
case s := <-p.newSlots:
141-
p.newSlotCounter.Add(ctx, -1)
154+
newSlotsAvailableCounter.Add(ctx, -1)
155+
acquiredSlots.Add(ctx, 1, metric.WithAttributes(attribute.String("pool", "new")))
142156
telemetry.ReportEvent(ctx, "new network slot")
143157

144158
slot = s
@@ -165,7 +179,7 @@ func (p *Pool) Return(ctx context.Context, slot *Slot) error {
165179
err := slot.ResetInternet(ctx)
166180
if err != nil {
167181
// Cleanup the slot if resetting internet fails
168-
if cerr := p.cleanup(slot); cerr != nil {
182+
if cerr := p.cleanup(ctx, slot); cerr != nil {
169183
return fmt.Errorf("reset internet: %w; cleanup: %w", err, cerr)
170184
}
171185

@@ -178,9 +192,10 @@ func (p *Pool) Return(ctx context.Context, slot *Slot) error {
178192
case <-p.done:
179193
return ErrClosed
180194
case p.reusedSlots <- slot:
181-
p.reusedSlotCounter.Add(ctx, 1)
195+
returnedSlotCounter.Add(ctx, 1)
196+
reusableSlotsAvailableCounter.Add(ctx, 1)
182197
default:
183-
err := p.cleanup(slot)
198+
err := p.cleanup(ctx, slot)
184199
if err != nil {
185200
return fmt.Errorf("failed to return slot '%d': %w", slot.Idx, err)
186201
}
@@ -189,7 +204,7 @@ func (p *Pool) Return(ctx context.Context, slot *Slot) error {
189204
return nil
190205
}
191206

192-
func (p *Pool) cleanup(slot *Slot) error {
207+
func (p *Pool) cleanup(ctx context.Context, slot *Slot) error {
193208
var errs []error
194209

195210
err := slot.RemoveNetwork()
@@ -202,10 +217,12 @@ func (p *Pool) cleanup(slot *Slot) error {
202217
errs = append(errs, fmt.Errorf("failed to release slot '%d': %w", slot.Idx, err))
203218
}
204219

220+
releasedSlotCounter.Add(ctx, 1)
221+
205222
return errors.Join(errs...)
206223
}
207224

208-
func (p *Pool) Close(_ context.Context) error {
225+
func (p *Pool) Close(ctx context.Context) error {
209226
zap.L().Info("Closing network pool")
210227

211228
p.doneOnce.Do(func() {
@@ -215,7 +232,7 @@ func (p *Pool) Close(_ context.Context) error {
215232
var errs []error
216233

217234
for slot := range p.newSlots {
218-
err := p.cleanup(slot)
235+
err := p.cleanup(ctx, slot)
219236
if err != nil {
220237
errs = append(errs, fmt.Errorf("failed to cleanup slot '%d': %w", slot.Idx, err))
221238
}
@@ -224,7 +241,7 @@ func (p *Pool) Close(_ context.Context) error {
224241
close(p.reusedSlots)
225242

226243
for slot := range p.reusedSlots {
227-
err := p.cleanup(slot)
244+
err := p.cleanup(ctx, slot)
228245
if err != nil {
229246
errs = append(errs, fmt.Errorf("failed to cleanup slot '%d': %w", slot.Idx, err))
230247
}

packages/orchestrator/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,7 @@ func run(config cfg.Config) (success bool) {
326326
closers = append(closers, closer{"sandbox proxy", sandboxProxy.Close})
327327

328328
// device pool
329-
devicePool, err := nbd.NewDevicePool(tel.MeterProvider)
329+
devicePool, err := nbd.NewDevicePool()
330330
if err != nil {
331331
zap.L().Fatal("failed to create device pool", zap.Error(err))
332332
}
@@ -337,7 +337,7 @@ func run(config cfg.Config) (success bool) {
337337
closers = append(closers, closer{"device pool", devicePool.Close})
338338

339339
// network pool
340-
networkPool, err := network.NewPool(tel.MeterProvider, network.NewSlotsPoolSize, network.ReusedSlotsPoolSize, nodeID, config.NetworkConfig)
340+
networkPool, err := network.NewPool(network.NewSlotsPoolSize, network.ReusedSlotsPoolSize, nodeID, config.NetworkConfig)
341341
if err != nil {
342342
zap.L().Fatal("failed to create network pool", zap.Error(err))
343343
}

0 commit comments

Comments
 (0)