Skip to content

Commit f2ff4f3

Browse files
committed
Add suppress_health_degradation stream config to prevent optional streams from degrading agent health
Adds a `suppress_health_degradation` config flag at the stream level in the management framework. When set, fetch failures are still logged and tracked but do not mark the agent as Degraded. This lets operators build broad Fleet policies with integrations that may not be running on every enrolled host, without those absent services dragging the entire agent into a degraded state. Unlike a per-input approach, this change lives in the health aggregation layer (`calcState()` in `unit.go`) so every input type gets the flag for free: CEL, httpjson, metricbeat modules, filebeat inputs, and any future inputs that report per-stream health. The flag is read from the stream's existing `Source` protobuf field — no proto changes or per-input modifications needed. Behaviour when `suppress_health_degradation: true`: - Input still retries every `period` - Errors are logged at ERROR level - Per-stream status still reports Degraded/Failed in the streams payload - The stream is excluded from the unit's aggregate health calculation - On recovery, stream status resets to Running as usual Tested on a live Elastic Agent 9.3.1 cluster across three input types (CEL, redis/metrics, nginx/metrics) with 9 permutations — all passed. Supersedes #49492 Fixes: elastic/elastic-agent#12885
1 parent 9dd7c3c commit f2ff4f3

File tree

2 files changed

+203
-8
lines changed

2 files changed

+203
-8
lines changed

