Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 108 additions & 0 deletions internal/agent/events/k8s_event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package events

import (
"context"
"encoding/json"
"fmt"
"os"
"time"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)

const (
RoleChangedReason = "RoleChanged"
InvolvedObjectPod = "Pod"
EventSourceComponent = "redis-role-detector"
)

// RoleChangeEventData represents the structured data for role change events
type RoleChangeEventData struct {
PodName string `json:"podName"`
Namespace string `json:"namespace"`
PreviousRole string `json:"previousRole"`
CurrentRole string `json:"currentRole"`
Timestamp time.Time `json:"timestamp"`
}

// K8sEventSender sends Kubernetes events for role changes
type K8sEventSender struct {
clientset *kubernetes.Clientset
podName string
namespace string
}

// NewK8sEventSender creates a new Kubernetes event sender
func NewK8sEventSender() (*K8sEventSender, error) {
config, err := rest.InClusterConfig()
if err != nil {
return nil, fmt.Errorf("failed to get in-cluster config: %w", err)
}

Check warning on line 43 in internal/agent/events/k8s_event.go

View check run for this annotation

Codecov / codecov/patch

internal/agent/events/k8s_event.go#L39-L43

Added lines #L39 - L43 were not covered by tests

clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("failed to create clientset: %w", err)
}

Check warning on line 48 in internal/agent/events/k8s_event.go

View check run for this annotation

Codecov / codecov/patch

internal/agent/events/k8s_event.go#L45-L48

Added lines #L45 - L48 were not covered by tests

podName := os.Getenv("POD_NAME")
namespace := os.Getenv("POD_NAMESPACE")

if podName == "" || namespace == "" {
return nil, fmt.Errorf("POD_NAME and POD_NAMESPACE environment variables must be set")
}

Check warning on line 55 in internal/agent/events/k8s_event.go

View check run for this annotation

Codecov / codecov/patch

internal/agent/events/k8s_event.go#L50-L55

Added lines #L50 - L55 were not covered by tests

return &K8sEventSender{
clientset: clientset,
podName: podName,
namespace: namespace,
}, nil

Check warning on line 61 in internal/agent/events/k8s_event.go

View check run for this annotation

Codecov / codecov/patch

internal/agent/events/k8s_event.go#L57-L61

Added lines #L57 - L61 were not covered by tests
}

// SendRoleChangeEvent sends a Kubernetes event for role change
func (s *K8sEventSender) SendRoleChangeEvent(ctx context.Context, previousRole, currentRole string) error {
// Create structured event data
eventData := RoleChangeEventData{
PodName: s.podName,
Namespace: s.namespace,
PreviousRole: previousRole,
CurrentRole: currentRole,
Timestamp: time.Now().UTC(),
}

// Marshal to JSON for structured message
messageBytes, err := json.Marshal(eventData)
if err != nil {
return fmt.Errorf("failed to marshal event data: %w", err)
}

Check warning on line 79 in internal/agent/events/k8s_event.go

View check run for this annotation

Codecov / codecov/patch

internal/agent/events/k8s_event.go#L65-L79

Added lines #L65 - L79 were not covered by tests

event := &corev1.Event{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "redis-role-change-",
Namespace: s.namespace,
},
InvolvedObject: corev1.ObjectReference{
Kind: InvolvedObjectPod,
Name: s.podName,
Namespace: s.namespace,
},
Reason: RoleChangedReason,
Message: string(messageBytes),
Type: corev1.EventTypeNormal,
Source: corev1.EventSource{
Component: EventSourceComponent,
},
FirstTimestamp: metav1.Now(),
LastTimestamp: metav1.Now(),
Count: 1,
}

_, err = s.clientset.CoreV1().Events(s.namespace).Create(ctx, event, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("failed to create event: %w", err)
}

Check warning on line 105 in internal/agent/events/k8s_event.go

View check run for this annotation

Codecov / codecov/patch

internal/agent/events/k8s_event.go#L81-L105

Added lines #L81 - L105 were not covered by tests

return nil

Check warning on line 107 in internal/agent/events/k8s_event.go

View check run for this annotation

Codecov / codecov/patch

internal/agent/events/k8s_event.go#L107

Added line #L107 was not covered by tests
}
100 changes: 100 additions & 0 deletions internal/agent/server/probe/role_detector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package probe

import (
"context"
"fmt"
"strings"
"time"

"github.com/OT-CONTAINER-KIT/redis-operator/internal/agent/events"
"github.com/redis/go-redis/v9"
)

