Skip to content

Commit 78c82fe

Browse files
feat(otel): share instance of Beat processor between pipelines
1 parent 3e72f7f commit 78c82fe

File tree

5 files changed

+284
-3
lines changed

5 files changed

+284
-3
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,7 @@ require (
255255
go.opentelemetry.io/collector/pipeline v1.53.0
256256
go.opentelemetry.io/collector/processor v1.53.0
257257
go.opentelemetry.io/collector/processor/processorhelper v0.147.0
258+
go.opentelemetry.io/collector/processor/processortest v0.147.0
258259
go.opentelemetry.io/collector/receiver/receivertest v0.147.0
259260
go.opentelemetry.io/collector/service v0.147.0
260261
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.65.0
@@ -490,7 +491,6 @@ require (
490491
go.opentelemetry.io/collector/pdata/testdata v0.147.0 // indirect
491492
go.opentelemetry.io/collector/pdata/xpdata v0.147.0 // indirect
492493
go.opentelemetry.io/collector/pipeline/xpipeline v0.147.0 // indirect
493-
go.opentelemetry.io/collector/processor/processortest v0.147.0 // indirect
494494
go.opentelemetry.io/collector/processor/xprocessor v0.147.0 // indirect
495495
go.opentelemetry.io/collector/receiver/xreceiver v0.147.0 // indirect
496496
go.opentelemetry.io/collector/service/hostcapabilities v0.147.0 // indirect

x-pack/filebeat/tests/integration/otel_test.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2028,3 +2028,78 @@ service:
20282028
col.Shutdown()
20292029
}
20302030
}
2031+
2032+
// TestBeatProcessorSharedAcrossPipelines verifies that when the same beat
2033+
// processor component ID is referenced by multiple OTel pipelines, only a
2034+
// single underlying beatProcessor instance is created. This avoids duplicate
2035+
// initialisation of expensive Beat sub-processors (add_cloud_metadata,
2036+
// add_kubernetes_metadata, etc.).
2037+
//
2038+
// The test configures a full OTel Collector with two log pipelines that both
2039+
// reference the same "beat" processor, then asserts that the "Configured Beat
2040+
// processor" log message appears exactly once — proving a single shared instance.
2041+
func TestBeatProcessorSharedAcrossPipelines(t *testing.T) {
2042+
cfg := `service:
2043+
pipelines:
2044+
logs/1:
2045+
receivers:
2046+
- filebeatreceiver/1
2047+
processors:
2048+
- beat
2049+
exporters:
2050+
- debug
2051+
logs/2:
2052+
receivers:
2053+
- filebeatreceiver/2
2054+
processors:
2055+
- beat
2056+
exporters:
2057+
- debug
2058+
telemetry:
2059+
logs:
2060+
level: debug
2061+
metrics:
2062+
level: none
2063+
receivers:
2064+
filebeatreceiver/1:
2065+
filebeat:
2066+
inputs:
2067+
- type: benchmark
2068+
enabled: true
2069+
message: "first test message"
2070+
count: 1
2071+
queue.mem.flush.timeout: 0s
2072+
filebeatreceiver/2:
2073+
filebeat:
2074+
inputs:
2075+
- type: benchmark
2076+
enabled: true
2077+
message: "second test message"
2078+
count: 1
2079+
queue.mem.flush.timeout: 0s
2080+
processors:
2081+
beat:
2082+
processors:
2083+
- add_fields:
2084+
fields:
2085+
env: "test"
2086+
exporters:
2087+
debug:
2088+
`
2089+
col := oteltestcol.New(t, cfg)
2090+
require.NotNil(t, col)
2091+
2092+
require.Eventually(t, func() bool {
2093+
return col.ObservedLogs().
2094+
FilterMessageSnippet("Publish event").
2095+
FilterMessageSnippet(`"message": "first test message"`).Len() == 1
2096+
}, 30*time.Second, 100*time.Millisecond, "Expected log with first test message not found")
2097+
require.Eventually(t, func() bool {
2098+
return col.ObservedLogs().
2099+
FilterMessageSnippet("Publish event").
2100+
FilterMessageSnippet(`"message": "second test message"`).Len() == 1
2101+
}, 30*time.Second, 100*time.Millisecond, "Expected log with second test message not found")
2102+
2103+
processorInstanceCount := col.ObservedLogs().FilterMessageSnippet("Configured Beat processor").Len()
2104+
assert.Equal(t, 1, processorInstanceCount, "expected beat processor to be configured once (shared instance), but got %d", processorInstanceCount)
2105+
}
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
// Adapted from go.opentelemetry.io/collector/internal/sharedcomponent
5+
// (cannot be imported directly because it is an internal package).
6+
7+
// Package sharedcomponent exposes functionality for components to register
8+
// against a shared key, such as a configuration object, in order to be reused
9+
// across multiple pipelines that reference the same component ID.
10+
// This is particularly useful when the component relies on an expensive shared
11+
// resource (e.g. cloud-metadata HTTP calls, Kubernetes API connections) that
12+
// should not be duplicated.
13+
package sharedcomponent // import "github.com/elastic/beats/v7/x-pack/otel/internal/sharedcomponent"
14+
15+
import (
16+
"container/ring"
17+
"context"
18+
"sync"
19+
20+
"go.opentelemetry.io/collector/component"
21+
"go.opentelemetry.io/collector/component/componentstatus"
22+
)
23+
24+
// NewMap creates a new shared-component map.
25+
func NewMap[K comparable, V component.Component]() *Map[K, V] {
26+
return &Map[K, V]{
27+
components: map[K]*Component[V]{},
28+
}
29+
}
30+
31+
// Map keeps a reference to all created instances for a given shared key such
32+
// as a component configuration pointer.
33+
type Map[K comparable, V component.Component] struct {
34+
lock sync.Mutex
35+
components map[K]*Component[V]
36+
}
37+
38+
// LoadOrStore returns the already-created instance for key if one exists,
39+
// otherwise calls create, stores the result, and returns it.
40+
func (m *Map[K, V]) LoadOrStore(key K, create func() (V, error)) (*Component[V], error) {
41+
m.lock.Lock()
42+
defer m.lock.Unlock()
43+
if c, ok := m.components[key]; ok {
44+
return c, nil
45+
}
46+
comp, err := create()
47+
if err != nil {
48+
return nil, err
49+
}
50+
newComp := &Component[V]{
51+
component: comp,
52+
removeFunc: func() {
53+
m.lock.Lock()
54+
defer m.lock.Unlock()
55+
delete(m.components, key)
56+
},
57+
}
58+
m.components[key] = newComp
59+
return newComp, nil
60+
}
61+
62+
// Len returns the number of components currently held in the map.
63+
func (m *Map[K, V]) Len() int {
64+
m.lock.Lock()
65+
defer m.lock.Unlock()
66+
return len(m.components)
67+
}
68+
69+
// Component ensures that the wrapped component is started and stopped only
70+
// once. When stopped it is removed from the Map.
71+
type Component[V component.Component] struct {
72+
component V
73+
74+
startOnce sync.Once
75+
stopOnce sync.Once
76+
removeFunc func()
77+
78+
hostWrapper *hostWrapper
79+
}
80+
81+
// Unwrap returns the original component.
82+
func (c *Component[V]) Unwrap() V {
83+
return c.component
84+
}
85+
86+
// Start starts the underlying component if it has never been started before.
87+
// Subsequent calls are no-ops for the underlying component but register
88+
// additional status reporters.
89+
func (c *Component[V]) Start(ctx context.Context, host component.Host) error {
90+
if c.hostWrapper == nil {
91+
var err error
92+
c.startOnce.Do(func() {
93+
c.hostWrapper = &hostWrapper{
94+
host: host,
95+
sources: make([]componentstatus.Reporter, 0),
96+
previousEvents: ring.New(5),
97+
}
98+
if statusReporter, ok := host.(componentstatus.Reporter); ok {
99+
c.hostWrapper.addSource(statusReporter)
100+
}
101+
c.hostWrapper.Report(componentstatus.NewEvent(componentstatus.StatusStarting))
102+
if err = c.component.Start(ctx, c.hostWrapper); err != nil {
103+
c.hostWrapper.Report(componentstatus.NewPermanentErrorEvent(err))
104+
}
105+
})
106+
return err
107+
}
108+
if statusReporter, ok := host.(componentstatus.Reporter); ok {
109+
c.hostWrapper.addSource(statusReporter)
110+
}
111+
return nil
112+
}
113+
114+
// Shutdown shuts down the underlying component exactly once, then removes it
115+
// from the parent Map so the same configuration can be recreated if needed.
116+
func (c *Component[V]) Shutdown(ctx context.Context) error {
117+
var err error
118+
c.stopOnce.Do(func() {
119+
if c.hostWrapper != nil {
120+
c.hostWrapper.Report(componentstatus.NewEvent(componentstatus.StatusStopping))
121+
}
122+
err = c.component.Shutdown(ctx)
123+
if c.hostWrapper != nil {
124+
if err != nil {
125+
c.hostWrapper.Report(componentstatus.NewPermanentErrorEvent(err))
126+
} else {
127+
c.hostWrapper.Report(componentstatus.NewEvent(componentstatus.StatusStopped))
128+
}
129+
}
130+
c.removeFunc()
131+
})
132+
return err
133+
}
134+
135+
var (
136+
_ component.Host = (*hostWrapper)(nil)
137+
_ componentstatus.Reporter = (*hostWrapper)(nil)
138+
)
139+
140+
type hostWrapper struct {
141+
host component.Host
142+
sources []componentstatus.Reporter
143+
previousEvents *ring.Ring
144+
lock sync.Mutex
145+
}
146+
147+
func (h *hostWrapper) GetExtensions() map[component.ID]component.Component {
148+
return h.host.GetExtensions()
149+
}
150+
151+
func (h *hostWrapper) Report(e *componentstatus.Event) {
152+
h.lock.Lock()
153+
defer h.lock.Unlock()
154+
if len(h.sources) > 0 {
155+
h.previousEvents.Value = e
156+
h.previousEvents = h.previousEvents.Next()
157+
}
158+
for _, s := range h.sources {
159+
s.Report(e)
160+
}
161+
}
162+
163+
func (h *hostWrapper) addSource(s componentstatus.Reporter) {
164+
h.lock.Lock()
165+
defer h.lock.Unlock()
166+
h.previousEvents.Do(func(a any) {
167+
if e, ok := a.(*componentstatus.Event); ok {
168+
s.Report(e)
169+
}
170+
})
171+
h.sources = append(h.sources, s)
172+
}