x-pack/libbeat/management/unit.go

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,9 @@ import (
1515

1616
// unitState is the current state of a unit
1717
type unitState struct {
18-
state status.Status
19-
msg string
18+
state status.Status
19+
msg string
20+
suppressHealthDegradation bool // when true, this stream's degraded/failed states do not affect the unit's aggregate health
2021
}
2122

2223
type clientUnit interface {
@@ -101,9 +102,20 @@ func getStreamStates(expected client.Expected) (map[string]unitState, []string)
101102
streamIDs := make([]string, len(expectedCfg.Streams))
102103

103104
for idx, stream := range expectedCfg.Streams {
105+
// Check if stream is marked as suppressHealthDegradation in its source config.
106+
// Optional streams still collect data and report per-stream status,
107+
// but their degraded/failed states do not drag the overall unit health down.
108+
suppressHealth := false
109+
if src := stream.GetSource(); src != nil {
110+
if v, ok := src.GetFields()["suppress_health_degradation"]; ok {
111+
suppressHealth = v.GetBoolValue()
112+
}
113+
}
114+
104115
streamState := unitState{
105-
state: status.Unknown,
106-
msg: "",
116+
state: status.Unknown,
117+
msg: "",
118+
suppressHealthDegradation: suppressHealth,
107119
}
108120

109121
if id := stream.GetId(); id != "" {
@@ -216,10 +228,16 @@ func (u *agentUnit) calcState() (status.Status, string) {
216228
return u.inputLevelState.state, u.inputLevelState.msg
217229
}
218230

219-
// inputLevelState state is marked as running, check the stream states
231+
// inputLevelState state is marked as running, check the stream states.
232+
// Streams marked as suppressHealthDegradation are excluded from the aggregate health
233+
// calculation — they still report per-stream status but do not cause
234+
// the unit to be reported as degraded or failed.
220235
reportedStatus := status.Running
221236
reportedMsg := "Healthy"
222237
for _, streamState := range u.streamStates {
238+
if streamState.suppressHealthDegradation {
239+
continue
240+
}
223241
switch streamState.state {
224242
case status.Degraded:
225243
if reportedStatus != status.Degraded {
@@ -307,8 +325,9 @@ func (u *agentUnit) updateStateForStream(streamID string, state status.Status, m
307325
}
308326

309327
u.streamStates[streamID] = unitState{
310-
state: state,
311-
msg: msg,
328+
state: state,
329+
msg: msg,
330+
suppressHealthDegradation: u.streamStates[streamID].suppressHealthDegradation,
312331
}
313332

314333
state, msg = u.calcState()
@@ -348,7 +367,11 @@ func (u *agentUnit) update(cu *client.Unit) {
348367
newStreamStates, newStreamIDs := getStreamStates(cu.Expected())
349368

350369
for key, state := range newStreamStates {
351-
if _, exists := u.streamStates[key]; exists {
370+
if existing, exists := u.streamStates[key]; exists {
371+
// Preserve current health state but update the suppressHealthDegradation flag
372+
// in case the stream config changed.
373+
existing.suppressHealthDegradation = state.suppressHealthDegradation
374+
u.streamStates[key] = existing
352375
continue
353376
}
354377

x-pack/libbeat/management/unit_test.go

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"github.com/elastic/elastic-agent-client/v7/pkg/client"
1111
"github.com/elastic/elastic-agent-client/v7/pkg/proto"
12+
"google.golang.org/protobuf/types/known/structpb"
1213

1314
"github.com/elastic/beats/v7/libbeat/management/status"
1415
)
@@ -181,6 +182,177 @@ func TestUnitUpdate(t *testing.T) {
181182
}
182183
}
183184

185+
func TestUnitUpdateSuppressHealthDegradation(t *testing.T) {
186+
187+
type StatusUpdate struct {
188+
status status.Status
189+
msg string
190+
}
191+
192+
const (
193+
Healthy = "Healthy"
194+
Failed = "Failed"
195+
Degraded = "Degraded"
196+
)
197+
198+
suppressedSource, _ := structpb.NewStruct(map[string]interface{}{
199+
"suppress_health_degradation": true,
200+
})
201+
202+
notSuppressedSource, _ := structpb.NewStruct(map[string]interface{}{
203+
"suppress_health_degradation": false,
204+
})
205+
206+
newUnit := func(streams []*proto.Stream) *mockClientUnit {
207+
return &mockClientUnit{
208+
expected: client.Expected{
209+
Config: &proto.UnitExpectedConfig{
210+
Id: "input-1",
211+
Streams: streams,
212+
},
213+
},
214+
}
215+
}
216+
217+
cases := []struct {
218+
name string
219+
unit *mockClientUnit
220+
inputLevelStatus StatusUpdate
221+
streamStates map[string]StatusUpdate
222+
expectedUnitStatus client.UnitState
223+
expectedUnitMsg string
224+
}{
225+
{
226+
name: "suppressed stream degraded does not affect unit health",
227+
unit: newUnit([]*proto.Stream{
228+
{Id: "stream-required"},
229+
{Id: "stream-suppressed", Source: suppressedSource},
230+
}),
231+
inputLevelStatus: StatusUpdate{status.Running, Healthy},
232+
streamStates: map[string]StatusUpdate{
233+
"stream-required": {status.Running, Healthy},
234+
"stream-suppressed": {status.Degraded, Degraded},
235+
},
236+
expectedUnitStatus: client.UnitStateHealthy,
237+
expectedUnitMsg: Healthy,
238+
},
239+
{
240+
name: "suppressed stream failed does not affect unit health",
241+
unit: newUnit([]*proto.Stream{
242+
{Id: "stream-required"},
243+
{Id: "stream-suppressed", Source: suppressedSource},
244+
}),
245+
inputLevelStatus: StatusUpdate{status.Running, Healthy},
246+
streamStates: map[string]StatusUpdate{
247+
"stream-required": {status.Running, Healthy},
248+
"stream-suppressed": {status.Failed, Failed},
249+
},
250+
expectedUnitStatus: client.UnitStateHealthy,
251+
expectedUnitMsg: Healthy,
252+
},
253+
{
254+
name: "required stream degraded still affects unit health even with suppressed streams",
255+
unit: newUnit([]*proto.Stream{
256+
{Id: "stream-required"},
257+
{Id: "stream-suppressed", Source: suppressedSource},
258+
}),
259+
inputLevelStatus: StatusUpdate{status.Running, Healthy},
260+
streamStates: map[string]StatusUpdate{
261+
"stream-required": {status.Degraded, Degraded},
262+
"stream-suppressed": {status.Degraded, Degraded},
263+
},
264+
expectedUnitStatus: client.UnitStateDegraded,
265+
expectedUnitMsg: Degraded,
266+
},
267+
{
268+
name: "required stream failed still affects unit health even with suppressed streams",
269+
unit: newUnit([]*proto.Stream{
270+
{Id: "stream-required"},
271+
{Id: "stream-suppressed", Source: suppressedSource},
272+
}),
273+
inputLevelStatus: StatusUpdate{status.Running, Healthy},
274+
streamStates: map[string]StatusUpdate{
275+
"stream-required": {status.Failed, Failed},
276+
"stream-suppressed": {status.Running, Healthy},
277+
},
278+
expectedUnitStatus: client.UnitStateFailed,
279+
expectedUnitMsg: Failed,
280+
},
281+
{
282+
name: "all suppressed streams degraded and failed keeps unit healthy",
283+
unit: newUnit([]*proto.Stream{
284+
{Id: "stream-suppressed-1", Source: suppressedSource},
285+
{Id: "stream-suppressed-2", Source: suppressedSource},
286+
}),
287+
inputLevelStatus: StatusUpdate{status.Running, Healthy},
288+
streamStates: map[string]StatusUpdate{
289+
"stream-suppressed-1": {status.Degraded, Degraded},
290+
"stream-suppressed-2": {status.Failed, Failed},
291+
},
292+
expectedUnitStatus: client.UnitStateHealthy,
293+
expectedUnitMsg: Healthy,
294+
},
295+
{
296+
name: "suppress false behaves same as not set",
297+
unit: newUnit([]*proto.Stream{
298+
{Id: "stream-explicit-false", Source: notSuppressedSource},
299+
}),
300+
inputLevelStatus: StatusUpdate{status.Running, Healthy},
301+
streamStates: map[string]StatusUpdate{
302+
"stream-explicit-false": {status.Degraded, Degraded},
303+
},
304+
expectedUnitStatus: client.UnitStateDegraded,
305+
expectedUnitMsg: Degraded,
306+
},
307+
{
308+
name: "input level degraded is not affected by suppress flag",
309+
unit: newUnit([]*proto.Stream{
310+
{Id: "stream-suppressed", Source: suppressedSource},
311+
}),
312+
inputLevelStatus: StatusUpdate{status.Degraded, Degraded},
313+
streamStates: map[string]StatusUpdate{
314+
"stream-suppressed": {status.Running, Healthy},
315+
},
316+
expectedUnitStatus: client.UnitStateDegraded,
317+
expectedUnitMsg: Degraded,
318+
},
319+
{
320+
name: "input level failed is not affected by suppress flag",
321+
unit: newUnit([]*proto.Stream{
322+
{Id: "stream-suppressed", Source: suppressedSource},
323+
}),
324+
inputLevelStatus: StatusUpdate{status.Failed, Failed},
325+
streamStates: map[string]StatusUpdate{
326+
"stream-suppressed": {status.Running, Healthy},
327+
},
328+
expectedUnitStatus: client.UnitStateFailed,
329+
expectedUnitMsg: Failed,
330+
},
331+
}
332+
333+
for _, c := range cases {
334+
t.Run(c.name, func(t *testing.T) {
335+
aUnit := newAgentUnit(c.unit, nil)
336+
err := aUnit.UpdateState(c.inputLevelStatus.status, c.inputLevelStatus.msg, nil)
337+
if err != nil {
338+
t.Fatal(err)
339+
}
340+
341+
for id, state := range c.streamStates {
342+
aUnit.updateStateForStream(id, state.status, state.msg)
343+
}
344+
345+
if c.unit.reportedState != c.expectedUnitStatus {
346+
t.Errorf("expected unit status %s, got %s", c.expectedUnitStatus, c.unit.reportedState)
347+
}
348+
349+
if c.unit.reportedMsg != c.expectedUnitMsg {
350+
t.Errorf("expected unit msg %q, got %q", c.expectedUnitMsg, c.unit.reportedMsg)
351+
}
352+
})
353+
}
354+
}
355+
184356
type mockClientUnit struct {
185357
expected client.Expected
186358
reportedState client.UnitState

0 commit comments

Comments
 (0)