Skip to content

Commit dc2a89d

Browse files
dhurleyAkshay2191
authored andcommitted
Add support for configuring mutliple OTel pipelines (#1167)
1 parent 752d40f commit dc2a89d

15 files changed

+664
-432
lines changed

internal/collector/otel_collector_plugin.go

Lines changed: 42 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ var (
6060
)
6161

6262
// NewCollector is the constructor for the Collector plugin.
63-
func New(conf *config.Config) (*Collector, error) {
63+
func NewCollector(conf *config.Config) (*Collector, error) {
6464
initMutex.Lock()
6565

6666
defer initMutex.Unlock()
@@ -194,7 +194,7 @@ func (oc *Collector) Subscriptions() []string {
194194
}
195195

196196
// Process receivers and log warning for sub-optimal configurations
197-
func (oc *Collector) processReceivers(ctx context.Context, receivers []config.OtlpReceiver) {
197+
func (oc *Collector) processReceivers(ctx context.Context, receivers map[string]*config.OtlpReceiver) {
198198
for _, receiver := range receivers {
199199
if receiver.OtlpTLSConfig == nil {
200200
slog.WarnContext(ctx, "OTel receiver is configured without TLS. Connections are unencrypted.")
@@ -317,12 +317,13 @@ func (oc *Collector) updateResourceProcessor(resourceUpdateContext *v1.Resource)
317317
resourceProcessorUpdated := false
318318

319319
if oc.config.Collector.Processors.Resource == nil {
320-
oc.config.Collector.Processors.Resource = &config.Resource{
320+
oc.config.Collector.Processors.Resource = make(map[string]*config.Resource)
321+
oc.config.Collector.Processors.Resource["default"] = &config.Resource{
321322
Attributes: make([]config.ResourceAttribute, 0),
322323
}
323324
}
324325

325-
if oc.config.Collector.Processors.Resource != nil &&
326+
if oc.config.Collector.Processors.Resource["default"] != nil &&
326327
resourceUpdateContext.GetResourceId() != "" {
327328
resourceProcessorUpdated = oc.updateResourceAttributes(
328329
[]config.ResourceAttribute{
@@ -432,7 +433,7 @@ func (oc *Collector) checkForNewReceivers(ctx context.Context, nginxConfigContex
432433
}
433434

434435
if oc.config.IsFeatureEnabled(pkgConfig.FeatureLogsNap) {
435-
tcplogReceiversFound := oc.updateTcplogReceivers(nginxConfigContext)
436+
tcplogReceiversFound := oc.updateNginxAppProtectTcplogReceivers(nginxConfigContext)
436437
if tcplogReceiversFound {
437438
reloadCollector = true
438439
}
@@ -542,44 +543,46 @@ func (oc *Collector) updateExistingNginxOSSReceiver(
542543
return nginxReceiverFound, reloadCollector
543544
}
544545

545-
func (oc *Collector) updateTcplogReceivers(nginxConfigContext *model.NginxConfigContext) bool {
546+
func (oc *Collector) updateNginxAppProtectTcplogReceivers(nginxConfigContext *model.NginxConfigContext) bool {
546547
newTcplogReceiverAdded := false
548+
549+
if oc.config.Collector.Receivers.TcplogReceivers == nil {
550+
oc.config.Collector.Receivers.TcplogReceivers = make(map[string]*config.TcplogReceiver)
551+
}
552+
547553
if nginxConfigContext.NAPSysLogServer != "" {
548554
if !oc.doesTcplogReceiverAlreadyExist(nginxConfigContext.NAPSysLogServer) {
549-
oc.config.Collector.Receivers.TcplogReceivers = append(
550-
oc.config.Collector.Receivers.TcplogReceivers,
551-
config.TcplogReceiver{
552-
ListenAddress: nginxConfigContext.NAPSysLogServer,
553-
Operators: []config.Operator{
554-
{
555-
Type: "add",
556-
Fields: map[string]string{
557-
"field": "body",
558-
"value": timestampConversionExpression,
559-
},
555+
oc.config.Collector.Receivers.TcplogReceivers["nginx_app_protect"] = &config.TcplogReceiver{
556+
ListenAddress: nginxConfigContext.NAPSysLogServer,
557+
Operators: []config.Operator{
558+
{
559+
Type: "add",
560+
Fields: map[string]string{
561+
"field": "body",
562+
"value": timestampConversionExpression,
560563
},
561-
{
562-
Type: "syslog_parser",
563-
Fields: map[string]string{
564-
"protocol": "rfc3164",
565-
},
564+
},
565+
{
566+
Type: "syslog_parser",
567+
Fields: map[string]string{
568+
"protocol": "rfc3164",
566569
},
567-
{
568-
Type: "remove",
569-
Fields: map[string]string{
570-
"field": "attributes.message",
571-
},
570+
},
571+
{
572+
Type: "remove",
573+
Fields: map[string]string{
574+
"field": "attributes.message",
572575
},
573-
{
574-
Type: "add",
575-
Fields: map[string]string{
576-
"field": "resource[\"instance.id\"]",
577-
"value": nginxConfigContext.InstanceID,
578-
},
576+
},
577+
{
578+
Type: "add",
579+
Fields: map[string]string{
580+
"field": "resource[\"instance.id\"]",
581+
"value": nginxConfigContext.InstanceID,
579582
},
580583
},
581584
},
582-
)
585+
}
583586

584587
newTcplogReceiverAdded = true
585588
}
@@ -593,23 +596,13 @@ func (oc *Collector) updateTcplogReceivers(nginxConfigContext *model.NginxConfig
593596
func (oc *Collector) areNapReceiversDeleted(nginxConfigContext *model.NginxConfigContext) bool {
594597
listenAddressesToBeDeleted := oc.configDeletedNapReceivers(nginxConfigContext)
595598
if len(listenAddressesToBeDeleted) != 0 {
596-
oc.deleteNapReceivers(listenAddressesToBeDeleted)
599+
delete(oc.config.Collector.Receivers.TcplogReceivers, "nginx_app_protect")
597600
return true
598601
}
599602

600603
return false
601604
}
602605

603-
func (oc *Collector) deleteNapReceivers(listenAddressesToBeDeleted map[string]bool) {
604-
filteredReceivers := (oc.config.Collector.Receivers.TcplogReceivers)[:0]
605-
for _, receiver := range oc.config.Collector.Receivers.TcplogReceivers {
606-
if !listenAddressesToBeDeleted[receiver.ListenAddress] {
607-
filteredReceivers = append(filteredReceivers, receiver)
608-
}
609-
}
610-
oc.config.Collector.Receivers.TcplogReceivers = filteredReceivers
611-
}
612-
613606
func (oc *Collector) configDeletedNapReceivers(nginxConfigContext *model.NginxConfigContext) map[string]bool {
614607
elements := make(map[string]bool)
615608

@@ -645,16 +638,16 @@ func (oc *Collector) updateResourceAttributes(
645638
) (actionUpdated bool) {
646639
actionUpdated = false
647640

648-
if oc.config.Collector.Processors.Resource.Attributes != nil {
641+
if oc.config.Collector.Processors.Resource["default"].Attributes != nil {
649642
OUTER:
650643
for _, toAdd := range attributesToAdd {
651-
for _, action := range oc.config.Collector.Processors.Resource.Attributes {
644+
for _, action := range oc.config.Collector.Processors.Resource["default"].Attributes {
652645
if action.Key == toAdd.Key {
653646
continue OUTER
654647
}
655648
}
656-
oc.config.Collector.Processors.Resource.Attributes = append(
657-
oc.config.Collector.Processors.Resource.Attributes,
649+
oc.config.Collector.Processors.Resource["default"].Attributes = append(
650+
oc.config.Collector.Processors.Resource["default"].Attributes,
658651
toAdd,
659652
)
660653
actionUpdated = true

internal/collector/otel_collector_plugin_test.go

Lines changed: 40 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func TestCollector_New(t *testing.T) {
6767

6868
for _, tt := range tests {
6969
t.Run(tt.name, func(t *testing.T) {
70-
collector, err := New(tt.config)
70+
collector, err := NewCollector(tt.config)
7171

7272
if tt.expectedError != nil {
7373
require.Error(t, err)
@@ -114,7 +114,7 @@ func TestCollector_Init(t *testing.T) {
114114
conf.Collector.Receivers = config.Receivers{}
115115
}
116116

117-
collector, err = New(conf)
117+
collector, err = NewCollector(conf)
118118
require.NoError(t, err, "NewCollector should not return an error with valid config")
119119

120120
collector.service = createFakeCollector()
@@ -133,7 +133,7 @@ func TestCollector_InitAndClose(t *testing.T) {
133133
conf := types.OTelConfig(t)
134134
conf.Collector.Log.Path = ""
135135

136-
collector, err := New(conf)
136+
collector, err := NewCollector(conf)
137137
require.NoError(t, err, "NewCollector should not return an error with valid config")
138138

139139
ctx := context.Background()
@@ -293,7 +293,7 @@ func TestCollector_ProcessNginxConfigUpdateTopic(t *testing.T) {
293293
conf.Collector.Extensions.HeadersSetter = nil
294294
conf.Collector.Exporters.PrometheusExporter = nil
295295

296-
collector, err := New(conf)
296+
collector, err := NewCollector(conf)
297297
require.NoError(tt, err, "NewCollector should not return an error with valid config")
298298

299299
collector.service = createFakeCollector()
@@ -349,12 +349,14 @@ func TestCollector_ProcessResourceUpdateTopic(t *testing.T) {
349349
Data: protos.HostResource(),
350350
},
351351
processors: config.Processors{
352-
Resource: &config.Resource{
353-
Attributes: []config.ResourceAttribute{
354-
{
355-
Key: "resource.id",
356-
Action: "insert",
357-
Value: "1234",
352+
Resource: map[string]*config.Resource{
353+
"default": {
354+
Attributes: []config.ResourceAttribute{
355+
{
356+
Key: "resource.id",
357+
Action: "insert",
358+
Value: "1234",
359+
},
358360
},
359361
},
360362
},
@@ -376,7 +378,7 @@ func TestCollector_ProcessResourceUpdateTopic(t *testing.T) {
376378

377379
for _, test := range tests {
378380
t.Run(test.name, func(tt *testing.T) {
379-
collector, err := New(conf)
381+
collector, err := NewCollector(conf)
380382
require.NoError(tt, err, "NewCollector should not return an error with valid config")
381383

382384
collector.service = createFakeCollector()
@@ -437,7 +439,7 @@ func TestCollector_ProcessResourceUpdateTopicFails(t *testing.T) {
437439

438440
for _, test := range tests {
439441
t.Run(test.name, func(tt *testing.T) {
440-
collector, err := New(conf)
442+
collector, err := NewCollector(conf)
441443
require.NoError(tt, err, "NewCollector should not return an error with valid config")
442444

443445
collector.service = createFakeCollector()
@@ -559,7 +561,7 @@ func TestCollector_updateExistingNginxOSSReceiver(t *testing.T) {
559561
for _, test := range tests {
560562
t.Run(test.name, func(tt *testing.T) {
561563
conf.Collector.Receivers = test.existingReceivers
562-
collector, err := New(conf)
564+
collector, err := NewCollector(conf)
563565
require.NoError(tt, err, "NewCollector should not return an error with valid config")
564566

565567
collector.service = createFakeCollector()
@@ -650,7 +652,7 @@ func TestCollector_updateExistingNginxPlusReceiver(t *testing.T) {
650652
for _, test := range tests {
651653
t.Run(test.name, func(tt *testing.T) {
652654
conf.Collector.Receivers = test.existingReceivers
653-
collector, err := New(conf)
655+
collector, err := NewCollector(conf)
654656
require.NoError(tt, err, "NewCollector should not return an error with valid config")
655657

656658
collector.service = createFakeCollector()
@@ -705,31 +707,32 @@ func TestCollector_updateResourceAttributes(t *testing.T) {
705707

706708
for _, test := range tests {
707709
t.Run(test.name, func(tt *testing.T) {
708-
collector, err := New(conf)
710+
collector, err := NewCollector(conf)
709711
require.NoError(tt, err, "NewCollector should not return an error with valid config")
710712

711713
collector.service = createFakeCollector()
712714

713715
// set up Actions
714-
conf.Collector.Processors.Resource = &config.Resource{Attributes: test.setup}
716+
conf.Collector.Processors.Resource = make(map[string]*config.Resource)
717+
conf.Collector.Processors.Resource["default"] = &config.Resource{Attributes: test.setup}
715718

716719
reloadRequired := collector.updateResourceAttributes(test.attributes)
717720
assert.Equal(tt,
718721
test.expectedAttribs,
719-
conf.Collector.Processors.Resource.Attributes)
722+
conf.Collector.Processors.Resource["default"].Attributes)
720723
assert.Equal(tt, test.expectedReloadRequired, reloadRequired)
721724
})
722725
}
723726
}
724727

725-
func TestCollector_updateTcplogReceivers(t *testing.T) {
728+
func TestCollector_updateNginxAppProtectTcplogReceivers(t *testing.T) {
726729
conf := types.OTelConfig(t)
727730
conf.Collector.Log.Path = ""
728731
conf.Collector.Processors.Batch = nil
729732
conf.Collector.Processors.Attribute = nil
730733
conf.Collector.Processors.Resource = nil
731734
conf.Collector.Processors.LogsGzip = nil
732-
collector, err := New(conf)
735+
collector, err := NewCollector(conf)
733736
require.NoError(t, err)
734737

735738
nginxConfigContext := &model.NginxConfigContext{
@@ -738,38 +741,42 @@ func TestCollector_updateTcplogReceivers(t *testing.T) {
738741

739742
assert.Empty(t, conf.Collector.Receivers.TcplogReceivers)
740743

741-
t.Run("Test 1: New TcplogReceiver added", func(tt *testing.T) {
742-
tcplogReceiverAdded := collector.updateTcplogReceivers(nginxConfigContext)
744+
t.Run("Test 1: NewCollector TcplogReceiver added", func(tt *testing.T) {
745+
tcplogReceiverAdded := collector.updateNginxAppProtectTcplogReceivers(nginxConfigContext)
743746

744747
assert.True(tt, tcplogReceiverAdded)
745748
assert.Len(tt, conf.Collector.Receivers.TcplogReceivers, 1)
746-
assert.Equal(tt, "localhost:151", conf.Collector.Receivers.TcplogReceivers[0].ListenAddress)
747-
assert.Len(tt, conf.Collector.Receivers.TcplogReceivers[0].Operators, 4)
749+
assert.Equal(tt, "localhost:151", conf.Collector.Receivers.TcplogReceivers["nginx_app_protect"].ListenAddress)
750+
assert.Len(tt, conf.Collector.Receivers.TcplogReceivers["nginx_app_protect"].Operators, 4)
748751
})
749752

750-
// Calling updateTcplogReceivers shouldn't update the TcplogReceivers slice
753+
// Calling updateNginxAppProtectTcplogReceivers shouldn't update the TcplogReceivers slice
751754
// since there is already a receiver with the same ListenAddress
752755
t.Run("Test 2: TcplogReceiver already exists", func(tt *testing.T) {
753-
tcplogReceiverAdded := collector.updateTcplogReceivers(nginxConfigContext)
756+
tcplogReceiverAdded := collector.updateNginxAppProtectTcplogReceivers(nginxConfigContext)
754757
assert.False(t, tcplogReceiverAdded)
755758
assert.Len(t, conf.Collector.Receivers.TcplogReceivers, 1)
756-
assert.Equal(t, "localhost:151", conf.Collector.Receivers.TcplogReceivers[0].ListenAddress)
757-
assert.Len(t, conf.Collector.Receivers.TcplogReceivers[0].Operators, 4)
759+
assert.Equal(t, "localhost:151", conf.Collector.Receivers.TcplogReceivers["nginx_app_protect"].ListenAddress)
760+
assert.Len(t, conf.Collector.Receivers.TcplogReceivers["nginx_app_protect"].Operators, 4)
758761
})
759762

760763
t.Run("Test 3: TcplogReceiver deleted", func(tt *testing.T) {
761-
tcplogReceiverDeleted := collector.updateTcplogReceivers(&model.NginxConfigContext{})
764+
tcplogReceiverDeleted := collector.updateNginxAppProtectTcplogReceivers(&model.NginxConfigContext{})
762765
assert.True(t, tcplogReceiverDeleted)
763766
assert.Empty(t, conf.Collector.Receivers.TcplogReceivers)
764767
})
765768

766-
t.Run("Test 4: New tcplogReceiver added and deleted another", func(tt *testing.T) {
767-
tcplogReceiverDeleted := collector.
768-
updateTcplogReceivers(&model.NginxConfigContext{NAPSysLogServer: "localhost:152"})
769+
t.Run("Test 4: NewCollector tcplogReceiver added and deleted another", func(tt *testing.T) {
770+
tcplogReceiverDeleted := collector.updateNginxAppProtectTcplogReceivers(
771+
&model.NginxConfigContext{
772+
NAPSysLogServer: "localhost:152",
773+
},
774+
)
775+
769776
assert.True(t, tcplogReceiverDeleted)
770777
assert.Len(t, conf.Collector.Receivers.TcplogReceivers, 1)
771-
assert.Equal(t, "localhost:152", conf.Collector.Receivers.TcplogReceivers[0].ListenAddress)
772-
assert.Len(t, conf.Collector.Receivers.TcplogReceivers[0].Operators, 4)
778+
assert.Equal(t, "localhost:152", conf.Collector.Receivers.TcplogReceivers["nginx_app_protect"].ListenAddress)
779+
assert.Len(t, conf.Collector.Receivers.TcplogReceivers["nginx_app_protect"].Operators, 4)
773780
})
774781
}
775782

0 commit comments

Comments
 (0)