diff --git a/internal/agent/events/k8s_event.go b/internal/agent/events/k8s_event.go new file mode 100644 index 0000000000..dc45bc8df7 --- /dev/null +++ b/internal/agent/events/k8s_event.go @@ -0,0 +1,108 @@ +package events + +import ( + "context" + "encoding/json" + "fmt" + "os" + "time" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" +) + +const ( + RoleChangedReason = "RoleChanged" + InvolvedObjectPod = "Pod" + EventSourceComponent = "redis-role-detector" +) + +// RoleChangeEventData represents the structured data for role change events +type RoleChangeEventData struct { + PodName string `json:"podName"` + Namespace string `json:"namespace"` + PreviousRole string `json:"previousRole"` + CurrentRole string `json:"currentRole"` + Timestamp time.Time `json:"timestamp"` +} + +// K8sEventSender sends Kubernetes events for role changes +type K8sEventSender struct { + clientset *kubernetes.Clientset + podName string + namespace string +} + +// NewK8sEventSender creates a new Kubernetes event sender +func NewK8sEventSender() (*K8sEventSender, error) { + config, err := rest.InClusterConfig() + if err != nil { + return nil, fmt.Errorf("failed to get in-cluster config: %w", err) + } + + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + return nil, fmt.Errorf("failed to create clientset: %w", err) + } + + podName := os.Getenv("POD_NAME") + namespace := os.Getenv("POD_NAMESPACE") + + if podName == "" || namespace == "" { + return nil, fmt.Errorf("POD_NAME and POD_NAMESPACE environment variables must be set") + } + + return &K8sEventSender{ + clientset: clientset, + podName: podName, + namespace: namespace, + }, nil +} + +// SendRoleChangeEvent sends a Kubernetes event for role change +func (s *K8sEventSender) SendRoleChangeEvent(ctx context.Context, previousRole, currentRole string) error { + // Create structured event data + eventData := RoleChangeEventData{ + PodName: s.podName, + Namespace: s.namespace, + PreviousRole: previousRole, + CurrentRole: currentRole, + Timestamp: time.Now().UTC(), + } + + // Marshal to JSON for structured message + messageBytes, err := json.Marshal(eventData) + if err != nil { + return fmt.Errorf("failed to marshal event data: %w", err) + } + + event := &corev1.Event{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "redis-role-change-", + Namespace: s.namespace, + }, + InvolvedObject: corev1.ObjectReference{ + Kind: InvolvedObjectPod, + Name: s.podName, + Namespace: s.namespace, + }, + Reason: RoleChangedReason, + Message: string(messageBytes), + Type: corev1.EventTypeNormal, + Source: corev1.EventSource{ + Component: EventSourceComponent, + }, + FirstTimestamp: metav1.Now(), + LastTimestamp: metav1.Now(), + Count: 1, + } + + _, err = s.clientset.CoreV1().Events(s.namespace).Create(ctx, event, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("failed to create event: %w", err) + } + + return nil +} diff --git a/internal/agent/server/probe/role_detector.go b/internal/agent/server/probe/role_detector.go new file mode 100644 index 0000000000..d44e7ab36e --- /dev/null +++ b/internal/agent/server/probe/role_detector.go @@ -0,0 +1,100 @@ +package probe + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/OT-CONTAINER-KIT/redis-operator/internal/agent/events" + "github.com/redis/go-redis/v9" +) + +// RoleDetector periodically checks the Redis role using INFO replication. +// It prints a log each time the role changes. +// All configuration is supplied through constructor parameters so that the caller +// can decide how to source configuration (for example, via environment variables). +type RoleDetector struct { + client *redis.Client + interval time.Duration + currentRole string + eventSender *events.K8sEventSender +} + +// NewRoleDetector creates a detector that connects to the given Redis instance. +func NewRoleDetector(addr, password string, interval time.Duration) *RoleDetector { + opts := &redis.Options{ + Addr: addr, + Password: password, + } + + // Try to create event sender, but don't fail if it's not available + eventSender, err := events.NewK8sEventSender() + if err != nil { + fmt.Printf("warning: failed to create event sender: %v\n", err) + } + + return &RoleDetector{ + client: redis.NewClient(opts), + interval: interval, + eventSender: eventSender, + } +} + +// Run starts the detection loop and blocks until the context is cancelled. +func (d *RoleDetector) Run(ctx context.Context) error { + // Perform an initial detection immediately. + if err := d.detect(ctx); err != nil { + fmt.Printf("role detection failed: %v\n", err) + } + + ticker := time.NewTicker(d.interval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return nil + case <-ticker.C: + if err := d.detect(ctx); err != nil { + fmt.Printf("role detection failed: %v\n", err) + } + } + } +} + +// detect queries Redis and logs a message if the role has changed. +func (d *RoleDetector) detect(ctx context.Context) error { + res, err := d.client.Info(ctx, "replication").Result() + if err != nil { + return err + } + role := parseRole(res) + if role != d.currentRole { + previousRole := d.currentRole + if previousRole == "" { + previousRole = "unknown" + } + fmt.Printf("detected role change: %s -> %s\n", previousRole, role) + + // Send Kubernetes event if event sender is available + if d.eventSender != nil { + if err := d.eventSender.SendRoleChangeEvent(ctx, previousRole, role); err != nil { + fmt.Printf("warning: failed to send role change event: %v\n", err) + } + } + + d.currentRole = role + } + return nil +} + +// parseRole extracts the role from the output of `INFO replication`. +func parseRole(info string) string { + for _, line := range strings.Split(info, "\n") { + if strings.HasPrefix(line, "role:") { + return strings.TrimSpace(strings.TrimPrefix(line, "role:")) + } + } + return "unknown" +} diff --git a/internal/agent/server/server.go b/internal/agent/server/server.go new file mode 100644 index 0000000000..b12bb014c9 --- /dev/null +++ b/internal/agent/server/server.go @@ -0,0 +1,33 @@ +package server + +import ( + "context" + "os" + "os/signal" + "syscall" + "time" + + "github.com/OT-CONTAINER-KIT/redis-operator/internal/agent/server/probe" +) + +// Start launches a role detector that periodically checks the Redis role. +// Configuration is provided via function parameters. +// The call blocks until the context is canceled (SIGINT/SIGTERM) or the +// detector exits. Any error returned by detector.Run is propagated to caller. +func Start(addr, password string, interval time.Duration) error { + detector := probe.NewRoleDetector(addr, password, interval) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go handleSignals(cancel) + + return detector.Run(ctx) +} + +func handleSignals(cancel context.CancelFunc) { + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + <-sigCh + cancel() +} diff --git a/internal/cmd/agent/cmd.go b/internal/cmd/agent/cmd.go index 7b43fc2fb2..512baffe21 100644 --- a/internal/cmd/agent/cmd.go +++ b/internal/cmd/agent/cmd.go @@ -2,6 +2,7 @@ package agent import ( "github.com/OT-CONTAINER-KIT/redis-operator/internal/cmd/agent/bootstrap" + "github.com/OT-CONTAINER-KIT/redis-operator/internal/cmd/agent/server" "github.com/spf13/cobra" ) @@ -12,5 +13,6 @@ func CMD() *cobra.Command { Short: "Agent is a tool which run as a init/sidecar container along with redis/sentinel", } agentCmd.AddCommand(bootstrap.CMD()) + agentCmd.AddCommand(server.CMD()) return agentCmd } diff --git a/internal/cmd/agent/server/cmd.go b/internal/cmd/agent/server/cmd.go new file mode 100644 index 0000000000..c74988bc38 --- /dev/null +++ b/internal/cmd/agent/server/cmd.go @@ -0,0 +1,30 @@ +package server + +import ( + "time" + + "github.com/OT-CONTAINER-KIT/redis-operator/internal/agent/server" + "github.com/spf13/cobra" +) + +func CMD() *cobra.Command { + var ( + redisAddr string + redisPassword string + detectInterval time.Duration + ) + + cmd := &cobra.Command{ + Use: "server", + Short: "Start the Redis role detector agent", + RunE: func(cmd *cobra.Command, args []string) error { + return server.Start(redisAddr, redisPassword, detectInterval) + }, + } + + cmd.Flags().StringVar(&redisAddr, "redis-addr", "127.0.0.1:6379", "Redis address in host:port format") + cmd.Flags().StringVar(&redisPassword, "redis-password", "", "Redis password for authentication (optional)") + cmd.Flags().DurationVar(&detectInterval, "detect-interval", 10*time.Second, "Role detection interval") + + return cmd +} diff --git a/internal/cmd/manager/cmd.go b/internal/cmd/manager/cmd.go index 394708bac2..8475af11b0 100644 --- a/internal/cmd/manager/cmd.go +++ b/internal/cmd/manager/cmd.go @@ -25,6 +25,7 @@ import ( rsvb2 "github.com/OT-CONTAINER-KIT/redis-operator/api/redissentinel/v1beta2" "github.com/OT-CONTAINER-KIT/redis-operator/internal/controller/common/redis" "github.com/OT-CONTAINER-KIT/redis-operator/internal/controller/common/scheme" + eventcontroller "github.com/OT-CONTAINER-KIT/redis-operator/internal/controller/event" rediscontroller "github.com/OT-CONTAINER-KIT/redis-operator/internal/controller/redis" redisclustercontroller "github.com/OT-CONTAINER-KIT/redis-operator/internal/controller/rediscluster" redisreplicationcontroller "github.com/OT-CONTAINER-KIT/redis-operator/internal/controller/redisreplication" @@ -242,6 +243,12 @@ func setupControllers(mgr ctrl.Manager, k8sClient kubernetes.Interface, dk8sClie setupLog.Error(err, "unable to create controller", "controller", "RedisSentinel") return err } + if err := (&eventcontroller.EventReconciler{ + Client: mgr.GetClient(), + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "Event") + return err + } return nil } diff --git a/internal/controller/event/controller.go b/internal/controller/event/controller.go new file mode 100644 index 0000000000..2058914364 --- /dev/null +++ b/internal/controller/event/controller.go @@ -0,0 +1,184 @@ +package event + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/OT-CONTAINER-KIT/redis-operator/internal/agent/events" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/predicate" +) + +const ( + // eventHandledAnnotationKey is used to mark the event has been handled + eventHandledAnnotationKey = "redis.opstreelabs.in/event-handled" +) + +// EventReconciler reconciles Kubernetes Events for Redis role changes +type EventReconciler struct { + client.Client +} + +// Reconcile handles role change events and updates Pod labels +func (r *EventReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + logger := log.FromContext(ctx) + + // Fetch the Event + var event corev1.Event + if err := r.Get(ctx, req.NamespacedName, &event); err != nil { + return ctrl.Result{}, client.IgnoreNotFound(err) + } + + // Check if this event has already been handled + if r.isEventHandled(&event) { + logger.V(1).Info("Event already handled, skipping", + "event", event.Name, + "count", event.Count) + return ctrl.Result{}, nil + } + + logger.Info("Processing role change event", + "pod", event.InvolvedObject.Name, + "namespace", event.InvolvedObject.Namespace, + "count", event.Count) + + // Parse the structured message + var roleData events.RoleChangeEventData + if err := json.Unmarshal([]byte(event.Message), &roleData); err != nil { + logger.Error(err, "Failed to parse role change event message", "message", event.Message) + return ctrl.Result{}, nil + } + + // Update Pod label based on role change + if err := r.updatePodRoleLabel(ctx, roleData); err != nil { + logger.Error(err, "Failed to update Pod role label", + "pod", roleData.PodName, + "namespace", roleData.Namespace, + "role", roleData.CurrentRole) + return ctrl.Result{RequeueAfter: time.Second * 30}, err + } + + // Mark event as handled + if err := r.markEventAsHandled(ctx, &event); err != nil { + logger.Error(err, "Failed to mark event as handled") + return ctrl.Result{RequeueAfter: time.Second * 10}, err + } + + logger.Info("Successfully processed role change event", + "pod", roleData.PodName, + "namespace", roleData.Namespace, + "previousRole", roleData.PreviousRole, + "currentRole", roleData.CurrentRole) + + return ctrl.Result{}, nil +} + +// isEventHandled checks if the event has already been processed +func (r *EventReconciler) isEventHandled(event *corev1.Event) bool { + count := fmt.Sprintf("%d", event.Count) + annotations := event.GetAnnotations() + if annotations != nil && annotations[eventHandledAnnotationKey] == count { + return true + } + return false +} + +// markEventAsHandled marks the event as processed to prevent duplicate processing +func (r *EventReconciler) markEventAsHandled(ctx context.Context, event *corev1.Event) error { + patch := client.MergeFrom(event.DeepCopy()) + if event.Annotations == nil { + event.Annotations = make(map[string]string) + } + event.Annotations[eventHandledAnnotationKey] = fmt.Sprintf("%d", event.Count) + return r.Client.Patch(ctx, event, patch) +} + +// updatePodRoleLabel updates the Pod's role label +func (r *EventReconciler) updatePodRoleLabel(ctx context.Context, roleData events.RoleChangeEventData) error { + // Fetch the Pod + pod := &corev1.Pod{} + podKey := types.NamespacedName{ + Name: roleData.PodName, + Namespace: roleData.Namespace, + } + + if err := r.Get(ctx, podKey, pod); err != nil { + return fmt.Errorf("failed to get pod %s/%s: %w", roleData.Namespace, roleData.PodName, err) + } + + // Update the role label + if pod.Labels == nil { + pod.Labels = make(map[string]string) + } + + // Convert Redis role to operator role format + operatorRole := convertRedisRoleToOperatorRole(roleData.CurrentRole) + pod.Labels["redis-role"] = operatorRole + + // Update the Pod + if err := r.Update(ctx, pod); err != nil { + return fmt.Errorf("failed to update pod %s/%s: %w", roleData.Namespace, roleData.PodName, err) + } + + return nil +} + +// convertRedisRoleToOperatorRole converts Redis role to operator role format +func convertRedisRoleToOperatorRole(redisRole string) string { + switch redisRole { + case "master": + return "master" + case "slave": + return "slave" + default: + return "unknown" + } +} + +// roleChangeEventPredicate filters events to only process Redis role change events +func roleChangeEventPredicate() predicate.Predicate { + return predicate.Funcs{ + CreateFunc: func(e event.CreateEvent) bool { + return isRoleChangeEvent(e.Object) + }, + UpdateFunc: func(e event.UpdateEvent) bool { + return isRoleChangeEvent(e.ObjectNew) + }, + DeleteFunc: func(e event.DeleteEvent) bool { + return false // We don't care about deleted events + }, + GenericFunc: func(e event.GenericEvent) bool { + return isRoleChangeEvent(e.Object) + }, + } +} + +// isRoleChangeEvent checks if the event is a Redis role change event +func isRoleChangeEvent(obj client.Object) bool { + event, ok := obj.(*corev1.Event) + if !ok { + return false + } + + // Only process role change events from redis-role-detector for Pod objects + return event.Reason == events.RoleChangedReason && + event.Source.Component == events.EventSourceComponent && + event.InvolvedObject.Kind == events.InvolvedObjectPod +} + +// SetupWithManager sets up the controller with the Manager +func (r *EventReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&corev1.Event{}). + WithOptions(controller.Options{}). + WithEventFilter(roleChangeEventPredicate()). + Complete(r) +} diff --git a/internal/controller/redisreplication/redisreplication_controller.go b/internal/controller/redisreplication/redisreplication_controller.go index 3472270d8b..f2632e85fd 100644 --- a/internal/controller/redisreplication/redisreplication_controller.go +++ b/internal/controller/redisreplication/redisreplication_controller.go @@ -200,10 +200,9 @@ func (r *Reconciler) reconcileStatus(ctx context.Context, instance *rrvb2.RedisR if err = r.UpdateRedisReplicationMaster(ctx, instance, realMaster); err != nil { return intctrlutil.RequeueE(ctx, err, "") } - if err = r.UpdateRedisPodRoleLabel(ctx, instance, realMaster); err != nil { - return intctrlutil.RequeueE(ctx, err, "") - } - + // if err = r.UpdateRedisPodRoleLabel(ctx, instance, realMaster); err != nil { + // return intctrlutil.RequeueE(ctx, err, "") + // } slaveNodes := k8sutils.GetRedisNodesByRole(ctx, r.K8sClient, instance, "slave") if realMaster != "" { monitoring.RedisReplicationConnectedSlavesTotal.WithLabelValues(instance.Namespace, instance.Name).Set(float64(len(slaveNodes))) diff --git a/internal/k8sutils/redis-agent.go b/internal/k8sutils/redis-agent.go new file mode 100644 index 0000000000..5e2ccdfad6 --- /dev/null +++ b/internal/k8sutils/redis-agent.go @@ -0,0 +1,96 @@ +package k8sutils + +import ( + "fmt" + + common "github.com/OT-CONTAINER-KIT/redis-operator/api/common/v1beta2" + "github.com/OT-CONTAINER-KIT/redis-operator/internal/image" + "github.com/OT-CONTAINER-KIT/redis-operator/internal/util" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" +) + +// RedisAgentConfig holds configuration for generating Redis agent sidecar +type RedisAgentConfig struct { + Port *int + ExistingPasswordSecret *common.ExistingPasswordSecret +} + +// generateAgentSidecar creates a Redis agent sidecar configuration +// The agent currently performs role detection and may be extended with additional functionality in the future +func generateAgentSidecar(config RedisAgentConfig) common.Sidecar { + operatorImage, _ := util.CoalesceEnv("OPERATOR_IMAGE", image.GetOperatorImage()) + redisPort := "6379" + if config.Port != nil { + redisPort = fmt.Sprintf("%d", *config.Port) + } + envVars := []corev1.EnvVar{ + { + Name: "POD_NAME", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "metadata.name", + }, + }, + }, + { + Name: "POD_NAMESPACE", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "metadata.namespace", + }, + }, + }, + } + + // Add Redis password environment variable if configured + if config.ExistingPasswordSecret != nil { + envVars = append(envVars, corev1.EnvVar{ + Name: "REDIS_PASSWORD", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: *config.ExistingPasswordSecret.Name, + }, + Key: *config.ExistingPasswordSecret.Key, + }, + }, + }) + } + + // Default resource requirements for role detector + resources := &corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("10m"), + corev1.ResourceMemory: resource.MustParse("32Mi"), + }, + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("50m"), + corev1.ResourceMemory: resource.MustParse("64Mi"), + }, + } + + // Build complete command with arguments + command := []string{ + "/operator", + "agent", + "server", + "--redis-addr=127.0.0.1:" + redisPort, + "--detect-interval=3s", + } + + if config.ExistingPasswordSecret != nil { + command = append(command, "--redis-password=$(REDIS_PASSWORD)") + } + + return common.Sidecar{ + Name: "agent", + Image: operatorImage, + ImagePullPolicy: corev1.PullIfNotPresent, + Command: command, + EnvVars: &envVars, + Resources: resources, + // SecurityContext can be omitted - the container image already runs as non-root + // and inherits pod-level security context if needed + } +} diff --git a/internal/k8sutils/redis-cluster.go b/internal/k8sutils/redis-cluster.go index 56a581a096..d20638b365 100644 --- a/internal/k8sutils/redis-cluster.go +++ b/internal/k8sutils/redis-cluster.go @@ -5,6 +5,7 @@ import ( "strconv" "strings" + common "github.com/OT-CONTAINER-KIT/redis-operator/api/common/v1beta2" rcvb2 "github.com/OT-CONTAINER-KIT/redis-operator/api/rediscluster/v1beta2" "github.com/OT-CONTAINER-KIT/redis-operator/internal/util" corev1 "k8s.io/api/core/v1" @@ -283,6 +284,11 @@ func (service RedisClusterSTS) CreateRedisClusterSetup(ctx context.Context, cr * labels := getRedisLabels(stateFulName, cluster, service.RedisStateFulType, cr.ObjectMeta.Labels) annotations := generateStatefulSetsAnots(cr.ObjectMeta, cr.Spec.KubernetesConfig.IgnoreAnnotations) objectMetaInfo := generateObjectMetaInformation(stateFulName, cr.Namespace, labels, annotations) + + sidecars := []common.Sidecar{} + if cr.Spec.Sidecars != nil { + sidecars = append(sidecars, *cr.Spec.Sidecars...) + } err := CreateOrUpdateStateFul( ctx, cl, @@ -292,7 +298,7 @@ func (service RedisClusterSTS) CreateRedisClusterSetup(ctx context.Context, cr * redisClusterAsOwner(cr), generateRedisClusterInitContainerParams(cr), generateRedisClusterContainerParams(ctx, cl, cr, service.SecurityContext, service.ReadinessProbe, service.LivenessProbe, service.RedisStateFulType, service.Resources), - cr.Spec.Sidecars, + &sidecars, ) if err != nil { log.FromContext(ctx).Error(err, "Cannot create statefulset for Redis", "Setup.Type", service.RedisStateFulType) diff --git a/internal/k8sutils/redis-replication.go b/internal/k8sutils/redis-replication.go index da0af4dcd7..7c6a0ad50b 100644 --- a/internal/k8sutils/redis-replication.go +++ b/internal/k8sutils/redis-replication.go @@ -3,6 +3,7 @@ package k8sutils import ( "context" + common "github.com/OT-CONTAINER-KIT/redis-operator/api/common/v1beta2" rrvb2 "github.com/OT-CONTAINER-KIT/redis-operator/api/redisreplication/v1beta2" rsvb2 "github.com/OT-CONTAINER-KIT/redis-operator/api/redissentinel/v1beta2" "github.com/OT-CONTAINER-KIT/redis-operator/internal/util" @@ -68,6 +69,16 @@ func CreateReplicationRedis(ctx context.Context, cr *rrvb2.RedisReplication, cl annotations := generateStatefulSetsAnots(cr.ObjectMeta, cr.Spec.KubernetesConfig.IgnoreAnnotations) objectMetaInfo := generateObjectMetaInformation(stateFulName, cr.Namespace, labels, annotations) + sidecars := []common.Sidecar{} + if cr.Spec.Sidecars != nil { + sidecars = append(sidecars, *cr.Spec.Sidecars...) + } + redisAgentSidecar := generateAgentSidecar(RedisAgentConfig{ + Port: ptr.To(redisPort), + ExistingPasswordSecret: cr.Spec.KubernetesConfig.ExistingPasswordSecret, + }) + sidecars = append(sidecars, redisAgentSidecar) + err := CreateOrUpdateStateFul( ctx, cl, @@ -77,7 +88,7 @@ func CreateReplicationRedis(ctx context.Context, cr *rrvb2.RedisReplication, cl redisReplicationAsOwner(cr), generateRedisReplicationInitContainerParams(cr), generateRedisReplicationContainerParams(cr), - cr.Spec.Sidecars, + &sidecars, ) if err != nil { log.FromContext(ctx).Error(err, "Cannot create replication statefulset for Redis")