From 7ed83e076504d52363a0526b00f8cffc41a4a8cf Mon Sep 17 00:00:00 2001 From: Donal Hurley Date: Tue, 15 Jul 2025 15:40:12 +0100 Subject: [PATCH 1/3] Add support for configuring mutliple OTel pipelines --- internal/collector/otel_collector_plugin.go | 91 +++---- .../collector/otel_collector_plugin_test.go | 52 ++-- internal/collector/otelcol.tmpl | 146 +++++------ internal/collector/settings.go | 63 +++-- internal/collector/settings_test.go | 44 ++-- internal/config/config.go | 246 ++++++++++++------ internal/config/config_test.go | 183 +++++++++---- internal/config/defaults.go | 9 +- internal/config/flags.go | 9 +- internal/config/testdata/nginx-agent.conf | 88 +++---- internal/config/types.go | 56 ++-- .../test-opentelemetry-collector-agent.yaml | 35 +-- test/types/config.go | 31 ++- 13 files changed, 636 insertions(+), 417 deletions(-) diff --git a/internal/collector/otel_collector_plugin.go b/internal/collector/otel_collector_plugin.go index a1c6a9876..64b007002 100644 --- a/internal/collector/otel_collector_plugin.go +++ b/internal/collector/otel_collector_plugin.go @@ -59,7 +59,7 @@ var ( initMutex = &sync.Mutex{} ) -// NewCollector is the constructor for the Collector plugin. +// New is the constructor for the Collector plugin. func New(conf *config.Config) (*Collector, error) { initMutex.Lock() @@ -194,7 +194,7 @@ func (oc *Collector) Subscriptions() []string { } // Process receivers and log warning for sub-optimal configurations -func (oc *Collector) processReceivers(ctx context.Context, receivers []config.OtlpReceiver) { +func (oc *Collector) processReceivers(ctx context.Context, receivers map[string]*config.OtlpReceiver) { for _, receiver := range receivers { if receiver.OtlpTLSConfig == nil { slog.WarnContext(ctx, "OTel receiver is configured without TLS. Connections are unencrypted.") @@ -317,12 +317,13 @@ func (oc *Collector) updateResourceProcessor(resourceUpdateContext *v1.Resource) resourceProcessorUpdated := false if oc.config.Collector.Processors.Resource == nil { - oc.config.Collector.Processors.Resource = &config.Resource{ + oc.config.Collector.Processors.Resource = make(map[string]*config.Resource) + oc.config.Collector.Processors.Resource["default"] = &config.Resource{ Attributes: make([]config.ResourceAttribute, 0), } } - if oc.config.Collector.Processors.Resource != nil && + if oc.config.Collector.Processors.Resource["default"] != nil && resourceUpdateContext.GetResourceId() != "" { resourceProcessorUpdated = oc.updateResourceAttributes( []config.ResourceAttribute{ @@ -431,7 +432,7 @@ func (oc *Collector) checkForNewReceivers(ctx context.Context, nginxConfigContex } if oc.config.IsFeatureEnabled(pkgConfig.FeatureLogsNap) { - tcplogReceiversFound := oc.updateTcplogReceivers(nginxConfigContext) + tcplogReceiversFound := oc.updateNginxAppProtectTcplogReceivers(nginxConfigContext) if tcplogReceiversFound { reloadCollector = true } @@ -541,8 +542,13 @@ func (oc *Collector) updateExistingNginxOSSReceiver( return nginxReceiverFound, reloadCollector } -func (oc *Collector) updateTcplogReceivers(nginxConfigContext *model.NginxConfigContext) bool { +func (oc *Collector) updateNginxAppProtectTcplogReceivers(nginxConfigContext *model.NginxConfigContext) bool { newTcplogReceiverAdded := false + + if oc.config.Collector.Receivers.TcplogReceivers == nil { + oc.config.Collector.Receivers.TcplogReceivers = make(map[string]*config.TcplogReceiver) + } + if nginxConfigContext.NAPSysLogServers != nil { napLoop: for _, napSysLogServer := range nginxConfigContext.NAPSysLogServers { @@ -550,40 +556,37 @@ func (oc *Collector) updateTcplogReceivers(nginxConfigContext *model.NginxConfig continue napLoop } - oc.config.Collector.Receivers.TcplogReceivers = append( - oc.config.Collector.Receivers.TcplogReceivers, - config.TcplogReceiver{ - ListenAddress: napSysLogServer, - Operators: []config.Operator{ - { - Type: "add", - Fields: map[string]string{ - "field": "body", - "value": timestampConversionExpression, - }, + oc.config.Collector.Receivers.TcplogReceivers["nginx_app_protect"] = &config.TcplogReceiver{ + ListenAddress: napSysLogServer, + Operators: []config.Operator{ + { + Type: "add", + Fields: map[string]string{ + "field": "body", + "value": timestampConversionExpression, }, - { - Type: "syslog_parser", - Fields: map[string]string{ - "protocol": "rfc3164", - }, + }, + { + Type: "syslog_parser", + Fields: map[string]string{ + "protocol": "rfc3164", }, - { - Type: "remove", - Fields: map[string]string{ - "field": "attributes.message", - }, + }, + { + Type: "remove", + Fields: map[string]string{ + "field": "attributes.message", }, - { - Type: "add", - Fields: map[string]string{ - "field": "resource[\"instance.id\"]", - "value": nginxConfigContext.InstanceID, - }, + }, + { + Type: "add", + Fields: map[string]string{ + "field": "resource[\"instance.id\"]", + "value": nginxConfigContext.InstanceID, }, }, }, - ) + } newTcplogReceiverAdded = true } @@ -597,23 +600,13 @@ func (oc *Collector) updateTcplogReceivers(nginxConfigContext *model.NginxConfig func (oc *Collector) areNapReceiversDeleted(nginxConfigContext *model.NginxConfigContext) bool { listenAddressesToBeDeleted := oc.configDeletedNapReceivers(nginxConfigContext) if len(listenAddressesToBeDeleted) != 0 { - oc.deleteNapReceivers(listenAddressesToBeDeleted) + delete(oc.config.Collector.Receivers.TcplogReceivers, "nginx_app_protect") return true } return false } -func (oc *Collector) deleteNapReceivers(listenAddressesToBeDeleted map[string]bool) { - filteredReceivers := (oc.config.Collector.Receivers.TcplogReceivers)[:0] - for _, receiver := range oc.config.Collector.Receivers.TcplogReceivers { - if !listenAddressesToBeDeleted[receiver.ListenAddress] { - filteredReceivers = append(filteredReceivers, receiver) - } - } - oc.config.Collector.Receivers.TcplogReceivers = filteredReceivers -} - func (oc *Collector) configDeletedNapReceivers(nginxConfigContext *model.NginxConfigContext) map[string]bool { elements := make(map[string]bool) @@ -651,16 +644,16 @@ func (oc *Collector) updateResourceAttributes( ) (actionUpdated bool) { actionUpdated = false - if oc.config.Collector.Processors.Resource.Attributes != nil { + if oc.config.Collector.Processors.Resource["default"].Attributes != nil { OUTER: for _, toAdd := range attributesToAdd { - for _, action := range oc.config.Collector.Processors.Resource.Attributes { + for _, action := range oc.config.Collector.Processors.Resource["default"].Attributes { if action.Key == toAdd.Key { continue OUTER } } - oc.config.Collector.Processors.Resource.Attributes = append( - oc.config.Collector.Processors.Resource.Attributes, + oc.config.Collector.Processors.Resource["default"].Attributes = append( + oc.config.Collector.Processors.Resource["default"].Attributes, toAdd, ) actionUpdated = true diff --git a/internal/collector/otel_collector_plugin_test.go b/internal/collector/otel_collector_plugin_test.go index d625d13b7..f3f3e6722 100644 --- a/internal/collector/otel_collector_plugin_test.go +++ b/internal/collector/otel_collector_plugin_test.go @@ -349,12 +349,14 @@ func TestCollector_ProcessResourceUpdateTopic(t *testing.T) { Data: protos.HostResource(), }, processors: config.Processors{ - Resource: &config.Resource{ - Attributes: []config.ResourceAttribute{ - { - Key: "resource.id", - Action: "insert", - Value: "1234", + Resource: map[string]*config.Resource{ + "default": { + Attributes: []config.ResourceAttribute{ + { + Key: "resource.id", + Action: "insert", + Value: "1234", + }, }, }, }, @@ -711,18 +713,19 @@ func TestCollector_updateResourceAttributes(t *testing.T) { collector.service = createFakeCollector() // set up Actions - conf.Collector.Processors.Resource = &config.Resource{Attributes: test.setup} + conf.Collector.Processors.Resource = make(map[string]*config.Resource) + conf.Collector.Processors.Resource["default"] = &config.Resource{Attributes: test.setup} reloadRequired := collector.updateResourceAttributes(test.attributes) assert.Equal(tt, test.expectedAttribs, - conf.Collector.Processors.Resource.Attributes) + conf.Collector.Processors.Resource["default"].Attributes) assert.Equal(tt, test.expectedReloadRequired, reloadRequired) }) } } -func TestCollector_updateTcplogReceivers(t *testing.T) { +func TestCollector_updateNginxAppProtectTcplogReceivers(t *testing.T) { conf := types.OTelConfig(t) conf.Collector.Log.Path = "" conf.Collector.Processors.Batch = nil @@ -741,38 +744,43 @@ func TestCollector_updateTcplogReceivers(t *testing.T) { assert.Empty(t, conf.Collector.Receivers.TcplogReceivers) t.Run("Test 1: New TcplogReceiver added", func(tt *testing.T) { - tcplogReceiverAdded := collector.updateTcplogReceivers(nginxConfigContext) + tcplogReceiverAdded := collector.updateNginxAppProtectTcplogReceivers(nginxConfigContext) assert.True(tt, tcplogReceiverAdded) assert.Len(tt, conf.Collector.Receivers.TcplogReceivers, 1) - assert.Equal(tt, "localhost:151", conf.Collector.Receivers.TcplogReceivers[0].ListenAddress) - assert.Len(tt, conf.Collector.Receivers.TcplogReceivers[0].Operators, 4) + assert.Equal(tt, "localhost:151", conf.Collector.Receivers.TcplogReceivers["nginx_app_protect"].ListenAddress) + assert.Len(tt, conf.Collector.Receivers.TcplogReceivers["nginx_app_protect"].Operators, 4) }) - // Calling updateTcplogReceivers shouldn't update the TcplogReceivers slice + // Calling updateNginxAppProtectTcplogReceivers shouldn't update the TcplogReceivers slice // since there is already a receiver with the same ListenAddress t.Run("Test 2: TcplogReceiver already exists", func(tt *testing.T) { - tcplogReceiverAdded := collector.updateTcplogReceivers(nginxConfigContext) + tcplogReceiverAdded := collector.updateNginxAppProtectTcplogReceivers(nginxConfigContext) assert.False(t, tcplogReceiverAdded) assert.Len(t, conf.Collector.Receivers.TcplogReceivers, 1) - assert.Equal(t, "localhost:151", conf.Collector.Receivers.TcplogReceivers[0].ListenAddress) - assert.Len(t, conf.Collector.Receivers.TcplogReceivers[0].Operators, 4) + assert.Equal(t, "localhost:151", conf.Collector.Receivers.TcplogReceivers["nginx_app_protect"].ListenAddress) + assert.Len(t, conf.Collector.Receivers.TcplogReceivers["nginx_app_protect"].Operators, 4) }) t.Run("Test 3: TcplogReceiver deleted", func(tt *testing.T) { - tcplogReceiverDeleted := collector.updateTcplogReceivers(&model.NginxConfigContext{}) + tcplogReceiverDeleted := collector.updateNginxAppProtectTcplogReceivers(&model.NginxConfigContext{}) assert.True(t, tcplogReceiverDeleted) assert.Empty(t, conf.Collector.Receivers.TcplogReceivers) }) t.Run("Test 4: New tcplogReceiver added and deleted another", func(tt *testing.T) { - tcplogReceiverDeleted := collector.updateTcplogReceivers(&model.NginxConfigContext{NAPSysLogServers: []string{ - "localhost:152", - }}) + tcplogReceiverDeleted := collector.updateNginxAppProtectTcplogReceivers( + &model.NginxConfigContext{ + NAPSysLogServers: []string{ + "localhost:152", + }, + }, + ) + assert.True(t, tcplogReceiverDeleted) assert.Len(t, conf.Collector.Receivers.TcplogReceivers, 1) - assert.Equal(t, "localhost:152", conf.Collector.Receivers.TcplogReceivers[0].ListenAddress) - assert.Len(t, conf.Collector.Receivers.TcplogReceivers[0].Operators, 4) + assert.Equal(t, "localhost:152", conf.Collector.Receivers.TcplogReceivers["nginx_app_protect"].ListenAddress) + assert.Len(t, conf.Collector.Receivers.TcplogReceivers["nginx_app_protect"].Operators, 4) }) } diff --git a/internal/collector/otelcol.tmpl b/internal/collector/otelcol.tmpl index 6af00e9d2..480101428 100644 --- a/internal/collector/otelcol.tmpl +++ b/internal/collector/otelcol.tmpl @@ -1,10 +1,11 @@ receivers: -{{- if ne .Receivers.ContainerMetrics nil }} + {{- with .Receivers.ContainerMetrics }} containermetrics: - {{- if .Receivers.ContainerMetrics.CollectionInterval }} - collection_interval: {{ .Receivers.ContainerMetrics.CollectionInterval }} + {{- with .CollectionInterval }} + collection_interval: "{{ . }}" {{- end}} {{- end }} + {{- if ne .Receivers.HostMetrics nil }} hostmetrics: {{- if .Receivers.HostMetrics.CollectionInterval }} @@ -39,7 +40,8 @@ receivers: network: {{- end }} {{- end }} -{{- end }} +{{- end }} + {{- range $index, $otlpReceiver := .Receivers.OtlpReceivers }} otlp/{{$index}}: protocols: @@ -75,6 +77,7 @@ receivers: {{- end }} {{- end }} {{- end }} + {{- range .Receivers.NginxReceivers }} nginx/{{- .InstanceID -}}: api_details: @@ -92,6 +95,7 @@ receivers: {{- end }} {{- end }} {{- end }} + {{- range .Receivers.NginxPlusReceivers }} nginxplus/{{- .InstanceID -}}: api_details: @@ -102,6 +106,7 @@ receivers: collection_interval: {{ .CollectionInterval }} {{- end }} {{- end }} + {{- range $index, $tcplogReceiver := .Receivers.TcplogReceivers }} tcplog/{{$index}}: listen_address: "{{- .ListenAddress -}}" @@ -116,35 +121,43 @@ receivers: processors: {{- if ne .Processors.Resource nil }} -{{- if .Processors.Resource.Attributes }} - resource: + {{- range $key, $resource := .Processors.Resource }} + {{- if $resource.Attributes }} + resource/{{$key}}: attributes: -{{- range .Processors.Resource.Attributes }} + {{- range $resource.Attributes }} - key: {{ .Key }} action: {{ .Action }} value: {{ .Value }} -{{- end }} -{{- end }} + {{- end }} + {{- end }} + {{- end }} {{- end }} {{- if ne .Processors.Attribute nil }} -{{- if .Processors.Attribute.Actions }} - attributes: + {{- range $key, $attribute := .Processors.Attribute }} + {{- if $attribute.Actions }} + attributes/{{$key}}: actions: -{{- range .Processors.Attribute.Actions }} + {{- range $attribute.Actions }} - key: {{ .Key }} action: {{ .Action }} value: {{ .Value }} -{{- end }} -{{- end }} + {{- end }} + {{- end }} + {{- end }} {{- end }} {{- if ne .Processors.Batch nil }} - batch: - send_batch_size: {{ .Processors.Batch.SendBatchSize }} - timeout: {{ .Processors.Batch.Timeout }} - send_batch_max_size: {{ .Processors.Batch.SendBatchMaxSize }} + {{- range $key, $batch := .Processors.Batch }} + batch/{{$key}}: + send_batch_size: {{ $batch.SendBatchSize }} + timeout: {{ $batch.Timeout }} + send_batch_max_size: {{ $batch.SendBatchMaxSize }} +{{- end }} {{- end }} {{- if ne .Processors.LogsGzip nil }} - logsgzip: {} +{{ range $key, $value := .Processors.LogsGzip }} + logsgzip/{{$key}}: {} +{{- end }} {{- end }} exporters: @@ -173,6 +186,7 @@ exporters: authenticator: {{ .Authenticator -}} {{- end }} {{- end }} + {{- if ne .Exporters.PrometheusExporter nil }} prometheus: endpoint: "{{ .Exporters.PrometheusExporter.Server.Host -}}:{{- .Exporters.PrometheusExporter.Server.Port }}" @@ -241,73 +255,59 @@ service: - headers_setter {{- end}} {{- end}} + pipelines: - {{- if or (ne .Receivers.HostMetrics nil) (ne .Receivers.ContainerMetrics nil) (gt (len .Receivers.OtlpReceivers) 0) (gt (len .Receivers.NginxReceivers) 0) (gt (len .Receivers.NginxPlusReceivers) 0) }} - metrics: + {{- range $pipelineName, $pipeline := .Pipelines.Metrics }} + {{- if or (ne $.Receivers.HostMetrics nil) (ne $.Receivers.ContainerMetrics nil) (gt (len $.Receivers.OtlpReceivers) 0) (gt (len $.Receivers.NginxReceivers) 0) (gt (len $.Receivers.NginxPlusReceivers) 0) }} + metrics/{{$pipelineName}}: receivers: - {{- if ne .Receivers.ContainerMetrics nil }} + {{- range $receiver := $pipeline.Receivers }} + {{- if eq $receiver "host_metrics" }} + {{- if ne $.Receivers.ContainerMetrics nil }} - containermetrics - {{- end }} - {{- if ne .Receivers.HostMetrics nil }} + {{- end }} + {{- if ne $.Receivers.HostMetrics nil }} - hostmetrics - {{- end }} - {{- range $index, $otlpReceiver := .Receivers.OtlpReceivers }} - - otlp/{{$index}} - {{- end }} - {{- range .Receivers.NginxReceivers }} + {{- end }} + {{- else if eq $receiver "nginx_metrics" }} + {{- range $.Receivers.NginxReceivers }} - nginx/{{- .InstanceID -}} - {{- end }} - {{- range .Receivers.NginxPlusReceivers }} + {{- end }} + {{- range $.Receivers.NginxPlusReceivers }} - nginxplus/{{- .InstanceID -}} + {{- end }} + {{- else }} + - {{ $receiver }} + {{- end }} {{- end }} processors: - {{- if ne .Processors.Resource nil }} - {{- if .Processors.Resource.Attributes }} - - resource - {{- end }} - {{- end }} - {{- if ne .Processors.Attribute nil }} - {{- if .Processors.Attribute.Actions }} - - attributes - {{- end }} - {{- end }} - {{- if ne .Processors.Batch nil }} - - batch + {{- range $pipeline.Processors }} + - {{ . }} {{- end }} exporters: - {{- range $index, $otlpExporter := .Exporters.OtlpExporters }} - - otlp/{{$index}} - {{- end }} - {{- if ne .Exporters.PrometheusExporter nil }} - - prometheus - {{- end }} - {{- if ne .Exporters.Debug nil }} - - debug - {{- end }} - {{- end }} - {{- if gt (len .Receivers.TcplogReceivers) 0 }} - logs: + {{- range $pipeline.Exporters }} + - {{ . }} + {{- end }} + {{- end }} + {{- end }} + {{- range $pipelineName, $pipeline := .Pipelines.Logs }} + {{- if gt (len $.Receivers.TcplogReceivers) 0 }} + logs/{{$pipelineName}}: receivers: - {{- range $index, $tcplogReceiver := .Receivers.TcplogReceivers }} - - tcplog/{{$index}} + {{- range $receiver := $pipeline.Receivers }} + {{- if eq $receiver "tcplog/nginx_app_protect" }} + - tcplog/nginx_app_protect: + {{- else }} + - {{ $receiver }} + {{- end }} {{- end }} processors: - {{- if ne .Processors.Resource nil }} - {{- if .Processors.Resource.Attributes }} - - resource - {{- end }} - {{- end }} - {{- if ne .Processors.LogsGzip nil }} - - logsgzip - {{- end }} - {{- if ne .Processors.Batch nil }} - - batch + {{- range $pipeline.Processors }} + - {{ . }} {{- end }} exporters: - {{- range $index, $otlpExporter := .Exporters.OtlpExporters }} - - otlp/{{$index}} + {{- range $pipeline.Exporters }} + - {{ . }} {{- end }} - {{- if ne .Exporters.Debug nil }} - - debug - {{- end }} - {{- end }} + {{- end }} + {{- end }} diff --git a/internal/collector/settings.go b/internal/collector/settings.go index 62e92afbb..e89bb89d6 100644 --- a/internal/collector/settings.go +++ b/internal/collector/settings.go @@ -9,6 +9,7 @@ import ( "log/slog" "os" "path/filepath" + "slices" "text/template" "github.com/nginx/agent/v3/internal/config" @@ -89,43 +90,59 @@ func createFile(confPath string) error { return nil } -// Generates a OTel Collector config to a file by injecting the Metrics Config to a Go template. +// Generates an OTel Collector config to a file by injecting the Metrics Config to a Go template. func writeCollectorConfig(conf *config.Collector) error { - otelcolTemplate, err := template.New(otelTemplatePath).Parse(otelcolTemplate) - if err != nil { - return err + if conf.Processors.Resource["default"] != nil { + addDefaultResourceProcessor(conf.Pipelines.Metrics) + addDefaultResourceProcessor(conf.Pipelines.Logs) } - confPath := filepath.Clean(conf.ConfigPath) + slog.Info("Writing OTel collector config") - // Check if file exists, if not create it. - _, err = os.Stat(confPath) - if err != nil { - if !os.IsNotExist(err) { - return err - } + otelcolTemplate, templateErr := template.New(otelTemplatePath).Parse(otelcolTemplate) + if templateErr != nil { + return templateErr + } - fileErr := createFile(confPath) - if fileErr != nil { - return fileErr - } + confPath := filepath.Clean(conf.ConfigPath) + + // Ensure file exists and has correct permissions + if err := ensureFileExists(confPath); err != nil { + return err } file, err := os.OpenFile(confPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, configFilePermission) + if err != nil { + return err + } defer func() { - err = file.Close() - if err != nil { - slog.Warn("Failed to close file", "file_path", confPath) + fileCloseErr := file.Close() + if fileCloseErr != nil { + slog.Warn("Failed to close file", "file_path", confPath, "error", fileCloseErr) } }() - if err != nil { - return err + + return otelcolTemplate.Execute(file, conf) +} + +func addDefaultResourceProcessor(pipelines map[string]*config.Pipeline) { + for _, pipeline := range pipelines { + if !slices.Contains(pipeline.Processors, "resource/default") { + pipeline.Processors = append(pipeline.Processors, "resource/default") + } } +} - err = otelcolTemplate.Execute(file, conf) +func ensureFileExists(confPath string) error { + _, err := os.Stat(confPath) if err != nil { - return err + if !os.IsNotExist(err) { + return err + } + if createFileErr := createFile(confPath); createFileErr != nil { + return createFileErr + } } - return nil + return os.Chmod(confPath, configFilePermission) } diff --git a/internal/collector/settings_test.go b/internal/collector/settings_test.go index af89bd1e1..f8b8c7be4 100644 --- a/internal/collector/settings_test.go +++ b/internal/collector/settings_test.go @@ -54,12 +54,14 @@ func TestTemplateWrite(t *testing.T) { cfg := types.AgentConfig() actualConfPath := filepath.Join(tmpDir, "nginx-agent-otelcol-test.yaml") cfg.Collector.ConfigPath = actualConfPath - cfg.Collector.Processors.Resource = &config.Resource{ - Attributes: []config.ResourceAttribute{ - { - Key: "resource.id", - Action: "add", - Value: "12345", + cfg.Collector.Processors.Resource = map[string]*config.Resource{ + "default": { + Attributes: []config.ResourceAttribute{ + { + Key: "resource.id", + Action: "add", + Value: "12345", + }, }, }, } @@ -106,8 +108,7 @@ func TestTemplateWrite(t *testing.T) { }, }) // Clear default config and test collector with TLS enabled - cfg.Collector.Receivers.OtlpReceivers = []config.OtlpReceiver{} - cfg.Collector.Receivers.OtlpReceivers = append(cfg.Collector.Receivers.OtlpReceivers, config.OtlpReceiver{ + cfg.Collector.Receivers.OtlpReceivers["default"] = &config.OtlpReceiver{ Server: &config.ServerConfig{ Host: "localhost", Port: 4317, @@ -118,10 +119,10 @@ func TestTemplateWrite(t *testing.T) { Key: "/tmp/key.pem", Ca: "/tmp/ca.pem", }, - }) + } - cfg.Collector.Receivers.TcplogReceivers = []config.TcplogReceiver{ - { + cfg.Collector.Receivers.TcplogReceivers = map[string]*config.TcplogReceiver{ + "default": { ListenAddress: "localhost:151", Operators: []config.Operator{ { @@ -155,13 +156,26 @@ func TestTemplateWrite(t *testing.T) { }, } - cfg.Collector.Exporters.OtlpExporters[0].Authenticator = "headers_setter" + cfg.Collector.Exporters.OtlpExporters["default"].Authenticator = "headers_setter" // nolint: lll - cfg.Collector.Exporters.OtlpExporters[0].Compression = types.AgentConfig().Collector.Exporters.OtlpExporters[0].Compression - cfg.Collector.Exporters.OtlpExporters[0].Server.Port = 1234 - cfg.Collector.Receivers.OtlpReceivers[0].Server.Port = 4317 + cfg.Collector.Exporters.OtlpExporters["default"].Compression = types.AgentConfig().Collector.Exporters.OtlpExporters["default"].Compression + cfg.Collector.Exporters.OtlpExporters["default"].Server.Port = 1234 + cfg.Collector.Receivers.OtlpReceivers["default"].Server.Port = 4317 cfg.Collector.Extensions.Health.Server.Port = 1337 + cfg.Collector.Pipelines.Metrics = make(map[string]*config.Pipeline) + cfg.Collector.Pipelines.Metrics["default"] = &config.Pipeline{ + Receivers: []string{"hostmetrics", "containermetrics", "otlp/default", "nginx/123"}, + Processors: []string{"resource/default", "batch/default"}, + Exporters: []string{"otlp/default", "prometheus", "debug"}, + } + cfg.Collector.Pipelines.Logs = make(map[string]*config.Pipeline) + cfg.Collector.Pipelines.Logs["default"] = &config.Pipeline{ + Receivers: []string{"tcplog/default"}, + Processors: []string{"resource/default", "batch/default"}, + Exporters: []string{"otlp/default", "debug"}, + } + require.NotNil(t, cfg) err := writeCollectorConfig(cfg.Collector) diff --git a/internal/config/config.go b/internal/config/config.go index 0447f834e..f17dc31d8 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -32,11 +32,15 @@ import ( ) const ( - ConfigFileName = "nginx-agent.conf" - EnvPrefix = "NGINX_AGENT" - KeyDelimiter = "_" - KeyValueNumber = 2 - AgentDirName = "/etc/nginx-agent/" + ConfigFileName = "nginx-agent.conf" + EnvPrefix = "NGINX_AGENT" + KeyDelimiter = "_" + KeyValueNumber = 2 + AgentDirName = "/etc/nginx-agent/" + DefaultMetricsBatchProcessor = "default_metrics" + DefaultLogsBatchProcessor = "default_logs" + DefaultExporter = "default" + DefaultPipeline = "default" ) var viperInstance = viper.NewWithOptions(viper.KeyDelimiter(KeyDelimiter)) @@ -128,7 +132,7 @@ func ResolveConfig() (*Config, error) { ManifestDir: viperInstance.GetString(ManifestDirPathKey), } - checkCollectorConfiguration(collector, config) + defaultCollector(collector, config) slog.Debug("Agent config", "config", config) slog.Info("Excluded files from being watched for file changes", "exclude_files", @@ -137,34 +141,145 @@ func ResolveConfig() (*Config, error) { return config, nil } -func checkCollectorConfiguration(collector *Collector, config *Config) { - if isOTelExporterConfigured(collector) && config.IsCommandGrpcClientConfigured() && - config.IsCommandAuthConfigured() && - config.IsCommandTLSConfigured() { - slog.Info("No collector configuration found in NGINX Agent config, command server configuration found." + - " Using default collector configuration") - defaultCollector(collector, config) +func defaultCollector(collector *Collector, config *Config) { + addDefaultHostMetricsReceiver(collector) + addDefaultProcessors(collector) + addDefaultOtlpExporter(collector, config) + addDefaultPipelines(collector) +} + +func addDefaultPipelines(collector *Collector) { + if collector.Pipelines.Metrics == nil { + collector.Pipelines.Metrics = make(map[string]*Pipeline) + } + if _, ok := collector.Pipelines.Metrics[DefaultPipeline]; !ok { + collector.Pipelines.Metrics[DefaultPipeline] = &Pipeline{ + Receivers: []string{"host_metrics", "nginx_metrics"}, + Processors: []string{"batch/default_metrics"}, + Exporters: []string{"otlp/default"}, + } + } + + if collector.Pipelines.Logs == nil { + collector.Pipelines.Logs = make(map[string]*Pipeline) + } + if _, ok := collector.Pipelines.Logs[DefaultPipeline]; !ok { + collector.Pipelines.Logs[DefaultPipeline] = &Pipeline{ + Receivers: []string{"tcplog/nginx_app_protect"}, + Processors: []string{"logsgzip/default", "batch/default_logs"}, + Exporters: []string{"otlp/default"}, + } } } -func defaultCollector(collector *Collector, config *Config) { - token := config.Command.Auth.Token - if config.Command.Auth.TokenPath != "" { - slog.Debug("Reading token from file", "path", config.Command.Auth.TokenPath) - pathToken, err := file.ReadFromFile(config.Command.Auth.TokenPath) +func addDefaultOtlpExporter(collector *Collector, config *Config) { + if collector.Exporters.OtlpExporters == nil { + collector.Exporters.OtlpExporters = make(map[string]*OtlpExporter) + } + + defaultCommandServer := config.Command + + if config.IsAuxiliaryCommandGrpcClientConfigured() { + defaultCommandServer = config.AuxiliaryCommand + } + + if _, ok := collector.Exporters.OtlpExporters[DefaultExporter]; !ok && defaultCommandServer != nil { + collector.Exporters.OtlpExporters[DefaultExporter] = &OtlpExporter{ + Server: defaultCommandServer.Server, + TLS: defaultCommandServer.TLS, + Compression: "", + } + + if defaultCommandServer.Auth != nil { + token := extractTokenFromAuth(defaultCommandServer.Auth) + if token != "" { + addAuthHeader(collector, token) + collector.Exporters.OtlpExporters[DefaultExporter].Authenticator = "headers_setter" + } + } + } +} + +func extractTokenFromAuth(auth *AuthConfig) string { + token := auth.Token + if auth.TokenPath != "" { + slog.Debug("Reading token from file", "path", auth.TokenPath) + tokenFromFile, err := file.ReadFromFile(auth.TokenPath) if err != nil { slog.Error("Error adding token to default collector, "+ "default collector configuration not started", "error", err) - return + return "" } - token = pathToken + token = tokenFromFile } + return token +} + +func addAuthHeader(collector *Collector, token string) { + header := []Header{ + { + Action: "insert", + Key: "authorization", + Value: token, + }, + } + + if collector.Extensions.HeadersSetter == nil { + collector.Extensions.HeadersSetter = &HeadersSetter{ + Headers: header, + } + } else { + // nolint: lll + collector.Extensions.HeadersSetter.Headers = append(collector.Extensions.HeadersSetter.Headers, header...) + } +} + +func addDefaultProcessors(collector *Collector) { + if collector.Processors.Batch == nil { + collector.Processors.Batch = make(map[string]*Batch) + } + + if _, ok := collector.Processors.Batch[DefaultMetricsBatchProcessor]; !ok { + collector.Processors.Batch[DefaultMetricsBatchProcessor] = &Batch{ + SendBatchSize: DefCollectorMetricsBatchProcessorSendBatchSize, + SendBatchMaxSize: DefCollectorMetricsBatchProcessorSendBatchMaxSize, + Timeout: DefCollectorMetricsBatchProcessorTimeout, + } + } + if _, ok := collector.Processors.Batch[DefaultLogsBatchProcessor]; !ok { + collector.Processors.Batch[DefaultLogsBatchProcessor] = &Batch{ + SendBatchSize: DefCollectorLogsBatchProcessorSendBatchSize, + SendBatchMaxSize: DefCollectorLogsBatchProcessorSendBatchMaxSize, + Timeout: DefCollectorLogsBatchProcessorTimeout, + } + } + + if collector.Processors.LogsGzip == nil { + collector.Processors.LogsGzip = make(map[string]*LogsGzip) + } + if _, ok := collector.Processors.LogsGzip["default"]; !ok { + collector.Processors.LogsGzip["default"] = &LogsGzip{} + } +} + +func addDefaultHostMetricsReceiver(collector *Collector) { if host.NewInfo().IsContainer() { + addDefaultContainerHostMetricsReceiver(collector) + } else { + addDefaultVMHostMetricsReceiver(collector) + } +} + +func addDefaultContainerHostMetricsReceiver(collector *Collector) { + if collector.Receivers.ContainerMetrics == nil { collector.Receivers.ContainerMetrics = &ContainerMetricsReceiver{ CollectionInterval: 1 * time.Minute, } + } + + if collector.Receivers.HostMetrics == nil { collector.Receivers.HostMetrics = &HostMetrics{ Scrapers: &HostMetricsScrapers{ Network: &NetworkScraper{}, @@ -172,11 +287,18 @@ func defaultCollector(collector *Collector, config *Config) { CollectionInterval: 1 * time.Minute, InitialDelay: 1 * time.Second, } + } + + if collector.Log == nil { collector.Log = &Log{ Path: "stdout", Level: "info", } - } else { + } +} + +func addDefaultVMHostMetricsReceiver(collector *Collector) { + if collector.Receivers.HostMetrics == nil { collector.Receivers.HostMetrics = &HostMetrics{ Scrapers: &HostMetricsScrapers{ CPU: &CPUScraper{}, @@ -189,24 +311,6 @@ func defaultCollector(collector *Collector, config *Config) { InitialDelay: 1 * time.Second, } } - - collector.Exporters.OtlpExporters = append(collector.Exporters.OtlpExporters, OtlpExporter{ - Server: config.Command.Server, - TLS: config.Command.TLS, - Compression: "", - Authenticator: "headers_setter", - }) - - header := []Header{ - { - Action: "insert", - Key: "authorization", - Value: token, - }, - } - collector.Extensions.HeadersSetter = &HeadersSetter{ - Headers: header, - } } func setVersion(version, commit string) { @@ -530,24 +634,6 @@ func registerCollectorFlags(fs *flag.FlagSet) { "If the default path doesn't exist, log messages are output to stdout/stderr.", ) - fs.Uint32( - CollectorBatchProcessorSendBatchSizeKey, - DefCollectorBatchProcessorSendBatchSize, - `Number of metric data points after which a batch will be sent regardless of the timeout.`, - ) - - fs.Uint32( - CollectorBatchProcessorSendBatchMaxSizeKey, - DefCollectorBatchProcessorSendBatchMaxSize, - `The upper limit of the batch size.`, - ) - - fs.Duration( - CollectorBatchProcessorTimeoutKey, - DefCollectorBatchProcessorTimeout, - `Time duration after which a batch will be sent regardless of size.`, - ) - fs.String( CollectorExtensionsHealthServerHostKey, DefCollectorExtensionsHealthServerHost, @@ -820,6 +906,7 @@ func resolveCollector(allowedDirs []string) (*Collector, error) { Receivers: receivers, Extensions: resolveExtensions(), Log: resolveCollectorLog(), + Pipelines: resolvePipelines(), } // Check for self-signed certificate true in Agent conf @@ -835,8 +922,33 @@ func resolveCollector(allowedDirs []string) (*Collector, error) { return col, nil } +func resolvePipelines() Pipelines { + var metricsPipelines map[string]*Pipeline + + if viperInstance.IsSet(CollectorMetricsPipelinesKey) { + err := resolveMapStructure(CollectorMetricsPipelinesKey, &metricsPipelines) + if err != nil { + metricsPipelines = nil + } + } + + var logsPipelines map[string]*Pipeline + + if viperInstance.IsSet(CollectorLogsPipelinesKey) { + err := resolveMapStructure(CollectorLogsPipelinesKey, &logsPipelines) + if err != nil { + logsPipelines = nil + } + } + + return Pipelines{ + Metrics: metricsPipelines, + Logs: logsPipelines, + } +} + func resolveExporters() (Exporters, error) { - var otlpExporters []OtlpExporter + var otlpExporters map[string]*OtlpExporter exporters := Exporters{} if viperInstance.IsSet(CollectorDebugExporterKey) { @@ -879,17 +991,10 @@ func isPrometheusExporterSet() bool { } func resolveProcessors() Processors { - processors := Processors{ - Batch: &Batch{ - SendBatchSize: viperInstance.GetUint32(CollectorBatchProcessorSendBatchSizeKey), - SendBatchMaxSize: viperInstance.GetUint32(CollectorBatchProcessorSendBatchMaxSizeKey), - Timeout: viperInstance.GetDuration(CollectorBatchProcessorTimeoutKey), - }, - LogsGzip: &LogsGzip{}, - } + processors := Processors{} - if viperInstance.IsSet(CollectorAttributeProcessorKey) { - err := resolveMapStructure(CollectorAttributeProcessorKey, &processors.Attribute) + if viperInstance.IsSet(CollectorProcessorsKey) { + err := resolveMapStructure(CollectorProcessorsKey, &processors) if err != nil { return processors } @@ -1167,8 +1272,3 @@ func resolveMapStructure(key string, object any) error { return nil } - -func isOTelExporterConfigured(collector *Collector) bool { - return len(collector.Exporters.OtlpExporters) == 0 && collector.Exporters.PrometheusExporter == nil && - collector.Exporters.Debug == nil -} diff --git a/internal/config/config_test.go b/internal/config/config_test.go index f34fa9853..95c04321c 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -187,11 +187,7 @@ func TestResolveCollector(t *testing.T) { viperInstance.Set(CollectorLogPathKey, expected.Log.Path) viperInstance.Set(CollectorLogLevelKey, expected.Log.Level) viperInstance.Set(CollectorReceiversKey, expected.Receivers) - viperInstance.Set(CollectorBatchProcessorKey, expected.Processors.Batch) - viperInstance.Set(CollectorBatchProcessorSendBatchSizeKey, expected.Processors.Batch.SendBatchSize) - viperInstance.Set(CollectorBatchProcessorSendBatchMaxSizeKey, expected.Processors.Batch.SendBatchMaxSize) - viperInstance.Set(CollectorBatchProcessorTimeoutKey, expected.Processors.Batch.Timeout) - viperInstance.Set(CollectorLogsGzipProcessorKey, expected.Processors.LogsGzip) + viperInstance.Set(CollectorProcessorsKey, expected.Processors) viperInstance.Set(CollectorExportersKey, expected.Exporters) viperInstance.Set(CollectorOtlpExportersKey, expected.Exporters.OtlpExporters) viperInstance.Set(CollectorExtensionsHealthServerHostKey, expected.Extensions.Health.Server.Host) @@ -761,6 +757,79 @@ func TestResolveExtensions_MultipleHeaders(t *testing.T) { } } +func TestAddDefaultOtlpExporter(t *testing.T) { + t.Run("Test 1: Command server only", func(t *testing.T) { + collector := &Collector{} + agentConfig := &Config{ + Command: &Command{ + Server: &ServerConfig{ + Host: "test.com", + Port: 8080, + Type: Grpc, + }, + Auth: &AuthConfig{ + Token: "token", + }, + TLS: &TLSConfig{ + SkipVerify: false, + }, + }, + } + + addDefaultOtlpExporter(collector, agentConfig) + + assert.Equal(t, "test.com", collector.Exporters.OtlpExporters["default"].Server.Host) + assert.Equal(t, 8080, collector.Exporters.OtlpExporters["default"].Server.Port) + assert.False(t, collector.Exporters.OtlpExporters["default"].TLS.SkipVerify) + assert.Equal(t, "headers_setter", collector.Exporters.OtlpExporters["default"].Authenticator) + assert.Equal(t, "insert", collector.Extensions.HeadersSetter.Headers[0].Action) + assert.Equal(t, "authorization", collector.Extensions.HeadersSetter.Headers[0].Key) + assert.Equal(t, "token", collector.Extensions.HeadersSetter.Headers[0].Value) + }) + + t.Run("Test 2: Command and Auxiliary Command servers", func(t *testing.T) { + collector := &Collector{} + agentConfig := &Config{ + Command: &Command{ + Server: &ServerConfig{ + Host: "test.com", + Port: 8080, + Type: Grpc, + }, + Auth: &AuthConfig{ + Token: "token", + }, + TLS: &TLSConfig{ + SkipVerify: false, + }, + }, + AuxiliaryCommand: &Command{ + Server: &ServerConfig{ + Host: "aux-test.com", + Port: 9090, + Type: Grpc, + }, + Auth: &AuthConfig{ + Token: "aux-token", + }, + TLS: &TLSConfig{ + SkipVerify: false, + }, + }, + } + + addDefaultOtlpExporter(collector, agentConfig) + + assert.Equal(t, "aux-test.com", collector.Exporters.OtlpExporters["default"].Server.Host) + assert.Equal(t, 9090, collector.Exporters.OtlpExporters["default"].Server.Port) + assert.False(t, collector.Exporters.OtlpExporters["default"].TLS.SkipVerify) + assert.Equal(t, "headers_setter", collector.Exporters.OtlpExporters["default"].Authenticator) + assert.Equal(t, "insert", collector.Extensions.HeadersSetter.Headers[0].Action) + assert.Equal(t, "authorization", collector.Extensions.HeadersSetter.Headers[0].Key) + assert.Equal(t, "aux-token", collector.Extensions.HeadersSetter.Headers[0].Value) + }) +} + func agentConfig() *Config { return &Config{ UUID: "", @@ -796,8 +865,8 @@ func agentConfig() *Config { Collector: &Collector{ ConfigPath: "/etc/nginx-agent/nginx-agent-otelcol.yaml", Exporters: Exporters{ - OtlpExporters: []OtlpExporter{ - { + OtlpExporters: map[string]*OtlpExporter{ + "default": { Server: &ServerConfig{ Host: "127.0.0.1", Port: 1234, @@ -814,16 +883,20 @@ func agentConfig() *Config { }, }, Processors: Processors{ - Batch: &Batch{ - SendBatchMaxSize: DefCollectorBatchProcessorSendBatchMaxSize, - SendBatchSize: DefCollectorBatchProcessorSendBatchSize, - Timeout: DefCollectorBatchProcessorTimeout, + Batch: map[string]*Batch{ + "default_logs": { + SendBatchMaxSize: DefCollectorLogsBatchProcessorSendBatchMaxSize, + SendBatchSize: DefCollectorLogsBatchProcessorSendBatchSize, + Timeout: DefCollectorLogsBatchProcessorTimeout, + }, + }, + LogsGzip: map[string]*LogsGzip{ + "default": {}, }, - LogsGzip: &LogsGzip{}, }, Receivers: Receivers{ - OtlpReceivers: []OtlpReceiver{ - { + OtlpReceivers: map[string]*OtlpReceiver{ + "default": { Server: &ServerConfig{ Host: "localhost", Port: 4317, @@ -937,8 +1010,8 @@ func createConfig() *Config { Collector: &Collector{ ConfigPath: "/etc/nginx-agent/nginx-agent-otelcol.yaml", Exporters: Exporters{ - OtlpExporters: []OtlpExporter{ - { + OtlpExporters: map[string]*OtlpExporter{ + "default": { Server: &ServerConfig{ Host: "127.0.0.1", Port: 5643, @@ -969,25 +1042,41 @@ func createConfig() *Config { Debug: &DebugExporter{}, }, Processors: Processors{ - Batch: &Batch{ - SendBatchMaxSize: 1, - SendBatchSize: 8199, - Timeout: 30 * time.Second, + Batch: map[string]*Batch{ + "default": { + SendBatchMaxSize: 1, + SendBatchSize: 8199, + Timeout: 30 * time.Second, + }, + "default_metrics": { + SendBatchMaxSize: 1000, + SendBatchSize: 1000, + Timeout: 30 * time.Second, + }, + "default_logs": { + SendBatchMaxSize: 100, + SendBatchSize: 100, + Timeout: 60 * time.Second, + }, }, - Attribute: &Attribute{ - Actions: []Action{ - { - Key: "test", - Action: "insert", - Value: "value", + Attribute: map[string]*Attribute{ + "default": { + Actions: []Action{ + { + Key: "test", + Action: "insert", + Value: "value", + }, }, }, }, - LogsGzip: &LogsGzip{}, + LogsGzip: map[string]*LogsGzip{ + "default": {}, + }, }, Receivers: Receivers{ - OtlpReceivers: []OtlpReceiver{ - { + OtlpReceivers: map[string]*OtlpReceiver{ + "default": { Server: &ServerConfig{ Host: "127.0.0.1", Port: 4317, @@ -1005,26 +1094,6 @@ func createConfig() *Config { }, }, }, - NginxReceivers: []NginxReceiver{ - { - InstanceID: "cd7b8911-c2c5-4daf-b311-dbead151d938", - AccessLogs: []AccessLog{ - { - LogFormat: "$remote_addr - $remote_user [$time_local] \"$request\"" + - " $status $body_bytes_sent \"$http_referer\" \"$http_user_agent\" " + - "\"$http_x_forwarded_for\"", - FilePath: "/var/log/nginx/access-custom.conf", - }, - }, - CollectionInterval: 30 * time.Second, - }, - }, - NginxPlusReceivers: []NginxPlusReceiver{ - { - InstanceID: "cd7b8911-c2c5-4daf-b311-dbead151d939", - CollectionInterval: 30 * time.Second, - }, - }, HostMetrics: &HostMetrics{ CollectionInterval: 10 * time.Second, InitialDelay: 2 * time.Second, @@ -1066,6 +1135,22 @@ func createConfig() *Config { Level: "INFO", Path: "/var/log/nginx-agent/opentelemetry-collector-agent.log", }, + Pipelines: Pipelines{ + Metrics: map[string]*Pipeline{ + "default": { + Receivers: []string{"host_metrics", "nginx_metrics"}, + Processors: []string{"batch/default_metrics"}, + Exporters: []string{"otlp/default"}, + }, + }, + Logs: map[string]*Pipeline{ + "default": { + Receivers: []string{"tcplog/nginx_app_protect"}, + Processors: []string{"logsgzip/default", "batch/default_logs"}, + Exporters: []string{"otlp/default"}, + }, + }, + }, }, Command: &Command{ Server: &ServerConfig{ diff --git a/internal/config/defaults.go b/internal/config/defaults.go index 6c4a1ab3d..2a783d6f7 100644 --- a/internal/config/defaults.go +++ b/internal/config/defaults.go @@ -73,9 +73,12 @@ const ( DefCollectorTLSCAPath = "/var/lib/nginx-agent/ca.pem" DefCollectorTLSSANNames = "127.0.0.1,::1,localhost" - DefCollectorBatchProcessorSendBatchSize = 1000 - DefCollectorBatchProcessorSendBatchMaxSize = 1000 - DefCollectorBatchProcessorTimeout = 30 * time.Second + DefCollectorMetricsBatchProcessorSendBatchSize = 1000 + DefCollectorMetricsBatchProcessorSendBatchMaxSize = 1000 + DefCollectorMetricsBatchProcessorTimeout = 30 * time.Second + DefCollectorLogsBatchProcessorSendBatchSize = 100 + DefCollectorLogsBatchProcessorSendBatchMaxSize = 100 + DefCollectorLogsBatchProcessorTimeout = 60 * time.Second DefCollectorExtensionsHealthServerHost = "localhost" DefCollectorExtensionsHealthServerPort = 13133 diff --git a/internal/config/flags.go b/internal/config/flags.go index 58a2d8454..189046907 100644 --- a/internal/config/flags.go +++ b/internal/config/flags.go @@ -49,7 +49,6 @@ var ( CollectorConfigPathKey = pre(CollectorRootKey) + "config_path" CollectorExportersKey = pre(CollectorRootKey) + "exporters" - CollectorAttributeProcessorKey = pre(CollectorProcessorsKey) + "attribute" CollectorDebugExporterKey = pre(CollectorExportersKey) + "debug" CollectorPrometheusExporterKey = pre(CollectorExportersKey) + "prometheus" CollectorPrometheusExporterServerHostKey = pre(CollectorPrometheusExporterKey) + "server_host" @@ -62,11 +61,6 @@ var ( CollectorPrometheusExporterTLSServerNameKey = pre(CollectorPrometheusExporterTLSKey) + "server_name" CollectorOtlpExportersKey = pre(CollectorExportersKey) + "otlp" CollectorProcessorsKey = pre(CollectorRootKey) + "processors" - CollectorBatchProcessorKey = pre(CollectorProcessorsKey) + "batch" - CollectorBatchProcessorSendBatchSizeKey = pre(CollectorBatchProcessorKey) + "send_batch_size" - CollectorBatchProcessorSendBatchMaxSizeKey = pre(CollectorBatchProcessorKey) + "send_batch_max_size" - CollectorBatchProcessorTimeoutKey = pre(CollectorBatchProcessorKey) + "timeout" - CollectorLogsGzipProcessorKey = pre(CollectorProcessorsKey) + "logsgzip" CollectorExtensionsKey = pre(CollectorRootKey) + "extensions" CollectorExtensionsHealthKey = pre(CollectorExtensionsKey) + "health" CollectorExtensionsHealthServerHostKey = pre(CollectorExtensionsHealthKey) + "server_host" @@ -79,6 +73,9 @@ var ( CollectorExtensionsHealthTLSServerNameKey = pre(CollectorExtensionsHealthTLSKey) + "server_name" CollectorExtensionsHealthTLSSkipVerifyKey = pre(CollectorExtensionsHealthTLSKey) + "skip_verify" CollectorExtensionsHeadersSetterKey = pre(CollectorExtensionsKey) + "headers_setter" + CollectorPipelinesKey = pre(CollectorRootKey) + "pipelines" + CollectorMetricsPipelinesKey = pre(CollectorPipelinesKey) + "metrics" + CollectorLogsPipelinesKey = pre(CollectorPipelinesKey) + "logs" CollectorReceiversKey = pre(CollectorRootKey) + "receivers" CollectorLogKey = pre(CollectorRootKey) + "log" CollectorLogLevelKey = pre(CollectorLogKey) + "level" diff --git a/internal/config/testdata/nginx-agent.conf b/internal/config/testdata/nginx-agent.conf index 0b4601a55..1aeb11221 100644 --- a/internal/config/testdata/nginx-agent.conf +++ b/internal/config/testdata/nginx-agent.conf @@ -90,46 +90,42 @@ collector: config_path: "/etc/nginx-agent/nginx-agent-otelcol.yaml" receivers: otlp: - - server: - host: "127.0.0.1" - port: 4317 - auth: - token: "secret-receiver-token" - tls: - generate_self_signed_cert: false - server_name: "test-local-server" - skip_verify: true - ca: /tmp/ca.pem - cert: /tmp/cert.pem - key: /tmp/key.pem - nginx: - - instance_id: cd7b8911-c2c5-4daf-b311-dbead151d938 - access_logs: - - file_path: "/var/log/nginx/access-custom.conf" - log_format: "$remote_addr - $remote_user [$time_local] \"$request\" $status $body_bytes_sent \"$http_referer\" \"$http_user_agent\" \"$http_x_forwarded_for\"" - collection_interval: 30s - nginx_plus: - - instance_id: cd7b8911-c2c5-4daf-b311-dbead151d939 - collection_interval: 30s + "default": + server: + host: "127.0.0.1" + port: 4317 + auth: + token: "secret-receiver-token" + tls: + generate_self_signed_cert: false + server_name: "test-local-server" + skip_verify: true + ca: /tmp/ca.pem + cert: /tmp/cert.pem + key: /tmp/key.pem host_metrics: - collection_interval: 10s - initial_delay: 2s - scrapers: - cpu: {} + collection_interval: 10s + initial_delay: 2s + scrapers: + cpu: {} processors: - batch: + batch: + "default": send_batch_max_size: 1 send_batch_size: 8199 timeout: 30s attribute: - actions: - - key: "test" - action: "insert" - value: "value" - logsgzip: {} + "default": + actions: + - key: "test" + action: "insert" + value: "value" + logsgzip: + "default": {} exporters: otlp: - - server: + "default": + server: host: "127.0.0.1" port: 5643 authenticator: "test-saas-token" @@ -140,22 +136,22 @@ collector: key: /path/to/server-key.pem ca: /path/to/server-cert.pem prometheus: - server: - host: "127.0.0.1" - port: 1235 - tls: - server_name: "test-server" - skip_verify: false - cert: /path/to/server-cert.pem - key: /path/to/server-key.pem - ca: /path/to/server-cert.pem + server: + host: "127.0.0.1" + port: 1235 + tls: + server_name: "test-server" + skip_verify: false + cert: /path/to/server-cert.pem + key: /path/to/server-key.pem + ca: /path/to/server-cert.pem debug: {} extensions: headers_setter: - headers: - - action: "action" - key: "key" - value: "value" + headers: + - action: "action" + key: "key" + value: "value" health: server: host: "127.0.0.1" @@ -167,6 +163,6 @@ collector: cert: /path/to/server-cert.pem key: /path/to/server-key.pem ca: /path/to/server-ca.pem - log: + log: level: "INFO" path: "/var/log/nginx-agent/opentelemetry-collector-agent.log" diff --git a/internal/config/types.go b/internal/config/types.go index b0db71462..986ffcc9d 100644 --- a/internal/config/types.go +++ b/internal/config/types.go @@ -104,13 +104,25 @@ type ( Exporters Exporters `yaml:"exporters" mapstructure:"exporters"` Extensions Extensions `yaml:"extensions" mapstructure:"extensions"` Processors Processors `yaml:"processors" mapstructure:"processors"` + Pipelines Pipelines `yaml:"pipelines" mapstructure:"pipelines"` Receivers Receivers `yaml:"receivers" mapstructure:"receivers"` } + Pipelines struct { + Metrics map[string]*Pipeline `yaml:"metrics" mapstructure:"metrics"` + Logs map[string]*Pipeline `yaml:"logs" mapstructure:"logs"` + } + + Pipeline struct { + Receivers []string `yaml:"receivers" mapstructure:"receivers"` + Processors []string `yaml:"processors" mapstructure:"processors"` + Exporters []string `yaml:"exporters" mapstructure:"exporters"` + } + Exporters struct { - Debug *DebugExporter `yaml:"debug" mapstructure:"debug"` - PrometheusExporter *PrometheusExporter `yaml:"prometheus" mapstructure:"prometheus"` - OtlpExporters []OtlpExporter `yaml:"otlp" mapstructure:"otlp"` + Debug *DebugExporter `yaml:"debug" mapstructure:"debug"` + PrometheusExporter *PrometheusExporter `yaml:"prometheus" mapstructure:"prometheus"` + OtlpExporters map[string]*OtlpExporter `yaml:"otlp" mapstructure:"otlp"` } OtlpExporter struct { @@ -153,10 +165,10 @@ type ( // OTel Collector Processors configuration. Processors struct { - Attribute *Attribute `yaml:"attribute" mapstructure:"attribute"` - Resource *Resource `yaml:"resource" mapstructure:"resource"` - Batch *Batch `yaml:"batch" mapstructure:"batch"` - LogsGzip *LogsGzip `yaml:"logsgzip" mapstructure:"logsgzip"` + Attribute map[string]*Attribute `yaml:"attribute" mapstructure:"attribute"` + Resource map[string]*Resource `yaml:"resource" mapstructure:"resource"` + Batch map[string]*Batch `yaml:"batch" mapstructure:"batch"` + LogsGzip map[string]*LogsGzip `yaml:"logsgzip" mapstructure:"logsgzip"` } Attribute struct { @@ -189,12 +201,12 @@ type ( // OTel Collector Receiver configuration. Receivers struct { - ContainerMetrics *ContainerMetricsReceiver `yaml:"container_metrics" mapstructure:"container_metrics"` - HostMetrics *HostMetrics `yaml:"host_metrics" mapstructure:"host_metrics"` - OtlpReceivers []OtlpReceiver `yaml:"otlp" mapstructure:"otlp"` - NginxReceivers []NginxReceiver `yaml:"nginx" mapstructure:"nginx"` - NginxPlusReceivers []NginxPlusReceiver `yaml:"nginx_plus" mapstructure:"nginx_plus"` - TcplogReceivers []TcplogReceiver `yaml:"tcplog" mapstructure:"tcplog"` + ContainerMetrics *ContainerMetricsReceiver `yaml:"container_metrics" mapstructure:"container_metrics"` + HostMetrics *HostMetrics `yaml:"host_metrics" mapstructure:"host_metrics"` + OtlpReceivers map[string]*OtlpReceiver `yaml:"otlp" mapstructure:"otlp"` + TcplogReceivers map[string]*TcplogReceiver `yaml:"tcplog" mapstructure:"tcplog"` + NginxReceivers []NginxReceiver `yaml:"-"` + NginxPlusReceivers []NginxPlusReceiver `yaml:"-"` } OtlpReceiver struct { @@ -376,24 +388,6 @@ func (c *Config) IsAuxiliaryCommandGrpcClientConfigured() bool { c.AuxiliaryCommand.Server.Type == Grpc } -func (c *Config) IsCommandAuthConfigured() bool { - return c.Command.Auth != nil && - (c.Command.Auth.Token != "" || c.Command.Auth.TokenPath != "") -} - -func (c *Config) IsAuxiliaryCommandAuthConfigured() bool { - return c.AuxiliaryCommand.Auth != nil && - (c.AuxiliaryCommand.Auth.Token != "" || c.AuxiliaryCommand.Auth.TokenPath != "") -} - -func (c *Config) IsCommandTLSConfigured() bool { - return c.Command.TLS != nil -} - -func (c *Config) IsAuxiliaryCommandTLSConfigured() bool { - return c.AuxiliaryCommand.TLS != nil -} - func (c *Config) IsFeatureEnabled(feature string) bool { for _, enabledFeature := range c.Features { if enabledFeature == feature { diff --git a/test/config/collector/test-opentelemetry-collector-agent.yaml b/test/config/collector/test-opentelemetry-collector-agent.yaml index 9a17adc3f..914793b01 100644 --- a/test/config/collector/test-opentelemetry-collector-agent.yaml +++ b/test/config/collector/test-opentelemetry-collector-agent.yaml @@ -1,6 +1,6 @@ receivers: containermetrics: - collection_interval: 1s + collection_interval: "1s" hostmetrics: collection_interval: 1m0s initial_delay: 1s @@ -18,7 +18,7 @@ receivers: system.memory.limit: enabled: true network: - otlp/0: + otlp/default: protocols: grpc: endpoint: "localhost:4317" @@ -35,7 +35,7 @@ receivers: access_logs: - log_format: "$remote_addr - $remote_user [$time_local] \"$request\" $status $body_bytes_sent \"$http_referer\" \"$http_user_agent\" \"$http_x_forwarded_for\"\"$upstream_cache_status\"" file_path: "/var/log/nginx/access-custom.conf" - tcplog/0: + tcplog/default: listen_address: "localhost:151" operators: - type: add @@ -45,18 +45,18 @@ receivers: field: attributes.message processors: - resource: + resource/default: attributes: - key: resource.id action: add value: 12345 - batch: + batch/default: send_batch_size: 1000 timeout: 30s send_batch_max_size: 1000 exporters: - otlp/0: + otlp/default: endpoint: "127.0.0.1:1234" compression: none timeout: 10s @@ -100,26 +100,27 @@ service: extensions: - health_check - headers_setter + pipelines: - metrics: + metrics/default: receivers: - - containermetrics - hostmetrics - - otlp/0 + - containermetrics + - otlp/default - nginx/123 processors: - - resource - - batch + - resource/default + - batch/default exporters: - - otlp/0 + - otlp/default - prometheus - debug - logs: + logs/default: receivers: - - tcplog/0 + - tcplog/default processors: - - resource - - batch + - resource/default + - batch/default exporters: - - otlp/0 + - otlp/default - debug diff --git a/test/types/config.go b/test/types/config.go index 29775f664..04dec87e5 100644 --- a/test/types/config.go +++ b/test/types/config.go @@ -65,8 +65,8 @@ func AgentConfig() *config.Config { Collector: &config.Collector{ ConfigPath: "/etc/nginx-agent/nginx-agent-otelcol.yaml", Exporters: config.Exporters{ - OtlpExporters: []config.OtlpExporter{ - { + OtlpExporters: map[string]*config.OtlpExporter{ + "default": { Server: &config.ServerConfig{ Host: "127.0.0.1", Port: 0, @@ -76,15 +76,17 @@ func AgentConfig() *config.Config { }, }, Processors: config.Processors{ - Batch: &config.Batch{ - SendBatchSize: config.DefCollectorBatchProcessorSendBatchSize, - SendBatchMaxSize: config.DefCollectorBatchProcessorSendBatchMaxSize, - Timeout: config.DefCollectorBatchProcessorTimeout, + Batch: map[string]*config.Batch{ + "default": { + SendBatchSize: config.DefCollectorMetricsBatchProcessorSendBatchMaxSize, + SendBatchMaxSize: config.DefCollectorMetricsBatchProcessorSendBatchMaxSize, + Timeout: config.DefCollectorMetricsBatchProcessorTimeout, + }, }, }, Receivers: config.Receivers{ - OtlpReceivers: []config.OtlpReceiver{ - { + OtlpReceivers: map[string]*config.OtlpReceiver{ + "default": { Server: &config.ServerConfig{ Host: "127.0.0.1", Port: 0, @@ -128,6 +130,15 @@ func AgentConfig() *config.Config { Level: "INFO", Path: "/var/log/nginx-agent/opentelemetry-collector-agent.log", }, + Pipelines: config.Pipelines{ + Metrics: map[string]*config.Pipeline{ + "default": { + Receivers: []string{"host_metrics"}, + Processors: []string{"batch/default"}, + Exporters: []string{"otlp/default"}, + }, + }, + }, }, Command: &config.Command{ Server: &config.ServerConfig{ @@ -179,11 +190,11 @@ func OTelConfig(t *testing.T) *config.Config { exporterPort, expErr := helpers.RandomPort(t) require.NoError(t, expErr) - ac.Collector.Exporters.OtlpExporters[0].Server.Port = exporterPort + ac.Collector.Exporters.OtlpExporters["default"].Server.Port = exporterPort receiverPort, recErr := helpers.RandomPort(t) require.NoError(t, recErr) - ac.Collector.Receivers.OtlpReceivers[0].Server.Port = receiverPort + ac.Collector.Receivers.OtlpReceivers["default"].Server.Port = receiverPort healthPort, healthErr := helpers.RandomPort(t) require.NoError(t, healthErr) From e12de1a6e0b75aea2e2208bac1d0cffdb57ad916 Mon Sep 17 00:00:00 2001 From: Donal Hurley Date: Tue, 15 Jul 2025 16:29:47 +0100 Subject: [PATCH 2/3] Update load tests --- test/config/agent/nginx-agent-otel-load.conf | 23 ++++++++++++++------ 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/test/config/agent/nginx-agent-otel-load.conf b/test/config/agent/nginx-agent-otel-load.conf index 2ee5e8651..4c7590bf3 100644 --- a/test/config/agent/nginx-agent-otel-load.conf +++ b/test/config/agent/nginx-agent-otel-load.conf @@ -20,21 +20,30 @@ allowed_directories: collector: receivers: otlp: - - server: + "default": + server: host: "127.0.0.1" port: 4317 processors: batch: - send_batch_size: 8192 - timeout: 200ms - send_batch_max_size: 0 + "default": + send_batch_size: 8192 + timeout: 200ms + send_batch_max_size: 0 exporters: otlp: - - server: - host: "127.0.0.1" - port: 5643 + "default": + server: + host: "127.0.0.1" + port: 5643 extensions: health: server: host: "127.0.0.1" port: 1337 + pipelines: + metrics: + "default": + receivers: ["otlp/default"] + processors: ["batch/default"] + exporters: ["otlp/default"] From e0c5a0e478fa19b3a39bc4e3d022290d10da5a3c Mon Sep 17 00:00:00 2001 From: Donal Hurley Date: Thu, 17 Jul 2025 13:26:25 +0100 Subject: [PATCH 3/3] Clean up --- internal/collector/otel_collector_plugin.go | 4 ++-- .../collector/otel_collector_plugin_test.go | 24 +++++++++---------- internal/collector/otelcol.tmpl | 6 ++--- internal/config/config.go | 9 +++++-- internal/plugin/plugin_manager.go | 2 +- .../test-opentelemetry-collector-agent.yaml | 2 +- 6 files changed, 26 insertions(+), 21 deletions(-) diff --git a/internal/collector/otel_collector_plugin.go b/internal/collector/otel_collector_plugin.go index 64b007002..39399c211 100644 --- a/internal/collector/otel_collector_plugin.go +++ b/internal/collector/otel_collector_plugin.go @@ -59,8 +59,8 @@ var ( initMutex = &sync.Mutex{} ) -// New is the constructor for the Collector plugin. -func New(conf *config.Config) (*Collector, error) { +// NewCollector is the constructor for the Collector plugin. +func NewCollector(conf *config.Config) (*Collector, error) { initMutex.Lock() defer initMutex.Unlock() diff --git a/internal/collector/otel_collector_plugin_test.go b/internal/collector/otel_collector_plugin_test.go index f3f3e6722..2067047ee 100644 --- a/internal/collector/otel_collector_plugin_test.go +++ b/internal/collector/otel_collector_plugin_test.go @@ -67,7 +67,7 @@ func TestCollector_New(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - collector, err := New(tt.config) + collector, err := NewCollector(tt.config) if tt.expectedError != nil { require.Error(t, err) @@ -114,7 +114,7 @@ func TestCollector_Init(t *testing.T) { conf.Collector.Receivers = config.Receivers{} } - collector, err = New(conf) + collector, err = NewCollector(conf) require.NoError(t, err, "NewCollector should not return an error with valid config") collector.service = createFakeCollector() @@ -133,7 +133,7 @@ func TestCollector_InitAndClose(t *testing.T) { conf := types.OTelConfig(t) conf.Collector.Log.Path = "" - collector, err := New(conf) + collector, err := NewCollector(conf) require.NoError(t, err, "NewCollector should not return an error with valid config") ctx := context.Background() @@ -293,7 +293,7 @@ func TestCollector_ProcessNginxConfigUpdateTopic(t *testing.T) { conf.Collector.Extensions.HeadersSetter = nil conf.Collector.Exporters.PrometheusExporter = nil - collector, err := New(conf) + collector, err := NewCollector(conf) require.NoError(tt, err, "NewCollector should not return an error with valid config") collector.service = createFakeCollector() @@ -378,7 +378,7 @@ func TestCollector_ProcessResourceUpdateTopic(t *testing.T) { for _, test := range tests { t.Run(test.name, func(tt *testing.T) { - collector, err := New(conf) + collector, err := NewCollector(conf) require.NoError(tt, err, "NewCollector should not return an error with valid config") collector.service = createFakeCollector() @@ -439,7 +439,7 @@ func TestCollector_ProcessResourceUpdateTopicFails(t *testing.T) { for _, test := range tests { t.Run(test.name, func(tt *testing.T) { - collector, err := New(conf) + collector, err := NewCollector(conf) require.NoError(tt, err, "NewCollector should not return an error with valid config") collector.service = createFakeCollector() @@ -561,7 +561,7 @@ func TestCollector_updateExistingNginxOSSReceiver(t *testing.T) { for _, test := range tests { t.Run(test.name, func(tt *testing.T) { conf.Collector.Receivers = test.existingReceivers - collector, err := New(conf) + collector, err := NewCollector(conf) require.NoError(tt, err, "NewCollector should not return an error with valid config") collector.service = createFakeCollector() @@ -652,7 +652,7 @@ func TestCollector_updateExistingNginxPlusReceiver(t *testing.T) { for _, test := range tests { t.Run(test.name, func(tt *testing.T) { conf.Collector.Receivers = test.existingReceivers - collector, err := New(conf) + collector, err := NewCollector(conf) require.NoError(tt, err, "NewCollector should not return an error with valid config") collector.service = createFakeCollector() @@ -707,7 +707,7 @@ func TestCollector_updateResourceAttributes(t *testing.T) { for _, test := range tests { t.Run(test.name, func(tt *testing.T) { - collector, err := New(conf) + collector, err := NewCollector(conf) require.NoError(tt, err, "NewCollector should not return an error with valid config") collector.service = createFakeCollector() @@ -732,7 +732,7 @@ func TestCollector_updateNginxAppProtectTcplogReceivers(t *testing.T) { conf.Collector.Processors.Attribute = nil conf.Collector.Processors.Resource = nil conf.Collector.Processors.LogsGzip = nil - collector, err := New(conf) + collector, err := NewCollector(conf) require.NoError(t, err) nginxConfigContext := &model.NginxConfigContext{ @@ -743,7 +743,7 @@ func TestCollector_updateNginxAppProtectTcplogReceivers(t *testing.T) { assert.Empty(t, conf.Collector.Receivers.TcplogReceivers) - t.Run("Test 1: New TcplogReceiver added", func(tt *testing.T) { + t.Run("Test 1: NewCollector TcplogReceiver added", func(tt *testing.T) { tcplogReceiverAdded := collector.updateNginxAppProtectTcplogReceivers(nginxConfigContext) assert.True(tt, tcplogReceiverAdded) @@ -768,7 +768,7 @@ func TestCollector_updateNginxAppProtectTcplogReceivers(t *testing.T) { assert.Empty(t, conf.Collector.Receivers.TcplogReceivers) }) - t.Run("Test 4: New tcplogReceiver added and deleted another", func(tt *testing.T) { + t.Run("Test 4: NewCollector tcplogReceiver added and deleted another", func(tt *testing.T) { tcplogReceiverDeleted := collector.updateNginxAppProtectTcplogReceivers( &model.NginxConfigContext{ NAPSysLogServers: []string{ diff --git a/internal/collector/otelcol.tmpl b/internal/collector/otelcol.tmpl index 480101428..b331152c0 100644 --- a/internal/collector/otelcol.tmpl +++ b/internal/collector/otelcol.tmpl @@ -1,8 +1,8 @@ receivers: - {{- with .Receivers.ContainerMetrics }} +{{- if ne .Receivers.ContainerMetrics nil }} containermetrics: - {{- with .CollectionInterval }} - collection_interval: "{{ . }}" + {{- if .Receivers.ContainerMetrics.CollectionInterval }} + collection_interval: {{ .Receivers.ContainerMetrics.CollectionInterval }} {{- end}} {{- end }} diff --git a/internal/config/config.go b/internal/config/config.go index 9075294c2..79e661efc 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -143,10 +143,15 @@ func ResolveConfig() (*Config, error) { } func defaultCollector(collector *Collector, config *Config) { + // Always add default host metric receiver and default processor addDefaultHostMetricsReceiver(collector) addDefaultProcessors(collector) - addDefaultOtlpExporter(collector, config) - addDefaultPipelines(collector) + + // Only add default otlp exporter and pipelines if connected to a management plane + if config.IsCommandGrpcClientConfigured() || config.IsAuxiliaryCommandGrpcClientConfigured() { + addDefaultOtlpExporter(collector, config) + addDefaultPipelines(collector) + } } func addDefaultPipelines(collector *Collector) { diff --git a/internal/plugin/plugin_manager.go b/internal/plugin/plugin_manager.go index 3f2c3869a..fde011d8b 100644 --- a/internal/plugin/plugin_manager.go +++ b/internal/plugin/plugin_manager.go @@ -96,7 +96,7 @@ func addCollectorPlugin(ctx context.Context, agentConfig *config.Config, plugins return plugins } if agentConfig.IsACollectorExporterConfigured() { - oTelCollector, err := collector.New(agentConfig) + oTelCollector, err := collector.NewCollector(agentConfig) if err == nil { plugins = append(plugins, oTelCollector) } else { diff --git a/test/config/collector/test-opentelemetry-collector-agent.yaml b/test/config/collector/test-opentelemetry-collector-agent.yaml index 914793b01..ed9acec91 100644 --- a/test/config/collector/test-opentelemetry-collector-agent.yaml +++ b/test/config/collector/test-opentelemetry-collector-agent.yaml @@ -1,6 +1,6 @@ receivers: containermetrics: - collection_interval: "1s" + collection_interval: 1s hostmetrics: collection_interval: 1m0s initial_delay: 1s