From 5dc81925d5ffd763e3e871bd1cf3324b9433b1df Mon Sep 17 00:00:00 2001 From: Elias Pol Date: Mon, 30 Jun 2025 09:34:30 -0400 Subject: [PATCH] Update batch writer, add to opensearch sink. Not using the batch API for opensearch (and likely for elasticsearch as well) is very bad for perfomance, easliy causing event backups in moderately busy clusters. This commit updates the batch writer to make it a little more clear in naming and configuration (while also updating the one existing user, bigquery, to keep its current interface). This also adds the ability to force the opensearch sink to output events with a consistent field as their most recent timestamp, which is very useful for datastreams. --- Dockerfile | 9 +- pkg/batch/writer.go | 44 ++++---- pkg/batch/writer_test.go | 71 ++++++------- pkg/sinks/bigquery.go | 25 ++--- pkg/sinks/opensearch.go | 221 ++++++++++++++++++++++++++++++++++++--- 5 files changed, 286 insertions(+), 84 deletions(-) diff --git a/Dockerfile b/Dockerfile index 5d2d1001..16f3820e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -3,8 +3,15 @@ FROM golang:1.20 AS builder ARG VERSION ENV PKG github.com/resmoio/kubernetes-event-exporter/pkg -ADD . /app WORKDIR /app + +# Build deps first to improve local build times when iterating. +ADD go.mod go.sum ./ + +RUN go mod download + +# Then add the rest of the code. +ADD . ./ RUN CGO_ENABLED=0 GOOS=linux GO11MODULE=on go build -ldflags="-s -w -X ${PKG}/version.Version=${VERSION}" -a -o /main . FROM gcr.io/distroless/static:nonroot diff --git a/pkg/batch/writer.go b/pkg/batch/writer.go index e484d332..35124790 100644 --- a/pkg/batch/writer.go +++ b/pkg/batch/writer.go @@ -3,15 +3,16 @@ package batch import ( "context" "time" -) + "github.com/pingcap/log" +) -// Writer allows to buffer some items and call the Handler function either when the buffer is full or the timeout is +// BufferWriter allows to buffer some items and call the Handler function either when the buffer is full or the timeout is // reached. There will also be support for concurrency for high volume. The handler function is supposed to return an // array of booleans to indicate whether the transfer was successful or not. It can be replaced with status codes in // the future to differentiate I/O errors, rate limiting, authorization issues. -type Writer struct { - cfg WriterConfig +type BufferWriter struct { + cfg BufferWriterConfig Handler Callback buffer []bufferItem len int @@ -27,34 +28,39 @@ type bufferItem struct { type Callback func(ctx context.Context, items []interface{}) []bool -type WriterConfig struct { - BatchSize int - MaxRetries int - Interval time.Duration - Timeout time.Duration +type BufferWriterConfig struct { + // Max events queued for a batch before a flush. + BatchSizeEvents int `yaml:"batchSizeEvents"` + // Max retries for each individual event. + MaxRetriesPerEvent int `yaml:"maxRetriesPerEvent"` + // Batches are processed + BatchIntervalSeconds int `yaml:"batchIntervalSeconds"` + // TODO: this doesn't do anything! + BatchTimeoutSeconds int `yaml:"batchTimeoutSeconds"` } -func NewWriter(cfg WriterConfig, cb Callback) *Writer { - return &Writer{ +func NewBufferWriter(cfg BufferWriterConfig, cb Callback) *BufferWriter { + log.Info().Msgf("New Buffer Writer created with config: %+v", cfg) + return &BufferWriter{ cfg: cfg, Handler: cb, - buffer: make([]bufferItem, cfg.BatchSize), + buffer: make([]bufferItem, cfg.BatchSizeEvents), } } // Indicates the start to accept the -func (w *Writer) Start() { +func (w *BufferWriter) Start() { w.done = make(chan bool) w.items = make(chan interface{}) w.stopDone = make(chan bool) - ticker := time.NewTicker(w.cfg.Interval) + ticker := time.NewTicker(time.Duration(w.cfg.BatchIntervalSeconds) * time.Second) go func() { shouldGoOn := true for shouldGoOn { select { case item := <-w.items: - if w.len >= w.cfg.BatchSize { + if w.len >= w.cfg.BatchSizeEvents { w.processBuffer(context.Background()) w.len = 0 } @@ -73,7 +79,7 @@ func (w *Writer) Start() { }() } -func (w *Writer) processBuffer(ctx context.Context) { +func (w *BufferWriter) processBuffer(ctx context.Context) { if w.len == 0 { return } @@ -93,7 +99,7 @@ func (w *Writer) processBuffer(ctx context.Context) { for idx, success := range responses { if !success { item := w.buffer[idx] - if item.attempt >= w.cfg.MaxRetries { + if item.attempt >= w.cfg.MaxRetriesPerEvent { // It's dropped, sorry you asked for it continue } @@ -112,13 +118,13 @@ func (w *Writer) processBuffer(ctx context.Context) { } // Used to signal writer to stop processing items and exit. -func (w *Writer) Stop() { +func (w *BufferWriter) Stop() { w.done <- true <-w.stopDone } // Submit pushes the items to the income buffer and they are placed onto the actual buffer from there. -func (w *Writer) Submit(items ...interface{}) { +func (w *BufferWriter) Submit(items ...interface{}) { for _, item := range items { w.items <- item } diff --git a/pkg/batch/writer_test.go b/pkg/batch/writer_test.go index 6edfb75b..d6e3c327 100644 --- a/pkg/batch/writer_test.go +++ b/pkg/batch/writer_test.go @@ -2,20 +2,21 @@ package batch import ( "context" - "github.com/stretchr/testify/assert" "testing" "time" + + "github.com/stretchr/testify/assert" ) func TestSimpleWriter(t *testing.T) { - cfg := WriterConfig{ - BatchSize: 10, - MaxRetries: 3, - Interval: time.Second * 2, + cfg := BufferWriterConfig{ + BatchSizeEvents: 10, + MaxRetriesPerEvent: 3, + BatchIntervalSeconds: 2, } var allItems []interface{} - w := NewWriter(cfg, func(ctx context.Context, items []interface{}) []bool { + w := NewBufferWriter(cfg, func(ctx context.Context, items []interface{}) []bool { resp := make([]bool, len(items)) for idx := range resp { resp[idx] = true @@ -44,14 +45,14 @@ func TestCorrectnessManyTimes(t *testing.T) { } func TestLargerThanBatchSize(t *testing.T) { - cfg := WriterConfig{ - BatchSize: 3, - MaxRetries: 3, - Interval: time.Second * 2, + cfg := BufferWriterConfig{ + BatchSizeEvents: 3, + MaxRetriesPerEvent: 3, + BatchIntervalSeconds: 2, } allItems := make([][]interface{}, 0) - w := NewWriter(cfg, func(ctx context.Context, items []interface{}) []bool { + w := NewBufferWriter(cfg, func(ctx context.Context, items []interface{}) []bool { resp := make([]bool, len(items)) for idx := range resp { resp[idx] = true @@ -72,14 +73,14 @@ func TestLargerThanBatchSize(t *testing.T) { } func TestSimpleInterval(t *testing.T) { - cfg := WriterConfig{ - BatchSize: 5, - MaxRetries: 3, - Interval: time.Millisecond * 20, + cfg := BufferWriterConfig{ + BatchSizeEvents: 5, + MaxRetriesPerEvent: 3, + BatchIntervalSeconds: 1, } allItems := make([][]interface{}, 0) - w := NewWriter(cfg, func(ctx context.Context, items []interface{}) []bool { + w := NewBufferWriter(cfg, func(ctx context.Context, items []interface{}) []bool { resp := make([]bool, len(items)) for idx := range resp { resp[idx] = true @@ -94,7 +95,7 @@ func TestSimpleInterval(t *testing.T) { time.Sleep(time.Millisecond * 5) assert.Len(t, allItems, 0) - time.Sleep(time.Millisecond * 50) + time.Sleep(time.Second * 2) assert.Len(t, allItems, 1) assert.Equal(t, allItems[0], []interface{}{1, 2}) @@ -103,14 +104,14 @@ func TestSimpleInterval(t *testing.T) { } func TestIntervalComplex(t *testing.T) { - cfg := WriterConfig{ - BatchSize: 5, - MaxRetries: 3, - Interval: time.Millisecond * 20, + cfg := BufferWriterConfig{ + BatchSizeEvents: 5, + MaxRetriesPerEvent: 3, + BatchIntervalSeconds: 1, } allItems := make([][]interface{}, 0) - w := NewWriter(cfg, func(ctx context.Context, items []interface{}) []bool { + w := NewBufferWriter(cfg, func(ctx context.Context, items []interface{}) []bool { resp := make([]bool, len(items)) for idx := range resp { resp[idx] = true @@ -126,7 +127,7 @@ func TestIntervalComplex(t *testing.T) { w.Submit(3, 4) assert.Len(t, allItems, 0) - time.Sleep(time.Millisecond * 50) + time.Sleep(time.Second * 2) assert.Len(t, allItems, 1) assert.Equal(t, allItems[0], []interface{}{1, 2, 3, 4}) @@ -135,14 +136,14 @@ func TestIntervalComplex(t *testing.T) { } func TestIntervalComplexAfterFlush(t *testing.T) { - cfg := WriterConfig{ - BatchSize: 5, - MaxRetries: 3, - Interval: time.Millisecond * 20, + cfg := BufferWriterConfig{ + BatchSizeEvents: 5, + MaxRetriesPerEvent: 3, + BatchIntervalSeconds: 1, } allItems := make([][]interface{}, 0) - w := NewWriter(cfg, func(ctx context.Context, items []interface{}) []bool { + w := NewBufferWriter(cfg, func(ctx context.Context, items []interface{}) []bool { resp := make([]bool, len(items)) for idx := range resp { resp[idx] = true @@ -158,7 +159,7 @@ func TestIntervalComplexAfterFlush(t *testing.T) { w.Submit(3, 4) assert.Len(t, allItems, 0) - time.Sleep(time.Millisecond * 50) + time.Sleep(time.Second * 2) assert.Len(t, allItems, 1) assert.Equal(t, allItems[0], []interface{}{1, 2, 3, 4}) @@ -170,14 +171,14 @@ func TestIntervalComplexAfterFlush(t *testing.T) { } func TestRetry(t *testing.T) { - cfg := WriterConfig{ - BatchSize: 5, - MaxRetries: 3, - Interval: time.Millisecond * 10, + cfg := BufferWriterConfig{ + BatchSizeEvents: 5, + MaxRetriesPerEvent: 3, + BatchIntervalSeconds: 1, } allItems := make([][]interface{}, 0) - w := NewWriter(cfg, func(ctx context.Context, items []interface{}) []bool { + w := NewBufferWriter(cfg, func(ctx context.Context, items []interface{}) []bool { resp := make([]bool, len(items)) for idx := range resp { resp[idx] = items[idx] != 2 @@ -191,7 +192,7 @@ func TestRetry(t *testing.T) { w.Submit(1, 2, 3) assert.Len(t, allItems, 0) - time.Sleep(time.Millisecond * 200) + time.Sleep(time.Second * 5) assert.Len(t, allItems, 4) assert.Equal(t, allItems[0], []interface{}{1, 2, 3}) diff --git a/pkg/sinks/bigquery.go b/pkg/sinks/bigquery.go index 7bc09f31..6cd6c6f6 100644 --- a/pkg/sinks/bigquery.go +++ b/pkg/sinks/bigquery.go @@ -2,19 +2,20 @@ package sinks import ( "bufio" - "cloud.google.com/go/bigquery" "context" "encoding/json" "errors" "fmt" - "github.com/resmoio/kubernetes-event-exporter/pkg/batch" - "github.com/resmoio/kubernetes-event-exporter/pkg/kube" - "github.com/rs/zerolog/log" - "google.golang.org/api/option" "math/rand" "os" "time" "unicode" + + "cloud.google.com/go/bigquery" + "github.com/resmoio/kubernetes-event-exporter/pkg/batch" + "github.com/resmoio/kubernetes-event-exporter/pkg/kube" + "github.com/rs/zerolog/log" + "google.golang.org/api/option" ) // Returns a map filtering out keys that have nil value assigned. @@ -206,12 +207,12 @@ func NewBigQuerySink(cfg *BigQueryConfig) (*BigQuerySink, error) { log.Error().Msgf("BigQuerySink create dataset failed: %v", err) } - batchWriter := batch.NewWriter( - batch.WriterConfig{ - BatchSize: cfg.BatchSize, - MaxRetries: cfg.MaxRetries, - Interval: time.Duration(cfg.IntervalSeconds) * time.Second, - Timeout: time.Duration(cfg.TimeoutSeconds) * time.Second, + batchWriter := batch.NewBufferWriter( + batch.BufferWriterConfig{ + BatchSizeEvents: cfg.BatchSize, + MaxRetriesPerEvent: cfg.MaxRetries, + BatchIntervalSeconds: cfg.IntervalSeconds, + BatchTimeoutSeconds: cfg.TimeoutSeconds, }, handleBatch, ) @@ -221,7 +222,7 @@ func NewBigQuerySink(cfg *BigQueryConfig) (*BigQuerySink, error) { } type BigQuerySink struct { - batchWriter *batch.Writer + batchWriter *batch.BufferWriter } func (e *BigQuerySink) Send(ctx context.Context, ev *kube.EnhancedEvent) error { diff --git a/pkg/sinks/opensearch.go b/pkg/sinks/opensearch.go index 6f58507d..3207bc3c 100644 --- a/pkg/sinks/opensearch.go +++ b/pkg/sinks/opensearch.go @@ -13,6 +13,7 @@ import ( opensearch "github.com/opensearch-project/opensearch-go" opensearchapi "github.com/opensearch-project/opensearch-go/opensearchapi" + "github.com/resmoio/kubernetes-event-exporter/pkg/batch" "github.com/resmoio/kubernetes-event-exporter/pkg/kube" "github.com/rs/zerolog/log" ) @@ -28,9 +29,14 @@ type OpenSearchConfig struct { DeDot bool `yaml:"deDot"` Index string `yaml:"index"` IndexFormat string `yaml:"indexFormat"` - Type string `yaml:"type"` TLS TLS `yaml:"tls"` Layout map[string]interface{} `yaml:"layout"` + // The key to place the greater of eventTime and lastTimestamp into. Useful for datastreams. + CombineTimestampTo string `yaml:"combineTimestampTo"` + + // Batching options. If this block is not present, events are sent individually. + // Note that opensearch calls this "bulk". + Batch *batch.BufferWriterConfig `yaml:"batch"` } func NewOpenSearch(cfg *OpenSearchConfig) (*OpenSearch, error) { @@ -52,15 +58,30 @@ func NewOpenSearch(cfg *OpenSearchConfig) (*OpenSearch, error) { return nil, err } - return &OpenSearch{ + // Init the object first + ptr := &OpenSearch{ client: client, cfg: cfg, - }, nil + batch: nil, + } + + // Initialize the batch writer if needed. + var bufferWriter *batch.BufferWriter + if cfg.Batch != nil { + bufferWriter = batch.NewBufferWriter(*cfg.Batch, ptr.SendBatch) + bufferWriter.Start() + } + + // Update the object with the writer. + ptr.batch = bufferWriter + + return ptr, nil } type OpenSearch struct { client *opensearch.Client cfg *OpenSearchConfig + batch *batch.BufferWriter } var osRegex = regexp.MustCompile(`(?s){(.*)}`) @@ -83,45 +104,93 @@ func osFormatIndexName(pattern string, when time.Time) string { return builder.String() } -func (e *OpenSearch) Send(ctx context.Context, ev *kube.EnhancedEvent) error { +func (e *OpenSearch) getIndex() string { + var index string + if len(e.cfg.IndexFormat) > 0 { + now := time.Now() + index = osFormatIndexName(e.cfg.IndexFormat, now) + } else { + index = e.cfg.Index + } + return index +} + +func (e *OpenSearch) prepareEvent(ev *kube.EnhancedEvent) ([]byte, error) { var toSend []byte + // DeDot the event if needed if e.cfg.DeDot { de := ev.DeDot() ev = &de } + // Handle layout if set if e.cfg.Layout != nil { res, err := convertLayoutTemplate(e.cfg.Layout, ev) if err != nil { - return err + return nil, err } toSend, err = json.Marshal(res) if err != nil { - return err + return nil, err } } else { toSend = ev.ToJSON() + if e.cfg.CombineTimestampTo != "" { + // Janky approach but avoids modifying the underlying functions or struct, for a fairly + // sink-specific need. + var jsonEvent map[string]interface{} + // We just marshalled, an error here would be unbelievable. + json.Unmarshal(toSend, &jsonEvent) + if _, ok := jsonEvent[e.cfg.CombineTimestampTo]; ok { + // We don't want to overwrite the existing value. + return nil, fmt.Errorf("key '%s' already exists in event", e.cfg.CombineTimestampTo) + } + // Can't use .GetTimestamp* since that's *first* timestamp. + var timestamp string + if !ev.LastTimestamp.Time.IsZero() { + timestamp = ev.LastTimestamp.Time.Format(time.RFC3339) + } else { + timestamp = ev.EventTime.Time.Format(time.RFC3339) + } + jsonEvent[e.cfg.CombineTimestampTo] = timestamp + var err error + toSend, err = json.Marshal(jsonEvent) + if err != nil { + // Most likely a strange value in the key. + log.Error().Msgf("Failed to marshal event with timestamp '%s' in key '%s': %s", ev.GetTimestampISO8601(), e.cfg.CombineTimestampTo, err) + return nil, err + } + } } - var index string - if len(e.cfg.IndexFormat) > 0 { - now := time.Now() - index = osFormatIndexName(e.cfg.IndexFormat, now) + return toSend, nil +} + +func (e *OpenSearch) Send(ctx context.Context, ev *kube.EnhancedEvent) error { + if e.batch != nil { + log.Debug().Msgf("Event %s submitted to batch", ev.Message) + e.batch.Submit(ev) + return nil } else { - index = e.cfg.Index + log.Debug().Msgf("Sending event %s individually", ev.Message) + return e.SendSingle(ctx, ev) + } +} + +func (e *OpenSearch) SendSingle(ctx context.Context, ev *kube.EnhancedEvent) error { + toSend, err := e.prepareEvent(ev) + if err != nil { + return err } + index := e.getIndex() + req := opensearchapi.IndexRequest{ Body: bytes.NewBuffer(toSend), Index: index, } - // This should not be used for clusters with ES8.0+. - if len(e.cfg.Type) > 0 { - req.DocumentType = e.cfg.Type - } - if e.cfg.UseEventID { req.DocumentID = string(ev.UID) } @@ -142,6 +211,124 @@ func (e *OpenSearch) Send(ctx context.Context, ev *kube.EnhancedEvent) error { return nil } +func (e *OpenSearch) PrepareBatchItem(item interface{}) (string, error) { + toSend, err := e.prepareEvent(item.(*kube.EnhancedEvent)) + if err != nil { + return "", err + } + + var firstline string + var secondline string + if e.cfg.UseEventID { + // Must upsert the event + firstline = fmt.Sprintf(`{"update": {"_index": "%s", "_id": "%s"}}`, e.getIndex(), item.(*kube.EnhancedEvent).UID) + secondline = fmt.Sprintf(`{"doc": %s, "doc_as_upsert": true}`, string(toSend)) + } else { + // Every event is unique, so they should all be creation. + firstline = fmt.Sprintf(`{"create": {"_index": "%s"}}`, e.getIndex()) + secondline = string(toSend) + } + + return fmt.Sprintf("%s\n%s", firstline, secondline), nil +} + +// Unfortunately the opensearch client does not define non-generic response types. +type BulkResponseItem struct { + Id string `json:"_id"` + Index string `json:"_index"` + Result string `json:"result"` + Status int `json:"status"` + Error struct { + Type string `json:"type"` + Reason string `json:"reason"` + } `json:"error"` +} +type BulkResponseItemWrapper struct { + // Only one of these will be present. + Update BulkResponseItem `json:"update"` + Create BulkResponseItem `json:"create"` +} +type BulkResponse struct { + Errors bool `json:"errors"` + Items []BulkResponseItemWrapper `json:"items"` +} + +func (e *OpenSearch) SendBatch(ctx context.Context, items []interface{}) []bool { + res := make([]bool, len(items)) + for i := range items { + res[i] = true + } + + bulkItems := make([]string, len(items)) + failed := 0 + for i, item := range items { + prepared, err := e.PrepareBatchItem(item) + if err != nil { + log.Error().Msgf("Could not prepare batch item: %s", err) + failed++ + continue + } + // Skip any failed items so empty items are left at the end. + bulkItems[i-failed] = prepared + } + + if failed > 0 { + log.Error().Msgf("Failed to prepare %d batch items", failed) + } + // Remove all empty items. + bulkItems = bulkItems[:len(items)-failed] + // Make sure there's a list left. + if len(bulkItems) == 0 { + log.Error().Msg("No items to send!") + return []bool{} + } + + bulk := opensearchapi.BulkRequest{ + // For some reason, bulk requests must have a trailing newline. + Body: bytes.NewBuffer([]byte(strings.Join(bulkItems, "\n") + "\n")), + } + + // Actually send the request. + resp, err := bulk.Do(ctx, e.client) + // Request failures are likely related to connection issues. + if err != nil { + log.Error().Msgf("Could not send bulk request: %s", err) + // Mark all items as failed + for i := range items { + res[i] = false + } + return res + } + + log.Debug().Msgf("Bulk request of %d items was processed.", len(items)) + + defer resp.Body.Close() + + // Deeper error inspection -- it could be that the request succeeded but some items failed. + var bulkResponse BulkResponse + json.NewDecoder(resp.Body).Decode(&bulkResponse) + + if bulkResponse.Errors { + for i, item := range bulkResponse.Items { + var inner BulkResponseItem + if item.Update != (BulkResponseItem{}) { + inner = item.Update + } else { + inner = item.Create + } + if inner.Error.Type != "" { + res[i] = false + log.Error().Msgf("Document %s failed to index in index %s: %s (status %d)", inner.Id, inner.Index, inner.Error.Reason, inner.Status) + } + } + } + + return res +} + func (e *OpenSearch) Close() { - // No-op + if e.batch != nil { + e.batch.Stop() + } + // No-op if no batch writer, }