Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,15 @@ FROM golang:1.20 AS builder
ARG VERSION
ENV PKG github.com/resmoio/kubernetes-event-exporter/pkg

ADD . /app
WORKDIR /app

# Build deps first to improve local build times when iterating.
ADD go.mod go.sum ./

RUN go mod download

# Then add the rest of the code.
ADD . ./
RUN CGO_ENABLED=0 GOOS=linux GO11MODULE=on go build -ldflags="-s -w -X ${PKG}/version.Version=${VERSION}" -a -o /main .

FROM gcr.io/distroless/static:nonroot
Expand Down
44 changes: 25 additions & 19 deletions pkg/batch/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@ package batch
import (
"context"
"time"
)

"github.com/pingcap/log"
)

// Writer allows to buffer some items and call the Handler function either when the buffer is full or the timeout is
// BufferWriter allows to buffer some items and call the Handler function either when the buffer is full or the timeout is
// reached. There will also be support for concurrency for high volume. The handler function is supposed to return an
// array of booleans to indicate whether the transfer was successful or not. It can be replaced with status codes in
// the future to differentiate I/O errors, rate limiting, authorization issues.
type Writer struct {
cfg WriterConfig
type BufferWriter struct {
cfg BufferWriterConfig
Handler Callback
buffer []bufferItem
len int
Expand All @@ -27,34 +28,39 @@ type bufferItem struct {

type Callback func(ctx context.Context, items []interface{}) []bool

type WriterConfig struct {
BatchSize int
MaxRetries int
Interval time.Duration
Timeout time.Duration
type BufferWriterConfig struct {
// Max events queued for a batch before a flush.
BatchSizeEvents int `yaml:"batchSizeEvents"`
// Max retries for each individual event.
MaxRetriesPerEvent int `yaml:"maxRetriesPerEvent"`
// Batches are processed
BatchIntervalSeconds int `yaml:"batchIntervalSeconds"`
// TODO: this doesn't do anything!
BatchTimeoutSeconds int `yaml:"batchTimeoutSeconds"`
}

func NewWriter(cfg WriterConfig, cb Callback) *Writer {
return &Writer{
func NewBufferWriter(cfg BufferWriterConfig, cb Callback) *BufferWriter {
log.Info().Msgf("New Buffer Writer created with config: %+v", cfg)
return &BufferWriter{
cfg: cfg,
Handler: cb,
buffer: make([]bufferItem, cfg.BatchSize),
buffer: make([]bufferItem, cfg.BatchSizeEvents),
}
}

// Indicates the start to accept the
func (w *Writer) Start() {
func (w *BufferWriter) Start() {
w.done = make(chan bool)
w.items = make(chan interface{})
w.stopDone = make(chan bool)
ticker := time.NewTicker(w.cfg.Interval)
ticker := time.NewTicker(time.Duration(w.cfg.BatchIntervalSeconds) * time.Second)

go func() {
shouldGoOn := true
for shouldGoOn {
select {
case item := <-w.items:
if w.len >= w.cfg.BatchSize {
if w.len >= w.cfg.BatchSizeEvents {
w.processBuffer(context.Background())
w.len = 0
}
Expand All @@ -73,7 +79,7 @@ func (w *Writer) Start() {
}()
}

func (w *Writer) processBuffer(ctx context.Context) {
func (w *BufferWriter) processBuffer(ctx context.Context) {
if w.len == 0 {
return
}
Expand All @@ -93,7 +99,7 @@ func (w *Writer) processBuffer(ctx context.Context) {
for idx, success := range responses {
if !success {
item := w.buffer[idx]
if item.attempt >= w.cfg.MaxRetries {
if item.attempt >= w.cfg.MaxRetriesPerEvent {
// It's dropped, sorry you asked for it
continue
}
Expand All @@ -112,13 +118,13 @@ func (w *Writer) processBuffer(ctx context.Context) {
}

// Used to signal writer to stop processing items and exit.
func (w *Writer) Stop() {
func (w *BufferWriter) Stop() {
w.done <- true
<-w.stopDone
}

// Submit pushes the items to the income buffer and they are placed onto the actual buffer from there.
func (w *Writer) Submit(items ...interface{}) {
func (w *BufferWriter) Submit(items ...interface{}) {
for _, item := range items {
w.items <- item
}
Expand Down
71 changes: 36 additions & 35 deletions pkg/batch/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,21 @@ package batch

import (
"context"
"github.com/stretchr/testify/assert"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestSimpleWriter(t *testing.T) {
cfg := WriterConfig{
BatchSize: 10,
MaxRetries: 3,
Interval: time.Second * 2,
cfg := BufferWriterConfig{
BatchSizeEvents: 10,
MaxRetriesPerEvent: 3,
BatchIntervalSeconds: 2,
}

var allItems []interface{}
w := NewWriter(cfg, func(ctx context.Context, items []interface{}) []bool {
w := NewBufferWriter(cfg, func(ctx context.Context, items []interface{}) []bool {
resp := make([]bool, len(items))
for idx := range resp {
resp[idx] = true
Expand Down Expand Up @@ -44,14 +45,14 @@ func TestCorrectnessManyTimes(t *testing.T) {
}

func TestLargerThanBatchSize(t *testing.T) {
cfg := WriterConfig{
BatchSize: 3,
MaxRetries: 3,
Interval: time.Second * 2,
cfg := BufferWriterConfig{
BatchSizeEvents: 3,
MaxRetriesPerEvent: 3,
BatchIntervalSeconds: 2,
}

allItems := make([][]interface{}, 0)
w := NewWriter(cfg, func(ctx context.Context, items []interface{}) []bool {
w := NewBufferWriter(cfg, func(ctx context.Context, items []interface{}) []bool {
resp := make([]bool, len(items))
for idx := range resp {
resp[idx] = true
Expand All @@ -72,14 +73,14 @@ func TestLargerThanBatchSize(t *testing.T) {
}

func TestSimpleInterval(t *testing.T) {
cfg := WriterConfig{
BatchSize: 5,
MaxRetries: 3,
Interval: time.Millisecond * 20,
cfg := BufferWriterConfig{
BatchSizeEvents: 5,
MaxRetriesPerEvent: 3,
BatchIntervalSeconds: 1,
}

allItems := make([][]interface{}, 0)
w := NewWriter(cfg, func(ctx context.Context, items []interface{}) []bool {
w := NewBufferWriter(cfg, func(ctx context.Context, items []interface{}) []bool {
resp := make([]bool, len(items))
for idx := range resp {
resp[idx] = true
Expand All @@ -94,7 +95,7 @@ func TestSimpleInterval(t *testing.T) {
time.Sleep(time.Millisecond * 5)
assert.Len(t, allItems, 0)

time.Sleep(time.Millisecond * 50)
time.Sleep(time.Second * 2)
assert.Len(t, allItems, 1)
assert.Equal(t, allItems[0], []interface{}{1, 2})

Expand All @@ -103,14 +104,14 @@ func TestSimpleInterval(t *testing.T) {
}

func TestIntervalComplex(t *testing.T) {
cfg := WriterConfig{
BatchSize: 5,
MaxRetries: 3,
Interval: time.Millisecond * 20,
cfg := BufferWriterConfig{
BatchSizeEvents: 5,
MaxRetriesPerEvent: 3,
BatchIntervalSeconds: 1,
}

allItems := make([][]interface{}, 0)
w := NewWriter(cfg, func(ctx context.Context, items []interface{}) []bool {
w := NewBufferWriter(cfg, func(ctx context.Context, items []interface{}) []bool {
resp := make([]bool, len(items))
for idx := range resp {
resp[idx] = true
Expand All @@ -126,7 +127,7 @@ func TestIntervalComplex(t *testing.T) {
w.Submit(3, 4)
assert.Len(t, allItems, 0)

time.Sleep(time.Millisecond * 50)
time.Sleep(time.Second * 2)
assert.Len(t, allItems, 1)
assert.Equal(t, allItems[0], []interface{}{1, 2, 3, 4})

Expand All @@ -135,14 +136,14 @@ func TestIntervalComplex(t *testing.T) {
}

func TestIntervalComplexAfterFlush(t *testing.T) {
cfg := WriterConfig{
BatchSize: 5,
MaxRetries: 3,
Interval: time.Millisecond * 20,
cfg := BufferWriterConfig{
BatchSizeEvents: 5,
MaxRetriesPerEvent: 3,
BatchIntervalSeconds: 1,
}

allItems := make([][]interface{}, 0)
w := NewWriter(cfg, func(ctx context.Context, items []interface{}) []bool {
w := NewBufferWriter(cfg, func(ctx context.Context, items []interface{}) []bool {
resp := make([]bool, len(items))
for idx := range resp {
resp[idx] = true
Expand All @@ -158,7 +159,7 @@ func TestIntervalComplexAfterFlush(t *testing.T) {
w.Submit(3, 4)
assert.Len(t, allItems, 0)

time.Sleep(time.Millisecond * 50)
time.Sleep(time.Second * 2)
assert.Len(t, allItems, 1)
assert.Equal(t, allItems[0], []interface{}{1, 2, 3, 4})

Expand All @@ -170,14 +171,14 @@ func TestIntervalComplexAfterFlush(t *testing.T) {
}

func TestRetry(t *testing.T) {
cfg := WriterConfig{
BatchSize: 5,
MaxRetries: 3,
Interval: time.Millisecond * 10,
cfg := BufferWriterConfig{
BatchSizeEvents: 5,
MaxRetriesPerEvent: 3,
BatchIntervalSeconds: 1,
}

allItems := make([][]interface{}, 0)
w := NewWriter(cfg, func(ctx context.Context, items []interface{}) []bool {
w := NewBufferWriter(cfg, func(ctx context.Context, items []interface{}) []bool {
resp := make([]bool, len(items))
for idx := range resp {
resp[idx] = items[idx] != 2
Expand All @@ -191,7 +192,7 @@ func TestRetry(t *testing.T) {
w.Submit(1, 2, 3)
assert.Len(t, allItems, 0)

time.Sleep(time.Millisecond * 200)
time.Sleep(time.Second * 5)
assert.Len(t, allItems, 4)

assert.Equal(t, allItems[0], []interface{}{1, 2, 3})
Expand Down
25 changes: 13 additions & 12 deletions pkg/sinks/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,20 @@ package sinks

import (
"bufio"
"cloud.google.com/go/bigquery"
"context"
"encoding/json"
"errors"
"fmt"
"github.com/resmoio/kubernetes-event-exporter/pkg/batch"
"github.com/resmoio/kubernetes-event-exporter/pkg/kube"
"github.com/rs/zerolog/log"
"google.golang.org/api/option"
"math/rand"
"os"
"time"
"unicode"

"cloud.google.com/go/bigquery"
"github.com/resmoio/kubernetes-event-exporter/pkg/batch"
"github.com/resmoio/kubernetes-event-exporter/pkg/kube"
"github.com/rs/zerolog/log"
"google.golang.org/api/option"
)

// Returns a map filtering out keys that have nil value assigned.
Expand Down Expand Up @@ -206,12 +207,12 @@ func NewBigQuerySink(cfg *BigQueryConfig) (*BigQuerySink, error) {
log.Error().Msgf("BigQuerySink create dataset failed: %v", err)
}

batchWriter := batch.NewWriter(
batch.WriterConfig{
BatchSize: cfg.BatchSize,
MaxRetries: cfg.MaxRetries,
Interval: time.Duration(cfg.IntervalSeconds) * time.Second,
Timeout: time.Duration(cfg.TimeoutSeconds) * time.Second,
batchWriter := batch.NewBufferWriter(
batch.BufferWriterConfig{
BatchSizeEvents: cfg.BatchSize,
MaxRetriesPerEvent: cfg.MaxRetries,
BatchIntervalSeconds: cfg.IntervalSeconds,
BatchTimeoutSeconds: cfg.TimeoutSeconds,
},
handleBatch,
)
Expand All @@ -221,7 +222,7 @@ func NewBigQuerySink(cfg *BigQueryConfig) (*BigQuerySink, error) {
}

type BigQuerySink struct {
batchWriter *batch.Writer
batchWriter *batch.BufferWriter
}

func (e *BigQuerySink) Send(ctx context.Context, ev *kube.EnhancedEvent) error {
Expand Down
Loading