Skip to content

Commit 3633f86

Browse files
fbreceiver: stabilize TestConsumeContract startup timing (#49238) (#49354)
Implement a readness probe for the log generator that waits until the Filestream input is publishing metrics. This fixes some flakness due to the assertions starting before the Filestream input was actually running, thus failing the test. GenAI-Assisted: Yes Human-Reviewed: Yes Tool: Cursor-CLI, Model: GPT-5.3 Codex Extra High Fast (cherry picked from commit 43c4d7d) Co-authored-by: Tiago Queiroz <tiago.queiroz@elastic.co>
1 parent a624a66 commit 3633f86

File tree

1 file changed

+81
-2
lines changed

1 file changed

+81
-2
lines changed

x-pack/filebeat/fbreceiver/receiver_test.go

Lines changed: 81 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
22
// or more contributor license agreements. Licensed under the Elastic License;
33
// you may not use this file except in compliance with the Elastic License.
4+
// This file was contributed to by generative AI
45

56
package fbreceiver
67

@@ -21,6 +22,7 @@ import (
2122
"strings"
2223
"sync/atomic"
2324
"testing"
25+
"time"
2426

2527
"github.com/gofrs/uuid/v5"
2628
"go.opentelemetry.io/collector/pdata/pcommon"
@@ -501,12 +503,73 @@ func getFromSocket(t *testing.T, sb *strings.Builder, socketPath string, endpoin
501503
return true
502504
}
503505

506+
func hasInputMetricsFromUnixSocket(t *testing.T, socketPath string, inputID string) error {
507+
// skip windows for now
508+
if runtime.GOOS == "windows" {
509+
return nil
510+
}
511+
512+
client := http.Client{
513+
Transport: &http.Transport{
514+
DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
515+
return (&net.Dialer{}).DialContext(ctx, "unix", socketPath)
516+
},
517+
},
518+
}
519+
defer client.CloseIdleConnections()
520+
521+
inputsURL, err := url.JoinPath("http://unix", "inputs")
522+
if err != nil {
523+
return fmt.Errorf("JoinPath failed: %w", err)
524+
}
525+
526+
req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, inputsURL, nil)
527+
if err != nil {
528+
return fmt.Errorf("error creating request: %w", err)
529+
}
530+
531+
resp, err := client.Do(req)
532+
if err != nil {
533+
return fmt.Errorf("client.Get failed: %w", err)
534+
}
535+
defer resp.Body.Close()
536+
537+
body, err := io.ReadAll(resp.Body)
538+
if err != nil {
539+
return fmt.Errorf("io.ReadAll of body failed: %w", err)
540+
}
541+
542+
if len(body) <= 0 {
543+
return errors.New("body too short")
544+
}
545+
546+
var inputs []map[string]any
547+
if err := json.Unmarshal(body, &inputs); err != nil {
548+
return fmt.Errorf("json unmarshal of body failed: %w (body=%q)", err, body)
549+
}
550+
551+
if len(inputs) <= 0 {
552+
return errors.New("json array didn't have any entries")
553+
}
554+
555+
for _, input := range inputs {
556+
id, _ := input["id"].(string)
557+
if id != inputID {
558+
continue
559+
}
560+
return nil
561+
}
562+
563+
return fmt.Errorf("input %q not found in /inputs payload", inputID)
564+
}
565+
504566
type logGenerator struct {
505567
t *testing.T
506568
tmpDir string
507569
f *os.File
508570
sequenceNum int64
509571
currentFile string
572+
waitReady func()
510573
}
511574

512575
func newLogGenerator(t *testing.T, tmpDir string) *logGenerator {
@@ -528,6 +591,9 @@ func (g *logGenerator) Start() {
528591
g.f = f
529592
g.currentFile = filePath
530593
atomic.StoreInt64(&g.sequenceNum, 0)
594+
if g.waitReady != nil {
595+
g.waitReady()
596+
}
531597
}
532598

533599
func (g *logGenerator) Stop() {
@@ -562,20 +628,31 @@ func TestConsumeContract(t *testing.T) {
562628
defer oteltest.VerifyNoLeaks(t)
563629

564630
tmpDir := t.TempDir()
565-
const logsPerTest = 100
631+
monitorSocket := genSocketPath(t)
632+
const (
633+
logsPerTest = 100
634+
filestreamInputID = "filestream-test"
635+
)
566636

567637
gen := newLogGenerator(t, tmpDir)
638+
gen.waitReady = func() {
639+
require.EventuallyWithT(t, func(c *assert.CollectT) {
640+
err := hasInputMetricsFromUnixSocket(t, monitorSocket, filestreamInputID)
641+
assert.NoError(c, err, "receiver input metrics are not ready")
642+
}, 30*time.Second, 100*time.Millisecond)
643+
}
568644

569645
t.Setenv("OTELCONSUMER_RECEIVERTEST", "1")
570646

571647
cfg := &Config{
572648
Beatconfig: map[string]any{
573649
"queue.mem.flush.timeout": "0s",
650+
"processors": []map[string]any{},
574651
"filebeat": map[string]any{
575652
"inputs": []map[string]any{
576653
{
577654
"type": "filestream",
578-
"id": "filestream-test",
655+
"id": filestreamInputID,
579656
"enabled": true,
580657
"paths": []string{
581658
filepath.Join(tmpDir, "input.log"),
@@ -597,6 +674,8 @@ func TestConsumeContract(t *testing.T) {
597674
},
598675
},
599676
},
677+
"http.enabled": true,
678+
"http.host": hostFromSocket(monitorSocket),
600679
"logging": map[string]any{
601680
"level": "debug",
602681
"selectors": []string{

0 commit comments

Comments
 (0)