diff --git a/controllers/postgres_controller.go b/controllers/postgres_controller.go index b6df1fce..b9919dc9 100644 --- a/controllers/postgres_controller.go +++ b/controllers/postgres_controller.go @@ -12,11 +12,13 @@ import ( "encoding/json" "errors" "fmt" + "io/ioutil" "net" "net/http" "net/url" "strconv" "strings" + "time" "github.com/go-logr/logr" zalando "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" @@ -65,16 +67,34 @@ type PostgresReconciler struct { PartitionID, Tenant, StorageClass string *operatormanager.OperatorManager *lbmanager.LBManager - recorder record.EventRecorder - PgParamBlockList map[string]bool - StandbyClustersSourceRanges []string - PostgresletNamespace string - SidecarsConfigMapName string - EnableNetPol bool - EtcdHost string - PatroniTTL uint32 - PatroniLoopWait uint32 - PatroniRetryTimeout uint32 + recorder record.EventRecorder + PgParamBlockList map[string]bool + StandbyClustersSourceRanges []string + PostgresletNamespace string + SidecarsConfigMapName string + EnableNetPol bool + EtcdHost string + PatroniTTL uint32 + PatroniLoopWait uint32 + PatroniRetryTimeout uint32 + ReplicationChangeRequeueDuration time.Duration +} + +type PatroniStandbyCluster struct { + CreateReplicaMethods []string `json:"create_replica_methods"` + Host string `json:"host"` + Port int `json:"port"` + ApplicationName string `json:"application_name"` +} +type PatroniConfig struct { + StandbyCluster *PatroniStandbyCluster `json:"standby_cluster"` + SynchronousNodesAdditional *string `json:"synchronous_nodes_additional"` +} + +type PatroniPodStatus struct { + APIURL string `json:"api_url"` + State string `json:"state"` + Role string `json:"role"` } // Reconcile is the entry point for postgres reconciliation. @@ -210,17 +230,13 @@ func (r *PostgresReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c return ctrl.Result{}, fmt.Errorf("error while creating standby secrets: %w", err) } - if instance.IsReplicationPrimary() { - // the field is not empty, which means we were either a regular, standalone database that was promoted to leader - // (meaning we are already running) or we are a standby which was promoted to leader (also meaning we are - // already running) - // That means we should be able to call the patroni api already. this is required, as updating the custom - // ressource of a standby db seems to fail (maybe because of the users/databases?)... - // anyway, let's get on with it - if err := r.updatePatroniConfig(ctx, instance); err != nil { - // TODO what to do here? reschedule or ignore? - log.Error(err, "failed to update patroni config via REST call") - } + // check (and update if neccessary) the current patroni replication config. + immediateRequeue, patroniConfigChangeErr := r.checkAndUpdatePatroniReplicationConfig(ctx, instance) + if immediateRequeue { + // if a config change was performed that requires a while to settle in, we simply requeue + // on the next reconciliation loop, the config should be correct already so we can continue with the rest + log.Info("Requeueing after patroni replication config change") + return ctrl.Result{Requeue: true, RequeueAfter: r.ReplicationChangeRequeueDuration}, patroniConfigChangeErr } // create standby egress rule first, so the standby can actually connect to the primary @@ -275,9 +291,11 @@ func (r *PostgresReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c return ctrl.Result{}, fmt.Errorf("unable to create or update ingress ClusterwideNetworkPolicy: %w", err) } - // this is the call for standbys - if err := r.updatePatroniConfig(ctx, instance); err != nil { - return requeue, fmt.Errorf("unable to update patroni config: %w", err) + // when an error occurred while updating the patroni config, requeue here + // this is done down here to make sure the rest of the resource updates were performed + if patroniConfigChangeErr != nil { + log.Info("Requeueing after modifying patroni replication config failed") + return ctrl.Result{Requeue: true, RequeueAfter: 10 * time.Second}, patroniConfigChangeErr } r.recorder.Event(instance, "Normal", "Reconciled", "postgres up to date") @@ -695,47 +713,142 @@ func (r *PostgresReconciler) ensureStandbySecrets(ctx context.Context, instance } -func (r *PostgresReconciler) updatePatroniConfig(ctx context.Context, instance *pg.Postgres) error { - // Finally, send a POST to to the database with the correct config +func (r *PostgresReconciler) checkAndUpdatePatroniReplicationConfig(ctx context.Context, instance *pg.Postgres) (bool, error) { + + const requeueImmediately = true + const continueWithReconciliation = false + + // If there is no connected postgres, no need to tinker with patroni directly if instance.Spec.PostgresConnection == nil { - return nil + return continueWithReconciliation, nil } - r.Log.Info("Sending REST call to Patroni API") - pods := &corev1.PodList{} + r.Log.Info("Checking replication config from Patroni API") - roleReq, err := labels.NewRequirement(pg.SpiloRoleLabelName, selection.In, []string{pg.SpiloRoleLabelValueMaster, pg.SpiloRoleLabelValueStandbyLeader}) + // Get the leader pod + leaderPods, err := r.findLeaderPods(ctx, instance) if err != nil { - r.Log.Info("could not create requirements for label selector to query pods, requeuing") - return err + r.Log.Info("could not query pods, requeuing") + return requeueImmediately, err } - leaderSelector := labels.NewSelector() - leaderSelector = leaderSelector.Add(*roleReq) - opts := []client.ListOption{ - client.InNamespace(instance.ToPeripheralResourceNamespace()), - client.MatchingLabelsSelector{Selector: leaderSelector}, + if len(leaderPods.Items) != 1 { + r.Log.Info("expected exactly one leader pod, selecting all spilo pods as a last resort (might be ok if it is still creating)") + // To make sure any updates to the Zalando postgresql manifest are written, we do not requeue in this case + return continueWithReconciliation, r.updatePatroniReplicationConfigOnAllPods(ctx, instance) } - if err := r.SvcClient.List(ctx, pods, opts...); err != nil { - r.Log.Info("could not query pods, requeuing") - return err + + apiURL, err := r.readAPIURL(&leaderPods.Items[0]) + if err != nil { + // TODO guessAPI or reconcile? + // If there is no annotation, there is probably no running patroni api either. but this is called AFTER selecting by the spilo-role labels, so this should not happen anyway? + // I guess keeping it as a fallback seems reasonable, especially since we would, if the call to the api fails, reconcile anyway. + // return continueWithReconciliation, err + apiURL = r.guessAPIURL(&leaderPods.Items[0]) } - if len(pods.Items) == 0 { - r.Log.Info("no leader pod found, selecting all spilo pods as a last resort (might be ok if it is still creating)") - err = r.updatePatroniConfigOnAllPods(ctx, instance) - if err != nil { - r.Log.Info("updating patroni config failed, got one or more errors") - return err + var resp *PatroniConfig + resp, err = r.httpGetPatroniConfig(ctx, apiURL) + if err != nil { + return continueWithReconciliation, err + } + if resp == nil { + return continueWithReconciliation, r.httpPatchPatroni(ctx, instance, apiURL) + } + + if instance.IsReplicationPrimary() { + if resp.StandbyCluster != nil { + r.Log.Info("standby_cluster mistmatch, updating and requeing", "response", resp) + return requeueImmediately, r.httpPatchPatroni(ctx, instance, apiURL) + } + if instance.Spec.PostgresConnection.SynchronousReplication { + if resp.SynchronousNodesAdditional == nil || *resp.SynchronousNodesAdditional != instance.Spec.PostgresConnection.ConnectedPostgresID { + r.Log.Info("synchronous_nodes_additional mistmatch, updating and requeing", "response", resp) + return requeueImmediately, r.httpPatchPatroni(ctx, instance, apiURL) + } + } else { + if resp.SynchronousNodesAdditional != nil { + r.Log.Info("synchronous_nodes_additional mistmatch, updating and requeing", "response", resp) + return requeueImmediately, r.httpPatchPatroni(ctx, instance, apiURL) + } + } + + } else { + if resp.StandbyCluster == nil { + r.Log.Info("standby_cluster mismatch, updating and requeing", "response", resp) + return requeueImmediately, r.httpPatchPatroni(ctx, instance, apiURL) + } + if resp.StandbyCluster.CreateReplicaMethods == nil { + r.Log.Info("create_replica_methods mismatch, updating and requeing", "response", resp) + return requeueImmediately, r.httpPatchPatroni(ctx, instance, apiURL) + } + if resp.StandbyCluster.Host != instance.Spec.PostgresConnection.ConnectionIP { + r.Log.Info("host mismatch, updating and requeing", "response", resp) + return requeueImmediately, r.httpPatchPatroni(ctx, instance, apiURL) + } + if resp.StandbyCluster.Port != int(instance.Spec.PostgresConnection.ConnectionPort) { + r.Log.Info("port mismatch, updating and requeing", "response", resp) + return requeueImmediately, r.httpPatchPatroni(ctx, instance, apiURL) + } + if resp.StandbyCluster.ApplicationName != instance.ObjectMeta.Name { + r.Log.Info("application_name mismatch, updating and requeing", "response", resp) + return requeueImmediately, r.httpPatchPatroni(ctx, instance, apiURL) + } + + if resp.SynchronousNodesAdditional != nil { + r.Log.Info("synchronous_nodes_additional mistmatch, updating and requeing", "response", resp) + return requeueImmediately, r.httpPatchPatroni(ctx, instance, apiURL) } - return nil } - podIP := pods.Items[0].Status.PodIP - return r.httpPatchPatroni(ctx, instance, podIP) + r.Log.Info("replication config from Patroni API up to date") + return continueWithReconciliation, nil } -func (r *PostgresReconciler) updatePatroniConfigOnAllPods(ctx context.Context, instance *pg.Postgres) error { +func (r *PostgresReconciler) readAPIURL(pod *corev1.Pod) (string, error) { + value, ok := pod.Annotations["status"] + if !ok { + return "", fmt.Errorf("could not find patroni pod status annotation") + } + + var status PatroniPodStatus + err := json.Unmarshal([]byte(value), &status) + if err != nil { + r.Log.Error(err, "could not parse patroni pod status annotation") + return "", err + } + + return status.APIURL, nil +} + +func (r *PostgresReconciler) guessAPIURL(pod *corev1.Pod) string { + if pod == nil { + return "" + } + + if pod.Status.PodIP == "" { + return "" + } + + return "http://" + pod.Status.PodIP + ":8080/config" +} + +func (r *PostgresReconciler) findLeaderPods(ctx context.Context, instance *pg.Postgres) (*corev1.PodList, error) { + leaderPods := &corev1.PodList{} + roleReq, err := labels.NewRequirement(pg.SpiloRoleLabelName, selection.In, []string{pg.SpiloRoleLabelValueMaster, pg.SpiloRoleLabelValueStandbyLeader}) + if err != nil { + r.Log.Info("could not create requirements for label selector to query pods, requeuing") + return leaderPods, err + } + leaderSelector := labels.NewSelector().Add(*roleReq) + opts := []client.ListOption{ + client.InNamespace(instance.ToPeripheralResourceNamespace()), + client.MatchingLabelsSelector{Selector: leaderSelector}, + } + return leaderPods, r.SvcClient.List(ctx, leaderPods, opts...) +} + +func (r *PostgresReconciler) updatePatroniReplicationConfigOnAllPods(ctx context.Context, instance *pg.Postgres) error { pods := &corev1.PodList{} opts := []client.ListOption{ client.InNamespace(instance.ToPeripheralResourceNamespace()), @@ -749,14 +862,16 @@ func (r *PostgresReconciler) updatePatroniConfigOnAllPods(ctx context.Context, i if len(pods.Items) == 0 { r.Log.Info("no spilo pods found at all, requeueing") return errors.New("no spilo pods found at all") + } else if len(pods.Items) < int(instance.Spec.NumberOfInstances) { + r.Log.Info("expected %d pods, but only found %d (might be ok if it is still creating)", instance.Spec.NumberOfInstances, len(pods.Items)) } // iterate all spilo pods var lastErr error for _, pod := range pods.Items { pod := pod // pin! - podIP := pod.Status.PodIP - if err := r.httpPatchPatroni(ctx, instance, podIP); err != nil { + url := r.guessAPIURL(&pod) + if err := r.httpPatchPatroni(ctx, instance, url); err != nil { lastErr = err r.Log.Info("failed to update pod") } @@ -769,29 +884,15 @@ func (r *PostgresReconciler) updatePatroniConfigOnAllPods(ctx context.Context, i return nil } -func (r *PostgresReconciler) httpPatchPatroni(ctx context.Context, instance *pg.Postgres, podIP string) error { - if podIP == "" { - return errors.New("podIP must not be empty") - } - - podPort := "8008" - path := "config" - - type PatroniStandbyCluster struct { - CreateReplicaMethods []string `json:"create_replica_methods"` - Host string `json:"host"` - Port int `json:"port"` - ApplicationName string `json:"application_name"` - } - type PatroniConfigRequest struct { - StandbyCluster *PatroniStandbyCluster `json:"standby_cluster"` - SynchronousNodesAdditional *string `json:"synchronous_nodes_additional"` +func (r *PostgresReconciler) httpPatchPatroni(ctx context.Context, instance *pg.Postgres, url string) error { + if url == "" { + return errors.New("url must not be empty") } r.Log.Info("Preparing request") - var request PatroniConfigRequest + var request PatroniConfig if instance.IsReplicationPrimary() { - request = PatroniConfigRequest{ + request = PatroniConfig{ StandbyCluster: nil, } if instance.Spec.PostgresConnection.SynchronousReplication { @@ -803,7 +904,7 @@ func (r *PostgresReconciler) httpPatchPatroni(ctx context.Context, instance *pg. } } else { // TODO check values first - request = PatroniConfigRequest{ + request = PatroniConfig{ StandbyCluster: &PatroniStandbyCluster{ CreateReplicaMethods: []string{"basebackup_fast_xlog"}, Host: instance.Spec.PostgresConnection.ConnectionIP, @@ -821,7 +922,6 @@ func (r *PostgresReconciler) httpPatchPatroni(ctx context.Context, instance *pg. } httpClient := &http.Client{} - url := "http://" + podIP + ":" + podPort + "/" + path req, err := http.NewRequestWithContext(ctx, http.MethodPatch, url, bytes.NewBuffer(jsonReq)) if err != nil { @@ -840,6 +940,49 @@ func (r *PostgresReconciler) httpPatchPatroni(ctx context.Context, instance *pg. return nil } +func (r *PostgresReconciler) httpGetPatroniConfig(ctx context.Context, podIP string) (*PatroniConfig, error) { + if podIP == "" { + return nil, errors.New("podIP must not be empty") + } + + podPort := "8008" + path := "config" + + httpClient := &http.Client{} + url := "http://" + podIP + ":" + podPort + "/" + path + + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) + if err != nil { + r.Log.Error(err, "could not create request") + return nil, err + } + req.Header.Set("Content-Type", "application/json") + + resp, err := httpClient.Do(req) + if err != nil { + r.Log.Error(err, "could not perform request") + return nil, err + } + + defer resp.Body.Close() + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + r.Log.Info("could not read body") + return nil, err + } + var jsonResp PatroniConfig + err = json.Unmarshal(body, &jsonResp) + if err != nil { + r.Log.Info("could not parse config") + return nil, err + } + + r.Log.Info("Got config", "response", jsonResp) + + return &jsonResp, err +} + func (r *PostgresReconciler) getBackupConfig(ctx context.Context, ns, name string) (*pg.BackupConfig, error) { // fetch secret backupSecret := &corev1.Secret{} diff --git a/main.go b/main.go index a58d6ebb..4140aa5a 100644 --- a/main.go +++ b/main.go @@ -12,6 +12,7 @@ import ( "net" "os" "strings" + "time" "github.com/metal-stack/v" coreosv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" @@ -35,31 +36,32 @@ import ( const ( // envPrefix = "pg" - metricsAddrSvcMgrFlg = "metrics-addr-svc-mgr" - metricsAddrCtrlMgrFlg = "metrics-addr-ctrl-mgr" - enableLeaderElectionFlg = "enable-leader-election" - partitionIDFlg = "partition-id" - tenantFlg = "tenant" - ctrlPlaneKubeConfifgFlg = "controlplane-kubeconfig" - loadBalancerIPFlg = "load-balancer-ip" - portRangeStartFlg = "port-range-start" - portRangeSizeFlg = "port-range-size" - customPSPNameFlg = "custom-psp-name" - storageClassFlg = "storage-class" - postgresImageFlg = "postgres-image" - etcdHostFlg = "etcd-host" - crdValidationFlg = "enable-crd-validation" - operatorImageFlg = "operator-image" - pgParamBlockListFlg = "postgres-param-blocklist" - majorVersionUpgradeModeFlg = "major-version-upgrade-mode" - standbyClustersSourceRangesFlg = "standby-clusters-source-ranges" - postgresletNamespaceFlg = "postgreslet-namespace" - sidecarsCMNameFlg = "sidecars-configmap-name" - enableNetPolFlg = "enable-netpol" - enablePodAntiaffinityFlg = "enable-pod-antiaffinity" - patroniRetryTimeoutFlg = "patroni-retry-timeout" - enableStandbyLeaderSelectorFlg = "enable-standby-leader-selector" - ControlPlaneNamespaceFlg = "control-plane-namespace" + metricsAddrSvcMgrFlg = "metrics-addr-svc-mgr" + metricsAddrCtrlMgrFlg = "metrics-addr-ctrl-mgr" + enableLeaderElectionFlg = "enable-leader-election" + partitionIDFlg = "partition-id" + tenantFlg = "tenant" + ctrlPlaneKubeConfifgFlg = "controlplane-kubeconfig" + loadBalancerIPFlg = "load-balancer-ip" + portRangeStartFlg = "port-range-start" + portRangeSizeFlg = "port-range-size" + customPSPNameFlg = "custom-psp-name" + storageClassFlg = "storage-class" + postgresImageFlg = "postgres-image" + etcdHostFlg = "etcd-host" + crdValidationFlg = "enable-crd-validation" + operatorImageFlg = "operator-image" + pgParamBlockListFlg = "postgres-param-blocklist" + majorVersionUpgradeModeFlg = "major-version-upgrade-mode" + standbyClustersSourceRangesFlg = "standby-clusters-source-ranges" + postgresletNamespaceFlg = "postgreslet-namespace" + sidecarsCMNameFlg = "sidecars-configmap-name" + enableNetPolFlg = "enable-netpol" + enablePodAntiaffinityFlg = "enable-pod-antiaffinity" + patroniRetryTimeoutFlg = "patroni-retry-timeout" + enableStandbyLeaderSelectorFlg = "enable-standby-leader-selector" + ControlPlaneNamespaceFlg = "control-plane-namespace" + replicationChangeRequeueTimeFlg = "replication-change-requeue-time-in-seconds" ) var ( @@ -101,8 +103,9 @@ func main() { enablePodAntiaffinity bool enableStandbyLeaderSelector bool - portRangeStart int - portRangeSize int + portRangeStart int + portRangeSize int + replicationChangeRequeueTimeInSeconds int patroniTTL uint32 patroniLoopWait uint32 @@ -207,6 +210,10 @@ func main() { viper.SetDefault(ControlPlaneNamespaceFlg, "metal-extension-postgres") controlPlaneNamespace = viper.GetString(ControlPlaneNamespaceFlg) + viper.SetDefault(replicationChangeRequeueTimeFlg, 10) + replicationChangeRequeueTimeInSeconds = viper.GetInt(replicationChangeRequeueTimeFlg) + replicationChangeRequeueDuration := time.Duration(replicationChangeRequeueTimeInSeconds) * time.Second + ctrl.SetLogger(zap.New(zap.UseDevMode(true))) ctrl.Log.Info("flag", @@ -235,6 +242,7 @@ func main() { patroniRetryTimeoutFlg, patroniRetryTimeout, enableStandbyLeaderSelectorFlg, enableStandbyLeaderSelector, ControlPlaneNamespaceFlg, controlPlaneNamespace, + replicationChangeRequeueTimeFlg, replicationChangeRequeueTimeInSeconds, ) svcClusterConf := ctrl.GetConfigOrDie() @@ -292,24 +300,25 @@ func main() { EnableStandbyLeaderSelector: enableStandbyLeaderSelector, } if err = (&controllers.PostgresReconciler{ - CtrlClient: ctrlPlaneClusterMgr.GetClient(), - SvcClient: svcClusterMgr.GetClient(), - Log: ctrl.Log.WithName("controllers").WithName("Postgres"), - Scheme: ctrlPlaneClusterMgr.GetScheme(), - PartitionID: partitionID, - Tenant: tenant, - StorageClass: storageClass, - OperatorManager: opMgr, - LBManager: lbmanager.New(svcClusterMgr.GetClient(), lbMgrOpts), - PgParamBlockList: pgParamBlockList, - StandbyClustersSourceRanges: standbyClusterSourceRanges, - PostgresletNamespace: postgresletNamespace, - SidecarsConfigMapName: sidecarsCMName, - EnableNetPol: enableNetPol, - EtcdHost: etcdHost, - PatroniTTL: patroniTTL, - PatroniLoopWait: patroniLoopWait, - PatroniRetryTimeout: patroniRetryTimeout, + CtrlClient: ctrlPlaneClusterMgr.GetClient(), + SvcClient: svcClusterMgr.GetClient(), + Log: ctrl.Log.WithName("controllers").WithName("Postgres"), + Scheme: ctrlPlaneClusterMgr.GetScheme(), + PartitionID: partitionID, + Tenant: tenant, + StorageClass: storageClass, + OperatorManager: opMgr, + LBManager: lbmanager.New(svcClusterMgr.GetClient(), lbMgrOpts), + PgParamBlockList: pgParamBlockList, + StandbyClustersSourceRanges: standbyClusterSourceRanges, + PostgresletNamespace: postgresletNamespace, + SidecarsConfigMapName: sidecarsCMName, + EnableNetPol: enableNetPol, + EtcdHost: etcdHost, + PatroniTTL: patroniTTL, + PatroniLoopWait: patroniLoopWait, + PatroniRetryTimeout: patroniRetryTimeout, + ReplicationChangeRequeueDuration: replicationChangeRequeueDuration, }).SetupWithManager(ctrlPlaneClusterMgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "Postgres") os.Exit(1)