Skip to content

Commit 1b163a6

Browse files
committed
add: timeout to cache eviction dial
Signed-off-by: Austin Vazquez <[email protected]>
1 parent f7846bb commit 1b163a6

File tree

9 files changed

+79
-38
lines changed

9 files changed

+79
-38
lines changed

snapshotter/app/service.go

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -156,11 +156,19 @@ func initCache(config config.Config, monitor *metrics.Monitor) (*cache.RemoteSna
156156
return nil, err
157157
}
158158

159-
// TODO: https://github.com/firecracker-microvm/firecracker-containerd/issues/689
160-
snapshotterDialer := func(ctx context.Context, host string, port uint64) (net.Conn, error) {
159+
dialTimeout, err := time.ParseDuration(config.Snapshotter.Dialer.Timeout)
160+
if err != nil {
161+
return nil, fmt.Errorf("Error parsing dialer retry interval from config: %w", err)
162+
}
163+
retryInterval, err := time.ParseDuration(config.Snapshotter.Dialer.RetryInterval)
164+
if err != nil {
165+
return nil, fmt.Errorf("Error parsing dialer retry interval from config: %w", err)
166+
}
167+
168+
vsockDial := func(ctx context.Context, host string, port uint64) (net.Conn, error) {
161169
return vsock.DialContext(ctx, host, uint32(port), vsock.WithLogger(log.G(ctx)),
162170
vsock.WithAckMsgTimeout(2*time.Second),
163-
vsock.WithRetryInterval(500*time.Millisecond),
171+
vsock.WithRetryInterval(retryInterval),
164172
)
165173
}
166174

@@ -177,8 +185,9 @@ func initCache(config config.Config, monitor *metrics.Monitor) (*cache.RemoteSna
177185
return nil, err
178186
}
179187

180-
return snapshotterDialer(ctx, host, port)
188+
return vsockDial(ctx, host, port)
181189
}
190+
dialer := proxy.Dialer{Dial: dial, Timeout: dialTimeout}
182191

