From a847b66346e99f19c7cc29881eacc9762dd1e7df Mon Sep 17 00:00:00 2001 From: Akshay Chawla Date: Thu, 17 Jul 2025 16:16:31 +0100 Subject: [PATCH 01/13] adding changes to proto --- api/grpc/mpi/v1/command.pb.go | 16 +++++++++++++--- api/grpc/mpi/v1/command.pb.validate.go | 2 ++ api/grpc/mpi/v1/command.proto | 2 ++ docs/proto/protos.md | 1 + 4 files changed, 18 insertions(+), 3 deletions(-) diff --git a/api/grpc/mpi/v1/command.pb.go b/api/grpc/mpi/v1/command.pb.go index b39457738..ed5f0b665 100644 --- a/api/grpc/mpi/v1/command.pb.go +++ b/api/grpc/mpi/v1/command.pb.go @@ -2262,7 +2262,9 @@ type APIDetails struct { // the API location directive Location string `protobuf:"bytes,1,opt,name=location,proto3" json:"location,omitempty"` // the API listen directive - Listen string `protobuf:"bytes,2,opt,name=listen,proto3" json:"listen,omitempty"` + Listen string `protobuf:"bytes,2,opt,name=listen,proto3" json:"listen,omitempty"` + // the API Ca directive + Ca string `protobuf:"bytes,3,opt,name=Ca,proto3" json:"Ca,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -2311,6 +2313,13 @@ func (x *APIDetails) GetListen() string { return "" } +func (x *APIDetails) GetCa() string { + if x != nil { + return x.Ca + } + return "" +} + // A set of runtime NGINX App Protect settings type NGINXAppProtectRuntimeInfo struct { state protoimpl.MessageState `protogen:"open.v1"` @@ -2872,11 +2881,12 @@ const file_mpi_v1_command_proto_rawDesc = "" + "error_logs\x18\x03 \x03(\tR\terrorLogs\x12)\n" + "\x10loadable_modules\x18\x04 \x03(\tR\x0floadableModules\x12'\n" + "\x0fdynamic_modules\x18\x05 \x03(\tR\x0edynamicModules\x12-\n" + - "\bplus_api\x18\x06 \x01(\v2\x12.mpi.v1.APIDetailsR\aplusApi\"@\n" + + "\bplus_api\x18\x06 \x01(\v2\x12.mpi.v1.APIDetailsR\aplusApi\"P\n" + "\n" + "APIDetails\x12\x1a\n" + "\blocation\x18\x01 \x01(\tR\blocation\x12\x16\n" + - "\x06listen\x18\x02 \x01(\tR\x06listen\"\xe0\x01\n" + + "\x06listen\x18\x02 \x01(\tR\x06listen\x12\x0e\n" + + "\x02Ca\x18\x03 \x01(\tR\x02Ca\"\xe0\x01\n" + "\x1aNGINXAppProtectRuntimeInfo\x12\x18\n" + "\arelease\x18\x01 \x01(\tR\arelease\x128\n" + "\x18attack_signature_version\x18\x02 \x01(\tR\x16attackSignatureVersion\x126\n" + diff --git a/api/grpc/mpi/v1/command.pb.validate.go b/api/grpc/mpi/v1/command.pb.validate.go index 194284c7e..81f716548 100644 --- a/api/grpc/mpi/v1/command.pb.validate.go +++ b/api/grpc/mpi/v1/command.pb.validate.go @@ -4893,6 +4893,8 @@ func (m *APIDetails) validate(all bool) error { // no validation rules for Listen + // no validation rules for Ca + if len(errors) > 0 { return APIDetailsMultiError(errors) } diff --git a/api/grpc/mpi/v1/command.proto b/api/grpc/mpi/v1/command.proto index cdf3232da..9577b8de8 100644 --- a/api/grpc/mpi/v1/command.proto +++ b/api/grpc/mpi/v1/command.proto @@ -352,6 +352,8 @@ message APIDetails { string location = 1; // the API listen directive string listen = 2; + // the API Ca directive + string Ca = 3; } // A set of runtime NGINX App Protect settings diff --git a/docs/proto/protos.md b/docs/proto/protos.md index b0e567fc2..a8995742c 100644 --- a/docs/proto/protos.md +++ b/docs/proto/protos.md @@ -678,6 +678,7 @@ Perform an associated API action on an instance | ----- | ---- | ----- | ----------- | | location | [string](#string) | | the API location directive | | listen | [string](#string) | | the API listen directive | +| Ca | [string](#string) | | the API Ca directive | From b95dcb03df8a66a4355a3a7c8f102b3033575cb0 Mon Sep 17 00:00:00 2001 From: Akshay Chawla Date: Thu, 17 Jul 2025 16:34:24 +0100 Subject: [PATCH 02/13] code changes to support ssl --- .../internal/config/config.go | 2 + .../scraper/stubstatus/stub_status_scraper.go | 25 +++++ .../collector/nginxplusreceiver/config.go | 2 + .../collector/nginxplusreceiver/scraper.go | 23 +++++ internal/collector/otel_collector_plugin.go | 1 + internal/collector/otelcol.tmpl | 2 + internal/config/config.go | 7 ++ internal/config/defaults.go | 1 + internal/config/flags.go | 1 + internal/config/types.go | 2 + .../datasource/config/nginx_config_parser.go | 92 ++++++++++++++++++- internal/model/config.go | 1 + internal/resource/resource_service.go | 23 +++++ .../test-opentelemetry-collector-agent.yaml | 1 + 14 files changed, 181 insertions(+), 2 deletions(-) diff --git a/internal/collector/nginxossreceiver/internal/config/config.go b/internal/collector/nginxossreceiver/internal/config/config.go index c09112f45..55b2bd0ad 100644 --- a/internal/collector/nginxossreceiver/internal/config/config.go +++ b/internal/collector/nginxossreceiver/internal/config/config.go @@ -33,6 +33,7 @@ type APIDetails struct { URL string `mapstructure:"url"` Listen string `mapstructure:"listen"` Location string `mapstructure:"location"` + Ca string `mapstructure:"ca"` } type AccessLog struct { @@ -56,6 +57,7 @@ func CreateDefaultConfig() component.Config { URL: "http://localhost:80/status", Listen: "localhost:80", Location: "status", + Ca: "", }, } } diff --git a/internal/collector/nginxossreceiver/internal/scraper/stubstatus/stub_status_scraper.go b/internal/collector/nginxossreceiver/internal/scraper/stubstatus/stub_status_scraper.go index f9173a1b8..06e7bfbd2 100644 --- a/internal/collector/nginxossreceiver/internal/scraper/stubstatus/stub_status_scraper.go +++ b/internal/collector/nginxossreceiver/internal/scraper/stubstatus/stub_status_scraper.go @@ -7,8 +7,11 @@ package stubstatus import ( "context" + "crypto/tls" + "crypto/x509" "net" "net/http" + "os" "strings" "sync" "time" @@ -63,6 +66,28 @@ func (s *NginxStubStatusScraper) ID() component.ID { func (s *NginxStubStatusScraper) Start(_ context.Context, _ component.Host) error { s.logger.Info("Starting NGINX stub status scraper") httpClient := http.DefaultClient + caCertLocation := s.cfg.APIDetails.Ca + if caCertLocation != "" { + s.settings.Logger.Debug("Reading from Location for Ca Cert : ", zap.Any(caCertLocation, caCertLocation)) + caCert, err := os.ReadFile(caCertLocation) + if err != nil { + s.settings.Logger.Error("Error starting NGINX stub scraper. "+ + "Failed to read CA certificate : ", zap.Error(err)) + + return nil + } + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + + httpClient = &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{ + RootCAs: caCertPool, + MinVersion: tls.VersionTLS13, + }, + }, + } + } httpClient.Timeout = s.cfg.ClientConfig.Timeout if strings.HasPrefix(s.cfg.APIDetails.Listen, "unix:") { diff --git a/internal/collector/nginxplusreceiver/config.go b/internal/collector/nginxplusreceiver/config.go index a05dd6d6d..7689442c1 100644 --- a/internal/collector/nginxplusreceiver/config.go +++ b/internal/collector/nginxplusreceiver/config.go @@ -29,6 +29,7 @@ type APIDetails struct { URL string `mapstructure:"url"` Listen string `mapstructure:"listen"` Location string `mapstructure:"location"` + Ca string `mapstructure:"ca"` } // Validate checks if the receiver configuration is valid @@ -59,6 +60,7 @@ func createDefaultConfig() component.Config { URL: "http://localhost:80/api", Listen: "localhost:80", Location: "/api", + Ca: "", }, MetricsBuilderConfig: metadata.DefaultMetricsBuilderConfig(), } diff --git a/internal/collector/nginxplusreceiver/scraper.go b/internal/collector/nginxplusreceiver/scraper.go index fc41b0f7f..cf0c577f2 100644 --- a/internal/collector/nginxplusreceiver/scraper.go +++ b/internal/collector/nginxplusreceiver/scraper.go @@ -6,9 +6,12 @@ package nginxplusreceiver import ( "context" + "crypto/tls" + "crypto/x509" "fmt" "net" "net/http" + "os" "strconv" "strings" "sync" @@ -82,6 +85,26 @@ func (nps *NginxPlusScraper) ID() component.ID { func (nps *NginxPlusScraper) Start(_ context.Context, _ component.Host) error { endpoint := strings.TrimPrefix(nps.cfg.APIDetails.URL, "unix:") httpClient := http.DefaultClient + caCertLocation := nps.cfg.APIDetails.Ca + if caCertLocation != "" { + nps.logger.Debug("Reading from Location for Ca Cert : ", zap.Any(caCertLocation, caCertLocation)) + caCert, err := os.ReadFile(caCertLocation) + if err != nil { + nps.logger.Error("Unable to start NGINX Plus scraper. Failed to read CA certificate: %v", zap.Error(err)) + return err + } + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + + httpClient = &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{ + RootCAs: caCertPool, + MinVersion: tls.VersionTLS13, + }, + }, + } + } httpClient.Timeout = nps.cfg.ClientConfig.Timeout if strings.HasPrefix(nps.cfg.APIDetails.Listen, "unix:") { diff --git a/internal/collector/otel_collector_plugin.go b/internal/collector/otel_collector_plugin.go index 1a8cac572..ae13b1bd6 100644 --- a/internal/collector/otel_collector_plugin.go +++ b/internal/collector/otel_collector_plugin.go @@ -418,6 +418,7 @@ func (oc *Collector) checkForNewReceivers(ctx context.Context, nginxConfigContex URL: nginxConfigContext.PlusAPI.URL, Listen: nginxConfigContext.PlusAPI.Listen, Location: nginxConfigContext.PlusAPI.Location, + Ca: nginxConfigContext.PlusAPI.Ca, }, CollectionInterval: defaultCollectionInterval, }, diff --git a/internal/collector/otelcol.tmpl b/internal/collector/otelcol.tmpl index 6af00e9d2..b5d3af372 100644 --- a/internal/collector/otelcol.tmpl +++ b/internal/collector/otelcol.tmpl @@ -81,6 +81,7 @@ receivers: url: "{{- .StubStatus.URL -}}" listen: "{{- .StubStatus.Listen -}}" location: "{{- .StubStatus.Location -}}" + ca: "{{- .StubStatus.Ca -}}" {{- if .CollectionInterval }} collection_interval: {{ .CollectionInterval }} {{- end }} @@ -98,6 +99,7 @@ receivers: url: "{{- .PlusAPI.URL -}}" listen: "{{- .PlusAPI.Listen -}}" location: "{{- .PlusAPI.Location -}}" + ca: "{{- .StubStatus.Ca -}}" {{- if .CollectionInterval }} collection_interval: {{ .CollectionInterval }} {{- end }} diff --git a/internal/config/config.go b/internal/config/config.go index 4ebd75900..194206404 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -267,6 +267,12 @@ func registerFlags() { "Warning messages in the NGINX errors logs after a NGINX reload will be treated as an error.", ) + fs.String( + NginxApiTlsCa, + DefNginxApiTlsCa, + "The NGINX Plus CA certificate file location needed to call the NGINX Plus API if SSL is enabled.", + ) + fs.StringSlice( NginxExcludeLogsKey, []string{}, "A comma-separated list of one or more NGINX log paths that you want to exclude from metrics "+ @@ -786,6 +792,7 @@ func resolveDataPlaneConfig() *DataPlaneConfig { ReloadMonitoringPeriod: viperInstance.GetDuration(NginxReloadMonitoringPeriodKey), TreatWarningsAsErrors: viperInstance.GetBool(NginxTreatWarningsAsErrorsKey), ExcludeLogs: viperInstance.GetStringSlice(NginxExcludeLogsKey), + ApiTls: TLSConfig{Ca: viperInstance.GetString(NginxApiTlsCa)}, }, } } diff --git a/internal/config/defaults.go b/internal/config/defaults.go index 6c4a1ab3d..b6aed8905 100644 --- a/internal/config/defaults.go +++ b/internal/config/defaults.go @@ -14,6 +14,7 @@ const ( DefGracefulShutdownPeriod = 5 * time.Second DefNginxReloadMonitoringPeriod = 10 * time.Second DefTreatErrorsAsWarnings = false + DefNginxApiTlsCa = "" DefCommandServerHostKey = "" DefCommandServerPortKey = 0 diff --git a/internal/config/flags.go b/internal/config/flags.go index 58a2d8454..24e6a340d 100644 --- a/internal/config/flags.go +++ b/internal/config/flags.go @@ -118,6 +118,7 @@ var ( NginxReloadMonitoringPeriodKey = pre(DataPlaneConfigRootKey, "nginx") + "reload_monitoring_period" NginxTreatWarningsAsErrorsKey = pre(DataPlaneConfigRootKey, "nginx") + "treat_warnings_as_errors" NginxExcludeLogsKey = pre(DataPlaneConfigRootKey, "nginx") + "exclude_logs" + NginxApiTlsCa = pre(DataPlaneConfigRootKey, "nginx") + "api_tls_ca" FileWatcherMonitoringFrequencyKey = pre(FileWatcherKey) + "monitoring_frequency" NginxExcludeFilesKey = pre(FileWatcherKey) + "exclude_files" diff --git a/internal/config/types.go b/internal/config/types.go index ae0558045..95755a4e1 100644 --- a/internal/config/types.go +++ b/internal/config/types.go @@ -61,6 +61,7 @@ type ( } NginxDataPlaneConfig struct { + ApiTls TLSConfig `yaml:"api_tls" mapstructure:"api_tls"` ExcludeLogs []string `yaml:"exclude_logs" mapstructure:"exclude_logs"` ReloadMonitoringPeriod time.Duration `yaml:"reload_monitoring_period" mapstructure:"reload_monitoring_period"` TreatWarningsAsErrors bool `yaml:"treat_warnings_as_errors" mapstructure:"treat_warnings_as_errors"` @@ -230,6 +231,7 @@ type ( URL string `yaml:"url" mapstructure:"url"` Listen string `yaml:"listen" mapstructure:"listen"` Location string `yaml:"location" mapstructure:"location"` + Ca string `yaml:"ca" mapstructure:"ca"` } AccessLog struct { diff --git a/internal/datasource/config/nginx_config_parser.go b/internal/datasource/config/nginx_config_parser.go index 536751948..c729fcfe8 100644 --- a/internal/datasource/config/nginx_config_parser.go +++ b/internal/datasource/config/nginx_config_parser.go @@ -7,6 +7,8 @@ package config import ( "context" + "crypto/tls" + "crypto/x509" "encoding/json" "fmt" "io" @@ -575,7 +577,11 @@ func (ncp *NginxConfigParser) apiCallback(ctx context.Context, parent, func (ncp *NginxConfigParser) pingAPIEndpoint(ctx context.Context, statusAPIDetail *model.APIDetails, apiType string, ) bool { - httpClient := http.DefaultClient + httpClient, err := ncp.prepareHTTPClient(ctx) + if err != nil { + slog.ErrorContext(ctx, "Failed to prepare HTTP client", "error", err) + return false + } listen := statusAPIDetail.Listen statusAPI := statusAPIDetail.URL @@ -641,6 +647,13 @@ func (ncp *NginxConfigParser) urlsForLocationDirectiveAPIDetails( locationDirectiveName string, ) []*model.APIDetails { var urls []*model.APIDetails + // Check if SSL is enabled in the server block + isSSL := ncp.isSSLEnabled(parent) + caCertLocation := "" + // If SSl is enabled, check if CA cert is provided and the location is allowed + if isSSL { + caCertLocation = ncp.getCACertLocation() + } // process from the location block if current.Directive != locationDirective { return urls @@ -668,12 +681,15 @@ func (ncp *NginxConfigParser) urlsForLocationDirectiveAPIDetails( URL: fmt.Sprintf(format, path), Listen: address, Location: path, + Ca: caCertLocation, }) } else { urls = append(urls, &model.APIDetails{ - URL: fmt.Sprintf(apiFormat, address, path), + URL: fmt.Sprintf("%s://%s%s", map[bool]string{true: "https", false: "http"}[isSSL], + address, path), Listen: address, Location: path, + Ca: caCertLocation, }) } } @@ -773,6 +789,37 @@ func (ncp *NginxConfigParser) isPort(value string) bool { return err == nil && port >= 1 && port <= 65535 } +// checks if any of the arguments contain "ssl". +func (ncp *NginxConfigParser) hasSSLArgument(args []string) bool { + for i := 1; i < len(args); i++ { + if args[i] == "ssl" { + return true + } + } + + return false +} + +// checks if a directive is a listen directive with ssl enabled. +func (ncp *NginxConfigParser) isSSLListenDirective(dir *crossplane.Directive) bool { + return dir.Directive == "listen" && ncp.hasSSLArgument(dir.Args) +} + +// checks if SSL is enabled for a given server block. +func (ncp *NginxConfigParser) isSSLEnabled(serverBlock *crossplane.Directive) bool { + if serverBlock == nil { + return false + } + + for _, dir := range serverBlock.Block { + if ncp.isSSLListenDirective(dir) { + return true + } + } + + return false +} + func (ncp *NginxConfigParser) socketClient(socketPath string) *http.Client { return &http.Client{ Timeout: ncp.agentConfig.Client.Grpc.KeepAlive.Timeout, @@ -784,6 +831,47 @@ func (ncp *NginxConfigParser) socketClient(socketPath string) *http.Client { } } +// prepareHTTPClient handles TLS config +func (ncp *NginxConfigParser) prepareHTTPClient(ctx context.Context) (*http.Client, error) { + httpClient := http.DefaultClient + caCertLocation := ncp.agentConfig.DataPlaneConfig.Nginx.ApiTls.Ca + + if caCertLocation != "" && ncp.agentConfig.IsDirectoryAllowed(caCertLocation) { + slog.DebugContext(ctx, "Reading from Location for Ca Cert : ", "cacertlocation", caCertLocation) + caCert, err := os.ReadFile(caCertLocation) + if err != nil { + slog.ErrorContext(ctx, "Failed to read CA certificate", "error", err) + return nil, err + } + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + + httpClient = &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{ + RootCAs: caCertPool, + MinVersion: tls.VersionTLS13, + }, + }, + } + } + + return httpClient, nil +} + +// Populate the CA cert location based ondirectory allowance. +func (ncp *NginxConfigParser) getCACertLocation() string { + caCertLocation := ncp.agentConfig.DataPlaneConfig.Nginx.ApiTls.Ca + + if caCertLocation != "" && !ncp.agentConfig.IsDirectoryAllowed(caCertLocation) { + // If SSL is enabled but CA cert is provided and not allowed, treat it as if no CA cert + slog.Warn("CA certificate location is not allowed, treating as if no CA cert provided.") + return "" + } + + return caCertLocation +} + func (ncp *NginxConfigParser) isDuplicateFile(nginxConfigContextFiles []*mpi.File, newFile *mpi.File) bool { for _, nginxConfigContextFile := range nginxConfigContextFiles { if nginxConfigContextFile.GetFileMeta().GetName() == newFile.GetFileMeta().GetName() { diff --git a/internal/model/config.go b/internal/model/config.go index 67dea7449..e4d3ba3e9 100644 --- a/internal/model/config.go +++ b/internal/model/config.go @@ -26,6 +26,7 @@ type APIDetails struct { URL string Listen string Location string + Ca string } type ManifestFile struct { diff --git a/internal/resource/resource_service.go b/internal/resource/resource_service.go index ed41c48a2..5d57d970d 100644 --- a/internal/resource/resource_service.go +++ b/internal/resource/resource_service.go @@ -7,12 +7,15 @@ package resource import ( "context" + "crypto/tls" + "crypto/x509" "encoding/json" "errors" "fmt" "log/slog" "net" "net/http" + "os" "strings" "sync" @@ -348,6 +351,26 @@ func (r *ResourceService) createPlusClient(instance *mpi.Instance) (*client.Ngin } httpClient := http.DefaultClient + caCertLocation := plusAPI.GetCa() + if caCertLocation != "" { + slog.Debug("Reading from Location for Ca Cert : ", "cacertlocation", caCertLocation) + caCert, err := os.ReadFile(caCertLocation) + if err != nil { + slog.Error("Unable to Create NGINX Plus client. Failed to read CA certificate : ", "err", err) + return nil, err + } + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + + httpClient = &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{ + RootCAs: caCertPool, + MinVersion: tls.VersionTLS13, + }, + }, + } + } if strings.HasPrefix(plusAPI.GetListen(), "unix:") { httpClient = socketClient(strings.TrimPrefix(plusAPI.GetListen(), "unix:")) } diff --git a/test/config/collector/test-opentelemetry-collector-agent.yaml b/test/config/collector/test-opentelemetry-collector-agent.yaml index 9a17adc3f..46625a905 100644 --- a/test/config/collector/test-opentelemetry-collector-agent.yaml +++ b/test/config/collector/test-opentelemetry-collector-agent.yaml @@ -31,6 +31,7 @@ receivers: url: "http://localhost:80/status" listen: "" location: "" + ca: "" collection_interval: 30s access_logs: - log_format: "$remote_addr - $remote_user [$time_local] \"$request\" $status $body_bytes_sent \"$http_referer\" \"$http_user_agent\" \"$http_x_forwarded_for\"\"$upstream_cache_status\"" From 06e701d38d948a6ca3dd1377cb5bdc5e72cf6210 Mon Sep 17 00:00:00 2001 From: Akshay Chawla Date: Thu, 17 Jul 2025 16:54:58 +0100 Subject: [PATCH 03/13] fixing otel collector config --- internal/collector/otelcol.tmpl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/collector/otelcol.tmpl b/internal/collector/otelcol.tmpl index b5d3af372..36edcea84 100644 --- a/internal/collector/otelcol.tmpl +++ b/internal/collector/otelcol.tmpl @@ -99,7 +99,7 @@ receivers: url: "{{- .PlusAPI.URL -}}" listen: "{{- .PlusAPI.Listen -}}" location: "{{- .PlusAPI.Location -}}" - ca: "{{- .StubStatus.Ca -}}" + ca: "{{- .PlusAPI.Ca -}}" {{- if .CollectionInterval }} collection_interval: {{ .CollectionInterval }} {{- end }} From 961954aaa48b5cacc00f084f25d9f90f1f3f759d Mon Sep 17 00:00:00 2001 From: Akshay Chawla Date: Thu, 17 Jul 2025 17:25:49 +0100 Subject: [PATCH 04/13] added unit test to increase test coverage --- .../stub_status_scraper_tls_test.go | 179 ++++++++++++++++++ internal/resource/resource_service_test.go | 34 +++- 2 files changed, 211 insertions(+), 2 deletions(-) create mode 100644 internal/collector/nginxossreceiver/internal/scraper/stubstatus/stub_status_scraper_tls_test.go diff --git a/internal/collector/nginxossreceiver/internal/scraper/stubstatus/stub_status_scraper_tls_test.go b/internal/collector/nginxossreceiver/internal/scraper/stubstatus/stub_status_scraper_tls_test.go new file mode 100644 index 000000000..0126641c2 --- /dev/null +++ b/internal/collector/nginxossreceiver/internal/scraper/stubstatus/stub_status_scraper_tls_test.go @@ -0,0 +1,179 @@ +// Copyright (c) F5, Inc. +// +// This source code is licensed under the Apache License, Version 2.0 license found in the +// LICENSE file in the root directory of this source tree. + +package stubstatus + +import ( + "context" + "crypto/rand" + "crypto/rsa" + "crypto/tls" + "crypto/x509" + "crypto/x509/pkix" + "encoding/pem" + "math/big" + "net" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/receiver/receivertest" + + "github.com/nginx/agent/v3/internal/collector/nginxossreceiver/internal/config" +) + +func TestStubStatusScraperTLS(t *testing.T) { + // Create a test CA certificate and key + ca := &x509.Certificate{ + SerialNumber: big.NewInt(1), + Subject: pkix.Name{ + Organization: []string{"NGINX Agent Test CA"}, + }, + NotBefore: time.Now(), + NotAfter: time.Now().AddDate(10, 0, 0), + KeyUsage: x509.KeyUsageCertSign | x509.KeyUsageDigitalSignature, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth}, + BasicConstraintsValid: true, + IsCA: true, + } + + caPrivKey, err := rsa.GenerateKey(rand.Reader, 2048) + require.NoError(t, err) + + caBytes, err := x509.CreateCertificate(rand.Reader, ca, ca, &caPrivKey.PublicKey, caPrivKey) + require.NoError(t, err) + + // Create a test server certificate signed by the CA + cert := &x509.Certificate{ + SerialNumber: big.NewInt(2), + Subject: pkix.Name{ + Organization: []string{"NGINX Agent Test"}, + }, + NotBefore: time.Now(), + NotAfter: time.Now().AddDate(10, 0, 0), + SubjectKeyId: []byte{1, 2, 3, 4, 6}, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth}, + KeyUsage: x509.KeyUsageDigitalSignature, + IPAddresses: []net.IP{net.IPv4(127, 0, 0, 1)}, + DNSNames: []string{"localhost"}, + } + + certPrivKey, err := rsa.GenerateKey(rand.Reader, 2048) + require.NoError(t, err) + + certBytes, err := x509.CreateCertificate(rand.Reader, cert, ca, &certPrivKey.PublicKey, caPrivKey) + require.NoError(t, err) + + // Create a temporary directory for test files + tempDir := t.TempDir() + + // Save CA certificate to a file + caFile := filepath.Join(tempDir, "ca.crt") + caPEM := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: caBytes}) + err = os.WriteFile(caFile, caPEM, 0o600) + require.NoError(t, err) + + // Create a TLS config for the server + serverTLSConfig := &tls.Config{ + Certificates: []tls.Certificate{ + { + Certificate: [][]byte{certBytes}, + PrivateKey: certPrivKey, + }, + }, + } + + // Create a test server with TLS + server := httptest.NewUnstartedServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + if req.URL.Path == "/status" { + rw.WriteHeader(http.StatusOK) + _, _ = rw.Write([]byte(`Active connections: 291 +server accepts handled requests + 16630948 16630946 31070465 +Reading: 6 Writing: 179 Waiting: 106 +`)) + return + } + rw.WriteHeader(http.StatusNotFound) + })) + + server.TLS = serverTLSConfig + server.StartTLS() + defer server.Close() + + // Test with TLS configuration + t.Run("with TLS CA", func(t *testing.T) { + cfg, ok := config.CreateDefaultConfig().(*config.Config) + require.True(t, ok) + + cfg.APIDetails.URL = server.URL + "/status" + cfg.APIDetails.Ca = caFile + + scraper := NewScraper(receivertest.NewNopSettings(component.Type{}), cfg) + + err := scraper.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + + _, err = scraper.Scrape(context.Background()) + assert.NoError(t, err) + }) +} + +func TestStubStatusScraperUnixSocket(t *testing.T) { + tempDir, err := os.MkdirTemp("", "TestStubStatusScraperUnixSocket") + require.NoError(t, err) + t.Cleanup(func() { os.RemoveAll(tempDir) }) + socketPath := filepath.Join(tempDir, "nginx.sock") + + // Create a Unix domain socket listener + listener, err := net.Listen("unix", socketPath) + require.NoError(t, err) + defer listener.Close() + + // Start a simple HTTP server on the Unix socket + server := &http.Server{ + Handler: http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + if req.URL.Path == "/status" { + rw.WriteHeader(http.StatusOK) + _, _ = rw.Write([]byte(`Active connections: 291 +server accepts handled requests + 16630948 16630946 31070465 +Reading: 6 Writing: 179 Waiting: 106 +`)) + return + } + rw.WriteHeader(http.StatusNotFound) + }), + } + + go func() { + _ = server.Serve(listener) + }() + defer server.Close() + + // Test with Unix socket + t.Run("with Unix socket", func(t *testing.T) { + cfg, ok := config.CreateDefaultConfig().(*config.Config) + require.True(t, ok) + + cfg.APIDetails.URL = "http://unix/status" + cfg.APIDetails.Listen = "unix:" + socketPath + + scraper := NewScraper(receivertest.NewNopSettings(component.Type{}), cfg) + + err := scraper.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + + _, err = scraper.Scrape(context.Background()) + assert.NoError(t, err) + }) +} diff --git a/internal/resource/resource_service_test.go b/internal/resource/resource_service_test.go index 0123220ac..826b48187 100644 --- a/internal/resource/resource_service_test.go +++ b/internal/resource/resource_service_test.go @@ -9,6 +9,7 @@ import ( "context" "errors" "fmt" + "os" "testing" "github.com/nginx/agent/v3/internal/model" @@ -26,6 +27,7 @@ import ( "github.com/nginx/agent/v3/api/grpc/mpi/v1" "github.com/nginx/agent/v3/test/protos" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestResourceService_AddInstance(t *testing.T) { @@ -237,6 +239,15 @@ func TestResourceService_GetResource(t *testing.T) { } func TestResourceService_createPlusClient(t *testing.T) { + // Create a temporary file for testing CA certificate + tempCAFile, err := os.CreateTemp("", "test-ca.crt") + require.NoError(t, err) + defer os.Remove(tempCAFile.Name()) + + _, err = tempCAFile.Write([]byte("-----BEGIN CERTIFICATE-----\nMII...\n-----END CERTIFICATE-----")) + require.NoError(t, err) + tempCAFile.Close() + instanceWithAPI := protos.NginxPlusInstance([]string{}) instanceWithAPI.InstanceRuntime.GetNginxPlusRuntimeInfo().PlusApi = &v1.APIDetails{ Location: "/api", @@ -249,6 +260,13 @@ func TestResourceService_createPlusClient(t *testing.T) { Location: "/api", } + instanceWithCACert := protos.NginxPlusInstance([]string{}) + instanceWithCACert.InstanceRuntime.GetNginxPlusRuntimeInfo().PlusApi = &v1.APIDetails{ + Location: "/api", + Listen: "localhost:443", + Ca: tempCAFile.Name(), + } + ctx := context.Background() tests := []struct { err error @@ -266,7 +284,12 @@ func TestResourceService_createPlusClient(t *testing.T) { err: nil, }, { - name: "Test 3: Fail Creating Client - API not Configured", + name: "Test 3: Create Plus Client with CA Certificate", + instance: instanceWithCACert, + err: nil, + }, + { + name: "Test 4: Fail Creating Client - API not Configured", instance: protos.NginxPlusInstance([]string{}), err: errors.New("failed to preform API action, NGINX Plus API is not configured"), }, @@ -281,7 +304,14 @@ func TestResourceService_createPlusClient(t *testing.T) { } _, err := resourceService.createPlusClient(test.instance) - assert.Equal(tt, test.err, err) + if test.err != nil { + assert.Error(tt, err) + assert.Contains(tt, err.Error(), test.err.Error()) + } else { + assert.NoError(tt, err) + // For the CA cert test, we can't easily verify the internal http.Client configuration + // without exporting it or adding test hooks, so we'll just verify no error is returned + } }) } } From bcdcd997c74e73d11838e407ede9277695bb1ee3 Mon Sep 17 00:00:00 2001 From: Akshay Chawla Date: Thu, 17 Jul 2025 17:39:17 +0100 Subject: [PATCH 05/13] fixing lint errors --- .../stub_status_scraper_tls_test.go | 35 ++++++++++--------- internal/resource/resource_service_test.go | 19 +++++----- 2 files changed, 28 insertions(+), 26 deletions(-) diff --git a/internal/collector/nginxossreceiver/internal/scraper/stubstatus/stub_status_scraper_tls_test.go b/internal/collector/nginxossreceiver/internal/scraper/stubstatus/stub_status_scraper_tls_test.go index 0126641c2..667a45d93 100644 --- a/internal/collector/nginxossreceiver/internal/scraper/stubstatus/stub_status_scraper_tls_test.go +++ b/internal/collector/nginxossreceiver/internal/scraper/stubstatus/stub_status_scraper_tls_test.go @@ -46,11 +46,11 @@ func TestStubStatusScraperTLS(t *testing.T) { IsCA: true, } - caPrivKey, err := rsa.GenerateKey(rand.Reader, 2048) - require.NoError(t, err) + caPrivKey, caPrivKeyErr := rsa.GenerateKey(rand.Reader, 2048) + require.NoError(t, caPrivKeyErr) - caBytes, err := x509.CreateCertificate(rand.Reader, ca, ca, &caPrivKey.PublicKey, caPrivKey) - require.NoError(t, err) + caBytes, caBytesErr := x509.CreateCertificate(rand.Reader, ca, ca, &caPrivKey.PublicKey, caPrivKey) + require.NoError(t, caBytesErr) // Create a test server certificate signed by the CA cert := &x509.Certificate{ @@ -67,11 +67,11 @@ func TestStubStatusScraperTLS(t *testing.T) { DNSNames: []string{"localhost"}, } - certPrivKey, err := rsa.GenerateKey(rand.Reader, 2048) - require.NoError(t, err) + certPrivKey, certPrivKeyErr := rsa.GenerateKey(rand.Reader, 2048) + require.NoError(t, certPrivKeyErr) - certBytes, err := x509.CreateCertificate(rand.Reader, cert, ca, &certPrivKey.PublicKey, caPrivKey) - require.NoError(t, err) + certBytes, certBytesErr := x509.CreateCertificate(rand.Reader, cert, ca, &certPrivKey.PublicKey, caPrivKey) + require.NoError(t, certBytesErr) // Create a temporary directory for test files tempDir := t.TempDir() @@ -79,11 +79,12 @@ func TestStubStatusScraperTLS(t *testing.T) { // Save CA certificate to a file caFile := filepath.Join(tempDir, "ca.crt") caPEM := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: caBytes}) - err = os.WriteFile(caFile, caPEM, 0o600) - require.NoError(t, err) + writeErr := os.WriteFile(caFile, caPEM, 0o600) + require.NoError(t, writeErr) // Create a TLS config for the server serverTLSConfig := &tls.Config{ + MinVersion: tls.VersionTLS13, Certificates: []tls.Certificate{ { Certificate: [][]byte{certBytes}, @@ -101,6 +102,7 @@ server accepts handled requests 16630948 16630946 31070465 Reading: 6 Writing: 179 Waiting: 106 `)) + return } rw.WriteHeader(http.StatusNotFound) @@ -129,14 +131,14 @@ Reading: 6 Writing: 179 Waiting: 106 } func TestStubStatusScraperUnixSocket(t *testing.T) { - tempDir, err := os.MkdirTemp("", "TestStubStatusScraperUnixSocket") - require.NoError(t, err) - t.Cleanup(func() { os.RemoveAll(tempDir) }) - socketPath := filepath.Join(tempDir, "nginx.sock") + // Use a shorter path for the socket to avoid path length issues + socketPath := filepath.Join(os.TempDir(), "test-nginx.sock") + // Clean up the socket file after the test + t.Cleanup(func() { os.Remove(socketPath) }) // Create a Unix domain socket listener - listener, err := net.Listen("unix", socketPath) - require.NoError(t, err) + listener, listenErr := net.Listen("unix", socketPath) + require.NoError(t, listenErr) defer listener.Close() // Start a simple HTTP server on the Unix socket @@ -149,6 +151,7 @@ server accepts handled requests 16630948 16630946 31070465 Reading: 6 Writing: 179 Waiting: 106 `)) + return } rw.WriteHeader(http.StatusNotFound) diff --git a/internal/resource/resource_service_test.go b/internal/resource/resource_service_test.go index 826b48187..f8fe52244 100644 --- a/internal/resource/resource_service_test.go +++ b/internal/resource/resource_service_test.go @@ -10,6 +10,7 @@ import ( "errors" "fmt" "os" + "path/filepath" "testing" "github.com/nginx/agent/v3/internal/model" @@ -240,13 +241,11 @@ func TestResourceService_GetResource(t *testing.T) { func TestResourceService_createPlusClient(t *testing.T) { // Create a temporary file for testing CA certificate - tempCAFile, err := os.CreateTemp("", "test-ca.crt") - require.NoError(t, err) - defer os.Remove(tempCAFile.Name()) + tempDir := t.TempDir() + caFile := filepath.Join(tempDir, "test-ca.crt") - _, err = tempCAFile.Write([]byte("-----BEGIN CERTIFICATE-----\nMII...\n-----END CERTIFICATE-----")) + err := os.WriteFile(caFile, []byte("-----BEGIN CERTIFICATE-----\nMII...\n-----END CERTIFICATE-----"), 0o600) require.NoError(t, err) - tempCAFile.Close() instanceWithAPI := protos.NginxPlusInstance([]string{}) instanceWithAPI.InstanceRuntime.GetNginxPlusRuntimeInfo().PlusApi = &v1.APIDetails{ @@ -264,7 +263,7 @@ func TestResourceService_createPlusClient(t *testing.T) { instanceWithCACert.InstanceRuntime.GetNginxPlusRuntimeInfo().PlusApi = &v1.APIDetails{ Location: "/api", Listen: "localhost:443", - Ca: tempCAFile.Name(), + Ca: caFile, } ctx := context.Background() @@ -303,12 +302,12 @@ func TestResourceService_createPlusClient(t *testing.T) { protos.NginxPlusInstance([]string{}), } - _, err := resourceService.createPlusClient(test.instance) + _, clientErr := resourceService.createPlusClient(test.instance) if test.err != nil { - assert.Error(tt, err) - assert.Contains(tt, err.Error(), test.err.Error()) + require.Error(tt, clientErr) + assert.Contains(tt, clientErr.Error(), test.err.Error()) } else { - assert.NoError(tt, err) + require.NoError(tt, clientErr) // For the CA cert test, we can't easily verify the internal http.Client configuration // without exporting it or adding test hooks, so we'll just verify no error is returned } From 6c2f5ca1cef931e5b8151458d65b009eacdf8f98 Mon Sep 17 00:00:00 2001 From: Akshay Chawla Date: Mon, 21 Jul 2025 15:07:59 +0100 Subject: [PATCH 06/13] Changes as per PR review comments --- api/grpc/mpi/v1/command.pb.go | 2 +- api/grpc/mpi/v1/command.proto | 2 +- docs/proto/protos.md | 2 +- .../scraper/stubstatus/stub_status_scraper.go | 6 +- .../stub_status_scraper_tls_test.go | 156 +++++++----------- .../collector/nginxplusreceiver/scraper.go | 6 +- internal/config/config.go | 2 +- internal/config/types.go | 2 +- .../datasource/config/nginx_config_parser.go | 17 +- .../config/nginx_config_parser_test.go | 4 +- internal/resource/resource_service.go | 3 +- test/helpers/cert_utils.go | 3 + 12 files changed, 89 insertions(+), 116 deletions(-) diff --git a/api/grpc/mpi/v1/command.pb.go b/api/grpc/mpi/v1/command.pb.go index ed5f0b665..7c137c5e9 100644 --- a/api/grpc/mpi/v1/command.pb.go +++ b/api/grpc/mpi/v1/command.pb.go @@ -2263,7 +2263,7 @@ type APIDetails struct { Location string `protobuf:"bytes,1,opt,name=location,proto3" json:"location,omitempty"` // the API listen directive Listen string `protobuf:"bytes,2,opt,name=listen,proto3" json:"listen,omitempty"` - // the API Ca directive + // the API CA file path Ca string `protobuf:"bytes,3,opt,name=Ca,proto3" json:"Ca,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache diff --git a/api/grpc/mpi/v1/command.proto b/api/grpc/mpi/v1/command.proto index 9577b8de8..84fb6a020 100644 --- a/api/grpc/mpi/v1/command.proto +++ b/api/grpc/mpi/v1/command.proto @@ -352,7 +352,7 @@ message APIDetails { string location = 1; // the API listen directive string listen = 2; - // the API Ca directive + // the API CA file path string Ca = 3; } diff --git a/docs/proto/protos.md b/docs/proto/protos.md index a8995742c..40bdb1828 100644 --- a/docs/proto/protos.md +++ b/docs/proto/protos.md @@ -678,7 +678,7 @@ Perform an associated API action on an instance | ----- | ---- | ----- | ----------- | | location | [string](#string) | | the API location directive | | listen | [string](#string) | | the API listen directive | -| Ca | [string](#string) | | the API Ca directive | +| Ca | [string](#string) | | the API CA file path | diff --git a/internal/collector/nginxossreceiver/internal/scraper/stubstatus/stub_status_scraper.go b/internal/collector/nginxossreceiver/internal/scraper/stubstatus/stub_status_scraper.go index 06e7bfbd2..384be6ba4 100644 --- a/internal/collector/nginxossreceiver/internal/scraper/stubstatus/stub_status_scraper.go +++ b/internal/collector/nginxossreceiver/internal/scraper/stubstatus/stub_status_scraper.go @@ -68,11 +68,11 @@ func (s *NginxStubStatusScraper) Start(_ context.Context, _ component.Host) erro httpClient := http.DefaultClient caCertLocation := s.cfg.APIDetails.Ca if caCertLocation != "" { - s.settings.Logger.Debug("Reading from Location for Ca Cert : ", zap.Any(caCertLocation, caCertLocation)) + s.settings.Logger.Debug("Reading CA certificate", zap.Any("file_path", caCertLocation)) caCert, err := os.ReadFile(caCertLocation) if err != nil { - s.settings.Logger.Error("Error starting NGINX stub scraper. "+ - "Failed to read CA certificate : ", zap.Error(err)) + s.settings.Logger.Error("Error starting NGINX stub status scraper. "+ + "Failed to read CA certificate", zap.Error(err)) return nil } diff --git a/internal/collector/nginxossreceiver/internal/scraper/stubstatus/stub_status_scraper_tls_test.go b/internal/collector/nginxossreceiver/internal/scraper/stubstatus/stub_status_scraper_tls_test.go index 667a45d93..d583eafff 100644 --- a/internal/collector/nginxossreceiver/internal/scraper/stubstatus/stub_status_scraper_tls_test.go +++ b/internal/collector/nginxossreceiver/internal/scraper/stubstatus/stub_status_scraper_tls_test.go @@ -7,20 +7,13 @@ package stubstatus import ( "context" - "crypto/rand" - "crypto/rsa" "crypto/tls" "crypto/x509" - "crypto/x509/pkix" - "encoding/pem" - "math/big" "net" "net/http" "net/http/httptest" "os" - "path/filepath" "testing" - "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -29,71 +22,39 @@ import ( "go.opentelemetry.io/collector/receiver/receivertest" "github.com/nginx/agent/v3/internal/collector/nginxossreceiver/internal/config" + "github.com/nginx/agent/v3/test/helpers" ) func TestStubStatusScraperTLS(t *testing.T) { - // Create a test CA certificate and key - ca := &x509.Certificate{ - SerialNumber: big.NewInt(1), - Subject: pkix.Name{ - Organization: []string{"NGINX Agent Test CA"}, - }, - NotBefore: time.Now(), - NotAfter: time.Now().AddDate(10, 0, 0), - KeyUsage: x509.KeyUsageCertSign | x509.KeyUsageDigitalSignature, - ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth}, - BasicConstraintsValid: true, - IsCA: true, - } - - caPrivKey, caPrivKeyErr := rsa.GenerateKey(rand.Reader, 2048) - require.NoError(t, caPrivKeyErr) - - caBytes, caBytesErr := x509.CreateCertificate(rand.Reader, ca, ca, &caPrivKey.PublicKey, caPrivKey) - require.NoError(t, caBytesErr) - - // Create a test server certificate signed by the CA - cert := &x509.Certificate{ - SerialNumber: big.NewInt(2), - Subject: pkix.Name{ - Organization: []string{"NGINX Agent Test"}, - }, - NotBefore: time.Now(), - NotAfter: time.Now().AddDate(10, 0, 0), - SubjectKeyId: []byte{1, 2, 3, 4, 6}, - ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth}, - KeyUsage: x509.KeyUsageDigitalSignature, - IPAddresses: []net.IP{net.IPv4(127, 0, 0, 1)}, - DNSNames: []string{"localhost"}, - } - - certPrivKey, certPrivKeyErr := rsa.GenerateKey(rand.Reader, 2048) - require.NoError(t, certPrivKeyErr) - - certBytes, certBytesErr := x509.CreateCertificate(rand.Reader, cert, ca, &certPrivKey.PublicKey, caPrivKey) - require.NoError(t, certBytesErr) + // Generate self-signed certificate using helper + keyBytes, certBytes := helpers.GenerateSelfSignedCert(t) // Create a temporary directory for test files tempDir := t.TempDir() - // Save CA certificate to a file - caFile := filepath.Join(tempDir, "ca.crt") - caPEM := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: caBytes}) - writeErr := os.WriteFile(caFile, caPEM, 0o600) - require.NoError(t, writeErr) + // Save certificate to a file + certFile := helpers.WriteCertFiles(t, tempDir, helpers.Cert{ + Name: "server.crt", + Type: "CERTIFICATE", + Contents: certBytes, + }) + + // Parse the private key + key, err := x509.ParsePKCS1PrivateKey(keyBytes) + require.NoError(t, err) + + // Create a TLS config with our self-signed certificate + tlsCert := tls.Certificate{ + Certificate: [][]byte{certBytes}, + PrivateKey: key, + } - // Create a TLS config for the server serverTLSConfig := &tls.Config{ - MinVersion: tls.VersionTLS13, - Certificates: []tls.Certificate{ - { - Certificate: [][]byte{certBytes}, - PrivateKey: certPrivKey, - }, - }, + MinVersion: tls.VersionTLS13, + Certificates: []tls.Certificate{tlsCert}, } - // Create a test server with TLS + // Create a test server with our custom TLS config server := httptest.NewUnstartedServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { if req.URL.Path == "/status" { rw.WriteHeader(http.StatusOK) @@ -112,56 +73,65 @@ Reading: 6 Writing: 179 Waiting: 106 server.StartTLS() defer server.Close() - // Test with TLS configuration - t.Run("with TLS CA", func(t *testing.T) { + // Test with TLS configuration using our self-signed certificate + t.Run("with self-signed TLS", func(t *testing.T) { cfg, ok := config.CreateDefaultConfig().(*config.Config) require.True(t, ok) cfg.APIDetails.URL = server.URL + "/status" - cfg.APIDetails.Ca = caFile + // Use the self-signed certificate for verification + cfg.APIDetails.Ca = certFile scraper := NewScraper(receivertest.NewNopSettings(component.Type{}), cfg) - err := scraper.Start(context.Background(), componenttest.NewNopHost()) - require.NoError(t, err) + startErr := scraper.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, startErr) _, err = scraper.Scrape(context.Background()) - assert.NoError(t, err) + assert.NoError(t, err, "Scraping with self-signed certificate should succeed") }) } func TestStubStatusScraperUnixSocket(t *testing.T) { - // Use a shorter path for the socket to avoid path length issues - socketPath := filepath.Join(os.TempDir(), "test-nginx.sock") - // Clean up the socket file after the test - t.Cleanup(func() { os.Remove(socketPath) }) - - // Create a Unix domain socket listener - listener, listenErr := net.Listen("unix", socketPath) - require.NoError(t, listenErr) - defer listener.Close() - - // Start a simple HTTP server on the Unix socket - server := &http.Server{ - Handler: http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { - if req.URL.Path == "/status" { - rw.WriteHeader(http.StatusOK) - _, _ = rw.Write([]byte(`Active connections: 291 + // Create a test server with a Unix domain socket + handler := http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + if req.URL.Path == "/status" { + rw.WriteHeader(http.StatusOK) + _, _ = rw.Write([]byte(`Active connections: 291 server accepts handled requests 16630948 16630946 31070465 Reading: 6 Writing: 179 Waiting: 106 `)) - return - } - rw.WriteHeader(http.StatusNotFound) - }), + return + } + rw.WriteHeader(http.StatusNotFound) + }) + + // Create a socket file in a temporary directory with a shorter path + socketPath := "/tmp/nginx-test.sock" + + // Clean up any existing socket file + os.Remove(socketPath) + + // Create a listener for the Unix socket + listener, err := net.Listen("unix", socketPath) + require.NoError(t, err, "Failed to create Unix socket listener") + + // Create a test server with our custom listener + server := &httptest.Server{ + Listener: listener, + Config: &http.Server{Handler: handler}, } - go func() { - _ = server.Serve(listener) - }() - defer server.Close() + // Start the server + server.Start() + + // Ensure cleanup of the socket file + t.Cleanup(func() { + server.Close() + os.Remove(socketPath) + }) // Test with Unix socket t.Run("with Unix socket", func(t *testing.T) { @@ -173,8 +143,8 @@ Reading: 6 Writing: 179 Waiting: 106 scraper := NewScraper(receivertest.NewNopSettings(component.Type{}), cfg) - err := scraper.Start(context.Background(), componenttest.NewNopHost()) - require.NoError(t, err) + startErr := scraper.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, startErr) _, err = scraper.Scrape(context.Background()) assert.NoError(t, err) diff --git a/internal/collector/nginxplusreceiver/scraper.go b/internal/collector/nginxplusreceiver/scraper.go index cf0c577f2..9f11b607c 100644 --- a/internal/collector/nginxplusreceiver/scraper.go +++ b/internal/collector/nginxplusreceiver/scraper.go @@ -87,10 +87,12 @@ func (nps *NginxPlusScraper) Start(_ context.Context, _ component.Host) error { httpClient := http.DefaultClient caCertLocation := nps.cfg.APIDetails.Ca if caCertLocation != "" { - nps.logger.Debug("Reading from Location for Ca Cert : ", zap.Any(caCertLocation, caCertLocation)) + nps.logger.Debug("Reading CA certificate", zap.Any("file_path", caCertLocation)) caCert, err := os.ReadFile(caCertLocation) if err != nil { - nps.logger.Error("Unable to start NGINX Plus scraper. Failed to read CA certificate: %v", zap.Error(err)) + nps.logger.Error("Error starting NGINX stub status scraper. "+ + "Failed to read CA certificate", zap.Error(err)) + return err } caCertPool := x509.NewCertPool() diff --git a/internal/config/config.go b/internal/config/config.go index 194206404..366daac2b 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -792,7 +792,7 @@ func resolveDataPlaneConfig() *DataPlaneConfig { ReloadMonitoringPeriod: viperInstance.GetDuration(NginxReloadMonitoringPeriodKey), TreatWarningsAsErrors: viperInstance.GetBool(NginxTreatWarningsAsErrorsKey), ExcludeLogs: viperInstance.GetStringSlice(NginxExcludeLogsKey), - ApiTls: TLSConfig{Ca: viperInstance.GetString(NginxApiTlsCa)}, + APITls: TLSConfig{Ca: viperInstance.GetString(NginxApiTlsCa)}, }, } } diff --git a/internal/config/types.go b/internal/config/types.go index 95755a4e1..a6e25d608 100644 --- a/internal/config/types.go +++ b/internal/config/types.go @@ -61,7 +61,7 @@ type ( } NginxDataPlaneConfig struct { - ApiTls TLSConfig `yaml:"api_tls" mapstructure:"api_tls"` + APITls TLSConfig `yaml:"api_tls" mapstructure:"api_tls"` ExcludeLogs []string `yaml:"exclude_logs" mapstructure:"exclude_logs"` ReloadMonitoringPeriod time.Duration `yaml:"reload_monitoring_period" mapstructure:"reload_monitoring_period"` TreatWarningsAsErrors bool `yaml:"treat_warnings_as_errors" mapstructure:"treat_warnings_as_errors"` diff --git a/internal/datasource/config/nginx_config_parser.go b/internal/datasource/config/nginx_config_parser.go index c729fcfe8..1ee85caa0 100644 --- a/internal/datasource/config/nginx_config_parser.go +++ b/internal/datasource/config/nginx_config_parser.go @@ -554,7 +554,7 @@ func (ncp *NginxConfigParser) sslCert(ctx context.Context, file, rootDir string) func (ncp *NginxConfigParser) apiCallback(ctx context.Context, parent, current *crossplane.Directive, apiType string, ) *model.APIDetails { - urls := ncp.urlsForLocationDirectiveAPIDetails(parent, current, apiType) + urls := ncp.urlsForLocationDirectiveAPIDetails(ctx, parent, current, apiType) if len(urls) > 0 { slog.DebugContext(ctx, fmt.Sprintf("%d potential %s urls", len(urls), apiType), "urls", urls) } @@ -643,7 +643,7 @@ func (ncp *NginxConfigParser) pingAPIEndpoint(ctx context.Context, statusAPIDeta // nolint: revive func (ncp *NginxConfigParser) urlsForLocationDirectiveAPIDetails( - parent, current *crossplane.Directive, + ctx context.Context, parent, current *crossplane.Directive, locationDirectiveName string, ) []*model.APIDetails { var urls []*model.APIDetails @@ -652,7 +652,7 @@ func (ncp *NginxConfigParser) urlsForLocationDirectiveAPIDetails( caCertLocation := "" // If SSl is enabled, check if CA cert is provided and the location is allowed if isSSL { - caCertLocation = ncp.getCACertLocation() + caCertLocation = ncp.getCACertLocation(ctx) } // process from the location block if current.Directive != locationDirective { @@ -834,13 +834,12 @@ func (ncp *NginxConfigParser) socketClient(socketPath string) *http.Client { // prepareHTTPClient handles TLS config func (ncp *NginxConfigParser) prepareHTTPClient(ctx context.Context) (*http.Client, error) { httpClient := http.DefaultClient - caCertLocation := ncp.agentConfig.DataPlaneConfig.Nginx.ApiTls.Ca + caCertLocation := ncp.agentConfig.DataPlaneConfig.Nginx.APITls.Ca if caCertLocation != "" && ncp.agentConfig.IsDirectoryAllowed(caCertLocation) { - slog.DebugContext(ctx, "Reading from Location for Ca Cert : ", "cacertlocation", caCertLocation) + slog.DebugContext(ctx, "Reading CA certificate", "file_path", caCertLocation) caCert, err := os.ReadFile(caCertLocation) if err != nil { - slog.ErrorContext(ctx, "Failed to read CA certificate", "error", err) return nil, err } caCertPool := x509.NewCertPool() @@ -860,12 +859,12 @@ func (ncp *NginxConfigParser) prepareHTTPClient(ctx context.Context) (*http.Clie } // Populate the CA cert location based ondirectory allowance. -func (ncp *NginxConfigParser) getCACertLocation() string { - caCertLocation := ncp.agentConfig.DataPlaneConfig.Nginx.ApiTls.Ca +func (ncp *NginxConfigParser) getCACertLocation(ctx context.Context) string { + caCertLocation := ncp.agentConfig.DataPlaneConfig.Nginx.APITls.Ca if caCertLocation != "" && !ncp.agentConfig.IsDirectoryAllowed(caCertLocation) { // If SSL is enabled but CA cert is provided and not allowed, treat it as if no CA cert - slog.Warn("CA certificate location is not allowed, treating as if no CA cert provided.") + slog.WarnContext(ctx, "CA certificate location is not allowed, treating as if no CA cert provided.") return "" } diff --git a/internal/datasource/config/nginx_config_parser_test.go b/internal/datasource/config/nginx_config_parser_test.go index 2360de2ce..c8118a719 100644 --- a/internal/datasource/config/nginx_config_parser_test.go +++ b/internal/datasource/config/nginx_config_parser_test.go @@ -1172,9 +1172,9 @@ func TestNginxConfigParser_urlsForLocationDirective(t *testing.T) { assert.Len(t, xpConf.Parsed, 1) err = ncp.crossplaneConfigTraverse(ctx, &xpConf, func(ctx context.Context, parent, directive *crossplane.Directive) error { - _oss := ncp.urlsForLocationDirectiveAPIDetails(parent, directive, + _oss := ncp.urlsForLocationDirectiveAPIDetails(ctx, parent, directive, stubStatusAPIDirective) - _plus := ncp.urlsForLocationDirectiveAPIDetails(parent, directive, plusAPIDirective) + _plus := ncp.urlsForLocationDirectiveAPIDetails(ctx, parent, directive, plusAPIDirective) oss = append(oss, _oss...) plus = append(plus, _plus...) diff --git a/internal/resource/resource_service.go b/internal/resource/resource_service.go index 5d57d970d..792c28c7a 100644 --- a/internal/resource/resource_service.go +++ b/internal/resource/resource_service.go @@ -353,10 +353,9 @@ func (r *ResourceService) createPlusClient(instance *mpi.Instance) (*client.Ngin httpClient := http.DefaultClient caCertLocation := plusAPI.GetCa() if caCertLocation != "" { - slog.Debug("Reading from Location for Ca Cert : ", "cacertlocation", caCertLocation) + slog.Debug("Reading CA certificate", "file_path", caCertLocation) caCert, err := os.ReadFile(caCertLocation) if err != nil { - slog.Error("Unable to Create NGINX Plus client. Failed to read CA certificate : ", "err", err) return nil, err } caCertPool := x509.NewCertPool() diff --git a/test/helpers/cert_utils.go b/test/helpers/cert_utils.go index f4516349c..3b9e249e4 100644 --- a/test/helpers/cert_utils.go +++ b/test/helpers/cert_utils.go @@ -12,6 +12,7 @@ import ( "crypto/x509/pkix" "encoding/pem" "math/big" + "net" "os" "path" "testing" @@ -55,6 +56,8 @@ func GenerateSelfSignedCert(t testing.TB) (keyBytes, certBytes []byte) { ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth}, BasicConstraintsValid: true, IsCA: true, + IPAddresses: []net.IP{net.ParseIP("127.0.0.1"), net.IPv6loopback}, + DNSNames: []string{"localhost"}, } certBytes, err = x509.CreateCertificate(rand.Reader, &tmpl, &tmpl, &key.PublicKey, key) if err != nil { From 4da851e01dface86618a0c63112637b337b0f6eb Mon Sep 17 00:00:00 2001 From: Akshay Chawla Date: Tue, 22 Jul 2025 16:28:42 +0100 Subject: [PATCH 07/13] addressing PR coments --- .../scraper/stubstatus/stub_status_scraper_tls_test.go | 4 ++-- internal/datasource/config/nginx_config_parser.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/collector/nginxossreceiver/internal/scraper/stubstatus/stub_status_scraper_tls_test.go b/internal/collector/nginxossreceiver/internal/scraper/stubstatus/stub_status_scraper_tls_test.go index d583eafff..b36db092c 100644 --- a/internal/collector/nginxossreceiver/internal/scraper/stubstatus/stub_status_scraper_tls_test.go +++ b/internal/collector/nginxossreceiver/internal/scraper/stubstatus/stub_status_scraper_tls_test.go @@ -74,7 +74,7 @@ Reading: 6 Writing: 179 Waiting: 106 defer server.Close() // Test with TLS configuration using our self-signed certificate - t.Run("with self-signed TLS", func(t *testing.T) { + t.Run("Test 1: self-signed TLS", func(t *testing.T) { cfg, ok := config.CreateDefaultConfig().(*config.Config) require.True(t, ok) @@ -134,7 +134,7 @@ Reading: 6 Writing: 179 Waiting: 106 }) // Test with Unix socket - t.Run("with Unix socket", func(t *testing.T) { + t.Run("Test 1: Unix socket", func(t *testing.T) { cfg, ok := config.CreateDefaultConfig().(*config.Config) require.True(t, ok) diff --git a/internal/datasource/config/nginx_config_parser.go b/internal/datasource/config/nginx_config_parser.go index 1ee85caa0..2d8f5e738 100644 --- a/internal/datasource/config/nginx_config_parser.go +++ b/internal/datasource/config/nginx_config_parser.go @@ -652,7 +652,7 @@ func (ncp *NginxConfigParser) urlsForLocationDirectiveAPIDetails( caCertLocation := "" // If SSl is enabled, check if CA cert is provided and the location is allowed if isSSL { - caCertLocation = ncp.getCACertLocation(ctx) + caCertLocation = ncp.selfSignedCACertLocation(ctx) } // process from the location block if current.Directive != locationDirective { @@ -859,7 +859,7 @@ func (ncp *NginxConfigParser) prepareHTTPClient(ctx context.Context) (*http.Clie } // Populate the CA cert location based ondirectory allowance. -func (ncp *NginxConfigParser) getCACertLocation(ctx context.Context) string { +func (ncp *NginxConfigParser) selfSignedCACertLocation(ctx context.Context) string { caCertLocation := ncp.agentConfig.DataPlaneConfig.Nginx.APITls.Ca if caCertLocation != "" && !ncp.agentConfig.IsDirectoryAllowed(caCertLocation) { From 033de017cada37674e4ace4fa6245f9ff4409854 Mon Sep 17 00:00:00 2001 From: Sean Breen <101327931+sean-breen@users.noreply.github.com> Date: Fri, 18 Jul 2025 14:57:36 +0100 Subject: [PATCH 08/13] Add NAP paths to allowed directories (#1163) * update go verion and golangci-lint * add paths for NAP upon instance discovery * use variable for NAP directory path * add paths when creating NAP instance * add nap paths by default, update agent config during upgrade * add nap by default * nap paths to default agent configuration * add back log message * only add config directory /etc/app_protect * update preinstall.sh * update default config * remove blank line --------- Co-authored-by: Aphral Griffin --- internal/config/config_test.go | 2 +- internal/config/defaults.go | 1 + .../instance/nginx-app-protect-instance-watcher_test.go | 1 - nginx-agent.conf | 1 + scripts/packages/preinstall.sh | 1 + scripts/packages/upgrade-agent-config.sh | 3 ++- 6 files changed, 6 insertions(+), 3 deletions(-) diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 6f3ad0eed..9048eb834 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -796,7 +796,7 @@ func agentConfig() *Config { }, AllowedDirectories: []string{ "/etc/nginx/", "/etc/nginx-agent/", "/usr/local/etc/nginx/", "/var/run/nginx/", "/var/log/nginx/", - "/usr/share/nginx/modules/", + "/usr/share/nginx/modules/", "/etc/app_protect/", }, Collector: &Collector{ ConfigPath: "/etc/nginx-agent/nginx-agent-otelcol.yaml", diff --git a/internal/config/defaults.go b/internal/config/defaults.go index b6aed8905..fd2b32124 100644 --- a/internal/config/defaults.go +++ b/internal/config/defaults.go @@ -107,6 +107,7 @@ func DefaultAllowedDirectories() []string { "/usr/share/nginx/modules", "/var/run/nginx", "/var/log/nginx", + "/etc/app_protect", } } diff --git a/internal/watcher/instance/nginx-app-protect-instance-watcher_test.go b/internal/watcher/instance/nginx-app-protect-instance-watcher_test.go index 48925663d..2dae2d254 100644 --- a/internal/watcher/instance/nginx-app-protect-instance-watcher_test.go +++ b/internal/watcher/instance/nginx-app-protect-instance-watcher_test.go @@ -112,7 +112,6 @@ func TestNginxAppProtectInstanceWatcher_Watch(t *testing.T) { t.Fatalf("Timed out waiting for instance updates") } }) - t.Run("Test 2: Update instance", func(t *testing.T) { _, err = enforcerEngineVersionFile.WriteAt([]byte("6.113.0"), 0) require.NoError(t, err) diff --git a/nginx-agent.conf b/nginx-agent.conf index b67980261..559754f43 100644 --- a/nginx-agent.conf +++ b/nginx-agent.conf @@ -12,6 +12,7 @@ log: allowed_directories: - /etc/nginx + - /etc/app_protect - /usr/local/etc/nginx - /usr/share/nginx/modules - /var/run/nginx diff --git a/scripts/packages/preinstall.sh b/scripts/packages/preinstall.sh index 7bcc02250..530aa2706 100644 --- a/scripts/packages/preinstall.sh +++ b/scripts/packages/preinstall.sh @@ -109,6 +109,7 @@ labels: allowed_directories="${allowed_directories}\n - ${config_dir}" done allowed_directories="${allowed_directories}\n - /var/log/nginx" + allowed_directories="${allowed_directories}\n - /etc/app_protect" echo "Writing new v3 configuration to $v3_config_file" v3_config_contents=" diff --git a/scripts/packages/upgrade-agent-config.sh b/scripts/packages/upgrade-agent-config.sh index 0a3fefe9f..4ea7f1582 100755 --- a/scripts/packages/upgrade-agent-config.sh +++ b/scripts/packages/upgrade-agent-config.sh @@ -52,7 +52,8 @@ for config_dir in $config_dirs; do done allowed_directories="${allowed_directories}\n - /var/log/nginx" - +allowed_directories="${allowed_directories}\n - /etc/app_protect" + v3_config_contents=" # # /etc/nginx-agent/nginx-agent.conf From 24abf2899c097e56dfd8a39ff466bb1ea63df35c Mon Sep 17 00:00:00 2001 From: Donal Hurley Date: Mon, 21 Jul 2025 11:11:27 +0100 Subject: [PATCH 09/13] Fix include directive parsing to handle relative paths (#1174) --- .../datasource/config/nginx_config_parser.go | 9 ++- .../config/nginx_config_parser_test.go | 60 ++++++++++++++++++- internal/watcher/file/file_watcher_service.go | 14 +++-- .../watcher/file/file_watcher_service_test.go | 4 +- 4 files changed, 74 insertions(+), 13 deletions(-) diff --git a/internal/datasource/config/nginx_config_parser.go b/internal/datasource/config/nginx_config_parser.go index 2d8f5e738..872147a30 100644 --- a/internal/datasource/config/nginx_config_parser.go +++ b/internal/datasource/config/nginx_config_parser.go @@ -144,7 +144,7 @@ func (ncp *NginxConfigParser) createNginxConfigContext( func(ctx context.Context, parent, directive *crossplane.Directive) error { switch directive.Directive { case "include": - include := ncp.parseIncludeDirective(directive) + include := ncp.parseIncludeDirective(directive, &conf) nginxConfigContext.Includes = append(nginxConfigContext.Includes, include) case "log_format": @@ -269,12 +269,15 @@ func (ncp *NginxConfigParser) findLocalSysLogServers(sysLogServer string) string return "" } -func (ncp *NginxConfigParser) parseIncludeDirective(directive *crossplane.Directive) string { +func (ncp *NginxConfigParser) parseIncludeDirective( + directive *crossplane.Directive, + configFile *crossplane.Config, +) string { var include string if filepath.IsAbs(directive.Args[0]) { include = directive.Args[0] } else { - include = filepath.Join(filepath.Dir(directive.File), directive.Args[0]) + include = filepath.Join(filepath.Dir(configFile.File), directive.Args[0]) } return include diff --git a/internal/datasource/config/nginx_config_parser_test.go b/internal/datasource/config/nginx_config_parser_test.go index c8118a719..9729266bd 100644 --- a/internal/datasource/config/nginx_config_parser_test.go +++ b/internal/datasource/config/nginx_config_parser_test.go @@ -507,7 +507,7 @@ func TestNginxConfigParser_Parse(t *testing.T) { allowedDirectories: []string{dir}, }, { - name: "Test 10: Check with multiple syslog servers", + name: "Test 10: Available NAP syslog server", instance: protos.NginxPlusInstance([]string{}), content: testconfig.NginxConfigWithMultipleSysLogs(errorLog.Name(), accessLog.Name(), "192.168.12.34:1517", "my.domain.com:1517", "127.0.0.1:1515"), @@ -521,7 +521,7 @@ func TestNginxConfigParser_Parse(t *testing.T) { expectedLog: "Found valid NAP syslog server", }, { - name: "Test 10: Check with multiple syslog servers", + name: "Test 11: Unavailable NAP syslog server", instance: protos.NginxPlusInstance([]string{}), content: testconfig.NginxConfigWithMultipleSysLogs(errorLog.Name(), accessLog.Name(), "192.168.12.34:1517", "my.domain.com:1517", "not.allowed:1515"), @@ -1521,6 +1521,62 @@ func TestNginxConfigParser_checkDuplicate(t *testing.T) { } } +func TestNginxConfigParser_parseIncludeDirective(t *testing.T) { + parser := NewNginxConfigParser(types.AgentConfig()) + + tests := []struct { + name string + confFile string + expected string + args []string + }{ + { + name: "Test 1: relative path", + args: []string{"test.conf"}, + confFile: "/etc/nginx/nginx.conf", + expected: "/etc/nginx/test.conf", + }, + { + name: "Test 2: absolute path", + args: []string{"/usr/local/nginx/conf/vhost.conf"}, + confFile: "/etc/nginx/nginx.conf", + expected: "/usr/local/nginx/conf/vhost.conf", + }, + { + name: "Test 3: wildcard", + args: []string{"/etc/nginx/conf.d/*.conf"}, + confFile: "/etc/nginx/nginx.conf", + expected: "/etc/nginx/conf.d/*.conf", + }, + { + name: "Test 4: relative path with subdirectory", + args: []string{"conf.d/default.conf"}, + confFile: "/etc/nginx/nginx.conf", + expected: "/etc/nginx/conf.d/default.conf", + }, + { + name: "Test 5: parent directory reference", + args: []string{"../sites-enabled/*.conf"}, + confFile: "/etc/nginx/conf.d/nginx.conf", + expected: "/etc/nginx/sites-enabled/*.conf", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + include := parser.parseIncludeDirective( + &crossplane.Directive{ + Args: tc.args, + }, + &crossplane.Config{ + File: tc.confFile, + }, + ) + assert.Equal(t, tc.expected, include) + }) + } +} + func protoListEqual(protoListA, protoListB []*mpi.File) bool { for i := range protoListA { res := proto.Equal(protoListA[i], protoListB[i]) diff --git a/internal/watcher/file/file_watcher_service.go b/internal/watcher/file/file_watcher_service.go index dff86b117..10e1a2470 100644 --- a/internal/watcher/file/file_watcher_service.go +++ b/internal/watcher/file/file_watcher_service.go @@ -138,8 +138,12 @@ func (fws *FileWatcherService) addWatchers(ctx context.Context) { } if !slices.Contains(fws.watcher.WatchList(), directory) { - fws.addWatcher(ctx, directory) - fws.filesChanged.Store(true) + err := fws.addWatcher(ctx, directory) + if err != nil { + slog.DebugContext(ctx, "Failed to add file watcher", "directory", directory, "error", err) + } else { + fws.filesChanged.Store(true) + } } } } @@ -208,7 +212,7 @@ func (fws *FileWatcherService) checkForUpdates(ctx context.Context, ch chan<- Fi } } -func (fws *FileWatcherService) addWatcher(ctx context.Context, directory string) { +func (fws *FileWatcherService) addWatcher(ctx context.Context, directory string) error { slog.DebugContext(ctx, "Checking if file watcher needs to be added", "directory", directory) if _, err := os.Stat(directory); errors.Is(err, os.ErrNotExist) { @@ -220,9 +224,7 @@ func (fws *FileWatcherService) addWatcher(ctx context.Context, directory string) slog.DebugContext(ctx, "Adding watcher", "directory", directory) - if err := fws.watcher.Add(directory); err != nil { - slog.WarnContext(ctx, "Failed to add file watcher", "directory", directory, "error", err) - } + return fws.watcher.Add(directory) } func (fws *FileWatcherService) removeWatcher(ctx context.Context, path string) { diff --git a/internal/watcher/file/file_watcher_service_test.go b/internal/watcher/file/file_watcher_service_test.go index 3177c7226..56c35415a 100644 --- a/internal/watcher/file/file_watcher_service_test.go +++ b/internal/watcher/file/file_watcher_service_test.go @@ -62,7 +62,7 @@ func TestFileWatcherService_addWatcher(t *testing.T) { require.NoError(t, err) defer os.Remove(testDirectory) - fileWatcherService.addWatcher(ctx, testDirectory) + require.NoError(t, fileWatcherService.addWatcher(ctx, testDirectory)) directoriesBeingWatched := fileWatcherService.watcher.WatchList() assert.Len(t, directoriesBeingWatched, 1) @@ -79,7 +79,7 @@ func TestFileWatcherService_addWatcher_Error(t *testing.T) { tempDir := t.TempDir() testDirectory := path.Join(tempDir, "test_dir") - fileWatcherService.addWatcher(ctx, testDirectory) + require.Error(t, fileWatcherService.addWatcher(ctx, testDirectory)) directoriesBeingWatched := fileWatcherService.watcher.WatchList() assert.Empty(t, directoriesBeingWatched) From 752d40f46ad70372e8921384609e953dce76513b Mon Sep 17 00:00:00 2001 From: Sean Breen <101327931+sean-breen@users.noreply.github.com> Date: Mon, 21 Jul 2025 15:20:29 +0100 Subject: [PATCH 10/13] add nap logs to default features (#1172) --- internal/config/defaults.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/config/defaults.go b/internal/config/defaults.go index fd2b32124..564f60fdb 100644 --- a/internal/config/defaults.go +++ b/internal/config/defaults.go @@ -97,6 +97,7 @@ func DefaultFeatures() []string { pkg.FeatureCertificates, pkg.FeatureMetrics, pkg.FeatureFileWatcher, + pkg.FeatureLogsNap, } } From dc2a89d68bb1db85f131569efe2c1be4fc5e9d88 Mon Sep 17 00:00:00 2001 From: Donal Hurley Date: Tue, 22 Jul 2025 13:57:37 +0100 Subject: [PATCH 11/13] Add support for configuring mutliple OTel pipelines (#1167) --- internal/collector/otel_collector_plugin.go | 91 +++---- .../collector/otel_collector_plugin_test.go | 73 ++--- internal/collector/otelcol.tmpl | 140 +++++----- internal/collector/settings.go | 63 +++-- internal/collector/settings_test.go | 44 +-- internal/config/config.go | 251 +++++++++++++----- internal/config/config_test.go | 183 +++++++++---- internal/config/defaults.go | 9 +- internal/config/flags.go | 9 +- internal/config/testdata/nginx-agent.conf | 88 +++--- internal/config/types.go | 56 ++-- internal/plugin/plugin_manager.go | 2 +- test/config/agent/nginx-agent-otel-load.conf | 23 +- .../test-opentelemetry-collector-agent.yaml | 33 +-- test/types/config.go | 31 ++- 15 files changed, 664 insertions(+), 432 deletions(-) diff --git a/internal/collector/otel_collector_plugin.go b/internal/collector/otel_collector_plugin.go index ae13b1bd6..557e0af11 100644 --- a/internal/collector/otel_collector_plugin.go +++ b/internal/collector/otel_collector_plugin.go @@ -60,7 +60,7 @@ var ( ) // NewCollector is the constructor for the Collector plugin. -func New(conf *config.Config) (*Collector, error) { +func NewCollector(conf *config.Config) (*Collector, error) { initMutex.Lock() defer initMutex.Unlock() @@ -194,7 +194,7 @@ func (oc *Collector) Subscriptions() []string { } // Process receivers and log warning for sub-optimal configurations -func (oc *Collector) processReceivers(ctx context.Context, receivers []config.OtlpReceiver) { +func (oc *Collector) processReceivers(ctx context.Context, receivers map[string]*config.OtlpReceiver) { for _, receiver := range receivers { if receiver.OtlpTLSConfig == nil { slog.WarnContext(ctx, "OTel receiver is configured without TLS. Connections are unencrypted.") @@ -317,12 +317,13 @@ func (oc *Collector) updateResourceProcessor(resourceUpdateContext *v1.Resource) resourceProcessorUpdated := false if oc.config.Collector.Processors.Resource == nil { - oc.config.Collector.Processors.Resource = &config.Resource{ + oc.config.Collector.Processors.Resource = make(map[string]*config.Resource) + oc.config.Collector.Processors.Resource["default"] = &config.Resource{ Attributes: make([]config.ResourceAttribute, 0), } } - if oc.config.Collector.Processors.Resource != nil && + if oc.config.Collector.Processors.Resource["default"] != nil && resourceUpdateContext.GetResourceId() != "" { resourceProcessorUpdated = oc.updateResourceAttributes( []config.ResourceAttribute{ @@ -432,7 +433,7 @@ func (oc *Collector) checkForNewReceivers(ctx context.Context, nginxConfigContex } if oc.config.IsFeatureEnabled(pkgConfig.FeatureLogsNap) { - tcplogReceiversFound := oc.updateTcplogReceivers(nginxConfigContext) + tcplogReceiversFound := oc.updateNginxAppProtectTcplogReceivers(nginxConfigContext) if tcplogReceiversFound { reloadCollector = true } @@ -542,44 +543,46 @@ func (oc *Collector) updateExistingNginxOSSReceiver( return nginxReceiverFound, reloadCollector } -func (oc *Collector) updateTcplogReceivers(nginxConfigContext *model.NginxConfigContext) bool { +func (oc *Collector) updateNginxAppProtectTcplogReceivers(nginxConfigContext *model.NginxConfigContext) bool { newTcplogReceiverAdded := false + + if oc.config.Collector.Receivers.TcplogReceivers == nil { + oc.config.Collector.Receivers.TcplogReceivers = make(map[string]*config.TcplogReceiver) + } + if nginxConfigContext.NAPSysLogServer != "" { if !oc.doesTcplogReceiverAlreadyExist(nginxConfigContext.NAPSysLogServer) { - oc.config.Collector.Receivers.TcplogReceivers = append( - oc.config.Collector.Receivers.TcplogReceivers, - config.TcplogReceiver{ - ListenAddress: nginxConfigContext.NAPSysLogServer, - Operators: []config.Operator{ - { - Type: "add", - Fields: map[string]string{ - "field": "body", - "value": timestampConversionExpression, - }, + oc.config.Collector.Receivers.TcplogReceivers["nginx_app_protect"] = &config.TcplogReceiver{ + ListenAddress: nginxConfigContext.NAPSysLogServer, + Operators: []config.Operator{ + { + Type: "add", + Fields: map[string]string{ + "field": "body", + "value": timestampConversionExpression, }, - { - Type: "syslog_parser", - Fields: map[string]string{ - "protocol": "rfc3164", - }, + }, + { + Type: "syslog_parser", + Fields: map[string]string{ + "protocol": "rfc3164", }, - { - Type: "remove", - Fields: map[string]string{ - "field": "attributes.message", - }, + }, + { + Type: "remove", + Fields: map[string]string{ + "field": "attributes.message", }, - { - Type: "add", - Fields: map[string]string{ - "field": "resource[\"instance.id\"]", - "value": nginxConfigContext.InstanceID, - }, + }, + { + Type: "add", + Fields: map[string]string{ + "field": "resource[\"instance.id\"]", + "value": nginxConfigContext.InstanceID, }, }, }, - ) + } newTcplogReceiverAdded = true } @@ -593,23 +596,13 @@ func (oc *Collector) updateTcplogReceivers(nginxConfigContext *model.NginxConfig func (oc *Collector) areNapReceiversDeleted(nginxConfigContext *model.NginxConfigContext) bool { listenAddressesToBeDeleted := oc.configDeletedNapReceivers(nginxConfigContext) if len(listenAddressesToBeDeleted) != 0 { - oc.deleteNapReceivers(listenAddressesToBeDeleted) + delete(oc.config.Collector.Receivers.TcplogReceivers, "nginx_app_protect") return true } return false } -func (oc *Collector) deleteNapReceivers(listenAddressesToBeDeleted map[string]bool) { - filteredReceivers := (oc.config.Collector.Receivers.TcplogReceivers)[:0] - for _, receiver := range oc.config.Collector.Receivers.TcplogReceivers { - if !listenAddressesToBeDeleted[receiver.ListenAddress] { - filteredReceivers = append(filteredReceivers, receiver) - } - } - oc.config.Collector.Receivers.TcplogReceivers = filteredReceivers -} - func (oc *Collector) configDeletedNapReceivers(nginxConfigContext *model.NginxConfigContext) map[string]bool { elements := make(map[string]bool) @@ -645,16 +638,16 @@ func (oc *Collector) updateResourceAttributes( ) (actionUpdated bool) { actionUpdated = false - if oc.config.Collector.Processors.Resource.Attributes != nil { + if oc.config.Collector.Processors.Resource["default"].Attributes != nil { OUTER: for _, toAdd := range attributesToAdd { - for _, action := range oc.config.Collector.Processors.Resource.Attributes { + for _, action := range oc.config.Collector.Processors.Resource["default"].Attributes { if action.Key == toAdd.Key { continue OUTER } } - oc.config.Collector.Processors.Resource.Attributes = append( - oc.config.Collector.Processors.Resource.Attributes, + oc.config.Collector.Processors.Resource["default"].Attributes = append( + oc.config.Collector.Processors.Resource["default"].Attributes, toAdd, ) actionUpdated = true diff --git a/internal/collector/otel_collector_plugin_test.go b/internal/collector/otel_collector_plugin_test.go index 6073951a1..2fa51cef4 100644 --- a/internal/collector/otel_collector_plugin_test.go +++ b/internal/collector/otel_collector_plugin_test.go @@ -67,7 +67,7 @@ func TestCollector_New(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - collector, err := New(tt.config) + collector, err := NewCollector(tt.config) if tt.expectedError != nil { require.Error(t, err) @@ -114,7 +114,7 @@ func TestCollector_Init(t *testing.T) { conf.Collector.Receivers = config.Receivers{} } - collector, err = New(conf) + collector, err = NewCollector(conf) require.NoError(t, err, "NewCollector should not return an error with valid config") collector.service = createFakeCollector() @@ -133,7 +133,7 @@ func TestCollector_InitAndClose(t *testing.T) { conf := types.OTelConfig(t) conf.Collector.Log.Path = "" - collector, err := New(conf) + collector, err := NewCollector(conf) require.NoError(t, err, "NewCollector should not return an error with valid config") ctx := context.Background() @@ -293,7 +293,7 @@ func TestCollector_ProcessNginxConfigUpdateTopic(t *testing.T) { conf.Collector.Extensions.HeadersSetter = nil conf.Collector.Exporters.PrometheusExporter = nil - collector, err := New(conf) + collector, err := NewCollector(conf) require.NoError(tt, err, "NewCollector should not return an error with valid config") collector.service = createFakeCollector() @@ -349,12 +349,14 @@ func TestCollector_ProcessResourceUpdateTopic(t *testing.T) { Data: protos.HostResource(), }, processors: config.Processors{ - Resource: &config.Resource{ - Attributes: []config.ResourceAttribute{ - { - Key: "resource.id", - Action: "insert", - Value: "1234", + Resource: map[string]*config.Resource{ + "default": { + Attributes: []config.ResourceAttribute{ + { + Key: "resource.id", + Action: "insert", + Value: "1234", + }, }, }, }, @@ -376,7 +378,7 @@ func TestCollector_ProcessResourceUpdateTopic(t *testing.T) { for _, test := range tests { t.Run(test.name, func(tt *testing.T) { - collector, err := New(conf) + collector, err := NewCollector(conf) require.NoError(tt, err, "NewCollector should not return an error with valid config") collector.service = createFakeCollector() @@ -437,7 +439,7 @@ func TestCollector_ProcessResourceUpdateTopicFails(t *testing.T) { for _, test := range tests { t.Run(test.name, func(tt *testing.T) { - collector, err := New(conf) + collector, err := NewCollector(conf) require.NoError(tt, err, "NewCollector should not return an error with valid config") collector.service = createFakeCollector() @@ -559,7 +561,7 @@ func TestCollector_updateExistingNginxOSSReceiver(t *testing.T) { for _, test := range tests { t.Run(test.name, func(tt *testing.T) { conf.Collector.Receivers = test.existingReceivers - collector, err := New(conf) + collector, err := NewCollector(conf) require.NoError(tt, err, "NewCollector should not return an error with valid config") collector.service = createFakeCollector() @@ -650,7 +652,7 @@ func TestCollector_updateExistingNginxPlusReceiver(t *testing.T) { for _, test := range tests { t.Run(test.name, func(tt *testing.T) { conf.Collector.Receivers = test.existingReceivers - collector, err := New(conf) + collector, err := NewCollector(conf) require.NoError(tt, err, "NewCollector should not return an error with valid config") collector.service = createFakeCollector() @@ -705,31 +707,32 @@ func TestCollector_updateResourceAttributes(t *testing.T) { for _, test := range tests { t.Run(test.name, func(tt *testing.T) { - collector, err := New(conf) + collector, err := NewCollector(conf) require.NoError(tt, err, "NewCollector should not return an error with valid config") collector.service = createFakeCollector() // set up Actions - conf.Collector.Processors.Resource = &config.Resource{Attributes: test.setup} + conf.Collector.Processors.Resource = make(map[string]*config.Resource) + conf.Collector.Processors.Resource["default"] = &config.Resource{Attributes: test.setup} reloadRequired := collector.updateResourceAttributes(test.attributes) assert.Equal(tt, test.expectedAttribs, - conf.Collector.Processors.Resource.Attributes) + conf.Collector.Processors.Resource["default"].Attributes) assert.Equal(tt, test.expectedReloadRequired, reloadRequired) }) } } -func TestCollector_updateTcplogReceivers(t *testing.T) { +func TestCollector_updateNginxAppProtectTcplogReceivers(t *testing.T) { conf := types.OTelConfig(t) conf.Collector.Log.Path = "" conf.Collector.Processors.Batch = nil conf.Collector.Processors.Attribute = nil conf.Collector.Processors.Resource = nil conf.Collector.Processors.LogsGzip = nil - collector, err := New(conf) + collector, err := NewCollector(conf) require.NoError(t, err) nginxConfigContext := &model.NginxConfigContext{ @@ -738,38 +741,42 @@ func TestCollector_updateTcplogReceivers(t *testing.T) { assert.Empty(t, conf.Collector.Receivers.TcplogReceivers) - t.Run("Test 1: New TcplogReceiver added", func(tt *testing.T) { - tcplogReceiverAdded := collector.updateTcplogReceivers(nginxConfigContext) + t.Run("Test 1: NewCollector TcplogReceiver added", func(tt *testing.T) { + tcplogReceiverAdded := collector.updateNginxAppProtectTcplogReceivers(nginxConfigContext) assert.True(tt, tcplogReceiverAdded) assert.Len(tt, conf.Collector.Receivers.TcplogReceivers, 1) - assert.Equal(tt, "localhost:151", conf.Collector.Receivers.TcplogReceivers[0].ListenAddress) - assert.Len(tt, conf.Collector.Receivers.TcplogReceivers[0].Operators, 4) + assert.Equal(tt, "localhost:151", conf.Collector.Receivers.TcplogReceivers["nginx_app_protect"].ListenAddress) + assert.Len(tt, conf.Collector.Receivers.TcplogReceivers["nginx_app_protect"].Operators, 4) }) - // Calling updateTcplogReceivers shouldn't update the TcplogReceivers slice + // Calling updateNginxAppProtectTcplogReceivers shouldn't update the TcplogReceivers slice // since there is already a receiver with the same ListenAddress t.Run("Test 2: TcplogReceiver already exists", func(tt *testing.T) { - tcplogReceiverAdded := collector.updateTcplogReceivers(nginxConfigContext) + tcplogReceiverAdded := collector.updateNginxAppProtectTcplogReceivers(nginxConfigContext) assert.False(t, tcplogReceiverAdded) assert.Len(t, conf.Collector.Receivers.TcplogReceivers, 1) - assert.Equal(t, "localhost:151", conf.Collector.Receivers.TcplogReceivers[0].ListenAddress) - assert.Len(t, conf.Collector.Receivers.TcplogReceivers[0].Operators, 4) + assert.Equal(t, "localhost:151", conf.Collector.Receivers.TcplogReceivers["nginx_app_protect"].ListenAddress) + assert.Len(t, conf.Collector.Receivers.TcplogReceivers["nginx_app_protect"].Operators, 4) }) t.Run("Test 3: TcplogReceiver deleted", func(tt *testing.T) { - tcplogReceiverDeleted := collector.updateTcplogReceivers(&model.NginxConfigContext{}) + tcplogReceiverDeleted := collector.updateNginxAppProtectTcplogReceivers(&model.NginxConfigContext{}) assert.True(t, tcplogReceiverDeleted) assert.Empty(t, conf.Collector.Receivers.TcplogReceivers) }) - t.Run("Test 4: New tcplogReceiver added and deleted another", func(tt *testing.T) { - tcplogReceiverDeleted := collector. - updateTcplogReceivers(&model.NginxConfigContext{NAPSysLogServer: "localhost:152"}) + t.Run("Test 4: NewCollector tcplogReceiver added and deleted another", func(tt *testing.T) { + tcplogReceiverDeleted := collector.updateNginxAppProtectTcplogReceivers( + &model.NginxConfigContext{ + NAPSysLogServer: "localhost:152", + }, + ) + assert.True(t, tcplogReceiverDeleted) assert.Len(t, conf.Collector.Receivers.TcplogReceivers, 1) - assert.Equal(t, "localhost:152", conf.Collector.Receivers.TcplogReceivers[0].ListenAddress) - assert.Len(t, conf.Collector.Receivers.TcplogReceivers[0].Operators, 4) + assert.Equal(t, "localhost:152", conf.Collector.Receivers.TcplogReceivers["nginx_app_protect"].ListenAddress) + assert.Len(t, conf.Collector.Receivers.TcplogReceivers["nginx_app_protect"].Operators, 4) }) } diff --git a/internal/collector/otelcol.tmpl b/internal/collector/otelcol.tmpl index 36edcea84..038a936e4 100644 --- a/internal/collector/otelcol.tmpl +++ b/internal/collector/otelcol.tmpl @@ -5,6 +5,7 @@ receivers: collection_interval: {{ .Receivers.ContainerMetrics.CollectionInterval }} {{- end}} {{- end }} + {{- if ne .Receivers.HostMetrics nil }} hostmetrics: {{- if .Receivers.HostMetrics.CollectionInterval }} @@ -39,7 +40,8 @@ receivers: network: {{- end }} {{- end }} -{{- end }} +{{- end }} + {{- range $index, $otlpReceiver := .Receivers.OtlpReceivers }} otlp/{{$index}}: protocols: @@ -75,6 +77,7 @@ receivers: {{- end }} {{- end }} {{- end }} + {{- range .Receivers.NginxReceivers }} nginx/{{- .InstanceID -}}: api_details: @@ -93,6 +96,7 @@ receivers: {{- end }} {{- end }} {{- end }} + {{- range .Receivers.NginxPlusReceivers }} nginxplus/{{- .InstanceID -}}: api_details: @@ -104,6 +108,7 @@ receivers: collection_interval: {{ .CollectionInterval }} {{- end }} {{- end }} + {{- range $index, $tcplogReceiver := .Receivers.TcplogReceivers }} tcplog/{{$index}}: listen_address: "{{- .ListenAddress -}}" @@ -118,35 +123,43 @@ receivers: processors: {{- if ne .Processors.Resource nil }} -{{- if .Processors.Resource.Attributes }} - resource: + {{- range $key, $resource := .Processors.Resource }} + {{- if $resource.Attributes }} + resource/{{$key}}: attributes: -{{- range .Processors.Resource.Attributes }} + {{- range $resource.Attributes }} - key: {{ .Key }} action: {{ .Action }} value: {{ .Value }} -{{- end }} -{{- end }} + {{- end }} + {{- end }} + {{- end }} {{- end }} {{- if ne .Processors.Attribute nil }} -{{- if .Processors.Attribute.Actions }} - attributes: + {{- range $key, $attribute := .Processors.Attribute }} + {{- if $attribute.Actions }} + attributes/{{$key}}: actions: -{{- range .Processors.Attribute.Actions }} + {{- range $attribute.Actions }} - key: {{ .Key }} action: {{ .Action }} value: {{ .Value }} -{{- end }} -{{- end }} + {{- end }} + {{- end }} + {{- end }} {{- end }} {{- if ne .Processors.Batch nil }} - batch: - send_batch_size: {{ .Processors.Batch.SendBatchSize }} - timeout: {{ .Processors.Batch.Timeout }} - send_batch_max_size: {{ .Processors.Batch.SendBatchMaxSize }} + {{- range $key, $batch := .Processors.Batch }} + batch/{{$key}}: + send_batch_size: {{ $batch.SendBatchSize }} + timeout: {{ $batch.Timeout }} + send_batch_max_size: {{ $batch.SendBatchMaxSize }} +{{- end }} {{- end }} {{- if ne .Processors.LogsGzip nil }} - logsgzip: {} +{{ range $key, $value := .Processors.LogsGzip }} + logsgzip/{{$key}}: {} +{{- end }} {{- end }} exporters: @@ -175,6 +188,7 @@ exporters: authenticator: {{ .Authenticator -}} {{- end }} {{- end }} + {{- if ne .Exporters.PrometheusExporter nil }} prometheus: endpoint: "{{ .Exporters.PrometheusExporter.Server.Host -}}:{{- .Exporters.PrometheusExporter.Server.Port }}" @@ -243,73 +257,59 @@ service: - headers_setter {{- end}} {{- end}} + pipelines: - {{- if or (ne .Receivers.HostMetrics nil) (ne .Receivers.ContainerMetrics nil) (gt (len .Receivers.OtlpReceivers) 0) (gt (len .Receivers.NginxReceivers) 0) (gt (len .Receivers.NginxPlusReceivers) 0) }} - metrics: + {{- range $pipelineName, $pipeline := .Pipelines.Metrics }} + {{- if or (ne $.Receivers.HostMetrics nil) (ne $.Receivers.ContainerMetrics nil) (gt (len $.Receivers.OtlpReceivers) 0) (gt (len $.Receivers.NginxReceivers) 0) (gt (len $.Receivers.NginxPlusReceivers) 0) }} + metrics/{{$pipelineName}}: receivers: - {{- if ne .Receivers.ContainerMetrics nil }} + {{- range $receiver := $pipeline.Receivers }} + {{- if eq $receiver "host_metrics" }} + {{- if ne $.Receivers.ContainerMetrics nil }} - containermetrics - {{- end }} - {{- if ne .Receivers.HostMetrics nil }} + {{- end }} + {{- if ne $.Receivers.HostMetrics nil }} - hostmetrics - {{- end }} - {{- range $index, $otlpReceiver := .Receivers.OtlpReceivers }} - - otlp/{{$index}} - {{- end }} - {{- range .Receivers.NginxReceivers }} + {{- end }} + {{- else if eq $receiver "nginx_metrics" }} + {{- range $.Receivers.NginxReceivers }} - nginx/{{- .InstanceID -}} - {{- end }} - {{- range .Receivers.NginxPlusReceivers }} + {{- end }} + {{- range $.Receivers.NginxPlusReceivers }} - nginxplus/{{- .InstanceID -}} + {{- end }} + {{- else }} + - {{ $receiver }} + {{- end }} {{- end }} processors: - {{- if ne .Processors.Resource nil }} - {{- if .Processors.Resource.Attributes }} - - resource - {{- end }} - {{- end }} - {{- if ne .Processors.Attribute nil }} - {{- if .Processors.Attribute.Actions }} - - attributes - {{- end }} - {{- end }} - {{- if ne .Processors.Batch nil }} - - batch + {{- range $pipeline.Processors }} + - {{ . }} {{- end }} exporters: - {{- range $index, $otlpExporter := .Exporters.OtlpExporters }} - - otlp/{{$index}} - {{- end }} - {{- if ne .Exporters.PrometheusExporter nil }} - - prometheus - {{- end }} - {{- if ne .Exporters.Debug nil }} - - debug - {{- end }} - {{- end }} - {{- if gt (len .Receivers.TcplogReceivers) 0 }} - logs: + {{- range $pipeline.Exporters }} + - {{ . }} + {{- end }} + {{- end }} + {{- end }} + {{- range $pipelineName, $pipeline := .Pipelines.Logs }} + {{- if gt (len $.Receivers.TcplogReceivers) 0 }} + logs/{{$pipelineName}}: receivers: - {{- range $index, $tcplogReceiver := .Receivers.TcplogReceivers }} - - tcplog/{{$index}} + {{- range $receiver := $pipeline.Receivers }} + {{- if eq $receiver "tcplog/nginx_app_protect" }} + - tcplog/nginx_app_protect: + {{- else }} + - {{ $receiver }} + {{- end }} {{- end }} processors: - {{- if ne .Processors.Resource nil }} - {{- if .Processors.Resource.Attributes }} - - resource - {{- end }} - {{- end }} - {{- if ne .Processors.LogsGzip nil }} - - logsgzip - {{- end }} - {{- if ne .Processors.Batch nil }} - - batch + {{- range $pipeline.Processors }} + - {{ . }} {{- end }} exporters: - {{- range $index, $otlpExporter := .Exporters.OtlpExporters }} - - otlp/{{$index}} + {{- range $pipeline.Exporters }} + - {{ . }} {{- end }} - {{- if ne .Exporters.Debug nil }} - - debug - {{- end }} - {{- end }} + {{- end }} + {{- end }} diff --git a/internal/collector/settings.go b/internal/collector/settings.go index 62e92afbb..e89bb89d6 100644 --- a/internal/collector/settings.go +++ b/internal/collector/settings.go @@ -9,6 +9,7 @@ import ( "log/slog" "os" "path/filepath" + "slices" "text/template" "github.com/nginx/agent/v3/internal/config" @@ -89,43 +90,59 @@ func createFile(confPath string) error { return nil } -// Generates a OTel Collector config to a file by injecting the Metrics Config to a Go template. +// Generates an OTel Collector config to a file by injecting the Metrics Config to a Go template. func writeCollectorConfig(conf *config.Collector) error { - otelcolTemplate, err := template.New(otelTemplatePath).Parse(otelcolTemplate) - if err != nil { - return err + if conf.Processors.Resource["default"] != nil { + addDefaultResourceProcessor(conf.Pipelines.Metrics) + addDefaultResourceProcessor(conf.Pipelines.Logs) } - confPath := filepath.Clean(conf.ConfigPath) + slog.Info("Writing OTel collector config") - // Check if file exists, if not create it. - _, err = os.Stat(confPath) - if err != nil { - if !os.IsNotExist(err) { - return err - } + otelcolTemplate, templateErr := template.New(otelTemplatePath).Parse(otelcolTemplate) + if templateErr != nil { + return templateErr + } - fileErr := createFile(confPath) - if fileErr != nil { - return fileErr - } + confPath := filepath.Clean(conf.ConfigPath) + + // Ensure file exists and has correct permissions + if err := ensureFileExists(confPath); err != nil { + return err } file, err := os.OpenFile(confPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, configFilePermission) + if err != nil { + return err + } defer func() { - err = file.Close() - if err != nil { - slog.Warn("Failed to close file", "file_path", confPath) + fileCloseErr := file.Close() + if fileCloseErr != nil { + slog.Warn("Failed to close file", "file_path", confPath, "error", fileCloseErr) } }() - if err != nil { - return err + + return otelcolTemplate.Execute(file, conf) +} + +func addDefaultResourceProcessor(pipelines map[string]*config.Pipeline) { + for _, pipeline := range pipelines { + if !slices.Contains(pipeline.Processors, "resource/default") { + pipeline.Processors = append(pipeline.Processors, "resource/default") + } } +} - err = otelcolTemplate.Execute(file, conf) +func ensureFileExists(confPath string) error { + _, err := os.Stat(confPath) if err != nil { - return err + if !os.IsNotExist(err) { + return err + } + if createFileErr := createFile(confPath); createFileErr != nil { + return createFileErr + } } - return nil + return os.Chmod(confPath, configFilePermission) } diff --git a/internal/collector/settings_test.go b/internal/collector/settings_test.go index af89bd1e1..f8b8c7be4 100644 --- a/internal/collector/settings_test.go +++ b/internal/collector/settings_test.go @@ -54,12 +54,14 @@ func TestTemplateWrite(t *testing.T) { cfg := types.AgentConfig() actualConfPath := filepath.Join(tmpDir, "nginx-agent-otelcol-test.yaml") cfg.Collector.ConfigPath = actualConfPath - cfg.Collector.Processors.Resource = &config.Resource{ - Attributes: []config.ResourceAttribute{ - { - Key: "resource.id", - Action: "add", - Value: "12345", + cfg.Collector.Processors.Resource = map[string]*config.Resource{ + "default": { + Attributes: []config.ResourceAttribute{ + { + Key: "resource.id", + Action: "add", + Value: "12345", + }, }, }, } @@ -106,8 +108,7 @@ func TestTemplateWrite(t *testing.T) { }, }) // Clear default config and test collector with TLS enabled - cfg.Collector.Receivers.OtlpReceivers = []config.OtlpReceiver{} - cfg.Collector.Receivers.OtlpReceivers = append(cfg.Collector.Receivers.OtlpReceivers, config.OtlpReceiver{ + cfg.Collector.Receivers.OtlpReceivers["default"] = &config.OtlpReceiver{ Server: &config.ServerConfig{ Host: "localhost", Port: 4317, @@ -118,10 +119,10 @@ func TestTemplateWrite(t *testing.T) { Key: "/tmp/key.pem", Ca: "/tmp/ca.pem", }, - }) + } - cfg.Collector.Receivers.TcplogReceivers = []config.TcplogReceiver{ - { + cfg.Collector.Receivers.TcplogReceivers = map[string]*config.TcplogReceiver{ + "default": { ListenAddress: "localhost:151", Operators: []config.Operator{ { @@ -155,13 +156,26 @@ func TestTemplateWrite(t *testing.T) { }, } - cfg.Collector.Exporters.OtlpExporters[0].Authenticator = "headers_setter" + cfg.Collector.Exporters.OtlpExporters["default"].Authenticator = "headers_setter" // nolint: lll - cfg.Collector.Exporters.OtlpExporters[0].Compression = types.AgentConfig().Collector.Exporters.OtlpExporters[0].Compression - cfg.Collector.Exporters.OtlpExporters[0].Server.Port = 1234 - cfg.Collector.Receivers.OtlpReceivers[0].Server.Port = 4317 + cfg.Collector.Exporters.OtlpExporters["default"].Compression = types.AgentConfig().Collector.Exporters.OtlpExporters["default"].Compression + cfg.Collector.Exporters.OtlpExporters["default"].Server.Port = 1234 + cfg.Collector.Receivers.OtlpReceivers["default"].Server.Port = 4317 cfg.Collector.Extensions.Health.Server.Port = 1337 + cfg.Collector.Pipelines.Metrics = make(map[string]*config.Pipeline) + cfg.Collector.Pipelines.Metrics["default"] = &config.Pipeline{ + Receivers: []string{"hostmetrics", "containermetrics", "otlp/default", "nginx/123"}, + Processors: []string{"resource/default", "batch/default"}, + Exporters: []string{"otlp/default", "prometheus", "debug"}, + } + cfg.Collector.Pipelines.Logs = make(map[string]*config.Pipeline) + cfg.Collector.Pipelines.Logs["default"] = &config.Pipeline{ + Receivers: []string{"tcplog/default"}, + Processors: []string{"resource/default", "batch/default"}, + Exporters: []string{"otlp/default", "debug"}, + } + require.NotNil(t, cfg) err := writeCollectorConfig(cfg.Collector) diff --git a/internal/config/config.go b/internal/config/config.go index 366daac2b..867865d0c 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -32,11 +32,15 @@ import ( ) const ( - ConfigFileName = "nginx-agent.conf" - EnvPrefix = "NGINX_AGENT" - KeyDelimiter = "_" - KeyValueNumber = 2 - AgentDirName = "/etc/nginx-agent/" + ConfigFileName = "nginx-agent.conf" + EnvPrefix = "NGINX_AGENT" + KeyDelimiter = "_" + KeyValueNumber = 2 + AgentDirName = "/etc/nginx-agent/" + DefaultMetricsBatchProcessor = "default_metrics" + DefaultLogsBatchProcessor = "default_logs" + DefaultExporter = "default" + DefaultPipeline = "default" ) var viperInstance = viper.NewWithOptions(viper.KeyDelimiter(KeyDelimiter)) @@ -128,7 +132,7 @@ func ResolveConfig() (*Config, error) { ManifestDir: viperInstance.GetString(ManifestDirPathKey), } - checkCollectorConfiguration(collector, config) + defaultCollector(collector, config) addLabelsAsOTelHeaders(collector, config.Labels) slog.Debug("Agent config", "config", config) @@ -138,34 +142,150 @@ func ResolveConfig() (*Config, error) { return config, nil } -func checkCollectorConfiguration(collector *Collector, config *Config) { - if isOTelExporterConfigured(collector) && config.IsCommandGrpcClientConfigured() && - config.IsCommandAuthConfigured() && - config.IsCommandTLSConfigured() { - slog.Info("No collector configuration found in NGINX Agent config, command server configuration found." + - " Using default collector configuration") - defaultCollector(collector, config) +func defaultCollector(collector *Collector, config *Config) { + // Always add default host metric receiver and default processor + addDefaultHostMetricsReceiver(collector) + addDefaultProcessors(collector) + + // Only add default otlp exporter and pipelines if connected to a management plane + if config.IsCommandGrpcClientConfigured() || config.IsAuxiliaryCommandGrpcClientConfigured() { + addDefaultOtlpExporter(collector, config) + addDefaultPipelines(collector) } } -func defaultCollector(collector *Collector, config *Config) { - token := config.Command.Auth.Token - if config.Command.Auth.TokenPath != "" { - slog.Debug("Reading token from file", "path", config.Command.Auth.TokenPath) - pathToken, err := file.ReadFromFile(config.Command.Auth.TokenPath) +func addDefaultPipelines(collector *Collector) { + if collector.Pipelines.Metrics == nil { + collector.Pipelines.Metrics = make(map[string]*Pipeline) + } + if _, ok := collector.Pipelines.Metrics[DefaultPipeline]; !ok { + collector.Pipelines.Metrics[DefaultPipeline] = &Pipeline{ + Receivers: []string{"host_metrics", "nginx_metrics"}, + Processors: []string{"batch/default_metrics"}, + Exporters: []string{"otlp/default"}, + } + } + + if collector.Pipelines.Logs == nil { + collector.Pipelines.Logs = make(map[string]*Pipeline) + } + if _, ok := collector.Pipelines.Logs[DefaultPipeline]; !ok { + collector.Pipelines.Logs[DefaultPipeline] = &Pipeline{ + Receivers: []string{"tcplog/nginx_app_protect"}, + Processors: []string{"logsgzip/default", "batch/default_logs"}, + Exporters: []string{"otlp/default"}, + } + } +} + +func addDefaultOtlpExporter(collector *Collector, config *Config) { + if collector.Exporters.OtlpExporters == nil { + collector.Exporters.OtlpExporters = make(map[string]*OtlpExporter) + } + + defaultCommandServer := config.Command + + if config.IsAuxiliaryCommandGrpcClientConfigured() { + defaultCommandServer = config.AuxiliaryCommand + } + + if _, ok := collector.Exporters.OtlpExporters[DefaultExporter]; !ok && defaultCommandServer != nil { + collector.Exporters.OtlpExporters[DefaultExporter] = &OtlpExporter{ + Server: defaultCommandServer.Server, + TLS: defaultCommandServer.TLS, + Compression: "", + } + + if defaultCommandServer.Auth != nil { + token := extractTokenFromAuth(defaultCommandServer.Auth) + if token != "" { + addAuthHeader(collector, token) + collector.Exporters.OtlpExporters[DefaultExporter].Authenticator = "headers_setter" + } + } + } +} + +func extractTokenFromAuth(auth *AuthConfig) string { + token := auth.Token + if auth.TokenPath != "" { + slog.Debug("Reading token from file", "path", auth.TokenPath) + tokenFromFile, err := file.ReadFromFile(auth.TokenPath) if err != nil { slog.Error("Error adding token to default collector, "+ "default collector configuration not started", "error", err) - return + return "" + } + token = tokenFromFile + } + + return token +} + +func addAuthHeader(collector *Collector, token string) { + header := []Header{ + { + Action: "insert", + Key: "authorization", + Value: token, + }, + } + + if collector.Extensions.HeadersSetter == nil { + collector.Extensions.HeadersSetter = &HeadersSetter{ + Headers: header, } - token = pathToken + } else { + // nolint: lll + collector.Extensions.HeadersSetter.Headers = append(collector.Extensions.HeadersSetter.Headers, header...) } +} + +func addDefaultProcessors(collector *Collector) { + if collector.Processors.Batch == nil { + collector.Processors.Batch = make(map[string]*Batch) + } + + if _, ok := collector.Processors.Batch[DefaultMetricsBatchProcessor]; !ok { + collector.Processors.Batch[DefaultMetricsBatchProcessor] = &Batch{ + SendBatchSize: DefCollectorMetricsBatchProcessorSendBatchSize, + SendBatchMaxSize: DefCollectorMetricsBatchProcessorSendBatchMaxSize, + Timeout: DefCollectorMetricsBatchProcessorTimeout, + } + } + if _, ok := collector.Processors.Batch[DefaultLogsBatchProcessor]; !ok { + collector.Processors.Batch[DefaultLogsBatchProcessor] = &Batch{ + SendBatchSize: DefCollectorLogsBatchProcessorSendBatchSize, + SendBatchMaxSize: DefCollectorLogsBatchProcessorSendBatchMaxSize, + Timeout: DefCollectorLogsBatchProcessorTimeout, + } + } + + if collector.Processors.LogsGzip == nil { + collector.Processors.LogsGzip = make(map[string]*LogsGzip) + } + if _, ok := collector.Processors.LogsGzip["default"]; !ok { + collector.Processors.LogsGzip["default"] = &LogsGzip{} + } +} +func addDefaultHostMetricsReceiver(collector *Collector) { if host.NewInfo().IsContainer() { + addDefaultContainerHostMetricsReceiver(collector) + } else { + addDefaultVMHostMetricsReceiver(collector) + } +} + +func addDefaultContainerHostMetricsReceiver(collector *Collector) { + if collector.Receivers.ContainerMetrics == nil { collector.Receivers.ContainerMetrics = &ContainerMetricsReceiver{ CollectionInterval: 1 * time.Minute, } + } + + if collector.Receivers.HostMetrics == nil { collector.Receivers.HostMetrics = &HostMetrics{ Scrapers: &HostMetricsScrapers{ Network: &NetworkScraper{}, @@ -173,11 +293,18 @@ func defaultCollector(collector *Collector, config *Config) { CollectionInterval: 1 * time.Minute, InitialDelay: 1 * time.Second, } + } + + if collector.Log == nil { collector.Log = &Log{ Path: "stdout", Level: "info", } - } else { + } +} + +func addDefaultVMHostMetricsReceiver(collector *Collector) { + if collector.Receivers.HostMetrics == nil { collector.Receivers.HostMetrics = &HostMetrics{ Scrapers: &HostMetricsScrapers{ CPU: &CPUScraper{}, @@ -190,24 +317,6 @@ func defaultCollector(collector *Collector, config *Config) { InitialDelay: 1 * time.Second, } } - - collector.Exporters.OtlpExporters = append(collector.Exporters.OtlpExporters, OtlpExporter{ - Server: config.Command.Server, - TLS: config.Command.TLS, - Compression: "", - Authenticator: "headers_setter", - }) - - header := []Header{ - { - Action: "insert", - Key: "authorization", - Value: token, - }, - } - collector.Extensions.HeadersSetter = &HeadersSetter{ - Headers: header, - } } func addLabelsAsOTelHeaders(collector *Collector, labels map[string]any) { @@ -553,24 +662,6 @@ func registerCollectorFlags(fs *flag.FlagSet) { "If the default path doesn't exist, log messages are output to stdout/stderr.", ) - fs.Uint32( - CollectorBatchProcessorSendBatchSizeKey, - DefCollectorBatchProcessorSendBatchSize, - `Number of metric data points after which a batch will be sent regardless of the timeout.`, - ) - - fs.Uint32( - CollectorBatchProcessorSendBatchMaxSizeKey, - DefCollectorBatchProcessorSendBatchMaxSize, - `The upper limit of the batch size.`, - ) - - fs.Duration( - CollectorBatchProcessorTimeoutKey, - DefCollectorBatchProcessorTimeout, - `Time duration after which a batch will be sent regardless of size.`, - ) - fs.String( CollectorExtensionsHealthServerHostKey, DefCollectorExtensionsHealthServerHost, @@ -844,6 +935,7 @@ func resolveCollector(allowedDirs []string) (*Collector, error) { Receivers: receivers, Extensions: resolveExtensions(), Log: resolveCollectorLog(), + Pipelines: resolvePipelines(), } // Check for self-signed certificate true in Agent conf @@ -859,8 +951,33 @@ func resolveCollector(allowedDirs []string) (*Collector, error) { return col, nil } +func resolvePipelines() Pipelines { + var metricsPipelines map[string]*Pipeline + + if viperInstance.IsSet(CollectorMetricsPipelinesKey) { + err := resolveMapStructure(CollectorMetricsPipelinesKey, &metricsPipelines) + if err != nil { + metricsPipelines = nil + } + } + + var logsPipelines map[string]*Pipeline + + if viperInstance.IsSet(CollectorLogsPipelinesKey) { + err := resolveMapStructure(CollectorLogsPipelinesKey, &logsPipelines) + if err != nil { + logsPipelines = nil + } + } + + return Pipelines{ + Metrics: metricsPipelines, + Logs: logsPipelines, + } +} + func resolveExporters() (Exporters, error) { - var otlpExporters []OtlpExporter + var otlpExporters map[string]*OtlpExporter exporters := Exporters{} if viperInstance.IsSet(CollectorDebugExporterKey) { @@ -903,17 +1020,10 @@ func isPrometheusExporterSet() bool { } func resolveProcessors() Processors { - processors := Processors{ - Batch: &Batch{ - SendBatchSize: viperInstance.GetUint32(CollectorBatchProcessorSendBatchSizeKey), - SendBatchMaxSize: viperInstance.GetUint32(CollectorBatchProcessorSendBatchMaxSizeKey), - Timeout: viperInstance.GetDuration(CollectorBatchProcessorTimeoutKey), - }, - LogsGzip: &LogsGzip{}, - } + processors := Processors{} - if viperInstance.IsSet(CollectorAttributeProcessorKey) { - err := resolveMapStructure(CollectorAttributeProcessorKey, &processors.Attribute) + if viperInstance.IsSet(CollectorProcessorsKey) { + err := resolveMapStructure(CollectorProcessorsKey, &processors) if err != nil { return processors } @@ -1191,8 +1301,3 @@ func resolveMapStructure(key string, object any) error { return nil } - -func isOTelExporterConfigured(collector *Collector) bool { - return len(collector.Exporters.OtlpExporters) == 0 && collector.Exporters.PrometheusExporter == nil && - collector.Exporters.Debug == nil -} diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 9048eb834..b8c8a11ce 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -192,11 +192,7 @@ func TestResolveCollector(t *testing.T) { viperInstance.Set(CollectorLogPathKey, expected.Log.Path) viperInstance.Set(CollectorLogLevelKey, expected.Log.Level) viperInstance.Set(CollectorReceiversKey, expected.Receivers) - viperInstance.Set(CollectorBatchProcessorKey, expected.Processors.Batch) - viperInstance.Set(CollectorBatchProcessorSendBatchSizeKey, expected.Processors.Batch.SendBatchSize) - viperInstance.Set(CollectorBatchProcessorSendBatchMaxSizeKey, expected.Processors.Batch.SendBatchMaxSize) - viperInstance.Set(CollectorBatchProcessorTimeoutKey, expected.Processors.Batch.Timeout) - viperInstance.Set(CollectorLogsGzipProcessorKey, expected.Processors.LogsGzip) + viperInstance.Set(CollectorProcessorsKey, expected.Processors) viperInstance.Set(CollectorExportersKey, expected.Exporters) viperInstance.Set(CollectorOtlpExportersKey, expected.Exporters.OtlpExporters) viperInstance.Set(CollectorExtensionsHealthServerHostKey, expected.Extensions.Health.Server.Host) @@ -766,6 +762,79 @@ func TestResolveExtensions_MultipleHeaders(t *testing.T) { } } +func TestAddDefaultOtlpExporter(t *testing.T) { + t.Run("Test 1: Command server only", func(t *testing.T) { + collector := &Collector{} + agentConfig := &Config{ + Command: &Command{ + Server: &ServerConfig{ + Host: "test.com", + Port: 8080, + Type: Grpc, + }, + Auth: &AuthConfig{ + Token: "token", + }, + TLS: &TLSConfig{ + SkipVerify: false, + }, + }, + } + + addDefaultOtlpExporter(collector, agentConfig) + + assert.Equal(t, "test.com", collector.Exporters.OtlpExporters["default"].Server.Host) + assert.Equal(t, 8080, collector.Exporters.OtlpExporters["default"].Server.Port) + assert.False(t, collector.Exporters.OtlpExporters["default"].TLS.SkipVerify) + assert.Equal(t, "headers_setter", collector.Exporters.OtlpExporters["default"].Authenticator) + assert.Equal(t, "insert", collector.Extensions.HeadersSetter.Headers[0].Action) + assert.Equal(t, "authorization", collector.Extensions.HeadersSetter.Headers[0].Key) + assert.Equal(t, "token", collector.Extensions.HeadersSetter.Headers[0].Value) + }) + + t.Run("Test 2: Command and Auxiliary Command servers", func(t *testing.T) { + collector := &Collector{} + agentConfig := &Config{ + Command: &Command{ + Server: &ServerConfig{ + Host: "test.com", + Port: 8080, + Type: Grpc, + }, + Auth: &AuthConfig{ + Token: "token", + }, + TLS: &TLSConfig{ + SkipVerify: false, + }, + }, + AuxiliaryCommand: &Command{ + Server: &ServerConfig{ + Host: "aux-test.com", + Port: 9090, + Type: Grpc, + }, + Auth: &AuthConfig{ + Token: "aux-token", + }, + TLS: &TLSConfig{ + SkipVerify: false, + }, + }, + } + + addDefaultOtlpExporter(collector, agentConfig) + + assert.Equal(t, "aux-test.com", collector.Exporters.OtlpExporters["default"].Server.Host) + assert.Equal(t, 9090, collector.Exporters.OtlpExporters["default"].Server.Port) + assert.False(t, collector.Exporters.OtlpExporters["default"].TLS.SkipVerify) + assert.Equal(t, "headers_setter", collector.Exporters.OtlpExporters["default"].Authenticator) + assert.Equal(t, "insert", collector.Extensions.HeadersSetter.Headers[0].Action) + assert.Equal(t, "authorization", collector.Extensions.HeadersSetter.Headers[0].Key) + assert.Equal(t, "aux-token", collector.Extensions.HeadersSetter.Headers[0].Value) + }) +} + func agentConfig() *Config { return &Config{ UUID: "", @@ -801,8 +870,8 @@ func agentConfig() *Config { Collector: &Collector{ ConfigPath: "/etc/nginx-agent/nginx-agent-otelcol.yaml", Exporters: Exporters{ - OtlpExporters: []OtlpExporter{ - { + OtlpExporters: map[string]*OtlpExporter{ + "default": { Server: &ServerConfig{ Host: "127.0.0.1", Port: 1234, @@ -819,16 +888,20 @@ func agentConfig() *Config { }, }, Processors: Processors{ - Batch: &Batch{ - SendBatchMaxSize: DefCollectorBatchProcessorSendBatchMaxSize, - SendBatchSize: DefCollectorBatchProcessorSendBatchSize, - Timeout: DefCollectorBatchProcessorTimeout, + Batch: map[string]*Batch{ + "default_logs": { + SendBatchMaxSize: DefCollectorLogsBatchProcessorSendBatchMaxSize, + SendBatchSize: DefCollectorLogsBatchProcessorSendBatchSize, + Timeout: DefCollectorLogsBatchProcessorTimeout, + }, + }, + LogsGzip: map[string]*LogsGzip{ + "default": {}, }, - LogsGzip: &LogsGzip{}, }, Receivers: Receivers{ - OtlpReceivers: []OtlpReceiver{ - { + OtlpReceivers: map[string]*OtlpReceiver{ + "default": { Server: &ServerConfig{ Host: "localhost", Port: 4317, @@ -942,8 +1015,8 @@ func createConfig() *Config { Collector: &Collector{ ConfigPath: "/etc/nginx-agent/nginx-agent-otelcol.yaml", Exporters: Exporters{ - OtlpExporters: []OtlpExporter{ - { + OtlpExporters: map[string]*OtlpExporter{ + "default": { Server: &ServerConfig{ Host: "127.0.0.1", Port: 5643, @@ -974,25 +1047,41 @@ func createConfig() *Config { Debug: &DebugExporter{}, }, Processors: Processors{ - Batch: &Batch{ - SendBatchMaxSize: 1, - SendBatchSize: 8199, - Timeout: 30 * time.Second, + Batch: map[string]*Batch{ + "default": { + SendBatchMaxSize: 1, + SendBatchSize: 8199, + Timeout: 30 * time.Second, + }, + "default_metrics": { + SendBatchMaxSize: 1000, + SendBatchSize: 1000, + Timeout: 30 * time.Second, + }, + "default_logs": { + SendBatchMaxSize: 100, + SendBatchSize: 100, + Timeout: 60 * time.Second, + }, }, - Attribute: &Attribute{ - Actions: []Action{ - { - Key: "test", - Action: "insert", - Value: "value", + Attribute: map[string]*Attribute{ + "default": { + Actions: []Action{ + { + Key: "test", + Action: "insert", + Value: "value", + }, }, }, }, - LogsGzip: &LogsGzip{}, + LogsGzip: map[string]*LogsGzip{ + "default": {}, + }, }, Receivers: Receivers{ - OtlpReceivers: []OtlpReceiver{ - { + OtlpReceivers: map[string]*OtlpReceiver{ + "default": { Server: &ServerConfig{ Host: "127.0.0.1", Port: 4317, @@ -1010,26 +1099,6 @@ func createConfig() *Config { }, }, }, - NginxReceivers: []NginxReceiver{ - { - InstanceID: "cd7b8911-c2c5-4daf-b311-dbead151d938", - AccessLogs: []AccessLog{ - { - LogFormat: "$remote_addr - $remote_user [$time_local] \"$request\"" + - " $status $body_bytes_sent \"$http_referer\" \"$http_user_agent\" " + - "\"$http_x_forwarded_for\"", - FilePath: "/var/log/nginx/access-custom.conf", - }, - }, - CollectionInterval: 30 * time.Second, - }, - }, - NginxPlusReceivers: []NginxPlusReceiver{ - { - InstanceID: "cd7b8911-c2c5-4daf-b311-dbead151d939", - CollectionInterval: 30 * time.Second, - }, - }, HostMetrics: &HostMetrics{ CollectionInterval: 10 * time.Second, InitialDelay: 2 * time.Second, @@ -1081,6 +1150,22 @@ func createConfig() *Config { Level: "INFO", Path: "/var/log/nginx-agent/opentelemetry-collector-agent.log", }, + Pipelines: Pipelines{ + Metrics: map[string]*Pipeline{ + "default": { + Receivers: []string{"host_metrics", "nginx_metrics"}, + Processors: []string{"batch/default_metrics"}, + Exporters: []string{"otlp/default"}, + }, + }, + Logs: map[string]*Pipeline{ + "default": { + Receivers: []string{"tcplog/nginx_app_protect"}, + Processors: []string{"logsgzip/default", "batch/default_logs"}, + Exporters: []string{"otlp/default"}, + }, + }, + }, }, Command: &Command{ Server: &ServerConfig{ diff --git a/internal/config/defaults.go b/internal/config/defaults.go index 564f60fdb..072052431 100644 --- a/internal/config/defaults.go +++ b/internal/config/defaults.go @@ -74,9 +74,12 @@ const ( DefCollectorTLSCAPath = "/var/lib/nginx-agent/ca.pem" DefCollectorTLSSANNames = "127.0.0.1,::1,localhost" - DefCollectorBatchProcessorSendBatchSize = 1000 - DefCollectorBatchProcessorSendBatchMaxSize = 1000 - DefCollectorBatchProcessorTimeout = 30 * time.Second + DefCollectorMetricsBatchProcessorSendBatchSize = 1000 + DefCollectorMetricsBatchProcessorSendBatchMaxSize = 1000 + DefCollectorMetricsBatchProcessorTimeout = 30 * time.Second + DefCollectorLogsBatchProcessorSendBatchSize = 100 + DefCollectorLogsBatchProcessorSendBatchMaxSize = 100 + DefCollectorLogsBatchProcessorTimeout = 60 * time.Second DefCollectorExtensionsHealthServerHost = "localhost" DefCollectorExtensionsHealthServerPort = 13133 diff --git a/internal/config/flags.go b/internal/config/flags.go index 24e6a340d..e6da27573 100644 --- a/internal/config/flags.go +++ b/internal/config/flags.go @@ -49,7 +49,6 @@ var ( CollectorConfigPathKey = pre(CollectorRootKey) + "config_path" CollectorExportersKey = pre(CollectorRootKey) + "exporters" - CollectorAttributeProcessorKey = pre(CollectorProcessorsKey) + "attribute" CollectorDebugExporterKey = pre(CollectorExportersKey) + "debug" CollectorPrometheusExporterKey = pre(CollectorExportersKey) + "prometheus" CollectorPrometheusExporterServerHostKey = pre(CollectorPrometheusExporterKey) + "server_host" @@ -62,11 +61,6 @@ var ( CollectorPrometheusExporterTLSServerNameKey = pre(CollectorPrometheusExporterTLSKey) + "server_name" CollectorOtlpExportersKey = pre(CollectorExportersKey) + "otlp" CollectorProcessorsKey = pre(CollectorRootKey) + "processors" - CollectorBatchProcessorKey = pre(CollectorProcessorsKey) + "batch" - CollectorBatchProcessorSendBatchSizeKey = pre(CollectorBatchProcessorKey) + "send_batch_size" - CollectorBatchProcessorSendBatchMaxSizeKey = pre(CollectorBatchProcessorKey) + "send_batch_max_size" - CollectorBatchProcessorTimeoutKey = pre(CollectorBatchProcessorKey) + "timeout" - CollectorLogsGzipProcessorKey = pre(CollectorProcessorsKey) + "logsgzip" CollectorExtensionsKey = pre(CollectorRootKey) + "extensions" CollectorExtensionsHealthKey = pre(CollectorExtensionsKey) + "health" CollectorExtensionsHealthServerHostKey = pre(CollectorExtensionsHealthKey) + "server_host" @@ -79,6 +73,9 @@ var ( CollectorExtensionsHealthTLSServerNameKey = pre(CollectorExtensionsHealthTLSKey) + "server_name" CollectorExtensionsHealthTLSSkipVerifyKey = pre(CollectorExtensionsHealthTLSKey) + "skip_verify" CollectorExtensionsHeadersSetterKey = pre(CollectorExtensionsKey) + "headers_setter" + CollectorPipelinesKey = pre(CollectorRootKey) + "pipelines" + CollectorMetricsPipelinesKey = pre(CollectorPipelinesKey) + "metrics" + CollectorLogsPipelinesKey = pre(CollectorPipelinesKey) + "logs" CollectorReceiversKey = pre(CollectorRootKey) + "receivers" CollectorLogKey = pre(CollectorRootKey) + "log" CollectorLogLevelKey = pre(CollectorLogKey) + "level" diff --git a/internal/config/testdata/nginx-agent.conf b/internal/config/testdata/nginx-agent.conf index 0b4601a55..1aeb11221 100644 --- a/internal/config/testdata/nginx-agent.conf +++ b/internal/config/testdata/nginx-agent.conf @@ -90,46 +90,42 @@ collector: config_path: "/etc/nginx-agent/nginx-agent-otelcol.yaml" receivers: otlp: - - server: - host: "127.0.0.1" - port: 4317 - auth: - token: "secret-receiver-token" - tls: - generate_self_signed_cert: false - server_name: "test-local-server" - skip_verify: true - ca: /tmp/ca.pem - cert: /tmp/cert.pem - key: /tmp/key.pem - nginx: - - instance_id: cd7b8911-c2c5-4daf-b311-dbead151d938 - access_logs: - - file_path: "/var/log/nginx/access-custom.conf" - log_format: "$remote_addr - $remote_user [$time_local] \"$request\" $status $body_bytes_sent \"$http_referer\" \"$http_user_agent\" \"$http_x_forwarded_for\"" - collection_interval: 30s - nginx_plus: - - instance_id: cd7b8911-c2c5-4daf-b311-dbead151d939 - collection_interval: 30s + "default": + server: + host: "127.0.0.1" + port: 4317 + auth: + token: "secret-receiver-token" + tls: + generate_self_signed_cert: false + server_name: "test-local-server" + skip_verify: true + ca: /tmp/ca.pem + cert: /tmp/cert.pem + key: /tmp/key.pem host_metrics: - collection_interval: 10s - initial_delay: 2s - scrapers: - cpu: {} + collection_interval: 10s + initial_delay: 2s + scrapers: + cpu: {} processors: - batch: + batch: + "default": send_batch_max_size: 1 send_batch_size: 8199 timeout: 30s attribute: - actions: - - key: "test" - action: "insert" - value: "value" - logsgzip: {} + "default": + actions: + - key: "test" + action: "insert" + value: "value" + logsgzip: + "default": {} exporters: otlp: - - server: + "default": + server: host: "127.0.0.1" port: 5643 authenticator: "test-saas-token" @@ -140,22 +136,22 @@ collector: key: /path/to/server-key.pem ca: /path/to/server-cert.pem prometheus: - server: - host: "127.0.0.1" - port: 1235 - tls: - server_name: "test-server" - skip_verify: false - cert: /path/to/server-cert.pem - key: /path/to/server-key.pem - ca: /path/to/server-cert.pem + server: + host: "127.0.0.1" + port: 1235 + tls: + server_name: "test-server" + skip_verify: false + cert: /path/to/server-cert.pem + key: /path/to/server-key.pem + ca: /path/to/server-cert.pem debug: {} extensions: headers_setter: - headers: - - action: "action" - key: "key" - value: "value" + headers: + - action: "action" + key: "key" + value: "value" health: server: host: "127.0.0.1" @@ -167,6 +163,6 @@ collector: cert: /path/to/server-cert.pem key: /path/to/server-key.pem ca: /path/to/server-ca.pem - log: + log: level: "INFO" path: "/var/log/nginx-agent/opentelemetry-collector-agent.log" diff --git a/internal/config/types.go b/internal/config/types.go index a6e25d608..01e71ead3 100644 --- a/internal/config/types.go +++ b/internal/config/types.go @@ -108,13 +108,25 @@ type ( Exporters Exporters `yaml:"exporters" mapstructure:"exporters"` Extensions Extensions `yaml:"extensions" mapstructure:"extensions"` Processors Processors `yaml:"processors" mapstructure:"processors"` + Pipelines Pipelines `yaml:"pipelines" mapstructure:"pipelines"` Receivers Receivers `yaml:"receivers" mapstructure:"receivers"` } + Pipelines struct { + Metrics map[string]*Pipeline `yaml:"metrics" mapstructure:"metrics"` + Logs map[string]*Pipeline `yaml:"logs" mapstructure:"logs"` + } + + Pipeline struct { + Receivers []string `yaml:"receivers" mapstructure:"receivers"` + Processors []string `yaml:"processors" mapstructure:"processors"` + Exporters []string `yaml:"exporters" mapstructure:"exporters"` + } + Exporters struct { - Debug *DebugExporter `yaml:"debug" mapstructure:"debug"` - PrometheusExporter *PrometheusExporter `yaml:"prometheus" mapstructure:"prometheus"` - OtlpExporters []OtlpExporter `yaml:"otlp" mapstructure:"otlp"` + Debug *DebugExporter `yaml:"debug" mapstructure:"debug"` + PrometheusExporter *PrometheusExporter `yaml:"prometheus" mapstructure:"prometheus"` + OtlpExporters map[string]*OtlpExporter `yaml:"otlp" mapstructure:"otlp"` } OtlpExporter struct { @@ -157,10 +169,10 @@ type ( // OTel Collector Processors configuration. Processors struct { - Attribute *Attribute `yaml:"attribute" mapstructure:"attribute"` - Resource *Resource `yaml:"resource" mapstructure:"resource"` - Batch *Batch `yaml:"batch" mapstructure:"batch"` - LogsGzip *LogsGzip `yaml:"logsgzip" mapstructure:"logsgzip"` + Attribute map[string]*Attribute `yaml:"attribute" mapstructure:"attribute"` + Resource map[string]*Resource `yaml:"resource" mapstructure:"resource"` + Batch map[string]*Batch `yaml:"batch" mapstructure:"batch"` + LogsGzip map[string]*LogsGzip `yaml:"logsgzip" mapstructure:"logsgzip"` } Attribute struct { @@ -193,12 +205,12 @@ type ( // OTel Collector Receiver configuration. Receivers struct { - ContainerMetrics *ContainerMetricsReceiver `yaml:"container_metrics" mapstructure:"container_metrics"` - HostMetrics *HostMetrics `yaml:"host_metrics" mapstructure:"host_metrics"` - OtlpReceivers []OtlpReceiver `yaml:"otlp" mapstructure:"otlp"` - NginxReceivers []NginxReceiver `yaml:"nginx" mapstructure:"nginx"` - NginxPlusReceivers []NginxPlusReceiver `yaml:"nginx_plus" mapstructure:"nginx_plus"` - TcplogReceivers []TcplogReceiver `yaml:"tcplog" mapstructure:"tcplog"` + ContainerMetrics *ContainerMetricsReceiver `yaml:"container_metrics" mapstructure:"container_metrics"` + HostMetrics *HostMetrics `yaml:"host_metrics" mapstructure:"host_metrics"` + OtlpReceivers map[string]*OtlpReceiver `yaml:"otlp" mapstructure:"otlp"` + TcplogReceivers map[string]*TcplogReceiver `yaml:"tcplog" mapstructure:"tcplog"` + NginxReceivers []NginxReceiver `yaml:"-"` + NginxPlusReceivers []NginxPlusReceiver `yaml:"-"` } OtlpReceiver struct { @@ -381,24 +393,6 @@ func (c *Config) IsAuxiliaryCommandGrpcClientConfigured() bool { c.AuxiliaryCommand.Server.Type == Grpc } -func (c *Config) IsCommandAuthConfigured() bool { - return c.Command.Auth != nil && - (c.Command.Auth.Token != "" || c.Command.Auth.TokenPath != "") -} - -func (c *Config) IsAuxiliaryCommandAuthConfigured() bool { - return c.AuxiliaryCommand.Auth != nil && - (c.AuxiliaryCommand.Auth.Token != "" || c.AuxiliaryCommand.Auth.TokenPath != "") -} - -func (c *Config) IsCommandTLSConfigured() bool { - return c.Command.TLS != nil -} - -func (c *Config) IsAuxiliaryCommandTLSConfigured() bool { - return c.AuxiliaryCommand.TLS != nil -} - func (c *Config) IsFeatureEnabled(feature string) bool { for _, enabledFeature := range c.Features { if enabledFeature == feature { diff --git a/internal/plugin/plugin_manager.go b/internal/plugin/plugin_manager.go index 3f2c3869a..fde011d8b 100644 --- a/internal/plugin/plugin_manager.go +++ b/internal/plugin/plugin_manager.go @@ -96,7 +96,7 @@ func addCollectorPlugin(ctx context.Context, agentConfig *config.Config, plugins return plugins } if agentConfig.IsACollectorExporterConfigured() { - oTelCollector, err := collector.New(agentConfig) + oTelCollector, err := collector.NewCollector(agentConfig) if err == nil { plugins = append(plugins, oTelCollector) } else { diff --git a/test/config/agent/nginx-agent-otel-load.conf b/test/config/agent/nginx-agent-otel-load.conf index 2ee5e8651..4c7590bf3 100644 --- a/test/config/agent/nginx-agent-otel-load.conf +++ b/test/config/agent/nginx-agent-otel-load.conf @@ -20,21 +20,30 @@ allowed_directories: collector: receivers: otlp: - - server: + "default": + server: host: "127.0.0.1" port: 4317 processors: batch: - send_batch_size: 8192 - timeout: 200ms - send_batch_max_size: 0 + "default": + send_batch_size: 8192 + timeout: 200ms + send_batch_max_size: 0 exporters: otlp: - - server: - host: "127.0.0.1" - port: 5643 + "default": + server: + host: "127.0.0.1" + port: 5643 extensions: health: server: host: "127.0.0.1" port: 1337 + pipelines: + metrics: + "default": + receivers: ["otlp/default"] + processors: ["batch/default"] + exporters: ["otlp/default"] diff --git a/test/config/collector/test-opentelemetry-collector-agent.yaml b/test/config/collector/test-opentelemetry-collector-agent.yaml index 46625a905..aa72f4a4b 100644 --- a/test/config/collector/test-opentelemetry-collector-agent.yaml +++ b/test/config/collector/test-opentelemetry-collector-agent.yaml @@ -18,7 +18,7 @@ receivers: system.memory.limit: enabled: true network: - otlp/0: + otlp/default: protocols: grpc: endpoint: "localhost:4317" @@ -36,7 +36,7 @@ receivers: access_logs: - log_format: "$remote_addr - $remote_user [$time_local] \"$request\" $status $body_bytes_sent \"$http_referer\" \"$http_user_agent\" \"$http_x_forwarded_for\"\"$upstream_cache_status\"" file_path: "/var/log/nginx/access-custom.conf" - tcplog/0: + tcplog/default: listen_address: "localhost:151" operators: - type: add @@ -46,18 +46,18 @@ receivers: field: attributes.message processors: - resource: + resource/default: attributes: - key: resource.id action: add value: 12345 - batch: + batch/default: send_batch_size: 1000 timeout: 30s send_batch_max_size: 1000 exporters: - otlp/0: + otlp/default: endpoint: "127.0.0.1:1234" compression: none timeout: 10s @@ -101,26 +101,27 @@ service: extensions: - health_check - headers_setter + pipelines: - metrics: + metrics/default: receivers: - - containermetrics - hostmetrics - - otlp/0 + - containermetrics + - otlp/default - nginx/123 processors: - - resource - - batch + - resource/default + - batch/default exporters: - - otlp/0 + - otlp/default - prometheus - debug - logs: + logs/default: receivers: - - tcplog/0 + - tcplog/default processors: - - resource - - batch + - resource/default + - batch/default exporters: - - otlp/0 + - otlp/default - debug diff --git a/test/types/config.go b/test/types/config.go index 29775f664..04dec87e5 100644 --- a/test/types/config.go +++ b/test/types/config.go @@ -65,8 +65,8 @@ func AgentConfig() *config.Config { Collector: &config.Collector{ ConfigPath: "/etc/nginx-agent/nginx-agent-otelcol.yaml", Exporters: config.Exporters{ - OtlpExporters: []config.OtlpExporter{ - { + OtlpExporters: map[string]*config.OtlpExporter{ + "default": { Server: &config.ServerConfig{ Host: "127.0.0.1", Port: 0, @@ -76,15 +76,17 @@ func AgentConfig() *config.Config { }, }, Processors: config.Processors{ - Batch: &config.Batch{ - SendBatchSize: config.DefCollectorBatchProcessorSendBatchSize, - SendBatchMaxSize: config.DefCollectorBatchProcessorSendBatchMaxSize, - Timeout: config.DefCollectorBatchProcessorTimeout, + Batch: map[string]*config.Batch{ + "default": { + SendBatchSize: config.DefCollectorMetricsBatchProcessorSendBatchMaxSize, + SendBatchMaxSize: config.DefCollectorMetricsBatchProcessorSendBatchMaxSize, + Timeout: config.DefCollectorMetricsBatchProcessorTimeout, + }, }, }, Receivers: config.Receivers{ - OtlpReceivers: []config.OtlpReceiver{ - { + OtlpReceivers: map[string]*config.OtlpReceiver{ + "default": { Server: &config.ServerConfig{ Host: "127.0.0.1", Port: 0, @@ -128,6 +130,15 @@ func AgentConfig() *config.Config { Level: "INFO", Path: "/var/log/nginx-agent/opentelemetry-collector-agent.log", }, + Pipelines: config.Pipelines{ + Metrics: map[string]*config.Pipeline{ + "default": { + Receivers: []string{"host_metrics"}, + Processors: []string{"batch/default"}, + Exporters: []string{"otlp/default"}, + }, + }, + }, }, Command: &config.Command{ Server: &config.ServerConfig{ @@ -179,11 +190,11 @@ func OTelConfig(t *testing.T) *config.Config { exporterPort, expErr := helpers.RandomPort(t) require.NoError(t, expErr) - ac.Collector.Exporters.OtlpExporters[0].Server.Port = exporterPort + ac.Collector.Exporters.OtlpExporters["default"].Server.Port = exporterPort receiverPort, recErr := helpers.RandomPort(t) require.NoError(t, recErr) - ac.Collector.Receivers.OtlpReceivers[0].Server.Port = receiverPort + ac.Collector.Receivers.OtlpReceivers["default"].Server.Port = receiverPort healthPort, healthErr := helpers.RandomPort(t) require.NoError(t, healthErr) From 6de84e1c7525b541aa4f56ccbecaf3b3ba3361c9 Mon Sep 17 00:00:00 2001 From: aphralG <108004222+aphralG@users.noreply.github.com> Date: Tue, 22 Jul 2025 17:05:12 +0100 Subject: [PATCH 12/13] Fix failing unit test (#1176) --- internal/collector/otel_collector_plugin_test.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/internal/collector/otel_collector_plugin_test.go b/internal/collector/otel_collector_plugin_test.go index 2fa51cef4..66a58c36b 100644 --- a/internal/collector/otel_collector_plugin_test.go +++ b/internal/collector/otel_collector_plugin_test.go @@ -173,8 +173,9 @@ func TestCollector_ProcessNginxConfigUpdateTopic(t *testing.T) { }, }, receivers: config.Receivers{ - HostMetrics: nil, - OtlpReceivers: nil, + HostMetrics: nil, + OtlpReceivers: nil, + TcplogReceivers: make(map[string]*config.TcplogReceiver), NginxPlusReceivers: []config.NginxPlusReceiver{ { InstanceID: "123", @@ -213,8 +214,9 @@ func TestCollector_ProcessNginxConfigUpdateTopic(t *testing.T) { }, }, receivers: config.Receivers{ - HostMetrics: nil, - OtlpReceivers: nil, + HostMetrics: nil, + OtlpReceivers: nil, + TcplogReceivers: make(map[string]*config.TcplogReceiver), NginxReceivers: []config.NginxReceiver{ { InstanceID: "123", From 96b184b4e5974fa82172fef3ab378190e6b0afb7 Mon Sep 17 00:00:00 2001 From: aphralG <108004222+aphralG@users.noreply.github.com> Date: Wed, 23 Jul 2025 09:38:34 +0100 Subject: [PATCH 13/13] Filter NAP logs by Severity (#1169) --- internal/collector/otel_collector_plugin.go | 18 ++++++++++++++++++ .../collector/otel_collector_plugin_test.go | 6 +++--- internal/collector/otelcol.tmpl | 2 +- 3 files changed, 22 insertions(+), 4 deletions(-) diff --git a/internal/collector/otel_collector_plugin.go b/internal/collector/otel_collector_plugin.go index 557e0af11..da7094417 100644 --- a/internal/collector/otel_collector_plugin.go +++ b/internal/collector/otel_collector_plugin.go @@ -555,6 +555,24 @@ func (oc *Collector) updateNginxAppProtectTcplogReceivers(nginxConfigContext *mo oc.config.Collector.Receivers.TcplogReceivers["nginx_app_protect"] = &config.TcplogReceiver{ ListenAddress: nginxConfigContext.NAPSysLogServer, Operators: []config.Operator{ + // regex captures the priority number from the log line + { + Type: "regex_parser", + Fields: map[string]string{ + "regex": "^<(?P\\d+)>", + "parse_from": "body", + "parse_to": "attributes", + }, + }, + // filter drops all logs that have a severity above 4 + // https://docs.secureauth.com/0902/en/how-to-read-a-syslog-message.html#severity-code-table + { + Type: "filter", + Fields: map[string]string{ + "expr": "'int(attributes.priority) % 8 > 4'", + "drop_ratio": "1.0", + }, + }, { Type: "add", Fields: map[string]string{ diff --git a/internal/collector/otel_collector_plugin_test.go b/internal/collector/otel_collector_plugin_test.go index 66a58c36b..5623d2eef 100644 --- a/internal/collector/otel_collector_plugin_test.go +++ b/internal/collector/otel_collector_plugin_test.go @@ -749,7 +749,7 @@ func TestCollector_updateNginxAppProtectTcplogReceivers(t *testing.T) { assert.True(tt, tcplogReceiverAdded) assert.Len(tt, conf.Collector.Receivers.TcplogReceivers, 1) assert.Equal(tt, "localhost:151", conf.Collector.Receivers.TcplogReceivers["nginx_app_protect"].ListenAddress) - assert.Len(tt, conf.Collector.Receivers.TcplogReceivers["nginx_app_protect"].Operators, 4) + assert.Len(tt, conf.Collector.Receivers.TcplogReceivers["nginx_app_protect"].Operators, 6) }) // Calling updateNginxAppProtectTcplogReceivers shouldn't update the TcplogReceivers slice @@ -759,7 +759,7 @@ func TestCollector_updateNginxAppProtectTcplogReceivers(t *testing.T) { assert.False(t, tcplogReceiverAdded) assert.Len(t, conf.Collector.Receivers.TcplogReceivers, 1) assert.Equal(t, "localhost:151", conf.Collector.Receivers.TcplogReceivers["nginx_app_protect"].ListenAddress) - assert.Len(t, conf.Collector.Receivers.TcplogReceivers["nginx_app_protect"].Operators, 4) + assert.Len(t, conf.Collector.Receivers.TcplogReceivers["nginx_app_protect"].Operators, 6) }) t.Run("Test 3: TcplogReceiver deleted", func(tt *testing.T) { @@ -778,7 +778,7 @@ func TestCollector_updateNginxAppProtectTcplogReceivers(t *testing.T) { assert.True(t, tcplogReceiverDeleted) assert.Len(t, conf.Collector.Receivers.TcplogReceivers, 1) assert.Equal(t, "localhost:152", conf.Collector.Receivers.TcplogReceivers["nginx_app_protect"].ListenAddress) - assert.Len(t, conf.Collector.Receivers.TcplogReceivers["nginx_app_protect"].Operators, 4) + assert.Len(t, conf.Collector.Receivers.TcplogReceivers["nginx_app_protect"].Operators, 6) }) } diff --git a/internal/collector/otelcol.tmpl b/internal/collector/otelcol.tmpl index 038a936e4..cd8e691dd 100644 --- a/internal/collector/otelcol.tmpl +++ b/internal/collector/otelcol.tmpl @@ -298,7 +298,7 @@ service: receivers: {{- range $receiver := $pipeline.Receivers }} {{- if eq $receiver "tcplog/nginx_app_protect" }} - - tcplog/nginx_app_protect: + - tcplog/nginx_app_protect {{- else }} - {{ $receiver }} {{- end }}