diff --git a/Dockerfile b/Dockerfile index 5d2d1001..16f3820e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -3,8 +3,15 @@ FROM golang:1.20 AS builder ARG VERSION ENV PKG github.com/resmoio/kubernetes-event-exporter/pkg -ADD . /app WORKDIR /app + +# Build deps first to improve local build times when iterating. +ADD go.mod go.sum ./ + +RUN go mod download + +# Then add the rest of the code. +ADD . ./ RUN CGO_ENABLED=0 GOOS=linux GO11MODULE=on go build -ldflags="-s -w -X ${PKG}/version.Version=${VERSION}" -a -o /main . FROM gcr.io/distroless/static:nonroot diff --git a/pkg/batch/writer.go b/pkg/batch/writer.go index e484d332..35124790 100644 --- a/pkg/batch/writer.go +++ b/pkg/batch/writer.go @@ -3,15 +3,16 @@ package batch import ( "context" "time" -) + "github.com/pingcap/log" +) -// Writer allows to buffer some items and call the Handler function either when the buffer is full or the timeout is +// BufferWriter allows to buffer some items and call the Handler function either when the buffer is full or the timeout is // reached. There will also be support for concurrency for high volume. The handler function is supposed to return an // array of booleans to indicate whether the transfer was successful or not. It can be replaced with status codes in // the future to differentiate I/O errors, rate limiting, authorization issues. -type Writer struct { - cfg WriterConfig +type BufferWriter struct { + cfg BufferWriterConfig Handler Callback buffer []bufferItem len int @@ -27,34 +28,39 @@ type bufferItem struct { type Callback func(ctx context.Context, items []interface{}) []bool -type WriterConfig struct { - BatchSize int - MaxRetries int - Interval time.Duration - Timeout time.Duration +type BufferWriterConfig struct { + // Max events queued for a batch before a flush. + BatchSizeEvents int `yaml:"batchSizeEvents"` + // Max retries for each individual event. + MaxRetriesPerEvent int `yaml:"maxRetriesPerEvent"` + // Batches are processed + BatchIntervalSeconds int `yaml:"batchIntervalSeconds"` + // TODO: this doesn't do anything! + BatchTimeoutSeconds int `yaml:"batchTimeoutSeconds"` } -func NewWriter(cfg WriterConfig, cb Callback) *Writer { - return &Writer{ +func NewBufferWriter(cfg BufferWriterConfig, cb Callback) *BufferWriter { + log.Info().Msgf("New Buffer Writer created with config: %+v", cfg) + return &BufferWriter{ cfg: cfg, Handler: cb, - buffer: make([]bufferItem, cfg.BatchSize), + buffer: make([]bufferItem, cfg.BatchSizeEvents), } } // Indicates the start to accept the -func (w *Writer) Start() { +func (w *BufferWriter) Start() { w.done = make(chan bool) w.items = make(chan interface{}) w.stopDone = make(chan bool) - ticker := time.NewTicker(w.cfg.Interval) + ticker := time.NewTicker(time.Duration(w.cfg.BatchIntervalSeconds) * time.Second) go func() { shouldGoOn := true for shouldGoOn { select { case item := <-w.items: - if w.len >= w.cfg.BatchSize { + if w.len >= w.cfg.BatchSizeEvents { w.processBuffer(context.Background()) w.len = 0 } @@ -73,7 +79,7 @@ func (w *Writer) Start() { }() } -func (w *Writer) processBuffer(ctx context.Context) { +func (w *BufferWriter) processBuffer(ctx context.Context) { if w.len == 0 { return } @@ -93,7 +99,7 @@ func (w *Writer) processBuffer(ctx context.Context) { for idx, success := range responses { if !success { item := w.buffer[idx] - if item.attempt >= w.cfg.MaxRetries { + if item.attempt >= w.cfg.MaxRetriesPerEvent { // It's dropped, sorry you asked for it continue } @@ -112,13 +118,13 @@ func (w *Writer) processBuffer(ctx context.Context) { } // Used to signal writer to stop processing items and exit. -func (w *Writer) Stop() { +func (w *BufferWriter) Stop() { w.done <- true <-w.stopDone } // Submit pushes the items to the income buffer and they are placed onto the actual buffer from there. -func (w *Writer) Submit(items ...interface{}) { +func (w *BufferWriter) Submit(items ...interface{}) { for _, item := range items { w.items <- item } diff --git a/pkg/batch/writer_test.go b/pkg/batch/writer_test.go index 6edfb75b..d6e3c327 100644 --- a/pkg/batch/writer_test.go +++ b/pkg/batch/writer_test.go @@ -2,20 +2,21 @@ package batch import ( "context" - "github.com/stretchr/testify/assert" "testing" "time" + + "github.com/stretchr/testify/assert" ) func TestSimpleWriter(t *testing.T) { - cfg := WriterConfig{ - BatchSize: 10, - MaxRetries: 3, - Interval: time.Second * 2, + cfg := BufferWriterConfig{ + BatchSizeEvents: 10, + MaxRetriesPerEvent: 3, + BatchIntervalSeconds: 2, } var allItems []interface{} - w := NewWriter(cfg, func(ctx context.Context, items []interface{}) []bool { + w := NewBufferWriter(cfg, func(ctx context.Context, items []interface{}) []bool { resp := make([]bool, len(items)) for idx := range resp { resp[idx] = true @@ -44,14 +45,14 @@ func TestCorrectnessManyTimes(t *testing.T) { } func TestLargerThanBatchSize(t *testing.T) { - cfg := WriterConfig{ - BatchSize: 3, - MaxRetries: 3, - Interval: time.Second * 2, + cfg := BufferWriterConfig{ + BatchSizeEvents: 3, + MaxRetriesPerEvent: 3, + BatchIntervalSeconds: 2, } allItems := make([][]interface{}, 0) - w := NewWriter(cfg, func(ctx context.Context, items []interface{}) []bool { + w := NewBufferWriter(cfg, func(ctx context.Context, items []interface{}) []bool { resp := make([]bool, len(items)) for idx := range resp { resp[idx] = true @@ -72,14 +73,14 @@ func TestLargerThanBatchSize(t *testing.T) { } func TestSimpleInterval(t *testing.T) { - cfg := WriterConfig{ - BatchSize: 5, - MaxRetries: 3, - Interval: time.Millisecond * 20, + cfg := BufferWriterConfig{ + BatchSizeEvents: 5, + MaxRetriesPerEvent: 3, + BatchIntervalSeconds: 1, } allItems := make([][]interface{}, 0) - w := NewWriter(cfg, func(ctx context.Context, items []interface{}) []bool { + w := NewBufferWriter(cfg, func(ctx context.Context, items []interface{}) []bool { resp := make([]bool, len(items)) for idx := range resp { resp[idx] = true @@ -94,7 +95,7 @@ func TestSimpleInterval(t *testing.T) { time.Sleep(time.Millisecond * 5) assert.Len(t, allItems, 0) - time.Sleep(time.Millisecond * 50) + time.Sleep(time.Second * 2) assert.Len(t, allItems, 1) assert.Equal(t, allItems[0], []interface{}{1, 2}) @@ -103,14 +104,14 @@ func TestSimpleInterval(t *testing.T) { } func TestIntervalComplex(t *testing.T) { - cfg := WriterConfig{ - BatchSize: 5, - MaxRetries: 3, - Interval: time.Millisecond * 20, + cfg := BufferWriterConfig{ + BatchSizeEvents: 5, + MaxRetriesPerEvent: 3, + BatchIntervalSeconds: 1, } allItems := make([][]interface{}, 0) - w := NewWriter(cfg, func(ctx context.Context, items []interface{}) []bool { + w := NewBufferWriter(cfg, func(ctx context.Context, items []interface{}) []bool { resp := make([]bool, len(items)) for idx := range resp { resp[idx] = true @@ -126,7 +127,7 @@ func TestIntervalComplex(t *testing.T) { w.Submit(3, 4) assert.Len(t, allItems, 0) - time.Sleep(time.Millisecond * 50) + time.Sleep(time.Second * 2) assert.Len(t, allItems, 1) assert.Equal(t, allItems[0], []interface{}{1, 2, 3, 4}) @@ -135,14 +136,14 @@ func TestIntervalComplex(t *testing.T) { } func TestIntervalComplexAfterFlush(t *testing.T) { - cfg := WriterConfig{ - BatchSize: 5, - MaxRetries: 3, - Interval: time.Millisecond * 20, + cfg := BufferWriterConfig{ + BatchSizeEvents: 5, + MaxRetriesPerEvent: 3, + BatchIntervalSeconds: 1, } allItems := make([][]interface{}, 0) - w := NewWriter(cfg, func(ctx context.Context, items []interface{}) []bool { + w := NewBufferWriter(cfg, func(ctx context.Context, items []interface{}) []bool { resp := make([]bool, len(items)) for idx := range resp { resp[idx] = true @@ -158,7 +159,7 @@ func TestIntervalComplexAfterFlush(t *testing.T) { w.Submit(3, 4) assert.Len(t, allItems, 0) - time.Sleep(time.Millisecond * 50) + time.Sleep(time.Second * 2) assert.Len(t, allItems, 1) assert.Equal(t, allItems[0], []interface{}{1, 2, 3, 4}) @@ -170,14 +171,14 @@ func TestIntervalComplexAfterFlush(t *testing.T) { } func TestRetry(t *testing.T) { - cfg := WriterConfig{ - BatchSize: 5, - MaxRetries: 3, - Interval: time.Millisecond * 10, + cfg := BufferWriterConfig{ + BatchSizeEvents: 5, + MaxRetriesPerEvent: 3, + BatchIntervalSeconds: 1, } allItems := make([][]interface{}, 0) - w := NewWriter(cfg, func(ctx context.Context, items []interface{}) []bool { + w := NewBufferWriter(cfg, func(ctx context.Context, items []interface{}) []bool { resp := make([]bool, len(items)) for idx := range resp { resp[idx] = items[idx] != 2 @@ -191,7 +192,7 @@ func TestRetry(t *testing.T) { w.Submit(1, 2, 3) assert.Len(t, allItems, 0) - time.Sleep(time.Millisecond * 200) + time.Sleep(time.Second * 5) assert.Len(t, allItems, 4) assert.Equal(t, allItems[0], []interface{}{1, 2, 3}) diff --git a/pkg/sinks/bigquery.go b/pkg/sinks/bigquery.go index 7bc09f31..6cd6c6f6 100644 --- a/pkg/sinks/bigquery.go +++ b/pkg/sinks/bigquery.go @@ -2,19 +2,20 @@ package sinks import ( "bufio" - "cloud.google.com/go/bigquery" "context" "encoding/json" "errors" "fmt" - "github.com/resmoio/kubernetes-event-exporter/pkg/batch" - "github.com/resmoio/kubernetes-event-exporter/pkg/kube" - "github.com/rs/zerolog/log" - "google.golang.org/api/option" "math/rand" "os" "time" "unicode" + + "cloud.google.com/go/bigquery" + "github.com/resmoio/kubernetes-event-exporter/pkg/batch" + "github.com/resmoio/kubernetes-event-exporter/pkg/kube" + "github.com/rs/zerolog/log" + "google.golang.org/api/option" ) // Returns a map filtering out keys that have nil value assigned. @@ -206,12 +207,12 @@ func NewBigQuerySink(cfg *BigQueryConfig) (*BigQuerySink, error) { log.Error().Msgf("BigQuerySink create dataset failed: %v", err) } - batchWriter := batch.NewWriter( - batch.WriterConfig{ - BatchSize: cfg.BatchSize, - MaxRetries: cfg.MaxRetries, - Interval: time.Duration(cfg.IntervalSeconds) * time.Second, - Timeout: time.Duration(cfg.TimeoutSeconds) * time.Second, + batchWriter := batch.NewBufferWriter( + batch.BufferWriterConfig{ + BatchSizeEvents: cfg.BatchSize, + MaxRetriesPerEvent: cfg.MaxRetries, + BatchIntervalSeconds: cfg.IntervalSeconds, + BatchTimeoutSeconds: cfg.TimeoutSeconds, }, handleBatch, ) @@ -221,7 +222,7 @@ func NewBigQuerySink(cfg *BigQueryConfig) (*BigQuerySink, error) { } type BigQuerySink struct { - batchWriter *batch.Writer + batchWriter *batch.BufferWriter } func (e *BigQuerySink) Send(ctx context.Context, ev *kube.EnhancedEvent) error { diff --git a/pkg/sinks/opensearch.go b/pkg/sinks/opensearch.go index 6f58507d..3207bc3c 100644 --- a/pkg/sinks/opensearch.go +++ b/pkg/sinks/opensearch.go @@ -13,6 +13,7 @@ import ( opensearch "github.com/opensearch-project/opensearch-go" opensearchapi "github.com/opensearch-project/opensearch-go/opensearchapi" + "github.com/resmoio/kubernetes-event-exporter/pkg/batch" "github.com/resmoio/kubernetes-event-exporter/pkg/kube" "github.com/rs/zerolog/log" ) @@ -28,9 +29,14 @@ type OpenSearchConfig struct { DeDot bool `yaml:"deDot"` Index string `yaml:"index"` IndexFormat string `yaml:"indexFormat"` - Type string `yaml:"type"` TLS TLS `yaml:"tls"` Layout map[string]interface{} `yaml:"layout"` + // The key to place the greater of eventTime and lastTimestamp into. Useful for datastreams. + CombineTimestampTo string `yaml:"combineTimestampTo"` + + // Batching options. If this block is not present, events are sent individually. + // Note that opensearch calls this "bulk". + Batch *batch.BufferWriterConfig `yaml:"batch"` } func NewOpenSearch(cfg *OpenSearchConfig) (*OpenSearch, error) { @@ -52,15 +58,30 @@ func NewOpenSearch(cfg *OpenSearchConfig) (*OpenSearch, error) { return nil, err } - return &OpenSearch{ + // Init the object first + ptr := &OpenSearch{ client: client, cfg: cfg, - }, nil + batch: nil, + } + + // Initialize the batch writer if needed. + var bufferWriter *batch.BufferWriter + if cfg.Batch != nil { + bufferWriter = batch.NewBufferWriter(*cfg.Batch, ptr.SendBatch) + bufferWriter.Start() + } + + // Update the object with the writer. + ptr.batch = bufferWriter + + return ptr, nil } type OpenSearch struct { client *opensearch.Client cfg *OpenSearchConfig + batch *batch.BufferWriter } var osRegex = regexp.MustCompile(`(?s){(.*)}`) @@ -83,45 +104,93 @@ func osFormatIndexName(pattern string, when time.Time) string { return builder.String() } -func (e *OpenSearch) Send(ctx context.Context, ev *kube.EnhancedEvent) error { +func (e *OpenSearch) getIndex() string { + var index string + if len(e.cfg.IndexFormat) > 0 { + now := time.Now() + index = osFormatIndexName(e.cfg.IndexFormat, now) + } else { + index = e.cfg.Index + } + return index +} + +func (e *OpenSearch) prepareEvent(ev *kube.EnhancedEvent) ([]byte, error) { var toSend []byte + // DeDot the event if needed if e.cfg.DeDot { de := ev.DeDot() ev = &de } + // Handle layout if set if e.cfg.Layout != nil { res, err := convertLayoutTemplate(e.cfg.Layout, ev) if err != nil { - return err + return nil, err } toSend, err = json.Marshal(res) if err != nil { - return err + return nil, err } } else { toSend = ev.ToJSON() + if e.cfg.CombineTimestampTo != "" { + // Janky approach but avoids modifying the underlying functions or struct, for a fairly + // sink-specific need. + var jsonEvent map[string]interface{} + // We just marshalled, an error here would be unbelievable. + json.Unmarshal(toSend, &jsonEvent) + if _, ok := jsonEvent[e.cfg.CombineTimestampTo]; ok { + // We don't want to overwrite the existing value. + return nil, fmt.Errorf("key '%s' already exists in event", e.cfg.CombineTimestampTo) + } + // Can't use .GetTimestamp* since that's *first* timestamp. + var timestamp string + if !ev.LastTimestamp.Time.IsZero() { + timestamp = ev.LastTimestamp.Time.Format(time.RFC3339) + } else { + timestamp = ev.EventTime.Time.Format(time.RFC3339) + } + jsonEvent[e.cfg.CombineTimestampTo] = timestamp + var err error + toSend, err = json.Marshal(jsonEvent) + if err != nil { + // Most likely a strange value in the key. + log.Error().Msgf("Failed to marshal event with timestamp '%s' in key '%s': %s", ev.GetTimestampISO8601(), e.cfg.CombineTimestampTo, err) + return nil, err + } + } } - var index string - if len(e.cfg.IndexFormat) > 0 { - now := time.Now() - index = osFormatIndexName(e.cfg.IndexFormat, now) + return toSend, nil +} + +func (e *OpenSearch) Send(ctx context.Context, ev *kube.EnhancedEvent) error { + if e.batch != nil { + log.Debug().Msgf("Event %s submitted to batch", ev.Message) + e.batch.Submit(ev) + return nil } else { - index = e.cfg.Index + log.Debug().Msgf("Sending event %s individually", ev.Message) + return e.SendSingle(ctx, ev) + } +} + +func (e *OpenSearch) SendSingle(ctx context.Context, ev *kube.EnhancedEvent) error { + toSend, err := e.prepareEvent(ev) + if err != nil { + return err } + index := e.getIndex() + req := opensearchapi.IndexRequest{ Body: bytes.NewBuffer(toSend), Index: index, } - // This should not be used for clusters with ES8.0+. - if len(e.cfg.Type) > 0 { - req.DocumentType = e.cfg.Type - } - if e.cfg.UseEventID { req.DocumentID = string(ev.UID) } @@ -142,6 +211,124 @@ func (e *OpenSearch) Send(ctx context.Context, ev *kube.EnhancedEvent) error { return nil } +func (e *OpenSearch) PrepareBatchItem(item interface{}) (string, error) { + toSend, err := e.prepareEvent(item.(*kube.EnhancedEvent)) + if err != nil { + return "", err + } + + var firstline string + var secondline string + if e.cfg.UseEventID { + // Must upsert the event + firstline = fmt.Sprintf(`{"update": {"_index": "%s", "_id": "%s"}}`, e.getIndex(), item.(*kube.EnhancedEvent).UID) + secondline = fmt.Sprintf(`{"doc": %s, "doc_as_upsert": true}`, string(toSend)) + } else { + // Every event is unique, so they should all be creation. + firstline = fmt.Sprintf(`{"create": {"_index": "%s"}}`, e.getIndex()) + secondline = string(toSend) + } + + return fmt.Sprintf("%s\n%s", firstline, secondline), nil +} + +// Unfortunately the opensearch client does not define non-generic response types. +type BulkResponseItem struct { + Id string `json:"_id"` + Index string `json:"_index"` + Result string `json:"result"` + Status int `json:"status"` + Error struct { + Type string `json:"type"` + Reason string `json:"reason"` + } `json:"error"` +} +type BulkResponseItemWrapper struct { + // Only one of these will be present. + Update BulkResponseItem `json:"update"` + Create BulkResponseItem `json:"create"` +} +type BulkResponse struct { + Errors bool `json:"errors"` + Items []BulkResponseItemWrapper `json:"items"` +} + +func (e *OpenSearch) SendBatch(ctx context.Context, items []interface{}) []bool { + res := make([]bool, len(items)) + for i := range items { + res[i] = true + } + + bulkItems := make([]string, len(items)) + failed := 0 + for i, item := range items { + prepared, err := e.PrepareBatchItem(item) + if err != nil { + log.Error().Msgf("Could not prepare batch item: %s", err) + failed++ + continue + } + // Skip any failed items so empty items are left at the end. + bulkItems[i-failed] = prepared + } + + if failed > 0 { + log.Error().Msgf("Failed to prepare %d batch items", failed) + } + // Remove all empty items. + bulkItems = bulkItems[:len(items)-failed] + // Make sure there's a list left. + if len(bulkItems) == 0 { + log.Error().Msg("No items to send!") + return []bool{} + } + + bulk := opensearchapi.BulkRequest{ + // For some reason, bulk requests must have a trailing newline. + Body: bytes.NewBuffer([]byte(strings.Join(bulkItems, "\n") + "\n")), + } + + // Actually send the request. + resp, err := bulk.Do(ctx, e.client) + // Request failures are likely related to connection issues. + if err != nil { + log.Error().Msgf("Could not send bulk request: %s", err) + // Mark all items as failed + for i := range items { + res[i] = false + } + return res + } + + log.Debug().Msgf("Bulk request of %d items was processed.", len(items)) + + defer resp.Body.Close() + + // Deeper error inspection -- it could be that the request succeeded but some items failed. + var bulkResponse BulkResponse + json.NewDecoder(resp.Body).Decode(&bulkResponse) + + if bulkResponse.Errors { + for i, item := range bulkResponse.Items { + var inner BulkResponseItem + if item.Update != (BulkResponseItem{}) { + inner = item.Update + } else { + inner = item.Create + } + if inner.Error.Type != "" { + res[i] = false + log.Error().Msgf("Document %s failed to index in index %s: %s (status %d)", inner.Id, inner.Index, inner.Error.Reason, inner.Status) + } + } + } + + return res +} + func (e *OpenSearch) Close() { - // No-op + if e.batch != nil { + e.batch.Stop() + } + // No-op if no batch writer, }