diff --git a/iac/provider-gcp/nomad/clean-nfs-cache.tf b/iac/provider-gcp/nomad/clean-nfs-cache.tf index defc00f00d..3816788609 100644 --- a/iac/provider-gcp/nomad/clean-nfs-cache.tf +++ b/iac/provider-gcp/nomad/clean-nfs-cache.tf @@ -24,5 +24,6 @@ resource "nomad_job" "clean_nfs_cache" { dry_run = var.filestore_cache_cleanup_dry_run deletions_per_loop = var.filestore_cache_cleanup_deletions_per_loop files_per_loop = var.filestore_cache_cleanup_files_per_loop + otel_collector_endpoint = data.google_secret_manager_secret_version.grafana_logs_url.secret_data }) } diff --git a/iac/provider-gcp/nomad/jobs/clean-nfs-cache.hcl b/iac/provider-gcp/nomad/jobs/clean-nfs-cache.hcl index 5f8f2bf216..cced858051 100644 --- a/iac/provider-gcp/nomad/jobs/clean-nfs-cache.hcl +++ b/iac/provider-gcp/nomad/jobs/clean-nfs-cache.hcl @@ -26,6 +26,9 @@ job "filestore-cleanup" { "--disk-usage-target-percent=${max_disk_usage_target}", "--files-per-loop=${files_per_loop}", "--deletions-per-loop=${deletions_per_loop}", + %{ if otel_collector_endpoint != "" } + "--otel-collector-endpoint=${otel_collector_endpoint}", + %{ endif } "${nfs_cache_mount_path}", ] } diff --git a/packages/orchestrator/cmd/clean-nfs-cache/main.go b/packages/orchestrator/cmd/clean-nfs-cache/main.go index 8dc1fc7330..0e110d2e8a 100644 --- a/packages/orchestrator/cmd/clean-nfs-cache/main.go +++ b/packages/orchestrator/cmd/clean-nfs-cache/main.go @@ -11,14 +11,22 @@ import ( "sort" "time" + "github.com/google/uuid" + "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc" "go.uber.org/zap" + "go.uber.org/zap/zapcore" "github.com/e2b-dev/infra/packages/orchestrator/cmd/clean-nfs-cache/pkg" "github.com/e2b-dev/infra/packages/shared/pkg/env" "github.com/e2b-dev/infra/packages/shared/pkg/logger" + "github.com/e2b-dev/infra/packages/shared/pkg/telemetry" ) -const serviceName = "clean-nfs-cache" +const ( + serviceName = "clean-nfs-cache" + commitSHA = "" + serviceVersion = "0.1.0" +) func main() { ctx := context.Background() @@ -29,11 +37,25 @@ func main() { } func cleanNFSCache(ctx context.Context) error { + path, opts, err := parseArgs() + if err != nil { + return fmt.Errorf("invalid arguments: %w", err) + } + + var cores []zapcore.Core + if opts.otelCollectorEndpoint != "" { + otelCore, err := newOtelCore(ctx, opts) + if err != nil { + return fmt.Errorf("failed to create otel logger: %w", err) + } + cores = append(cores, otelCore) + } + globalLogger := zap.Must(logger.NewLogger(ctx, logger.LoggerConfig{ ServiceName: serviceName, IsInternal: true, IsDebug: env.IsDebug(), - Cores: nil, + Cores: cores, EnableConsole: true, })) defer func(l *zap.Logger) { @@ -44,11 +66,6 @@ func cleanNFSCache(ctx context.Context) error { }(globalLogger) zap.ReplaceGlobals(globalLogger) - path, opts, err := parseArgs() - if err != nil { - return fmt.Errorf("invalid arguments: %w", err) - } - // get free space information for path zap.L().Info("starting", zap.Bool("dry_run", opts.dryRun), @@ -64,10 +81,6 @@ func cleanNFSCache(ctx context.Context) error { } targetDiskUsage := int64(float64(opts.targetDiskUsagePercent) / 100 * float64(diskInfo.Total)) areWeDone := func() bool { - currentUsedPercentage := (float64(diskInfo.Used) / float64(diskInfo.Total)) * 100 - zap.L().Info("current usage", - zap.Float64("percent", currentUsedPercentage), - zap.String("size", formatBytes(diskInfo.Used))) return diskInfo.Used < targetDiskUsage } @@ -102,7 +115,7 @@ func cleanNFSCache(ctx context.Context) error { zap.Int64("count", results.deletedFiles), zap.Int64("bytes", results.deletedBytes)) }) - allResults = allResults.union(results) + allResults = allResults.sum(results) if err != nil { return fmt.Errorf("failed to delete files: %w", err) } @@ -111,6 +124,27 @@ func cleanNFSCache(ctx context.Context) error { return nil } +func newOtelCore(ctx context.Context, opts opts) (zapcore.Core, error) { + nodeID := env.GetNodeID() + serviceInstanceID := uuid.NewString() + + resource, err := telemetry.GetResource(ctx, nodeID, serviceName, commitSHA, serviceVersion, serviceInstanceID) + if err != nil { + return nil, fmt.Errorf("failed to create resource: %w", err) + } + + logsExporter, err := telemetry.NewLogExporter(ctx, + otlploggrpc.WithEndpoint(opts.otelCollectorEndpoint), + ) + if err != nil { + return nil, fmt.Errorf("failed to create logs exporter: %w", err) + } + + loggerProvider := telemetry.NewLogProvider(ctx, logsExporter, resource) + otelCore := logger.GetOTELCore(loggerProvider, serviceName) + return otelCore, nil +} + func printSummary(r results, opts opts) { if r.deletedFiles == 0 { zap.L().Info("no files deleted") @@ -185,7 +219,7 @@ type results struct { createdDurations []time.Duration } -func (r results) union(other results) results { +func (r results) sum(other results) results { return results{ deletedFiles: r.deletedFiles + other.deletedFiles, deletedBytes: r.deletedBytes + other.deletedBytes, @@ -197,17 +231,8 @@ func (r results) union(other results) results { func deleteOldestFiles(cache *pkg.ListingCache, files []pkg.File, opts opts, diskInfo *pkg.DiskInfo, areWeDone func() bool, deleteCount int64) (results, error) { now := time.Now() var results results - for index, file := range files { - if opts.dryRun { - zap.L().Debug("would delete", - zap.String("path", file.Path), - zap.Int64("bytes", file.Size), - zap.Duration("last_access", time.Since(file.ATime).Round(time.Minute))) - } else { - zap.L().Debug("deleting", - zap.Int("index", index+1), - zap.String("path", file.Path), - zap.Int64("bytes", file.Size)) + for _, file := range files { + if !opts.dryRun { if err := os.Remove(file.Path); err != nil { zap.L().Error("failed to delete", zap.String("path", file.Path), @@ -291,6 +316,7 @@ type opts struct { dryRun bool filesPerLoop int filesToDeletePerLoop int64 + otelCollectorEndpoint string } var ( @@ -306,6 +332,7 @@ func parseArgs() (string, opts, error) { flags.BoolVar(&opts.dryRun, "dry-run", true, "dry run") flags.IntVar(&opts.filesPerLoop, "files-per-loop", 10000, "number of files to gather metadata for per loop") flags.Int64Var(&opts.filesToDeletePerLoop, "deletions-per-loop", 100, "maximum number of files to delete per loop") + flags.StringVar(&opts.otelCollectorEndpoint, "otel-collector-endpoint", "", "endpoint of the otel collector") args := os.Args[1:] // skip the command name if err := flags.Parse(args); err != nil { @@ -327,17 +354,3 @@ func timeit(message string, fn func()) { zap.L().Debug(message, zap.Duration("duration", done)) } - -func formatBytes(b int64) string { - const unit = 1024 - if b < unit { - return fmt.Sprintf("%d B", b) - } - div, exp := int64(unit), 0 - for n := b / unit; n >= unit; n /= unit { - div *= unit - exp++ - } - return fmt.Sprintf("%.1f %ciB", - float64(b)/float64(div), "KMGTPE"[exp]) -} diff --git a/packages/orchestrator/go.mod b/packages/orchestrator/go.mod index 29b5a8537f..9e1e3c4362 100644 --- a/packages/orchestrator/go.mod +++ b/packages/orchestrator/go.mod @@ -48,6 +48,7 @@ require ( github.com/vishvananda/netns v0.0.5 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.63.0 go.opentelemetry.io/otel v1.38.0 + go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.14.0 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.38.0 go.opentelemetry.io/otel/metric v1.38.0 go.opentelemetry.io/otel/sdk/metric v1.38.0 @@ -232,7 +233,6 @@ require ( go.opentelemetry.io/contrib/bridges/otelzap v0.13.0 // indirect go.opentelemetry.io/contrib/detectors/gcp v1.38.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.63.0 // indirect - go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.14.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.38.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.38.0 // indirect go.opentelemetry.io/otel/log v0.14.0 // indirect