Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions iac/provider-gcp/nomad/clean-nfs-cache.tf
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@ resource "nomad_job" "clean_nfs_cache" {
dry_run = var.filestore_cache_cleanup_dry_run
deletions_per_loop = var.filestore_cache_cleanup_deletions_per_loop
files_per_loop = var.filestore_cache_cleanup_files_per_loop
otel_collector_endpoint = data.google_secret_manager_secret_version.grafana_logs_url.secret_data
})
}
3 changes: 3 additions & 0 deletions iac/provider-gcp/nomad/jobs/clean-nfs-cache.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ job "filestore-cleanup" {
"--disk-usage-target-percent=${max_disk_usage_target}",
"--files-per-loop=${files_per_loop}",
"--deletions-per-loop=${deletions_per_loop}",
%{ if otel_collector_endpoint != "" }
"--otel-collector-endpoint=${otel_collector_endpoint}",
%{ endif }
"${nfs_cache_mount_path}",
]
}
Expand Down
89 changes: 51 additions & 38 deletions packages/orchestrator/cmd/clean-nfs-cache/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,22 @@ import (
"sort"
"time"

"github.com/google/uuid"
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/e2b-dev/infra/packages/orchestrator/cmd/clean-nfs-cache/pkg"
"github.com/e2b-dev/infra/packages/shared/pkg/env"
"github.com/e2b-dev/infra/packages/shared/pkg/logger"
"github.com/e2b-dev/infra/packages/shared/pkg/telemetry"
)

const serviceName = "clean-nfs-cache"
const (
serviceName = "clean-nfs-cache"
commitSHA = ""
serviceVersion = "0.1.0"
)

