diff --git a/controllers/postgres_controller.go b/controllers/postgres_controller.go index 44423791..612f1ecc 100644 --- a/controllers/postgres_controller.go +++ b/controllers/postgres_controller.go @@ -7,16 +7,20 @@ package controllers import ( + "bytes" "context" "crypto/rand" "encoding/json" "errors" "fmt" + "io" "math/big" "net" + "net/http" "net/url" "strconv" "strings" + "time" "github.com/go-logr/logr" zalando "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" @@ -70,13 +74,13 @@ var requeue = ctrl.Result{ // PostgresReconciler reconciles a Postgres object type PostgresReconciler struct { - CtrlClient client.Client - SvcClient client.Client - Log logr.Logger - Scheme *runtime.Scheme - PartitionID, Tenant, StorageClass string - OperatorManager *operatormanager.OperatorManager - LBManager *lbmanager.LBManager + CtrlClient client.Client + SvcClient client.Client + Log logr.Logger + Scheme *runtime.Scheme + PartitionID, Tenant, StorageClass string + *operatormanager.OperatorManager + *lbmanager.LBManager recorder record.EventRecorder PgParamBlockList map[string]bool StandbyClustersSourceRanges []string @@ -87,6 +91,7 @@ type PostgresReconciler struct { PatroniTTL uint32 PatroniLoopWait uint32 PatroniRetryTimeout uint32 + ReplicationChangeRequeueDuration time.Duration EnableRandomStorageEncryptionSecret bool EnableWalGEncryption bool PostgresletFullname string @@ -99,6 +104,17 @@ type PostgresReconciler struct { TLSSubDomain string } +type PatroniStandbyCluster struct { + CreateReplicaMethods []string `json:"create_replica_methods"` + Host string `json:"host"` + Port int `json:"port"` + ApplicationName string `json:"application_name"` +} +type PatroniConfig struct { + StandbyCluster *PatroniStandbyCluster `json:"standby_cluster"` + SynchronousNodesAdditional *string `json:"synchronous_nodes_additional"` +} + // Reconcile is the entry point for postgres reconciliation. // +kubebuilder:rbac:groups=database.fits.cloud,resources=postgres,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=database.fits.cloud,resources=postgres/status,verbs=get;update;patch @@ -253,18 +269,8 @@ func (r *PostgresReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c return ctrl.Result{}, fmt.Errorf("error while creating postgres secrets: %w", err) } - if instance.IsReplicationPrimary() { - // the field is not empty, which means we were either a regular, standalone database that was promoted to leader - // (meaning we are already running) or we are a standby which was promoted to leader (also meaning we are - // already running) - // That means we should be able to call the patroni api already. this is required, as updating the custom - // resource of a standby db seems to fail (maybe because of the users/databases?)... - // anyway, let's get on with it - if err := r.updatePatroniConfig(log, ctx, instance); err != nil { - // TODO what to do here? reschedule or ignore? - log.Error(err, "failed to update patroni config via REST call") - } - } + // check (and update if neccessary) the current patroni replication config. + requeueAfterReconcile, patroniConfigChangeErr := r.checkAndUpdatePatroniReplicationConfig(log, ctx, instance) // create standby egress rule first, so the standby can actually connect to the primary if err := r.createOrUpdateEgressCWNP(ctx, instance); err != nil { @@ -318,7 +324,7 @@ func (r *PostgresReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c // Check if socket port is ready port := instance.Status.Socket.Port if port == 0 { - r.recorder.Event(instance, "Warning", "Self-Reconcilation", "socket port not ready") + r.recorder.Event(instance, "Warning", "Self-Reconciliation", "socket port not ready") log.Info("socket port not ready, requeueing") return requeue, nil } @@ -329,9 +335,17 @@ func (r *PostgresReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c return ctrl.Result{}, fmt.Errorf("unable to create or update ingress ClusterwideNetworkPolicy: %w", err) } - // this is the call for standbys - if err := r.updatePatroniConfig(log, ctx, instance); err != nil { - return requeue, fmt.Errorf("unable to update patroni config: %w", err) + // when an error occurred while updating the patroni config, requeue here + // we try again in the next loop, hoping things will settle + if patroniConfigChangeErr != nil { + log.Info("Requeueing after getting/setting patroni replication config failed") + return ctrl.Result{Requeue: true, RequeueAfter: 10 * time.Second}, patroniConfigChangeErr + } + // if the config isn't in the expected state yet (we only add values to an existing config, we do not perform the actual switch), we simply requeue. + // on the next reconciliation loop, postgres-operator shoud have catched up and the config should hopefully be correct already so we can continue with adding our values. + if requeueAfterReconcile { + log.Info("Requeueing after patroni replication hasn't returned the expected state (yet)") + return ctrl.Result{Requeue: true, RequeueAfter: r.ReplicationChangeRequeueDuration}, nil } log.Info("postgres reconciled") @@ -1003,60 +1017,124 @@ func (r *PostgresReconciler) copySecrets(log logr.Logger, ctx context.Context, s return nil } -func (r *PostgresReconciler) updatePatroniConfig(log logr.Logger, ctx context.Context, instance *pg.Postgres) error { - // Finally, send a POST to to the database with the correct config +func (r *PostgresReconciler) checkAndUpdatePatroniReplicationConfig(log logr.Logger, ctx context.Context, instance *pg.Postgres) (bool, error) { + + log = log.WithValues("label", "patroni") + + const requeueAfterReconcile = true + const allDone = false + + // If there is no connected postgres, no need to tinker with patroni directly if instance.Spec.PostgresConnection == nil { - return nil + return allDone, nil } - log.V(debugLogLevel).Info("Sending REST call to Patroni API") - pods := &corev1.PodList{} + log.V(debugLogLevel).Info("Checking replication config from Patroni API") - roleReq, err := labels.NewRequirement(pg.SpiloRoleLabelName, selection.In, []string{pg.SpiloRoleLabelValueMaster, pg.SpiloRoleLabelValueStandbyLeader}) + // Get the leader pod + leaderPods, err := r.findLeaderPods(log, ctx, instance) if err != nil { - log.Info("could not create requirements for label selector to query pods, requeuing") - return err + log.V(debugLogLevel).Info("could not query pods, requeuing") + return requeueAfterReconcile, err } - leaderSelector := labels.NewSelector() - leaderSelector = leaderSelector.Add(*roleReq) - opts := []client.ListOption{ - client.InNamespace(instance.ToPeripheralResourceNamespace()), - client.MatchingLabelsSelector{Selector: leaderSelector}, + if len(leaderPods.Items) != 1 { + log.V(debugLogLevel).Info("expected exactly one leader pod, selecting all spilo pods as a last resort (might be ok if it is still creating)") + // To make sure any updates to the Zalando postgresql manifest are written, we do not requeue in this case + return allDone, r.updatePatroniReplicationConfigOnAllPods(log, ctx, instance) } - if err := r.SvcClient.List(ctx, pods, opts...); err != nil { - log.Info("could not query pods, requeuing") - return err + leaderIP := leaderPods.Items[0].Status.PodIP + + var resp *PatroniConfig + resp, err = r.httpGetPatroniConfig(log, ctx, leaderIP) + if err != nil { + log.V(debugLogLevel).Info("could not query patroni, requeuing") + return requeueAfterReconcile, err + } + if resp == nil { + log.V(debugLogLevel).Info("got nil response from patroni, requeuing") + return requeueAfterReconcile, nil } - if len(pods.Items) == 0 { - log.Info("no leader pod found, selecting all spilo pods as a last resort (might be ok if it is still creating)") - err = r.updatePatroniConfigOnAllPods(log, ctx, instance) - if err != nil { - log.Info("updating patroni config failed, got one or more errors") - return err + if instance.IsReplicationPrimary() { + if resp.StandbyCluster != nil { + log.V(debugLogLevel).Info("standby_cluster mistmatch, requeing", "response", resp) + return requeueAfterReconcile, nil + } + if instance.Spec.PostgresConnection.SynchronousReplication { + if resp.SynchronousNodesAdditional == nil || *resp.SynchronousNodesAdditional != instance.Spec.PostgresConnection.ConnectedPostgresID { + log.V(debugLogLevel).Info("synchronous_nodes_additional mistmatch, updating", "response", resp) + return allDone, r.httpPatchPatroni(log, ctx, instance, leaderIP) + } + } else { + if resp.SynchronousNodesAdditional != nil { + log.V(debugLogLevel).Info("synchronous_nodes_additional mistmatch, updating", "response", resp) + return allDone, r.httpPatchPatroni(log, ctx, instance, leaderIP) + } + } + + } else { + if resp.StandbyCluster == nil { + log.V(debugLogLevel).Info("standby_cluster mismatch, requeing", "response", resp) + return requeueAfterReconcile, nil + } + if resp.StandbyCluster.ApplicationName != instance.ObjectMeta.Name { + log.V(debugLogLevel).Info("application_name mismatch, updating", "response", resp) + return allDone, r.httpPatchPatroni(log, ctx, instance, leaderIP) + } + if resp.SynchronousNodesAdditional != nil { + log.V(debugLogLevel).Info("synchronous_nodes_additional mistmatch, updating", "response", resp) + return allDone, r.httpPatchPatroni(log, ctx, instance, leaderIP) + } + if resp.StandbyCluster.CreateReplicaMethods == nil { + log.V(debugLogLevel).Info("create_replica_methods mismatch, updating", "response", resp) + return requeueAfterReconcile, r.httpPatchPatroni(log, ctx, instance, leaderIP) + } + if resp.StandbyCluster.Host != instance.Spec.PostgresConnection.ConnectionIP { + log.V(debugLogLevel).Info("host mismatch, requeing", "updating", resp) + return requeueAfterReconcile, r.httpPatchPatroni(log, ctx, instance, leaderIP) + } + if resp.StandbyCluster.Port != int(instance.Spec.PostgresConnection.ConnectionPort) { + log.V(debugLogLevel).Info("port mismatch, requeing", "updating", resp) + return requeueAfterReconcile, r.httpPatchPatroni(log, ctx, instance, leaderIP) } - return nil } - podIP := pods.Items[0].Status.PodIP - return r.httpPatchPatroni(log, ctx, instance, podIP) + log.V(debugLogLevel).Info("replication config from Patroni API up to date") + return allDone, nil +} + +func (r *PostgresReconciler) findLeaderPods(log logr.Logger, ctx context.Context, instance *pg.Postgres) (*corev1.PodList, error) { + leaderPods := &corev1.PodList{} + roleReq, err := labels.NewRequirement(pg.SpiloRoleLabelName, selection.In, []string{pg.SpiloRoleLabelValueMaster, pg.SpiloRoleLabelValueStandbyLeader}) + if err != nil { + log.V(debugLogLevel).Info("could not create requirements for label selector to query pods, requeuing") + return leaderPods, err + } + leaderSelector := labels.NewSelector().Add(*roleReq) + opts := []client.ListOption{ + client.InNamespace(instance.ToPeripheralResourceNamespace()), + client.MatchingLabelsSelector{Selector: leaderSelector}, + } + return leaderPods, r.SvcClient.List(ctx, leaderPods, opts...) } -func (r *PostgresReconciler) updatePatroniConfigOnAllPods(log logr.Logger, ctx context.Context, instance *pg.Postgres) error { +func (r *PostgresReconciler) updatePatroniReplicationConfigOnAllPods(log logr.Logger, ctx context.Context, instance *pg.Postgres) error { pods := &corev1.PodList{} opts := []client.ListOption{ client.InNamespace(instance.ToPeripheralResourceNamespace()), client.MatchingLabels{pg.ApplicationLabelName: pg.ApplicationLabelValue}, } if err := r.SvcClient.List(ctx, pods, opts...); err != nil { - log.Info("could not query pods, requeuing") + log.V(debugLogLevel).Info("could not query pods, requeuing") return err } if len(pods.Items) == 0 { - log.Info("no spilo pods found at all, requeueing") + log.V(debugLogLevel).Info("no spilo pods found at all, requeueing") return errors.New("no spilo pods found at all") + } else if len(pods.Items) < int(instance.Spec.NumberOfInstances) { + log.V(debugLogLevel).Info("expected %d pods, but only found %d (might be ok if it is still creating)", instance.Spec.NumberOfInstances, len(pods.Items)) } // iterate all spilo pods @@ -1070,7 +1148,7 @@ func (r *PostgresReconciler) updatePatroniConfigOnAllPods(log logr.Logger, ctx c } } if lastErr != nil { - log.Info("updating patroni config failed, got one or more errors") + log.V(debugLogLevel).Info("updating patroni config failed, got one or more errors") return lastErr } log.V(debugLogLevel).Info("updating patroni config succeeded") @@ -1078,10 +1156,108 @@ func (r *PostgresReconciler) updatePatroniConfigOnAllPods(log logr.Logger, ctx c } func (r *PostgresReconciler) httpPatchPatroni(log logr.Logger, ctx context.Context, instance *pg.Postgres, podIP string) error { - log.Info("Skipping call to patroni") + if podIP == "" { + return errors.New("podIP must not be empty") + } + + podPort := "8008" + path := "config" + + log.V(debugLogLevel).Info("Preparing request") + var request PatroniConfig + if instance.IsReplicationPrimary() { + request = PatroniConfig{ + StandbyCluster: nil, + } + if instance.Spec.PostgresConnection.SynchronousReplication { + // enable sync replication + request.SynchronousNodesAdditional = pointer.String(instance.Spec.PostgresConnection.ConnectedPostgresID) + } else { + // disable sync replication + request.SynchronousNodesAdditional = nil + } + } else { + // TODO check values first + request = PatroniConfig{ + StandbyCluster: &PatroniStandbyCluster{ + CreateReplicaMethods: []string{"basebackup_fast_xlog"}, + Host: instance.Spec.PostgresConnection.ConnectionIP, + Port: int(instance.Spec.PostgresConnection.ConnectionPort), + ApplicationName: instance.ObjectMeta.Name, + }, + SynchronousNodesAdditional: nil, + } + } + log.V(debugLogLevel).Info("Prepared request", "request", request) + jsonReq, err := json.Marshal(request) + if err != nil { + log.V(debugLogLevel).Info("could not create config") + return err + } + + httpClient := &http.Client{} + url := "http://" + podIP + ":" + podPort + "/" + path + + req, err := http.NewRequestWithContext(ctx, http.MethodPatch, url, bytes.NewBuffer(jsonReq)) + if err != nil { + log.Error(err, "could not create PATCH request") + return err + } + req.Header.Set("Content-Type", "application/json") + + resp, err := httpClient.Do(req) + if err != nil { + log.Error(err, "could not perform PATCH request") + return err + } + defer resp.Body.Close() + return nil } +func (r *PostgresReconciler) httpGetPatroniConfig(log logr.Logger, ctx context.Context, podIP string) (*PatroniConfig, error) { + if podIP == "" { + return nil, errors.New("podIP must not be empty") + } + + podPort := "8008" + path := "config" + + httpClient := &http.Client{} + url := "http://" + podIP + ":" + podPort + "/" + path + + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) + if err != nil { + log.Error(err, "could not create GET request") + return nil, err + } + req.Header.Set("Content-Type", "application/json") + + resp, err := httpClient.Do(req) + if err != nil { + log.Error(err, "could not perform GET request") + return nil, err + } + + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + log.Info("could not read body") + return nil, err + } + var jsonResp PatroniConfig + err = json.Unmarshal(body, &jsonResp) + if err != nil { + log.V(debugLogLevel).Info("could not parse config response") + return nil, err + } + + log.V(debugLogLevel).Info("Got config", "response", jsonResp) + + return &jsonResp, err +} + func (r *PostgresReconciler) getBackupConfig(ctx context.Context, ns, name string) (*pg.BackupConfig, error) { // fetch secret backupSecret := &corev1.Secret{} diff --git a/main.go b/main.go index 398fc30f..b646788b 100644 --- a/main.go +++ b/main.go @@ -12,6 +12,7 @@ import ( "net" "os" "strings" + "time" "github.com/metal-stack/v" coreosv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" @@ -39,6 +40,7 @@ import ( const ( // envPrefix = "pg" + metricsAddrSvcMgrFlg = "metrics-addr-svc-mgr" metricsAddrCtrlMgrFlg = "metrics-addr-ctrl-mgr" enableLeaderElectionFlg = "enable-leader-election" @@ -70,6 +72,7 @@ const ( etcdBackupSidecarImageFlg = "etcd-backup-sidecar-image" etcdBackupSecretNameFlg = "etcd-backup-secret-name" // nolint etcdPSPNameFlg = "etcd-psp-name" + replicationChangeRequeueTimeFlg = "replication-change-requeue-time-in-seconds" postgresletFullnameFlg = "postgreslet-fullname" enableLBSourceRangesFlg = "enable-lb-source-ranges" enableRandomStorageEncryptionSecretFlg = "enable-random-storage-encryption-secret" @@ -141,8 +144,9 @@ func main() { enableSuperUserForDBO bool enablePatroniFailsafeMode bool - portRangeStart int - portRangeSize int + portRangeStart int + portRangeSize int + replicationChangeRequeueTimeInSeconds int patroniTTL uint32 patroniLoopWait uint32 @@ -266,6 +270,10 @@ func main() { viper.SetDefault(postgresletFullnameFlg, partitionID) // fall back to partition id postgresletFullname = viper.GetString(postgresletFullnameFlg) + viper.SetDefault(replicationChangeRequeueTimeFlg, 10) + replicationChangeRequeueTimeInSeconds = viper.GetInt(replicationChangeRequeueTimeFlg) + replicationChangeRequeueDuration := time.Duration(replicationChangeRequeueTimeInSeconds) * time.Second + viper.SetDefault(enableLBSourceRangesFlg, true) enableLBSourceRanges = viper.GetBool(enableLBSourceRangesFlg) @@ -332,6 +340,7 @@ func main() { enableLBSourceRangesFlg, enableLBSourceRanges, enableRandomStorageEncryptionSecretFlg, enableRandomStorageEncryptionSecret, postgresletFullnameFlg, postgresletFullname, + replicationChangeRequeueTimeFlg, replicationChangeRequeueTimeInSeconds, enableWalGEncryptionFlg, enableWalGEncryption, enableForceSharedIPFlg, enableForceSharedIP, initDBJobCMNameFlg, initDBJobCMName, @@ -447,6 +456,7 @@ func main() { PatroniTTL: patroniTTL, PatroniLoopWait: patroniLoopWait, PatroniRetryTimeout: patroniRetryTimeout, + ReplicationChangeRequeueDuration: replicationChangeRequeueDuration, EnableRandomStorageEncryptionSecret: enableRandomStorageEncryptionSecret, EnableWalGEncryption: enableWalGEncryption, PostgresletFullname: postgresletFullname,