diff --git a/README.md b/README.md index 98cc951d..4c052bc6 100644 --- a/README.md +++ b/README.md @@ -74,6 +74,24 @@ receivers: * A route can have many sub-routes, forming a tree. * Routing starts from the root route. +## sent or ignore event update +if an event occur many times, It will only update the event fields like ```count``` and ```lastTimestamp```. +By default ```kubernetes-event-exporter``` will ignore the eventUpdate and will not sent to the recivers. +If you Don't want to miss every event,you can use trigger ```sentUpdateEvent``` configurated in each sink to controll whether sent the event to the reciver. +```azure +sentUpdateEvent true|false (default false) +true: sent every matching event to the reciver including when the event updated +false: ignore the event updted +``` + +for example: +```azure +receivers: + - name: "dump" + stdout: + sentUpdateEvent: true +``` + ## Using Secrets In your config file, you can refer to environment variables as `${API_KEY}` therefore you can use ConfigMap or Secrets diff --git a/pkg/batch/writer.go b/pkg/batch/writer.go index e484d332..4bdc1ee4 100644 --- a/pkg/batch/writer.go +++ b/pkg/batch/writer.go @@ -5,7 +5,6 @@ import ( "time" ) - // Writer allows to buffer some items and call the Handler function either when the buffer is full or the timeout is // reached. There will also be support for concurrency for high volume. The handler function is supposed to return an // array of booleans to indicate whether the transfer was successful or not. It can be replaced with status codes in diff --git a/pkg/batch/writer_test.go b/pkg/batch/writer_test.go index 6edfb75b..345b0f57 100644 --- a/pkg/batch/writer_test.go +++ b/pkg/batch/writer_test.go @@ -2,9 +2,10 @@ package batch import ( "context" - "github.com/stretchr/testify/assert" "testing" "time" + + "github.com/stretchr/testify/assert" ) func TestSimpleWriter(t *testing.T) { diff --git a/pkg/exporter/channel_registry.go b/pkg/exporter/channel_registry.go index 39db655a..95d29601 100644 --- a/pkg/exporter/channel_registry.go +++ b/pkg/exporter/channel_registry.go @@ -16,9 +16,9 @@ import ( // and we might need a mechanism to drop the vents // On closing, the registry sends a signal on all exit channels, and then waits for all to complete. type ChannelBasedReceiverRegistry struct { - ch map[string]chan kube.EnhancedEvent - exitCh map[string]chan interface{} - wg *sync.WaitGroup + ch map[string]chan kube.EnhancedEvent + exitCh map[string]chan interface{} + wg *sync.WaitGroup MetricsStore *metrics.Store } diff --git a/pkg/exporter/engine_test.go b/pkg/exporter/engine_test.go index 380ece71..f570d622 100644 --- a/pkg/exporter/engine_test.go +++ b/pkg/exporter/engine_test.go @@ -1,10 +1,11 @@ package exporter import ( + "testing" + "github.com/resmoio/kubernetes-event-exporter/pkg/kube" "github.com/resmoio/kubernetes-event-exporter/pkg/sinks" "github.com/stretchr/testify/assert" - "testing" ) func TestEngineNoRoutes(t *testing.T) { diff --git a/pkg/exporter/route_test.go b/pkg/exporter/route_test.go index a8b514c5..8f06d42a 100644 --- a/pkg/exporter/route_test.go +++ b/pkg/exporter/route_test.go @@ -1,10 +1,11 @@ package exporter import ( + "testing" + "github.com/resmoio/kubernetes-event-exporter/pkg/kube" "github.com/resmoio/kubernetes-event-exporter/pkg/sinks" "github.com/stretchr/testify/assert" - "testing" ) // testReceiverRegistry just records the events to the registry so that tests can validate routing behavior diff --git a/pkg/exporter/rule_test.go b/pkg/exporter/rule_test.go index d95afeaf..f1c67282 100644 --- a/pkg/exporter/rule_test.go +++ b/pkg/exporter/rule_test.go @@ -1,9 +1,10 @@ package exporter import ( + "testing" + "github.com/resmoio/kubernetes-event-exporter/pkg/kube" "github.com/stretchr/testify/assert" - "testing" ) func TestEmptyRule(t *testing.T) { diff --git a/pkg/exporter/sync_registry.go b/pkg/exporter/sync_registry.go index 8c6294ec..92a5f0f9 100644 --- a/pkg/exporter/sync_registry.go +++ b/pkg/exporter/sync_registry.go @@ -2,6 +2,7 @@ package exporter import ( "context" + "github.com/resmoio/kubernetes-event-exporter/pkg/kube" "github.com/resmoio/kubernetes-event-exporter/pkg/sinks" "github.com/rs/zerolog/log" diff --git a/pkg/kube/client.go b/pkg/kube/client.go index 5ca43ba7..012ded2a 100644 --- a/pkg/kube/client.go +++ b/pkg/kube/client.go @@ -1,10 +1,11 @@ package kube import ( + "os" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" - "os" ) // GetKubernetesClient returns the client if it's possible in cluster, otherwise tries to read HOME diff --git a/pkg/kube/event.go b/pkg/kube/event.go index e7db644f..6584f417 100644 --- a/pkg/kube/event.go +++ b/pkg/kube/event.go @@ -11,6 +11,7 @@ import ( type EnhancedEvent struct { corev1.Event `json:",inline"` + IsUpdateEvent bool `json:"isUpdateEvent"` ClusterName string `json:"clusterName"` InvolvedObject EnhancedObjectReference `json:"involvedObject"` } diff --git a/pkg/kube/event_test.go b/pkg/kube/event_test.go index 306675c8..08656af5 100644 --- a/pkg/kube/event_test.go +++ b/pkg/kube/event_test.go @@ -1,10 +1,11 @@ package kube import ( + "testing" + "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "testing" ) func TestEnhancedEvent_DeDot(t *testing.T) { diff --git a/pkg/kube/watcher.go b/pkg/kube/watcher.go index ca3e17a7..23f8ca4a 100644 --- a/pkg/kube/watcher.go +++ b/pkg/kube/watcher.go @@ -57,11 +57,13 @@ func NewEventWatcher(config *rest.Config, namespace string, MaxEventAgeSeconds i func (e *EventWatcher) OnAdd(obj interface{}) { event := obj.(*corev1.Event) - e.onEvent(event) + e.onEvent(event, false) } func (e *EventWatcher) OnUpdate(oldObj, newObj interface{}) { // Ignore updates + event := newObj.(*corev1.Event) + e.onEvent(event, true) } // Ignore events older than the maxEventAgeSeconds @@ -87,7 +89,7 @@ func (e *EventWatcher) isEventDiscarded(event *corev1.Event) bool { return false } -func (e *EventWatcher) onEvent(event *corev1.Event) { +func (e *EventWatcher) onEvent(event *corev1.Event, IsUpdateEvent bool) { if e.isEventDiscarded(event) { return } @@ -102,7 +104,8 @@ func (e *EventWatcher) onEvent(event *corev1.Event) { e.metricsStore.EventsProcessed.Inc() ev := &EnhancedEvent{ - Event: *event.DeepCopy(), + Event: *event.DeepCopy(), + IsUpdateEvent: IsUpdateEvent, } ev.Event.ManagedFields = nil diff --git a/pkg/sinks/bigquery.go b/pkg/sinks/bigquery.go index 7bc09f31..36a3bd64 100644 --- a/pkg/sinks/bigquery.go +++ b/pkg/sinks/bigquery.go @@ -2,19 +2,20 @@ package sinks import ( "bufio" - "cloud.google.com/go/bigquery" "context" "encoding/json" "errors" "fmt" - "github.com/resmoio/kubernetes-event-exporter/pkg/batch" - "github.com/resmoio/kubernetes-event-exporter/pkg/kube" - "github.com/rs/zerolog/log" - "google.golang.org/api/option" "math/rand" "os" "time" "unicode" + + "cloud.google.com/go/bigquery" + "github.com/resmoio/kubernetes-event-exporter/pkg/batch" + "github.com/resmoio/kubernetes-event-exporter/pkg/kube" + "github.com/rs/zerolog/log" + "google.golang.org/api/option" ) // Returns a map filtering out keys that have nil value assigned. @@ -138,6 +139,7 @@ func bigQueryImportJsonFromFile(path string, cfg *BigQueryConfig) error { } type BigQueryConfig struct { + SentUpdateEvent bool `yaml:"sentUpdateEvent,omitempty"` // BigQuery table config Location string `yaml:"location"` Project string `yaml:"project"` @@ -217,14 +219,19 @@ func NewBigQuerySink(cfg *BigQueryConfig) (*BigQuerySink, error) { ) batchWriter.Start() - return &BigQuerySink{batchWriter: batchWriter}, nil + return &BigQuerySink{batchWriter: batchWriter, config: cfg}, nil } type BigQuerySink struct { batchWriter *batch.Writer + config *BigQueryConfig } func (e *BigQuerySink) Send(ctx context.Context, ev *kube.EnhancedEvent) error { + // skip update event + if ev.IsUpdateEvent && !e.config.SentUpdateEvent { + return nil + } e.batchWriter.Submit(ev) return nil } diff --git a/pkg/sinks/elasticsearch.go b/pkg/sinks/elasticsearch.go index 780566f7..a1f41079 100644 --- a/pkg/sinks/elasticsearch.go +++ b/pkg/sinks/elasticsearch.go @@ -18,6 +18,7 @@ import ( ) type ElasticsearchConfig struct { + SentUpdateEvent bool `yaml:"sentUpdateEvent,omitempty"` // Connection specific Hosts []string `yaml:"hosts"` Username string `yaml:"username"` @@ -97,6 +98,10 @@ func formatIndexName(pattern string, when time.Time) string { } func (e *Elasticsearch) Send(ctx context.Context, ev *kube.EnhancedEvent) error { + // skip update event + if ev.IsUpdateEvent && !e.cfg.SentUpdateEvent { + return nil + } var toSend []byte if e.cfg.DeDot { diff --git a/pkg/sinks/eventbridge.go b/pkg/sinks/eventbridge.go index 918ee472..cbb88696 100644 --- a/pkg/sinks/eventbridge.go +++ b/pkg/sinks/eventbridge.go @@ -3,21 +3,23 @@ package sinks import ( "context" "encoding/json" + "time" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/client" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/eventbridge" "github.com/resmoio/kubernetes-event-exporter/pkg/kube" "github.com/rs/zerolog/log" - "time" ) type EventBridgeConfig struct { - DetailType string `yaml:"detailType"` - Details map[string]interface{} `yaml:"details"` - Source string `yaml:"source"` - EventBusName string `yaml:"eventBusName"` - Region string `yaml:"region"` + SentUpdateEvent bool `yaml:"sentUpdateEvent,omitempty"` + DetailType string `yaml:"detailType"` + Details map[string]interface{} `yaml:"details"` + Source string `yaml:"source"` + EventBusName string `yaml:"eventBusName"` + Region string `yaml:"region"` } type EventBridgeSink struct { @@ -49,6 +51,10 @@ func NewEventBridgeSink(cfg *EventBridgeConfig) (Sink, error) { } func (s *EventBridgeSink) Send(ctx context.Context, ev *kube.EnhancedEvent) error { + // skip update event + if ev.IsUpdateEvent && !s.cfg.SentUpdateEvent { + return nil + } log.Info().Msg("Sending event to EventBridge ") var toSend string if s.cfg.Details != nil { diff --git a/pkg/sinks/file.go b/pkg/sinks/file.go index ecd8df77..361d66eb 100644 --- a/pkg/sinks/file.go +++ b/pkg/sinks/file.go @@ -10,12 +10,13 @@ import ( ) type FileConfig struct { - Path string `yaml:"path"` - Layout map[string]interface{} `yaml:"layout"` - MaxSize int `yaml:"maxsize"` - MaxAge int `yaml:"maxage"` - MaxBackups int `yaml:"maxbackups"` - DeDot bool `yaml:"deDot"` + SentUpdateEvent bool `yaml:"sentUpdateEvent,omitempty"` + Path string `yaml:"path"` + Layout map[string]interface{} `yaml:"layout"` + MaxSize int `yaml:"maxsize"` + MaxAge int `yaml:"maxage"` + MaxBackups int `yaml:"maxbackups"` + DeDot bool `yaml:"deDot"` } func (f *FileConfig) Validate() error { @@ -27,6 +28,7 @@ type File struct { encoder *json.Encoder layout map[string]interface{} DeDot bool + config *FileConfig } func NewFileSink(config *FileConfig) (*File, error) { @@ -42,6 +44,7 @@ func NewFileSink(config *FileConfig) (*File, error) { encoder: json.NewEncoder(writer), layout: config.Layout, DeDot: config.DeDot, + config: config, }, nil } @@ -50,6 +53,10 @@ func (f *File) Close() { } func (f *File) Send(ctx context.Context, ev *kube.EnhancedEvent) error { + // skip update event + if ev.IsUpdateEvent && !f.config.SentUpdateEvent { + return nil + } if f.DeDot { de := ev.DeDot() ev = &de diff --git a/pkg/sinks/firehose.go b/pkg/sinks/firehose.go index 6b707f5e..58d12f0b 100644 --- a/pkg/sinks/firehose.go +++ b/pkg/sinks/firehose.go @@ -11,6 +11,7 @@ import ( ) type FirehoseConfig struct { + SentUpdateEvent bool `yaml:"sentUpdateEvent,omitempty"` DeliveryStreamName string `yaml:"deliveryStreamName"` Region string `yaml:"region"` Layout map[string]interface{} `yaml:"layout"` @@ -38,6 +39,10 @@ func NewFirehoseSink(cfg *FirehoseConfig) (Sink, error) { } func (f *FirehoseSink) Send(ctx context.Context, ev *kube.EnhancedEvent) error { + // skip update event + if ev.IsUpdateEvent && !f.cfg.SentUpdateEvent { + return nil + } var toSend []byte if f.cfg.DeDot { diff --git a/pkg/sinks/inmemory.go b/pkg/sinks/inmemory.go index c74a1199..6591da01 100644 --- a/pkg/sinks/inmemory.go +++ b/pkg/sinks/inmemory.go @@ -2,11 +2,13 @@ package sinks import ( "context" + "github.com/resmoio/kubernetes-event-exporter/pkg/kube" ) type InMemoryConfig struct { - Ref *InMemory + SentUpdateEvent bool `yaml:"sentUpdateEvent,omitempty"` + Ref *InMemory } type InMemory struct { @@ -15,6 +17,10 @@ type InMemory struct { } func (i *InMemory) Send(ctx context.Context, ev *kube.EnhancedEvent) error { + // skip update event + if ev.IsUpdateEvent && !i.Config.SentUpdateEvent { + return nil + } i.Events = append(i.Events, ev) return nil } @@ -22,5 +28,3 @@ func (i *InMemory) Send(ctx context.Context, ev *kube.EnhancedEvent) error { func (i *InMemory) Close() { // No-op } - - diff --git a/pkg/sinks/kafka.go b/pkg/sinks/kafka.go index cb7907fe..3a520c2a 100644 --- a/pkg/sinks/kafka.go +++ b/pkg/sinks/kafka.go @@ -14,6 +14,7 @@ import ( // KafkaConfig is the Kafka producer configuration type KafkaConfig struct { + SentUpdateEvent bool `yaml:"sentUpdateEvent,omitempty"` Topic string `yaml:"topic"` Brokers []string `yaml:"brokers"` Layout map[string]interface{} `yaml:"layout"` @@ -82,6 +83,10 @@ func NewKafkaSink(cfg *KafkaConfig) (Sink, error) { // Send an event to Kafka synchronously func (k *KafkaSink) Send(ctx context.Context, ev *kube.EnhancedEvent) error { + // skip update event + if ev.IsUpdateEvent && !k.cfg.SentUpdateEvent { + return nil + } var toSend []byte if k.cfg.Layout != nil { diff --git a/pkg/sinks/kinesis.go b/pkg/sinks/kinesis.go index 9c11067a..29f65f8a 100644 --- a/pkg/sinks/kinesis.go +++ b/pkg/sinks/kinesis.go @@ -3,6 +3,7 @@ package sinks import ( "context" "encoding/json" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/kinesis" @@ -10,9 +11,10 @@ import ( ) type KinesisConfig struct { - StreamName string `yaml:"streamName"` - Region string `yaml:"region"` - Layout map[string]interface{} `yaml:"layout"` + SentUpdateEvent bool `yaml:"sentUpdateEvent,omitempty"` + StreamName string `yaml:"streamName"` + Region string `yaml:"region"` + Layout map[string]interface{} `yaml:"layout"` } type KinesisSink struct { @@ -35,6 +37,10 @@ func NewKinesisSink(cfg *KinesisConfig) (Sink, error) { } func (k *KinesisSink) Send(ctx context.Context, ev *kube.EnhancedEvent) error { + // skip update event + if ev.IsUpdateEvent && !k.cfg.SentUpdateEvent { + return nil + } var toSend []byte if k.cfg.Layout != nil { diff --git a/pkg/sinks/loki.go b/pkg/sinks/loki.go index 4d0a0d93..f4e32be5 100644 --- a/pkg/sinks/loki.go +++ b/pkg/sinks/loki.go @@ -6,11 +6,12 @@ import ( "encoding/json" "errors" "fmt" - "github.com/resmoio/kubernetes-event-exporter/pkg/kube" "io/ioutil" "net/http" "strconv" "time" + + "github.com/resmoio/kubernetes-event-exporter/pkg/kube" "github.com/rs/zerolog/log" ) @@ -24,11 +25,12 @@ type LokiMsg struct { } type LokiConfig struct { - Layout map[string]interface{} `yaml:"layout"` - StreamLabels map[string]string `yaml:"streamLabels"` - TLS TLS `yaml:"tls"` - URL string `yaml:"url"` - Headers map[string]string `yaml:"headers"` + SentUpdateEvent bool `yaml:"sentUpdateEvent,omitempty"` + Layout map[string]interface{} `yaml:"layout"` + StreamLabels map[string]string `yaml:"streamLabels"` + TLS TLS `yaml:"tls"` + URL string `yaml:"url"` + Headers map[string]string `yaml:"headers"` } type Loki struct { @@ -52,6 +54,10 @@ func generateTimestamp() string { } func (l *Loki) Send(ctx context.Context, ev *kube.EnhancedEvent) error { + // skip update event + if ev.IsUpdateEvent && !l.cfg.SentUpdateEvent { + return nil + } eventBody, err := serializeEventWithLayout(l.cfg.Layout, ev) if err != nil { return err diff --git a/pkg/sinks/opensearch.go b/pkg/sinks/opensearch.go index 6f58507d..454aa9be 100644 --- a/pkg/sinks/opensearch.go +++ b/pkg/sinks/opensearch.go @@ -18,6 +18,7 @@ import ( ) type OpenSearchConfig struct { + SentUpdateEvent bool `yaml:"sentUpdateEvent,omitempty"` // Connection specific Hosts []string `yaml:"hosts"` Username string `yaml:"username"` @@ -84,6 +85,10 @@ func osFormatIndexName(pattern string, when time.Time) string { } func (e *OpenSearch) Send(ctx context.Context, ev *kube.EnhancedEvent) error { + // skip update event + if ev.IsUpdateEvent && !e.cfg.SentUpdateEvent { + return nil + } var toSend []byte if e.cfg.DeDot { diff --git a/pkg/sinks/opscenter.go b/pkg/sinks/opscenter.go index 04f7c058..2450c7cb 100644 --- a/pkg/sinks/opscenter.go +++ b/pkg/sinks/opscenter.go @@ -14,6 +14,7 @@ import ( // OpsCenterConfig is the configuration of the Sink. type OpsCenterConfig struct { + SentUpdateEvent bool `yaml:"sentUpdateEvent,omitempty"` Category string `yaml:"category"` Description string `yaml:"description"` Notifications []string `yaml:"notifications"` @@ -51,6 +52,10 @@ func NewOpsCenterSink(cfg *OpsCenterConfig) (Sink, error) { // Send ... func (s *OpsCenterSink) Send(ctx context.Context, ev *kube.EnhancedEvent) error { + // skip update event + if ev.IsUpdateEvent && !s.cfg.SentUpdateEvent { + return nil + } oi := ssm.CreateOpsItemInput{} t, err := GetString(ev, s.cfg.Title) if err != nil { diff --git a/pkg/sinks/opsgenie.go b/pkg/sinks/opsgenie.go index 6976c819..4328cea8 100644 --- a/pkg/sinks/opsgenie.go +++ b/pkg/sinks/opsgenie.go @@ -2,20 +2,22 @@ package sinks import ( "context" - "github.com/resmoio/kubernetes-event-exporter/pkg/kube" + "github.com/opsgenie/opsgenie-go-sdk-v2/alert" "github.com/opsgenie/opsgenie-go-sdk-v2/client" + "github.com/resmoio/kubernetes-event-exporter/pkg/kube" ) type OpsgenieConfig struct { - ApiKey string `yaml:"apiKey"` - URL client.ApiUrl `yaml:"URL"` - Priority string `yaml:"priority"` - Message string `yaml:"message"` - Alias string `yaml:"alias"` - Description string `yaml:"description"` - Tags []string `yaml:"tags"` - Details map[string]string `yaml:"details"` + SentUpdateEvent bool `yaml:"sentUpdateEvent,omitempty"` + ApiKey string `yaml:"apiKey"` + URL client.ApiUrl `yaml:"URL"` + Priority string `yaml:"priority"` + Message string `yaml:"message"` + Alias string `yaml:"alias"` + Description string `yaml:"description"` + Tags []string `yaml:"tags"` + Details map[string]string `yaml:"details"` } type OpsgenieSink struct { @@ -48,6 +50,10 @@ func NewOpsgenieSink(config *OpsgenieConfig) (Sink, error) { } func (o *OpsgenieSink) Send(ctx context.Context, ev *kube.EnhancedEvent) error { + // skip update event + if ev.IsUpdateEvent && !o.cfg.SentUpdateEvent { + return nil + } request := alert.CreateAlertRequest{ Priority: alert.Priority(o.cfg.Priority), } diff --git a/pkg/sinks/pipe.go b/pkg/sinks/pipe.go index 8fe4abc8..efe1eda1 100644 --- a/pkg/sinks/pipe.go +++ b/pkg/sinks/pipe.go @@ -10,9 +10,10 @@ import ( ) type PipeConfig struct { - Path string `yaml:"path"` + SentUpdateEvent bool `yaml:"sentUpdateEvent,omitempty"` + Path string `yaml:"path"` // DeDot all labels and annotations in the event. For both the event and the involvedObject - DeDot bool `yaml:"deDot"` + DeDot bool `yaml:"deDot"` Layout map[string]interface{} `yaml:"layout"` } @@ -44,6 +45,10 @@ func (f *Pipe) Close() { } func (f *Pipe) Send(ctx context.Context, ev *kube.EnhancedEvent) error { + // skip update event + if ev.IsUpdateEvent && !f.cfg.SentUpdateEvent { + return nil + } if f.cfg.DeDot { de := ev.DeDot() ev = &de diff --git a/pkg/sinks/pubsub.go b/pkg/sinks/pubsub.go index 444c4880..bbfab9dd 100644 --- a/pkg/sinks/pubsub.go +++ b/pkg/sinks/pubsub.go @@ -9,6 +9,7 @@ import ( ) type PubsubConfig struct { + SentUpdateEvent bool `yaml:"sentUpdateEvent,omitempty"` GcloudProjectId string `yaml:"gcloud_project_id"` Topic string `yaml:"topic"` CreateTopic bool `yaml:"create_topic"` @@ -46,6 +47,10 @@ func NewPubsubSink(cfg *PubsubConfig) (Sink, error) { } func (ps *PubsubSink) Send(ctx context.Context, ev *kube.EnhancedEvent) error { + // skip update event + if ev.IsUpdateEvent && !ps.cfg.SentUpdateEvent { + return nil + } msg := &pubsub.Message{ Data: ev.ToJSON(), } diff --git a/pkg/sinks/slack.go b/pkg/sinks/slack.go index c1c45393..5961021b 100644 --- a/pkg/sinks/slack.go +++ b/pkg/sinks/slack.go @@ -10,14 +10,15 @@ import ( ) type SlackConfig struct { - Token string `yaml:"token"` - Channel string `yaml:"channel"` - Message string `yaml:"message"` - Color string `yaml:"color"` - Footer string `yaml:"footer"` - Title string `yaml:"title"` - AuthorName string `yaml:"author_name"` - Fields map[string]string `yaml:"fields"` + SentUpdateEvent bool `yaml:"sentUpdateEvent,omitempty"` + Token string `yaml:"token"` + Channel string `yaml:"channel"` + Message string `yaml:"message"` + Color string `yaml:"color"` + Footer string `yaml:"footer"` + Title string `yaml:"title"` + AuthorName string `yaml:"author_name"` + Fields map[string]string `yaml:"fields"` } type SlackSink struct { @@ -33,6 +34,10 @@ func NewSlackSink(cfg *SlackConfig) (Sink, error) { } func (s *SlackSink) Send(ctx context.Context, ev *kube.EnhancedEvent) error { + // skip update event + if ev.IsUpdateEvent && !s.cfg.SentUpdateEvent { + return nil + } channel, err := GetString(ev, s.cfg.Channel) if err != nil { return err diff --git a/pkg/sinks/sns.go b/pkg/sinks/sns.go index 8fc87b3d..6dabd721 100644 --- a/pkg/sinks/sns.go +++ b/pkg/sinks/sns.go @@ -2,6 +2,7 @@ package sinks import ( "context" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/sns" @@ -9,9 +10,10 @@ import ( ) type SNSConfig struct { - TopicARN string `yaml:"topicARN"` - Region string `yaml:"region"` - Layout map[string]interface{} `yaml:"layout"` + SentUpdateEvent bool `yaml:"sentUpdateEvent,omitempty"` + TopicARN string `yaml:"topicARN"` + Region string `yaml:"region"` + Layout map[string]interface{} `yaml:"layout"` } type SNSSink struct { diff --git a/pkg/sinks/sqs.go b/pkg/sinks/sqs.go index c5032c93..158bfa63 100644 --- a/pkg/sinks/sqs.go +++ b/pkg/sinks/sqs.go @@ -2,6 +2,7 @@ package sinks import ( "context" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/sqs" @@ -9,9 +10,10 @@ import ( ) type SQSConfig struct { - QueueName string `yaml:"queueName"` - Region string `yaml:"region"` - Layout map[string]interface{} `yaml:"layout"` + SentUpdateEvent bool `yaml:"sentUpdateEvent,omitempty"` + QueueName string `yaml:"queueName"` + Region string `yaml:"region"` + Layout map[string]interface{} `yaml:"layout"` } type SQSSink struct { @@ -45,6 +47,10 @@ func NewSQSSink(cfg *SQSConfig) (Sink, error) { } func (s *SQSSink) Send(ctx context.Context, ev *kube.EnhancedEvent) error { + // skip update event + if ev.IsUpdateEvent && !s.cfg.SentUpdateEvent { + return nil + } toSend, e := serializeEventWithLayout(s.cfg.Layout, ev) if e != nil { return e diff --git a/pkg/sinks/stdout.go b/pkg/sinks/stdout.go index 47d50fab..025f806a 100644 --- a/pkg/sinks/stdout.go +++ b/pkg/sinks/stdout.go @@ -11,6 +11,7 @@ import ( ) type StdoutConfig struct { + SentUpdateEvent bool `yaml:"sentUpdateEvent,omitempty"` // DeDot all labels and annotations in the event. For both the event and the involvedObject DeDot bool `yaml:"deDot"` Layout map[string]interface{} `yaml:"layout"` @@ -41,6 +42,10 @@ func (f *Stdout) Close() { } func (f *Stdout) Send(ctx context.Context, ev *kube.EnhancedEvent) error { + // skip update event + if ev.IsUpdateEvent && !f.cfg.SentUpdateEvent { + return nil + } if f.cfg.DeDot { de := ev.DeDot() ev = &de diff --git a/pkg/sinks/syslog.go b/pkg/sinks/syslog.go index 57221f8d..7d973994 100644 --- a/pkg/sinks/syslog.go +++ b/pkg/sinks/syslog.go @@ -3,18 +3,21 @@ package sinks import ( "context" "encoding/json" - "github.com/resmoio/kubernetes-event-exporter/pkg/kube" "log/syslog" + + "github.com/resmoio/kubernetes-event-exporter/pkg/kube" ) type SyslogConfig struct { - Network string `yaml:"network"` - Address string `yaml:"address"` - Tag string `yaml:"tag"` + SentUpdateEvent bool `yaml:"sentUpdateEvent,omitempty"` + Network string `yaml:"network"` + Address string `yaml:"address"` + Tag string `yaml:"tag"` } type SyslogSink struct { - sw *syslog.Writer + sw *syslog.Writer + cfg *SyslogConfig } func NewSyslogSink(config *SyslogConfig) (Sink, error) { @@ -22,7 +25,7 @@ func NewSyslogSink(config *SyslogConfig) (Sink, error) { if err != nil { return nil, err } - return &SyslogSink{sw: w}, nil + return &SyslogSink{sw: w, cfg: config}, nil } func (w *SyslogSink) Close() { @@ -30,7 +33,10 @@ func (w *SyslogSink) Close() { } func (w *SyslogSink) Send(ctx context.Context, ev *kube.EnhancedEvent) error { - + // skip update event + if ev.IsUpdateEvent && !w.cfg.SentUpdateEvent { + return nil + } if b, err := json.Marshal(ev); err == nil { _, writeErr := w.sw.Write(b) diff --git a/pkg/sinks/teams.go b/pkg/sinks/teams.go index 40e6be73..f253c477 100644 --- a/pkg/sinks/teams.go +++ b/pkg/sinks/teams.go @@ -13,9 +13,10 @@ import ( ) type TeamsConfig struct { - Endpoint string `yaml:"endpoint"` - Layout map[string]interface{} `yaml:"layout"` - Headers map[string]string `yaml:"headers"` + SentUpdateEvent bool `yaml:"sentUpdateEvent,omitempty"` + Endpoint string `yaml:"endpoint"` + Layout map[string]interface{} `yaml:"layout"` + Headers map[string]string `yaml:"headers"` } func NewTeamsSink(cfg *TeamsConfig) (Sink, error) { @@ -31,6 +32,11 @@ func (w *Teams) Close() { } func (w *Teams) Send(ctx context.Context, ev *kube.EnhancedEvent) error { + // skip update event + if ev.IsUpdateEvent && !w.cfg.SentUpdateEvent { + return nil + } + event, err := serializeEventWithLayout(w.cfg.Layout, ev) if err != nil { return err diff --git a/pkg/sinks/tmpl.go b/pkg/sinks/tmpl.go index 36f1f353..2a88db79 100644 --- a/pkg/sinks/tmpl.go +++ b/pkg/sinks/tmpl.go @@ -5,8 +5,8 @@ import ( "encoding/json" "text/template" - "github.com/resmoio/kubernetes-event-exporter/pkg/kube" "github.com/Masterminds/sprig/v3" + "github.com/resmoio/kubernetes-event-exporter/pkg/kube" ) func GetString(event *kube.EnhancedEvent, text string) (string, error) { diff --git a/pkg/sinks/tmpl_test.go b/pkg/sinks/tmpl_test.go index cd1bc5e3..cfb64216 100644 --- a/pkg/sinks/tmpl_test.go +++ b/pkg/sinks/tmpl_test.go @@ -1,11 +1,12 @@ package sinks import ( + "testing" + "time" + "github.com/resmoio/kubernetes-event-exporter/pkg/kube" "github.com/stretchr/testify/require" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "testing" - "time" ) func TestLayoutConvert(t *testing.T) { diff --git a/pkg/sinks/webhook.go b/pkg/sinks/webhook.go index c4f40a1f..346c25e9 100644 --- a/pkg/sinks/webhook.go +++ b/pkg/sinks/webhook.go @@ -13,10 +13,11 @@ import ( ) type WebhookConfig struct { - Endpoint string `yaml:"endpoint"` - TLS TLS `yaml:"tls"` - Layout map[string]interface{} `yaml:"layout"` - Headers map[string]string `yaml:"headers"` + SentUpdateEvent bool `yaml:"sentUpdateEvent,omitempty"` + Endpoint string `yaml:"endpoint"` + TLS TLS `yaml:"tls"` + Layout map[string]interface{} `yaml:"layout"` + Headers map[string]string `yaml:"headers"` } func NewWebhook(cfg *WebhookConfig) (Sink, error) { @@ -40,6 +41,11 @@ func (w *Webhook) Close() { } func (w *Webhook) Send(ctx context.Context, ev *kube.EnhancedEvent) error { + // skip update event + if ev.IsUpdateEvent && !w.cfg.SentUpdateEvent { + return nil + } + reqBody, err := serializeEventWithLayout(w.cfg.Layout, ev) if err != nil { return err diff --git a/pkg/version/version.go b/pkg/version/version.go index 300ed82d..0cb1c123 100644 --- a/pkg/version/version.go +++ b/pkg/version/version.go @@ -15,12 +15,12 @@ var ( func Revision() string { bi, ok := debug.ReadBuildInfo() - - if ok { + + if ok { for _, kv := range bi.Settings { switch kv.Key { - case "vcs.revision": - return kv.Value + case "vcs.revision": + return kv.Value } } }