183192
fetch := func(ctx context.Context, namespace string) (*proxy.RemoteSnapshotter, error) {
184193
r := resolver
@@ -193,7 +202,7 @@ func initCache(config config.Config, monitor *metrics.Monitor) (*cache.RemoteSna
193202
}
194203

195204
dial := func(ctx context.Context, namespace string) (net.Conn, error) {
196-
return snapshotterDialer(ctx, host, port)
205+
return vsockDial(ctx, host, port)
197206
}
198207

199208
var metricsProxy *metrics.Proxy
@@ -214,7 +223,7 @@ func initCache(config config.Config, monitor *metrics.Monitor) (*cache.RemoteSna
214223
if err != nil {
215224
return nil, fmt.Errorf("invalid cache evict poll connection frequency: %w", err)
216225
}
217-
opts = append(opts, cache.EvictOnConnectionFailure(dial, cachePollFrequency, nil))
226+
opts = append(opts, cache.EvictOnConnectionFailure(dialer, cachePollFrequency))
218227
}
219228

220229
return cache.NewRemoteSnapshotterCache(fetch, opts...), nil

snapshotter/config/config.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,16 @@ type Config struct {
3333
type snapshotter struct {
3434
Listener listener `toml:"listener"`
3535
Proxy proxy `toml:"proxy"`
36+
Dialer dialer `toml:"dialer"`
3637
Cache cache `toml:"cache"`
3738
Metrics metrics `toml:"metrics"`
3839
}
3940

41+
type dialer struct {
42+
Timeout string `toml:"timeout" default:"5s"`
43+
RetryInterval string `toml:"retry_interval" default:"100ms"`
44+
}
45+
4046
type listener struct {
4147
Network string `toml:"network" default:"unix"`
4248
Address string `toml:"address" default:"/var/lib/demux-snapshotter/snapshotter.sock"`

snapshotter/config/config.toml.example

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@
66
type = "http"
77
address = "127.0.0.1:10001"
88

9+
[snapshotter.dialer]
10+
timeout = "5s"
11+
retry_interval = "500ms"
12+
913
[snapshotter.cache]
1014
evict_on_connection_failure = true
1115
poll_connection_frequency = "60s"

snapshotter/config/config_test.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ func defaultConfig() error {
6262
Network: "unix",
6363
Address: "/var/lib/demux-snapshotter/snapshotter.sock",
6464
},
65+
Dialer: dialer{
66+
Timeout: "5s",
67+
RetryInterval: "100ms",
68+
},
6569
Cache: cache{
6670
EvictOnConnectionFailure: true,
6771
PollConnectionFrequency: "60s",
@@ -83,6 +87,9 @@ func parseExampleConfig() error {
8387
[snapshotter.listener]
8488
network = "unix"
8589
address = "/var/lib/demux-snapshotter/non-default-snapshotter.vsock"
90+
[snapshotter.dialer]
91+
timeout = "5s"
92+
retry_interval = "100ms"
8693
[snapshotter.proxy.address.resolver]
8794
type = "http"
8895
address = "localhost:10001"
@@ -111,6 +118,10 @@ func parseExampleConfig() error {
111118
},
112119
},
113120
},
121+
Dialer: dialer{
122+
Timeout: "5s",
123+
RetryInterval: "100ms",
124+
},
114125
Cache: cache{
115126
EvictOnConnectionFailure: false,
116127
PollConnectionFrequency: "120s",

snapshotter/demux/cache/cache.go

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,13 @@ package cache
1616
import (
1717
"context"
1818
"fmt"
19-
"net"
2019
"sync"
2120
"time"
2221

2322
"github.com/containerd/containerd/snapshots"
2423
"github.com/firecracker-microvm/firecracker-containerd/snapshotter/demux/proxy"
2524
"github.com/hashicorp/go-multierror"
26-
"github.com/sirupsen/logrus"
25+
log "github.com/sirupsen/logrus"
2726
)
2827

2928
// SnapshotterProvider defines a snapshotter fetch function.
@@ -64,28 +63,28 @@ func NewRemoteSnapshotterCache(fetch SnapshotterProvider, opts ...SnapshotterCac
6463
}
6564

6665
// EvictOnConnectionFailure is a caching option for evicting entries from the cache after a failed connection attempt.
67-
func EvictOnConnectionFailure(dial func(context.Context, string) (net.Conn, error), frequency time.Duration, log *logrus.Logger) func(*RemoteSnapshotterCache) {
66+
func EvictOnConnectionFailure(dialer proxy.Dialer, frequency time.Duration) func(*RemoteSnapshotterCache) {
6867
return func(c *RemoteSnapshotterCache) {
6968
c.evict = make(chan string)
70-
c.lease = NewEvictOnConnectionFailurePolicy(c.evict, c.stop, dial, frequency)
71-
c.startBackgroundReaper(log)
69+
c.lease = NewEvictOnConnectionFailurePolicy(c.evict, dialer, frequency, c.stop)
70+
c.startBackgroundReaper()
7271
}
7372
}
7473

75-
func (c *RemoteSnapshotterCache) startBackgroundReaper(log *logrus.Logger) {
76-
reap := func(log *logrus.Entry) {
74+
func (c *RemoteSnapshotterCache) startBackgroundReaper() {
75+
reap := func() {
7776
for {
7877
s, ok := <-c.evict
7978
if !ok {
8079
break
8180
}
8281
if err := c.Evict(s); err != nil {
83-
log.Error(err)
82+
log.WithField("context", "cache reaper").Error(err)
8483
}
8584
}
8685
}
8786
c.reaper.Do(func() {
88-
go reap(log.WithField("context", "cache reaper"))
87+
go reap()
8988
})
9089
}
9190

snapshotter/demux/cache/evict.go

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,9 @@ package cache
1515

1616
import (
1717
"context"
18-
"net"
1918
"time"
19+
20+
"github.com/firecracker-microvm/firecracker-containerd/snapshotter/demux/proxy"
2021
)
2122

2223
// EvictionPolicy defines the interface for enforcing a cache eviction policy.
@@ -35,40 +36,41 @@ type evictionPolicy struct {
3536
type EvictOnConnectionFailurePolicy struct {
3637
evictionPolicy
3738

38-
dial func(context.Context, string) (net.Conn, error)
39+
dialer proxy.Dialer
40+
3941
frequency time.Duration
4042
}
4143

4244
// NewEvictOnConnectionFailurePolicy creates a new policy to evict on remote snapshotter
4345
// connection failure on a specified frequency duration.
44-
func NewEvictOnConnectionFailurePolicy(evictChan chan string, stopCondition chan struct{}, dial func(context.Context, string) (net.Conn, error), frequency time.Duration) EvictionPolicy {
45-
return &EvictOnConnectionFailurePolicy{evictionPolicy: evictionPolicy{evict: evictChan, stop: stopCondition}, dial: dial, frequency: frequency}
46+
func NewEvictOnConnectionFailurePolicy(evictChan chan string, dialer proxy.Dialer, frequency time.Duration, stopCondition chan struct{}) EvictionPolicy {
47+
return &EvictOnConnectionFailurePolicy{evictionPolicy: evictionPolicy{evict: evictChan, stop: stopCondition}, dialer: dialer, frequency: frequency}
4648
}
4749

4850
// Enforce launches a go routine which periodically attempts to dial the cached entry
4951
// using the provided dial function.
5052
//
5153
// On connection failure, the entry will be evicted from cache.
52-
func (p *EvictOnConnectionFailurePolicy) Enforce(key string) {
53-
go func(dial func(context.Context, string) (net.Conn, error)) {
54+
func (p EvictOnConnectionFailurePolicy) Enforce(key string) {
55+
go func() {
5456
ticker := time.NewTicker(p.frequency)
5557
defer ticker.Stop()
5658

57-
ctx, cancel := context.WithCancel(context.Background())
58-
defer cancel()
59-
6059
for {
6160
select {
6261
case <-ticker.C:
63-
conn, err := dial(ctx, key)
62+
ctx, cancel := context.WithTimeout(context.Background(), p.dialer.Timeout)
63+
conn, err := p.dialer.Dial(ctx, key)
6464
if err != nil {
6565
p.evict <- key
66+
cancel()
6667
return
6768
}
69+
cancel()
6870
conn.Close()
6971
case <-p.stop:
7072
return
7173
}
7274
}
73-
}(p.dial)
75+
}()
7476
}

snapshotter/demux/cache/evict_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222

2323
"github.com/firecracker-microvm/firecracker-containerd/snapshotter/demux/internal"
2424
"github.com/firecracker-microvm/firecracker-containerd/snapshotter/demux/proxy"
25-
"github.com/sirupsen/logrus"
2625
"github.com/stretchr/testify/require"
2726
"go.uber.org/goleak"
2827
)
@@ -49,8 +48,8 @@ func TestCacheEvictionOnConnectionFailure(t *testing.T) {
4948
return nil, errors.New("mock dial error")
5049
}
5150
frequency := 2 * time.Millisecond
52-
log := logrus.New()
53-
cache := NewRemoteSnapshotterCache(getSnapshotter, EvictOnConnectionFailure(dial, frequency, log))
51+
dialer := proxy.Dialer{Dial: dial, Timeout: 1 * time.Second}
52+
cache := NewRemoteSnapshotterCache(getSnapshotter, EvictOnConnectionFailure(dialer, frequency))
5453
defer cache.Close()
5554

5655
_, err := cache.Get(context.Background(), "test")
@@ -85,8 +84,8 @@ func TestCacheNotEvictedIfConnectionIsHealthy(t *testing.T) {
8584
return &net.UnixConn{}, nil
8685
}
8786
frequency := 1 * time.Millisecond
88-
log := logrus.New()
89-
cache := NewRemoteSnapshotterCache(getSnapshotter, EvictOnConnectionFailure(dial, frequency, log))
87+
dialer := proxy.Dialer{Dial: dial, Timeout: 1 * time.Second}
88+
cache := NewRemoteSnapshotterCache(getSnapshotter, EvictOnConnectionFailure(dialer, frequency))
9089
defer cache.Close()
9190

9291
_, err := cache.Get(context.Background(), "test")
@@ -119,8 +118,8 @@ func TestBackgroundEnforcersCanBeStopped(t *testing.T) {
119118
return &net.UnixConn{}, nil
120119
}
121120
frequency := 1 * time.Millisecond
122-
log := logrus.New()
123-
cache := NewRemoteSnapshotterCache(getSnapshotter, EvictOnConnectionFailure(dial, frequency, log))
121+
dialer := proxy.Dialer{Dial: dial, Timeout: 1 * time.Second}
122+
cache := NewRemoteSnapshotterCache(getSnapshotter, EvictOnConnectionFailure(dialer, frequency))
124123

125124
_, err := cache.Get(context.Background(), "test")
126125
require.NoError(t, err, "Snapshotter not added to cache correctly")
@@ -137,8 +136,8 @@ func TestLogErrorOnEvictionFailure(t *testing.T) {
137136
return nil, errors.New("mock dial error")
138137
}
139138
frequency := 1 * time.Millisecond
140-
log := logrus.New()
141-
cache := NewRemoteSnapshotterCache(getErrorSnapshotter, EvictOnConnectionFailure(dial, frequency, log))
139+
dialer := proxy.Dialer{Dial: dial, Timeout: 1 * time.Second}
140+
cache := NewRemoteSnapshotterCache(getErrorSnapshotter, EvictOnConnectionFailure(dialer, frequency))
142141
defer cache.Close()
143142

144143
_, err := cache.Get(context.Background(), "test")
@@ -156,6 +155,7 @@ func TestLogErrorOnEvictionFailure(t *testing.T) {
156155
if cache.length() == 1 {
157156
continue
158157
}
158+
159159
return
160160
case <-ctx.Done():
161161
require.Len(t, cache.snapshotters, 1, "Cache entry was never evicted")

snapshotter/demux/proxy/snapshotter.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,25 @@ package proxy
1616
import (
1717
"context"
1818
"net"
19+
"time"
1920

2021
snapshotsapi "github.com/containerd/containerd/api/services/snapshots/v1"
2122
"github.com/containerd/containerd/snapshots"
2223
"github.com/containerd/containerd/snapshots/proxy"
2324
"github.com/firecracker-microvm/firecracker-containerd/snapshotter/demux/metrics"
2425
"github.com/hashicorp/go-multierror"
25-
"github.com/sirupsen/logrus"
2626
"google.golang.org/grpc"
2727
"google.golang.org/grpc/credentials/insecure"
2828
)
2929

30-
// SnapshotterDialer defines an interface for establishing a network connection.
31-
type SnapshotterDialer = func(context.Context, *logrus.Entry, string, uint32)
30+
// Dialer captures commonly grouped dial functionality and configuration.
31+
type Dialer struct {
32+
// Dial is a function used to establish a network connection to a remote snapshotter.
33+
Dial func(context.Context, string) (net.Conn, error)
34+
35+
// Timeout is the time required to establish a connection to a remote snapshotter.
36+
Timeout time.Duration
37+
}
3238

3339
// RemoteSnapshotter embeds a snapshots.Snapshotter and its metrics proxy.
3440
type RemoteSnapshotter struct {

tools/docker/entrypoint.sh

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ EOF
4242

4343
mkdir -p /etc/demux-snapshotter /var/lib/demux-snapshotter
4444
cat > /etc/demux-snapshotter/config.toml <<EOF
45+
[snapshotter.dialer]
46+
timeout = "5s"
47+
retry_interval = "500ms"
48+
4549
[snapshotter.proxy.address.resolver]
4650
type = "http"
4751
address = "http://127.0.0.1:10001"

0 commit comments

Comments
 (0)