Skip to content

Commit abeb91b

Browse files
committed
Fix double newline in NDJSON bulk body when using RawEncoding
When events are pre-encoded in the queue (via event_encoder.go), the encoded bytes include a trailing newline from the Marshal/AddRaw call. When the bulk body assembler later writes these bytes via AddRaw(RawEncoding{...}), it unconditionally appends another newline, producing an empty line (\n\n) in the NDJSON bulk body. While Elasticsearch tolerates empty lines in bulk requests, ES-compatible endpoints like Axiom and OpenSearch reject them with: 400 Bad Request: invalid event at index 1: ReadObject: expect { or , or } or n, but found \u0000 The fix checks whether RawEncoding bytes already end with a newline and skips the additional one if so. This preserves backward compatibility: RawEncoding bytes without a trailing newline (e.g. from json.Marshal) still get the newline appended as before. Applied to both jsonEncoder and gzipEncoder paths.
1 parent 1c08869 commit abeb91b

File tree

3 files changed

+14
-43
lines changed

3 files changed

+14
-43
lines changed
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
kind: bug-fix
2+
3+
summary: Fix double newline in NDJSON bulk body when using pre-encoded events (RawEncoding)
4+
5+
component: libbeat
6+
7+
pr: https://github.com/elastic/beats/pull/49557

libbeat/esleg/eslegclient/enc.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -158,11 +158,7 @@ func (b *jsonEncoder) AddRaw(obj interface{}) error {
158158
err = b.folder.Fold(event{Timestamp: v.Timestamp, Fields: v.Fields})
159159
case RawEncoding:
160160
_, err = b.buf.Write(v.Encoding)
161-
// If the pre-encoded bytes already end with a newline (as produced
162-
// by Marshal/AddRaw), skip the trailing WriteByte('\n') below to
163-
// avoid a double newline in the NDJSON bulk body. An empty line
164-
// between bulk items causes non-Elasticsearch endpoints (Axiom,
165-
// OpenSearch) to reject the request.
161+
// Skip trailing newline if already present to avoid an empty line in NDJSON bulk body.
166162
if err == nil && len(v.Encoding) > 0 && v.Encoding[len(v.Encoding)-1] == '\n' {
167163
return nil
168164
}

libbeat/esleg/eslegclient/enc_test.go

Lines changed: 6 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"encoding/json"
2222
"net/http"
2323
"strconv"
24-
"strings"
2524
"testing"
2625
"time"
2726

@@ -67,12 +66,7 @@ func TestJSONEncoderMarshalMonitoringEvent(t *testing.T) {
6766
"Unexpected marshaled format of report.Event")
6867
}
6968

70-
// TestRawEncodingNoDoubleNewline verifies that writing a RawEncoding whose
71-
// bytes already end with '\n' does not produce a double newline in the output
72-
// buffer. A double newline in an NDJSON bulk body creates an empty line that
73-
// Elasticsearch-compatible endpoints (Axiom, OpenSearch, etc.) reject.
7469
func TestRawEncodingNoDoubleNewline(t *testing.T) {
75-
// Pre-encode an event via Marshal, which appends a trailing '\n'.
7670
encoder := NewJSONEncoder(nil, false)
7771
event := beat.Event{
7872
Timestamp: time.Date(2024, time.January, 1, 0, 0, 0, 0, time.UTC),
@@ -82,42 +76,16 @@ func TestRawEncodingNoDoubleNewline(t *testing.T) {
8276
require.NoError(t, err)
8377
preEncoded := make([]byte, encoder.buf.Len())
8478
copy(preEncoded, encoder.buf.Bytes())
79+
require.Equal(t, byte('\n'), preEncoded[len(preEncoded)-1], "pre-encoded event should end with newline")
8580

86-
// Verify the pre-encoded bytes end with exactly one newline.
87-
require.True(t, len(preEncoded) > 0 && preEncoded[len(preEncoded)-1] == '\n',
88-
"pre-encoded event should end with newline")
89-
90-
// Now simulate a bulk body: meta line + RawEncoding document.
81+
// Simulate a bulk body: meta + pre-encoded document.
9182
encoder.Reset()
92-
meta := map[string]interface{}{
93-
"index": map[string]interface{}{"_index": "test"},
94-
}
95-
err = encoder.AddRaw(meta)
96-
require.NoError(t, err)
97-
err = encoder.AddRaw(RawEncoding{Encoding: preEncoded})
98-
require.NoError(t, err)
83+
meta := map[string]any{"index": map[string]any{"_index": "test"}}
84+
require.NoError(t, encoder.AddRaw(meta))
85+
require.NoError(t, encoder.AddRaw(RawEncoding{Encoding: preEncoded}))
9986

10087
body := encoder.buf.String()
101-
102-
// The body must not contain "\n\n" (double newline / empty line).
103-
assert.NotContains(t, body, "\n\n",
104-
"bulk body must not contain an empty line from double newline; got:\n%s", body)
105-
106-
// The body should be exactly: meta\ndocument\n
107-
lines := splitNDJSON(body)
108-
assert.Equal(t, 2, len(lines),
109-
"bulk body should have exactly 2 NDJSON lines (meta + document); got %d:\n%s", len(lines), body)
110-
}
111-
112-
// splitNDJSON splits an NDJSON string into non-empty lines.
113-
func splitNDJSON(s string) []string {
114-
var lines []string
115-
for _, line := range strings.Split(s, "\n") {
116-
if line != "" {
117-
lines = append(lines, line)
118-
}
119-
}
120-
return lines
88+
assert.NotContains(t, body, "\n\n", "bulk body must not contain empty lines; got:\n%s", body)
12189
}
12290

12391
func TestEncoderHeaders(t *testing.T) {

0 commit comments

Comments
 (0)