// RoleDetector periodically checks the Redis role using INFO replication.
// It prints a log each time the role changes.
// All configuration is supplied through constructor parameters so that the caller
// can decide how to source configuration (for example, via environment variables).
type RoleDetector struct {
client *redis.Client
interval time.Duration
currentRole string
eventSender *events.K8sEventSender
}

// NewRoleDetector creates a detector that connects to the given Redis instance.
func NewRoleDetector(addr, password string, interval time.Duration) *RoleDetector {
opts := &redis.Options{
Addr: addr,
Password: password,
}

// Try to create event sender, but don't fail if it's not available
eventSender, err := events.NewK8sEventSender()
if err != nil {
fmt.Printf("warning: failed to create event sender: %v\n", err)
}

Check warning on line 35 in internal/agent/server/probe/role_detector.go

View check run for this annotation

Codecov / codecov/patch

internal/agent/server/probe/role_detector.go#L25-L35

Added lines #L25 - L35 were not covered by tests

return &RoleDetector{
client: redis.NewClient(opts),
interval: interval,
eventSender: eventSender,
}

Check warning on line 41 in internal/agent/server/probe/role_detector.go

View check run for this annotation

Codecov / codecov/patch

internal/agent/server/probe/role_detector.go#L37-L41

Added lines #L37 - L41 were not covered by tests
}

// Run starts the detection loop and blocks until the context is cancelled.
func (d *RoleDetector) Run(ctx context.Context) error {
// Perform an initial detection immediately.
if err := d.detect(ctx); err != nil {
fmt.Printf("role detection failed: %v\n", err)
}

Check warning on line 49 in internal/agent/server/probe/role_detector.go

View check run for this annotation

Codecov / codecov/patch

internal/agent/server/probe/role_detector.go#L45-L49

Added lines #L45 - L49 were not covered by tests

ticker := time.NewTicker(d.interval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
if err := d.detect(ctx); err != nil {
fmt.Printf("role detection failed: %v\n", err)
}

Check warning on line 61 in internal/agent/server/probe/role_detector.go

View check run for this annotation

Codecov / codecov/patch

internal/agent/server/probe/role_detector.go#L51-L61

Added lines #L51 - L61 were not covered by tests
}
}
}

// detect queries Redis and logs a message if the role has changed.
func (d *RoleDetector) detect(ctx context.Context) error {
res, err := d.client.Info(ctx, "replication").Result()
if err != nil {
return err
}
role := parseRole(res)
if role != d.currentRole {
previousRole := d.currentRole
if previousRole == "" {
previousRole = "unknown"
}
fmt.Printf("detected role change: %s -> %s\n", previousRole, role)

// Send Kubernetes event if event sender is available
if d.eventSender != nil {
if err := d.eventSender.SendRoleChangeEvent(ctx, previousRole, role); err != nil {
fmt.Printf("warning: failed to send role change event: %v\n", err)
}

Check warning on line 84 in internal/agent/server/probe/role_detector.go

View check run for this annotation

Codecov / codecov/patch

internal/agent/server/probe/role_detector.go#L67-L84

Added lines #L67 - L84 were not covered by tests
}

d.currentRole = role

Check warning on line 87 in internal/agent/server/probe/role_detector.go

View check run for this annotation

Codecov / codecov/patch

internal/agent/server/probe/role_detector.go#L87

Added line #L87 was not covered by tests
}
return nil

Check warning on line 89 in internal/agent/server/probe/role_detector.go

View check run for this annotation

Codecov / codecov/patch

internal/agent/server/probe/role_detector.go#L89

Added line #L89 was not covered by tests
}

// parseRole extracts the role from the output of `INFO replication`.
func parseRole(info string) string {
for _, line := range strings.Split(info, "\n") {
if strings.HasPrefix(line, "role:") {
return strings.TrimSpace(strings.TrimPrefix(line, "role:"))
}

Check warning on line 97 in internal/agent/server/probe/role_detector.go

View check run for this annotation

Codecov / codecov/patch

internal/agent/server/probe/role_detector.go#L93-L97

Added lines #L93 - L97 were not covered by tests
}
return "unknown"

Check warning on line 99 in internal/agent/server/probe/role_detector.go

View check run for this annotation

Codecov / codecov/patch

internal/agent/server/probe/role_detector.go#L99

Added line #L99 was not covered by tests
}
33 changes: 33 additions & 0 deletions internal/agent/server/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package server

import (
"context"
"os"
"os/signal"
"syscall"
"time"

"github.com/OT-CONTAINER-KIT/redis-operator/internal/agent/server/probe"
)