x-pack/otel/processor/beatprocessor/factory.go

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,23 @@ import (
1212
"go.opentelemetry.io/collector/consumer"
1313
"go.opentelemetry.io/collector/processor"
1414
"go.opentelemetry.io/collector/processor/processorhelper"
15+
16+
"github.com/elastic/beats/v7/x-pack/otel/internal/sharedcomponent"
1517
)
1618

1719
const (
1820
Name = "beat"
1921
)
2022

23+
// sharedProcessors is the map of shared beatProcessor instances, keyed by *Config
24+
// pointer. When the same component ID is referenced in multiple pipelines the
25+
// OTel framework passes the same *Config pointer to every createLogsProcessor
26+
// call, so pointer equality is the correct identity here.
27+
//
28+
// This avoids spinning up duplicate expensive Beat sub-sharedProcessors (e.g.
29+
// add_cloud_metadata, add_kubernetes_metadata) for every pipeline.
30+
var sharedProcessors = sharedcomponent.NewMap[*Config, *beatProcessor]()
31+
2132
func NewFactory() processor.Factory {
2233
return processor.NewFactory(
2334
component.MustNewType(Name),
@@ -40,15 +51,28 @@ func createLogsProcessor(
4051
if !ok {
4152
return nil, fmt.Errorf("failed to cast component config to Beat processor config")
4253
}
43-
beatProcessor, err := newBeatProcessor(set, beatProcessorConfig)
54+
55+
// LoadOrStore creates the beatProcessor only on the first call for a given
56+
// *Config. Subsequent calls for the same config (i.e. a second pipeline)
57+
// return the already-created instance.
58+
shared, err := sharedProcessors.LoadOrStore(beatProcessorConfig, func() (*beatProcessor, error) {
59+
return newBeatProcessor(set, beatProcessorConfig)
60+
})
4461
if err != nil {
4562
return nil, err
4663
}
64+
65+
// Each pipeline gets its own processorhelper wrapper (with its own
66+
// nextConsumer), but all wrappers call the same shared ConsumeLogs.
67+
// Start/Shutdown are delegated to the shared component so the underlying
68+
// beatProcessor is started and stopped exactly once.
4769
return processorhelper.NewLogs(
4870
ctx,
4971
set,
5072
cfg,
5173
nextConsumer,
52-
beatProcessor.ConsumeLogs,
74+
shared.Unwrap().ConsumeLogs,
75+
processorhelper.WithStart(shared.Start),
76+
processorhelper.WithShutdown(shared.Shutdown),
5377
)
5478
}

x-pack/otel/processor/beatprocessor/processor.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import (
99
"errors"
1010
"fmt"
1111

12+
"go.opentelemetry.io/collector/component"
13+
1214
"github.com/elastic/beats/v7/libbeat/beat"
1315
"github.com/elastic/beats/v7/libbeat/processors/actions/addfields"
1416
"github.com/elastic/beats/v7/libbeat/processors/add_cloud_metadata"
@@ -108,6 +110,14 @@ func createProcessor(processorNameAndConfig map[string]any, logpLogger *logp.Log
108110
return nil, errors.New("malformed processor config")
109111
}
110112

113+
func (p *beatProcessor) Start(_ context.Context, _ component.Host) error {
114+
return nil
115+
}
116+
117+
func (p *beatProcessor) Shutdown(_ context.Context) error {
118+
return nil
119+
}
120+
111121
func (p *beatProcessor) ConsumeLogs(_ context.Context, logs plog.Logs) (plog.Logs, error) {
112122
if len(p.processors) == 0 {
113123
return logs, nil

0 commit comments

Comments
 (0)