Skip to content

Commit 0db3d5b

Browse files
Merge pull request #662 from austinvazquez/close-remote-snapshotter
fix: evict remote snapshotter from cache on connection failure
2 parents cb7f52c + 1b163a6 commit 0db3d5b

File tree

17 files changed

+627
-262
lines changed

17 files changed

+627
-262
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ require (
3333
github.com/sirupsen/logrus v1.8.1
3434
github.com/stretchr/testify v1.7.1
3535
github.com/vishvananda/netlink v1.1.1-0.20210330154013-f5de75959ad5
36+
go.uber.org/goleak v1.1.12
3637
golang.org/x/net v0.0.0-20220617184016-355a448f1bc9 // indirect
3738
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f
3839
golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1095,6 +1095,7 @@ golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHl
10951095
golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f/go.mod h1:5qLYkcX4OjUUV8bRuDixDT3tpyyb+LUpUlRWLxfhWrs=
10961096
golang.org/x/lint v0.0.0-20200130185559-910be7a94367/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
10971097
golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
1098+
golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5 h1:2M3HP5CCK1Si9FQhwnzYhXdG6DXeebvUHFpre8QvbyI=
10981099
golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
10991100
golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE=
11001101
golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o=
@@ -1393,6 +1394,7 @@ golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4f
13931394
golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
13941395
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
13951396
golang.org/x/tools v0.1.4/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
1397+
golang.org/x/tools v0.1.5 h1:ouewzE6p+/VEB31YYnTbEJdi8pFqKp4P4n85vwo3DHA=
13961398
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
13971399
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
13981400
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=

snapshotter/app/service.go

Lines changed: 66 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,7 @@ import (
2727
snapshotsapi "github.com/containerd/containerd/api/services/snapshots/v1"
2828
"github.com/containerd/containerd/contrib/snapshotservice"
2929
"github.com/containerd/containerd/log"
30-
"github.com/containerd/containerd/snapshots"
3130
"github.com/firecracker-microvm/firecracker-go-sdk/vsock"
32-
"github.com/sirupsen/logrus"
3331
"golang.org/x/sync/errgroup"
3432
"google.golang.org/grpc"
3533

@@ -55,52 +53,46 @@ func Run(config config.Config) error {
5553

5654
group, ctx := errgroup.WithContext(ctx)
5755

58-
cache := cache.NewSnapshotterCache()
59-
6056
var (
6157
monitor *metrics.Monitor
6258
serviceDiscovery *discovery.ServiceDiscovery
6359
)
60+
6461
if config.Snapshotter.Metrics.Enable {
65-
sdHost := config.Snapshotter.Metrics.Host
66-
sdPort := config.Snapshotter.Metrics.ServiceDiscoveryPort
67-
serviceDiscovery = discovery.NewServiceDiscovery(sdHost, sdPort, cache)
6862
var err error
6963
monitor, err = initMetricsProxyMonitor(config.Snapshotter.Metrics.PortRange)
7064
if err != nil {
71-
log.G(ctx).WithError(err).Fatal("failed creating metrics proxy monitor")
72-
return err
65+
return fmt.Errorf("failed creating metrics proxy monitor: %w", err)
7366
}
74-
group.Go(func() error {
75-
return serviceDiscovery.Serve()
76-
})
7767
group.Go(func() error {
7868
return monitor.Start()
7969
})
8070
}
8171

82-
snapshotter, err := initSnapshotter(ctx, config, cache, monitor)
72+
cache, err := initCache(config, monitor)
8373
if err != nil {
84-
log.G(ctx).WithFields(
85-
logrus.Fields{"resolver": config.Snapshotter.Proxy.Address.Resolver.Type},
86-
).WithError(err).Fatal("failed creating socket resolver")
87-
return err
74+
return fmt.Errorf("failed initializing cache: %w", err)
75+
}
76+
77+
if config.Snapshotter.Metrics.Enable {
78+
sdHost := config.Snapshotter.Metrics.Host
79+
sdPort := config.Snapshotter.Metrics.ServiceDiscoveryPort
80+
serviceDiscovery = discovery.NewServiceDiscovery(sdHost, sdPort, cache)
81+
group.Go(func() error {
82+
return serviceDiscovery.Serve()
83+
})
8884
}
8985

86+
snapshotter := demux.NewSnapshotter(cache)
87+
9088
grpcServer := grpc.NewServer()
9189
service := snapshotservice.FromSnapshotter(snapshotter)
9290
snapshotsapi.RegisterSnapshotsServer(grpcServer, service)
9391

9492
listenerConfig := config.Snapshotter.Listener
9593
listener, err := net.Listen(listenerConfig.Network, listenerConfig.Address)
9694
if err != nil {
97-
log.G(ctx).WithFields(
98-
logrus.Fields{
99-
"network": listenerConfig.Network,
100-
"address": listenerConfig.Address,
101-
},
102-
).WithError(err).Fatal("failed creating listener")
103-
return err
95+
return fmt.Errorf("failed creating service listener{network: %s, address: %s}: %w", listenerConfig.Network, listenerConfig.Address, err)
10496
}
10597

10698
group.Go(func() error {
@@ -138,8 +130,7 @@ func Run(config config.Config) error {
138130
})
139131

140132
if err := group.Wait(); err != nil {
141-
log.G(ctx).WithError(err).Error("demux snapshotter error")
142-
return err
133+
return fmt.Errorf("demux snapshotter error: %w", err)
143134
}
144135

145136
log.G(ctx).Info("done")
@@ -159,13 +150,46 @@ func initResolver(config config.Config) (proxyaddress.Resolver, error) {
159150
const base10 = 10
160151
const bits32 = 32
161152

162-
func initSnapshotter(ctx context.Context, config config.Config, cache cache.Cache, monitor *metrics.Monitor) (snapshots.Snapshotter, error) {
153+
func initCache(config config.Config, monitor *metrics.Monitor) (*cache.RemoteSnapshotterCache, error) {
163154
resolver, err := initResolver(config)
164155
if err != nil {
165156
return nil, err
166157
}
167158

168-
newRemoteSnapshotterFunc := func(ctx context.Context, namespace string) (*proxy.RemoteSnapshotter, error) {
159+
dialTimeout, err := time.ParseDuration(config.Snapshotter.Dialer.Timeout)
160+
if err != nil {
161+
return nil, fmt.Errorf("Error parsing dialer retry interval from config: %w", err)
162+
}
163+
retryInterval, err := time.ParseDuration(config.Snapshotter.Dialer.RetryInterval)
164+
if err != nil {
165+
return nil, fmt.Errorf("Error parsing dialer retry interval from config: %w", err)
166+
}
167+
168+
vsockDial := func(ctx context.Context, host string, port uint64) (net.Conn, error) {
169+
return vsock.DialContext(ctx, host, uint32(port), vsock.WithLogger(log.G(ctx)),
170+
vsock.WithAckMsgTimeout(2*time.Second),
171+
vsock.WithRetryInterval(retryInterval),
172+
)
173+
}
174+
175+
dial := func(ctx context.Context, namespace string) (net.Conn, error) {
176+
r := resolver
177+
response, err := r.Get(namespace)
178+
179+
if err != nil {
180+
return nil, err
181+
}
182+
host := response.Address
183+
port, err := strconv.ParseUint(response.SnapshotterPort, base10, bits32)
184+
if err != nil {
185+
return nil, err
186+
}
187+
188+
return vsockDial(ctx, host, port)
189+
}
190+
dialer := proxy.Dialer{Dial: dial, Timeout: dialTimeout}
191+
192+
fetch := func(ctx context.Context, namespace string) (*proxy.RemoteSnapshotter, error) {
169193
r := resolver
170194
response, err := r.Get(namespace)
171195
if err != nil {
@@ -177,12 +201,8 @@ func initSnapshotter(ctx context.Context, config config.Config, cache cache.Cach
177201
return nil, err
178202
}
179203

180-
// TODO: https://github.com/firecracker-microvm/firecracker-containerd/issues/689
181-
snapshotterDialer := func(ctx context.Context, namespace string) (net.Conn, error) {
182-
return vsock.DialContext(ctx, host, uint32(port), vsock.WithLogger(log.G(ctx)),
183-
vsock.WithAckMsgTimeout(2*time.Second),
184-
vsock.WithRetryInterval(200*time.Millisecond),
185-
)
204+
dial := func(ctx context.Context, namespace string) (net.Conn, error) {
205+
return vsockDial(ctx, host, port)
186206
}
187207

188208
var metricsProxy *metrics.Proxy
@@ -193,10 +213,20 @@ func initSnapshotter(ctx context.Context, config config.Config, cache cache.Cach
193213
}
194214
}
195215

196-
return proxy.NewRemoteSnapshotter(ctx, host, snapshotterDialer, metricsProxy)
216+
return proxy.NewRemoteSnapshotter(ctx, host, dial, metricsProxy)
217+
}
218+
219+
opts := make([]cache.SnapshotterCacheOption, 0)
220+
221+
if config.Snapshotter.Cache.EvictOnConnectionFailure {
222+
cachePollFrequency, err := time.ParseDuration(config.Snapshotter.Cache.PollConnectionFrequency)
223+
if err != nil {
224+
return nil, fmt.Errorf("invalid cache evict poll connection frequency: %w", err)
225+
}
226+
opts = append(opts, cache.EvictOnConnectionFailure(dialer, cachePollFrequency))
197227
}
198228

199-
return demux.NewSnapshotter(cache, newRemoteSnapshotterFunc), nil
229+
return cache.NewRemoteSnapshotterCache(fetch, opts...), nil
200230
}
201231

202232
func initMetricsProxyMonitor(portRange string) (*metrics.Monitor, error) {

snapshotter/config/config.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,16 @@ type Config struct {
3333
type snapshotter struct {
3434
Listener listener `toml:"listener"`
3535
Proxy proxy `toml:"proxy"`
36+
Dialer dialer `toml:"dialer"`
37+
Cache cache `toml:"cache"`
3638
Metrics metrics `toml:"metrics"`
3739
}
3840

41+
type dialer struct {
42+
Timeout string `toml:"timeout" default:"5s"`
43+
RetryInterval string `toml:"retry_interval" default:"100ms"`
44+
}
45+
3946
type listener struct {
4047
Network string `toml:"network" default:"unix"`
4148
Address string `toml:"address" default:"/var/lib/demux-snapshotter/snapshotter.sock"`
@@ -54,6 +61,11 @@ type resolver struct {
5461
Address string `toml:"address"`
5562
}
5663

64+
type cache struct {
65+
EvictOnConnectionFailure bool `toml:"evict_on_connection_failure" default:"true"`
66+
PollConnectionFrequency string `toml:"poll_connection_frequency" default:"60s"`
67+
}
68+
5769
type debug struct {
5870
LogLevel string `toml:"logLevel" default:"info"`
5971
}

snapshotter/config/config.toml.example

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,21 @@
66
type = "http"
77
address = "127.0.0.1:10001"
88

9+
[snapshotter.dialer]
10+
timeout = "5s"
11+
retry_interval = "500ms"
12+
13+
[snapshotter.cache]
14+
evict_on_connection_failure = true
15+
poll_connection_frequency = "60s"
16+
917
[snapshotter.metrics]
1018
enable = true
1119
port_range = "9000-9999"
12-
[snapshotter.metrics.service_discovery]
13-
enable = true
14-
port = 8080
20+
21+
[snapshotter.metrics.service_discovery]
22+
enable = true
23+
port = 8080
1524

1625
[debug]
1726
logLevel = "info"

snapshotter/config/config_test.go

Lines changed: 36 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,14 @@ func defaultConfig() error {
6262
Network: "unix",
6363
Address: "/var/lib/demux-snapshotter/snapshotter.sock",
6464
},
65+
Dialer: dialer{
66+
Timeout: "5s",
67+
RetryInterval: "100ms",
68+
},
69+
Cache: cache{
70+
EvictOnConnectionFailure: true,
71+
PollConnectionFrequency: "60s",
72+
},
6573
Metrics: metrics{
6674
Enable: false,
6775
},
@@ -75,21 +83,27 @@ func defaultConfig() error {
7583

7684
func parseExampleConfig() error {
7785
fileContents := []byte(`
78-
[snapshotter]
79-
[snapshotter.listener]
80-
network = "unix"
81-
address = "/var/lib/demux-snapshotter/non-default-snapshotter.vsock"
82-
[snapshotter.proxy.address.resolver]
83-
type = "http"
84-
address = "localhost:10001"
85-
[snapshotter.metrics]
86+
[snapshotter]
87+
[snapshotter.listener]
88+
network = "unix"
89+
address = "/var/lib/demux-snapshotter/non-default-snapshotter.vsock"
90+
[snapshotter.dialer]
91+
timeout = "5s"
92+
retry_interval = "100ms"
93+
[snapshotter.proxy.address.resolver]
94+
type = "http"
95+
address = "localhost:10001"
96+
[snapshotter.cache]
97+
evict_on_connection_failure = false
98+
poll_connection_frequency = "120s"
99+
[snapshotter.metrics]
86100
enable = true
87-
port_range = "9000-9999"
88-
host = "0.0.0.0"
89-
service_discovery_port = 8080
90-
[debug]
91-
logLevel = "debug"
92-
`)
101+
port_range = "9000-9999"
102+
host = "0.0.0.0"
103+
service_discovery_port = 8080
104+
[debug]
105+
logLevel = "debug"
106+
`)
93107
expected := Config{
94108
Snapshotter: snapshotter{
95109
Listener: listener{
@@ -104,6 +118,14 @@ func parseExampleConfig() error {
104118
},
105119
},
106120
},
121+
Dialer: dialer{
122+
Timeout: "5s",
123+
RetryInterval: "100ms",
124+
},
125+
Cache: cache{
126+
EvictOnConnectionFailure: false,
127+
PollConnectionFrequency: "120s",
128+
},
107129
Metrics: metrics{
108130
Enable: true,
109131
PortRange: "9000-9999",

0 commit comments

Comments
 (0)