// Start launches a role detector that periodically checks the Redis role.
// Configuration is provided via function parameters.
// The call blocks until the context is canceled (SIGINT/SIGTERM) or the
// detector exits. Any error returned by detector.Run is propagated to caller.
func Start(addr, password string, interval time.Duration) error {
detector := probe.NewRoleDetector(addr, password, interval)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go handleSignals(cancel)

return detector.Run(ctx)

Check warning on line 25 in internal/agent/server/server.go

View check run for this annotation

Codecov / codecov/patch

internal/agent/server/server.go#L17-L25

Added lines #L17 - L25 were not covered by tests
}

func handleSignals(cancel context.CancelFunc) {
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
cancel()

Check warning on line 32 in internal/agent/server/server.go

View check run for this annotation

Codecov / codecov/patch

internal/agent/server/server.go#L28-L32

Added lines #L28 - L32 were not covered by tests
}
2 changes: 2 additions & 0 deletions internal/cmd/agent/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import (
"github.com/OT-CONTAINER-KIT/redis-operator/internal/cmd/agent/bootstrap"
"github.com/OT-CONTAINER-KIT/redis-operator/internal/cmd/agent/server"
"github.com/spf13/cobra"
)

Expand All @@ -12,5 +13,6 @@
Short: "Agent is a tool which run as a init/sidecar container along with redis/sentinel",
}
agentCmd.AddCommand(bootstrap.CMD())
agentCmd.AddCommand(server.CMD())

Check warning on line 16 in internal/cmd/agent/cmd.go

View check run for this annotation

Codecov / codecov/patch

internal/cmd/agent/cmd.go#L16

Added line #L16 was not covered by tests
return agentCmd
}
30 changes: 30 additions & 0 deletions internal/cmd/agent/server/cmd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package server

import (
"time"

"github.com/OT-CONTAINER-KIT/redis-operator/internal/agent/server"
"github.com/spf13/cobra"
)

func CMD() *cobra.Command {
var (
redisAddr string
redisPassword string
detectInterval time.Duration
)

cmd := &cobra.Command{
Use: "server",
Short: "Start the Redis role detector agent",
RunE: func(cmd *cobra.Command, args []string) error {
return server.Start(redisAddr, redisPassword, detectInterval)
},

Check warning on line 22 in internal/cmd/agent/server/cmd.go

View check run for this annotation

Codecov / codecov/patch

internal/cmd/agent/server/cmd.go#L10-L22

Added lines #L10 - L22 were not covered by tests
}

cmd.Flags().StringVar(&redisAddr, "redis-addr", "127.0.0.1:6379", "Redis address in host:port format")
cmd.Flags().StringVar(&redisPassword, "redis-password", "", "Redis password for authentication (optional)")
cmd.Flags().DurationVar(&detectInterval, "detect-interval", 10*time.Second, "Role detection interval")

return cmd

Check warning on line 29 in internal/cmd/agent/server/cmd.go

View check run for this annotation

Codecov / codecov/patch

internal/cmd/agent/server/cmd.go#L25-L29

Added lines #L25 - L29 were not covered by tests
}
7 changes: 7 additions & 0 deletions internal/cmd/manager/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
rsvb2 "github.com/OT-CONTAINER-KIT/redis-operator/api/redissentinel/v1beta2"
"github.com/OT-CONTAINER-KIT/redis-operator/internal/controller/common/redis"
"github.com/OT-CONTAINER-KIT/redis-operator/internal/controller/common/scheme"
eventcontroller "github.com/OT-CONTAINER-KIT/redis-operator/internal/controller/event"
rediscontroller "github.com/OT-CONTAINER-KIT/redis-operator/internal/controller/redis"
redisclustercontroller "github.com/OT-CONTAINER-KIT/redis-operator/internal/controller/rediscluster"
redisreplicationcontroller "github.com/OT-CONTAINER-KIT/redis-operator/internal/controller/redisreplication"
Expand Down Expand Up @@ -242,6 +243,12 @@
setupLog.Error(err, "unable to create controller", "controller", "RedisSentinel")
return err
}
if err := (&eventcontroller.EventReconciler{
Client: mgr.GetClient(),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Event")
return err
}

Check warning on line 251 in internal/cmd/manager/cmd.go

View check run for this annotation

Codecov / codecov/patch

internal/cmd/manager/cmd.go#L246-L251

Added lines #L246 - L251 were not covered by tests

return nil
}
Expand Down
Loading
Loading