func main() {
ctx := context.Background()
Expand All @@ -29,11 +37,25 @@ func main() {
}

func cleanNFSCache(ctx context.Context) error {
path, opts, err := parseArgs()
if err != nil {
return fmt.Errorf("invalid arguments: %w", err)
}

var cores []zapcore.Core
if opts.otelCollectorEndpoint != "" {
otelCore, err := newOtelCore(ctx, opts)
if err != nil {
return fmt.Errorf("failed to create otel logger: %w", err)
}
cores = append(cores, otelCore)
}

globalLogger := zap.Must(logger.NewLogger(ctx, logger.LoggerConfig{
ServiceName: serviceName,
IsInternal: true,
IsDebug: env.IsDebug(),
Cores: nil,
Cores: cores,
EnableConsole: true,
}))
defer func(l *zap.Logger) {
Expand All @@ -44,11 +66,6 @@ func cleanNFSCache(ctx context.Context) error {
}(globalLogger)
zap.ReplaceGlobals(globalLogger)

path, opts, err := parseArgs()
if err != nil {
return fmt.Errorf("invalid arguments: %w", err)
}

// get free space information for path
zap.L().Info("starting",
zap.Bool("dry_run", opts.dryRun),
Expand All @@ -64,10 +81,6 @@ func cleanNFSCache(ctx context.Context) error {
}
targetDiskUsage := int64(float64(opts.targetDiskUsagePercent) / 100 * float64(diskInfo.Total))
areWeDone := func() bool {
currentUsedPercentage := (float64(diskInfo.Used) / float64(diskInfo.Total)) * 100
zap.L().Info("current usage",
zap.Float64("percent", currentUsedPercentage),
zap.String("size", formatBytes(diskInfo.Used)))
return diskInfo.Used < targetDiskUsage
}

Expand Down Expand Up @@ -102,7 +115,7 @@ func cleanNFSCache(ctx context.Context) error {
zap.Int64("count", results.deletedFiles),
zap.Int64("bytes", results.deletedBytes))
})
allResults = allResults.union(results)
allResults = allResults.sum(results)
if err != nil {
return fmt.Errorf("failed to delete files: %w", err)
}
Expand All @@ -111,6 +124,27 @@ func cleanNFSCache(ctx context.Context) error {
return nil
}

func newOtelCore(ctx context.Context, opts opts) (zapcore.Core, error) {
nodeID := env.GetNodeID()
serviceInstanceID := uuid.NewString()

resource, err := telemetry.GetResource(ctx, nodeID, serviceName, commitSHA, serviceVersion, serviceInstanceID)
if err != nil {
return nil, fmt.Errorf("failed to create resource: %w", err)
}

logsExporter, err := telemetry.NewLogExporter(ctx,
otlploggrpc.WithEndpoint(opts.otelCollectorEndpoint),
)
if err != nil {
return nil, fmt.Errorf("failed to create logs exporter: %w", err)
}

loggerProvider := telemetry.NewLogProvider(ctx, logsExporter, resource)
otelCore := logger.GetOTELCore(loggerProvider, serviceName)
return otelCore, nil
}

func printSummary(r results, opts opts) {
if r.deletedFiles == 0 {
zap.L().Info("no files deleted")
Expand Down Expand Up @@ -185,7 +219,7 @@ type results struct {
createdDurations []time.Duration
}

func (r results) union(other results) results {
func (r results) sum(other results) results {
return results{
deletedFiles: r.deletedFiles + other.deletedFiles,
deletedBytes: r.deletedBytes + other.deletedBytes,
Expand All @@ -197,17 +231,8 @@ func (r results) union(other results) results {
func deleteOldestFiles(cache *pkg.ListingCache, files []pkg.File, opts opts, diskInfo *pkg.DiskInfo, areWeDone func() bool, deleteCount int64) (results, error) {
now := time.Now()
var results results
for index, file := range files {
if opts.dryRun {
zap.L().Debug("would delete",
zap.String("path", file.Path),
zap.Int64("bytes", file.Size),
zap.Duration("last_access", time.Since(file.ATime).Round(time.Minute)))
} else {
zap.L().Debug("deleting",
zap.Int("index", index+1),
zap.String("path", file.Path),
zap.Int64("bytes", file.Size))
for _, file := range files {
if !opts.dryRun {
if err := os.Remove(file.Path); err != nil {
zap.L().Error("failed to delete",
zap.String("path", file.Path),
Expand Down Expand Up @@ -291,6 +316,7 @@ type opts struct {
dryRun bool
filesPerLoop int
filesToDeletePerLoop int64
otelCollectorEndpoint string
}

var (
Expand All @@ -306,6 +332,7 @@ func parseArgs() (string, opts, error) {
flags.BoolVar(&opts.dryRun, "dry-run", true, "dry run")
flags.IntVar(&opts.filesPerLoop, "files-per-loop", 10000, "number of files to gather metadata for per loop")
flags.Int64Var(&opts.filesToDeletePerLoop, "deletions-per-loop", 100, "maximum number of files to delete per loop")
flags.StringVar(&opts.otelCollectorEndpoint, "otel-collector-endpoint", "", "endpoint of the otel collector")

args := os.Args[1:] // skip the command name
if err := flags.Parse(args); err != nil {
Expand All @@ -327,17 +354,3 @@ func timeit(message string, fn func()) {

zap.L().Debug(message, zap.Duration("duration", done))
}

func formatBytes(b int64) string {
const unit = 1024
if b < unit {
return fmt.Sprintf("%d B", b)
}
div, exp := int64(unit), 0
for n := b / unit; n >= unit; n /= unit {
div *= unit
exp++
}
return fmt.Sprintf("%.1f %ciB",
float64(b)/float64(div), "KMGTPE"[exp])
}
2 changes: 1 addition & 1 deletion packages/orchestrator/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ require (
github.com/vishvananda/netns v0.0.5
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.63.0
go.opentelemetry.io/otel v1.38.0
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.14.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.38.0
go.opentelemetry.io/otel/metric v1.38.0
go.opentelemetry.io/otel/sdk/metric v1.38.0
Expand Down Expand Up @@ -232,7 +233,6 @@ require (
go.opentelemetry.io/contrib/bridges/otelzap v0.13.0 // indirect
go.opentelemetry.io/contrib/detectors/gcp v1.38.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.63.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.14.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.38.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.38.0 // indirect
go.opentelemetry.io/otel/log v0.14.0 // indirect
Expand Down