Skip to content

Sync mode reloaded #572

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 46 commits into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
3a21b29
Check patronic config and only update if neccessary
eberlep Aug 31, 2022
4e7a999
Refactoring
eberlep Aug 31, 2022
6c599cc
...
eberlep Aug 31, 2022
f5b3adc
Refactoring
eberlep Aug 31, 2022
395937e
Fix linter warnings
eberlep Aug 31, 2022
759147b
Fix linter warning
eberlep Aug 31, 2022
a3408fd
Delay requeue by 10 secs
eberlep Aug 31, 2022
bef00f1
Check before updating standby configs as well
eberlep Aug 31, 2022
293ae9c
Improve logging
eberlep Aug 31, 2022
64354f5
Improve logging
eberlep Aug 31, 2022
7ac265c
Improve logging
eberlep Aug 31, 2022
3f289f2
Fix check for SynchronousNodesAdditional
eberlep Aug 31, 2022
898bdf2
Fix comparison, improve logging
eberlep Aug 31, 2022
a17cf11
Additional nil check...
eberlep Aug 31, 2022
3f57b1c
Test different execution order for primaries and standbies
eberlep Aug 31, 2022
298bf05
Revert "Test different execution order for primaries and standbies"
eberlep Aug 31, 2022
15680d4
Logging
eberlep Aug 31, 2022
f03bb68
Refactoring
eberlep Sep 1, 2022
ce32f84
Merge remote-tracking branch 'origin/main' into 397-patronic-check-co…
eberlep Sep 1, 2022
41a85ed
Make requeue duration configurable
eberlep Sep 1, 2022
06cc9bb
Rename variable
eberlep Sep 1, 2022
febb2a9
Add additional check (but only log the result for now)
eberlep Sep 1, 2022
6422829
Merge remote-tracking branch 'origin/main' into 397-patronic-check-co…
eberlep Nov 23, 2022
4e8042e
Only requeue when REST call was successful
eberlep Nov 24, 2022
2b99dbf
Fix linter errors
eberlep Nov 24, 2022
fb7df8f
Merge remote-tracking branch 'origin' into 397-patronic-check-config-…
eberlep Oct 16, 2023
be390d8
Merge remote-tracking branch 'origin' into 397-patronic-check-config-…
eberlep Oct 16, 2023
cc65fff
Set to nil when not needed (so it will actually be removed from the CR)
eberlep Jun 26, 2024
ae24045
Merge remote-tracking branch 'origin' into sync_mode_reloaded
eberlep Jun 26, 2024
27ae3b5
Only set the params required for sync replication and leave the rest …
eberlep Jun 26, 2024
00cc28f
Remove unneccessary code
eberlep Jun 26, 2024
9f40ac0
logging
eberlep Jun 26, 2024
7e525ec
Remove unused code
eberlep Jun 26, 2024
9e66ff2
Merge branch 'major_update' into sync_mode_reloaded
eberlep Jun 26, 2024
f75c296
Fix logic
eberlep Jun 26, 2024
d78c90c
Update logic
eberlep Jun 27, 2024
ae565af
Revert "Remove unused code"
eberlep Jun 27, 2024
ee7de79
Revert "Remove unneccessary code"
eberlep Jun 27, 2024
56fe5ac
Update previously reverted code
eberlep Jun 27, 2024
63b71ae
Set all values when paching
eberlep Jun 27, 2024
2b14aa6
Logging
eberlep Jun 27, 2024
08ff9d5
Back to status quo: set the whole config
eberlep Jun 27, 2024
14a4c13
typo
eberlep Jun 27, 2024
a8e4ded
Logging
eberlep Jun 27, 2024
af4f37e
Remove TODOs after review
eberlep Jun 28, 2024
e867142
Simplify
eberlep Jun 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
282 changes: 229 additions & 53 deletions controllers/postgres_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,20 @@
package controllers

import (
"bytes"
"context"
"crypto/rand"
"encoding/json"
"errors"
"fmt"
"io"
"math/big"
"net"
"net/http"
"net/url"
"strconv"
"strings"
"time"

"github.com/go-logr/logr"
zalando "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
Expand Down Expand Up @@ -70,13 +74,13 @@ var requeue = ctrl.Result{

// PostgresReconciler reconciles a Postgres object
type PostgresReconciler struct {
CtrlClient client.Client
SvcClient client.Client
Log logr.Logger
Scheme *runtime.Scheme
PartitionID, Tenant, StorageClass string
OperatorManager *operatormanager.OperatorManager
LBManager *lbmanager.LBManager
CtrlClient client.Client
SvcClient client.Client
Log logr.Logger
Scheme *runtime.Scheme
PartitionID, Tenant, StorageClass string
*operatormanager.OperatorManager
*lbmanager.LBManager
recorder record.EventRecorder
PgParamBlockList map[string]bool
StandbyClustersSourceRanges []string
Expand All @@ -87,6 +91,7 @@ type PostgresReconciler struct {
PatroniTTL uint32
PatroniLoopWait uint32
PatroniRetryTimeout uint32
ReplicationChangeRequeueDuration time.Duration
EnableRandomStorageEncryptionSecret bool
EnableWalGEncryption bool
PostgresletFullname string
Expand All @@ -99,6 +104,17 @@ type PostgresReconciler struct {
TLSSubDomain string
}

type PatroniStandbyCluster struct {
CreateReplicaMethods []string `json:"create_replica_methods"`
Host string `json:"host"`
Port int `json:"port"`
ApplicationName string `json:"application_name"`
}
type PatroniConfig struct {
StandbyCluster *PatroniStandbyCluster `json:"standby_cluster"`
SynchronousNodesAdditional *string `json:"synchronous_nodes_additional"`
}

// Reconcile is the entry point for postgres reconciliation.
// +kubebuilder:rbac:groups=database.fits.cloud,resources=postgres,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=database.fits.cloud,resources=postgres/status,verbs=get;update;patch
Expand Down Expand Up @@ -253,18 +269,8 @@ func (r *PostgresReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return ctrl.Result{}, fmt.Errorf("error while creating postgres secrets: %w", err)
}

if instance.IsReplicationPrimary() {
// the field is not empty, which means we were either a regular, standalone database that was promoted to leader
// (meaning we are already running) or we are a standby which was promoted to leader (also meaning we are
// already running)
// That means we should be able to call the patroni api already. this is required, as updating the custom
// resource of a standby db seems to fail (maybe because of the users/databases?)...
// anyway, let's get on with it
if err := r.updatePatroniConfig(log, ctx, instance); err != nil {
// TODO what to do here? reschedule or ignore?
log.Error(err, "failed to update patroni config via REST call")
}
}
// check (and update if neccessary) the current patroni replication config.
requeueAfterReconcile, patroniConfigChangeErr := r.checkAndUpdatePatroniReplicationConfig(log, ctx, instance)

// create standby egress rule first, so the standby can actually connect to the primary
if err := r.createOrUpdateEgressCWNP(ctx, instance); err != nil {
Expand Down Expand Up @@ -318,7 +324,7 @@ func (r *PostgresReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
// Check if socket port is ready
port := instance.Status.Socket.Port
if port == 0 {
r.recorder.Event(instance, "Warning", "Self-Reconcilation", "socket port not ready")
r.recorder.Event(instance, "Warning", "Self-Reconciliation", "socket port not ready")
log.Info("socket port not ready, requeueing")
return requeue, nil
}
Expand All @@ -329,9 +335,17 @@ func (r *PostgresReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return ctrl.Result{}, fmt.Errorf("unable to create or update ingress ClusterwideNetworkPolicy: %w", err)
}

// this is the call for standbys
if err := r.updatePatroniConfig(log, ctx, instance); err != nil {
return requeue, fmt.Errorf("unable to update patroni config: %w", err)
// when an error occurred while updating the patroni config, requeue here
// we try again in the next loop, hoping things will settle
if patroniConfigChangeErr != nil {
log.Info("Requeueing after getting/setting patroni replication config failed")
return ctrl.Result{Requeue: true, RequeueAfter: 10 * time.Second}, patroniConfigChangeErr
}
// if the config isn't in the expected state yet (we only add values to an existing config, we do not perform the actual switch), we simply requeue.
// on the next reconciliation loop, postgres-operator shoud have catched up and the config should hopefully be correct already so we can continue with adding our values.
if requeueAfterReconcile {
log.Info("Requeueing after patroni replication hasn't returned the expected state (yet)")
return ctrl.Result{Requeue: true, RequeueAfter: r.ReplicationChangeRequeueDuration}, nil
}

log.Info("postgres reconciled")
Expand Down Expand Up @@ -1003,60 +1017,124 @@ func (r *PostgresReconciler) copySecrets(log logr.Logger, ctx context.Context, s
return nil
}

func (r *PostgresReconciler) updatePatroniConfig(log logr.Logger, ctx context.Context, instance *pg.Postgres) error {
// Finally, send a POST to to the database with the correct config
func (r *PostgresReconciler) checkAndUpdatePatroniReplicationConfig(log logr.Logger, ctx context.Context, instance *pg.Postgres) (bool, error) {

log = log.WithValues("label", "patroni")

const requeueAfterReconcile = true
const allDone = false

// If there is no connected postgres, no need to tinker with patroni directly
if instance.Spec.PostgresConnection == nil {
return nil
return allDone, nil
}

log.V(debugLogLevel).Info("Sending REST call to Patroni API")
pods := &corev1.PodList{}
log.V(debugLogLevel).Info("Checking replication config from Patroni API")

roleReq, err := labels.NewRequirement(pg.SpiloRoleLabelName, selection.In, []string{pg.SpiloRoleLabelValueMaster, pg.SpiloRoleLabelValueStandbyLeader})
// Get the leader pod
leaderPods, err := r.findLeaderPods(log, ctx, instance)
if err != nil {
log.Info("could not create requirements for label selector to query pods, requeuing")
return err
log.V(debugLogLevel).Info("could not query pods, requeuing")
return requeueAfterReconcile, err
}
leaderSelector := labels.NewSelector()
leaderSelector = leaderSelector.Add(*roleReq)

opts := []client.ListOption{
client.InNamespace(instance.ToPeripheralResourceNamespace()),
client.MatchingLabelsSelector{Selector: leaderSelector},
if len(leaderPods.Items) != 1 {
log.V(debugLogLevel).Info("expected exactly one leader pod, selecting all spilo pods as a last resort (might be ok if it is still creating)")
// To make sure any updates to the Zalando postgresql manifest are written, we do not requeue in this case
return allDone, r.updatePatroniReplicationConfigOnAllPods(log, ctx, instance)
}
if err := r.SvcClient.List(ctx, pods, opts...); err != nil {
log.Info("could not query pods, requeuing")
return err
leaderIP := leaderPods.Items[0].Status.PodIP

var resp *PatroniConfig
resp, err = r.httpGetPatroniConfig(log, ctx, leaderIP)
if err != nil {
log.V(debugLogLevel).Info("could not query patroni, requeuing")
return requeueAfterReconcile, err
}
if resp == nil {
log.V(debugLogLevel).Info("got nil response from patroni, requeuing")
return requeueAfterReconcile, nil
}
if len(pods.Items) == 0 {
log.Info("no leader pod found, selecting all spilo pods as a last resort (might be ok if it is still creating)")

err = r.updatePatroniConfigOnAllPods(log, ctx, instance)
if err != nil {
log.Info("updating patroni config failed, got one or more errors")
return err
if instance.IsReplicationPrimary() {
if resp.StandbyCluster != nil {
log.V(debugLogLevel).Info("standby_cluster mistmatch, requeing", "response", resp)
return requeueAfterReconcile, nil
}
if instance.Spec.PostgresConnection.SynchronousReplication {
if resp.SynchronousNodesAdditional == nil || *resp.SynchronousNodesAdditional != instance.Spec.PostgresConnection.ConnectedPostgresID {
log.V(debugLogLevel).Info("synchronous_nodes_additional mistmatch, updating", "response", resp)
return allDone, r.httpPatchPatroni(log, ctx, instance, leaderIP)
}
} else {
if resp.SynchronousNodesAdditional != nil {
log.V(debugLogLevel).Info("synchronous_nodes_additional mistmatch, updating", "response", resp)
return allDone, r.httpPatchPatroni(log, ctx, instance, leaderIP)
}
}

} else {
if resp.StandbyCluster == nil {
log.V(debugLogLevel).Info("standby_cluster mismatch, requeing", "response", resp)
return requeueAfterReconcile, nil
}
if resp.StandbyCluster.ApplicationName != instance.ObjectMeta.Name {
log.V(debugLogLevel).Info("application_name mismatch, updating", "response", resp)
return allDone, r.httpPatchPatroni(log, ctx, instance, leaderIP)
}
if resp.SynchronousNodesAdditional != nil {
log.V(debugLogLevel).Info("synchronous_nodes_additional mistmatch, updating", "response", resp)
return allDone, r.httpPatchPatroni(log, ctx, instance, leaderIP)
}
if resp.StandbyCluster.CreateReplicaMethods == nil {
log.V(debugLogLevel).Info("create_replica_methods mismatch, updating", "response", resp)
return requeueAfterReconcile, r.httpPatchPatroni(log, ctx, instance, leaderIP)
}
if resp.StandbyCluster.Host != instance.Spec.PostgresConnection.ConnectionIP {
log.V(debugLogLevel).Info("host mismatch, requeing", "updating", resp)
return requeueAfterReconcile, r.httpPatchPatroni(log, ctx, instance, leaderIP)
}
if resp.StandbyCluster.Port != int(instance.Spec.PostgresConnection.ConnectionPort) {
log.V(debugLogLevel).Info("port mismatch, requeing", "updating", resp)
return requeueAfterReconcile, r.httpPatchPatroni(log, ctx, instance, leaderIP)
}
return nil
}
podIP := pods.Items[0].Status.PodIP

return r.httpPatchPatroni(log, ctx, instance, podIP)
log.V(debugLogLevel).Info("replication config from Patroni API up to date")
return allDone, nil
}

func (r *PostgresReconciler) findLeaderPods(log logr.Logger, ctx context.Context, instance *pg.Postgres) (*corev1.PodList, error) {
leaderPods := &corev1.PodList{}
roleReq, err := labels.NewRequirement(pg.SpiloRoleLabelName, selection.In, []string{pg.SpiloRoleLabelValueMaster, pg.SpiloRoleLabelValueStandbyLeader})
if err != nil {
log.V(debugLogLevel).Info("could not create requirements for label selector to query pods, requeuing")
return leaderPods, err
}
leaderSelector := labels.NewSelector().Add(*roleReq)
opts := []client.ListOption{
client.InNamespace(instance.ToPeripheralResourceNamespace()),
client.MatchingLabelsSelector{Selector: leaderSelector},
}
return leaderPods, r.SvcClient.List(ctx, leaderPods, opts...)
}

func (r *PostgresReconciler) updatePatroniConfigOnAllPods(log logr.Logger, ctx context.Context, instance *pg.Postgres) error {
func (r *PostgresReconciler) updatePatroniReplicationConfigOnAllPods(log logr.Logger, ctx context.Context, instance *pg.Postgres) error {
pods := &corev1.PodList{}
opts := []client.ListOption{
client.InNamespace(instance.ToPeripheralResourceNamespace()),
client.MatchingLabels{pg.ApplicationLabelName: pg.ApplicationLabelValue},
}
if err := r.SvcClient.List(ctx, pods, opts...); err != nil {
log.Info("could not query pods, requeuing")
log.V(debugLogLevel).Info("could not query pods, requeuing")
return err
}

if len(pods.Items) == 0 {
log.Info("no spilo pods found at all, requeueing")
log.V(debugLogLevel).Info("no spilo pods found at all, requeueing")
return errors.New("no spilo pods found at all")
} else if len(pods.Items) < int(instance.Spec.NumberOfInstances) {
log.V(debugLogLevel).Info("expected %d pods, but only found %d (might be ok if it is still creating)", instance.Spec.NumberOfInstances, len(pods.Items))
}

// iterate all spilo pods
Expand All @@ -1070,18 +1148,116 @@ func (r *PostgresReconciler) updatePatroniConfigOnAllPods(log logr.Logger, ctx c
}
}
if lastErr != nil {
log.Info("updating patroni config failed, got one or more errors")
log.V(debugLogLevel).Info("updating patroni config failed, got one or more errors")
return lastErr
}
log.V(debugLogLevel).Info("updating patroni config succeeded")
return nil
}

func (r *PostgresReconciler) httpPatchPatroni(log logr.Logger, ctx context.Context, instance *pg.Postgres, podIP string) error {
log.Info("Skipping call to patroni")
if podIP == "" {
return errors.New("podIP must not be empty")
}

podPort := "8008"
path := "config"

log.V(debugLogLevel).Info("Preparing request")
var request PatroniConfig
if instance.IsReplicationPrimary() {
request = PatroniConfig{
StandbyCluster: nil,
}
if instance.Spec.PostgresConnection.SynchronousReplication {
// enable sync replication
request.SynchronousNodesAdditional = pointer.String(instance.Spec.PostgresConnection.ConnectedPostgresID)
} else {
// disable sync replication
request.SynchronousNodesAdditional = nil
}
} else {
// TODO check values first
request = PatroniConfig{
StandbyCluster: &PatroniStandbyCluster{
CreateReplicaMethods: []string{"basebackup_fast_xlog"},
Host: instance.Spec.PostgresConnection.ConnectionIP,
Port: int(instance.Spec.PostgresConnection.ConnectionPort),
ApplicationName: instance.ObjectMeta.Name,
},
SynchronousNodesAdditional: nil,
}
}
log.V(debugLogLevel).Info("Prepared request", "request", request)
jsonReq, err := json.Marshal(request)
if err != nil {
log.V(debugLogLevel).Info("could not create config")
return err
}

httpClient := &http.Client{}
url := "http://" + podIP + ":" + podPort + "/" + path

req, err := http.NewRequestWithContext(ctx, http.MethodPatch, url, bytes.NewBuffer(jsonReq))
if err != nil {
log.Error(err, "could not create PATCH request")
return err
}
req.Header.Set("Content-Type", "application/json")

resp, err := httpClient.Do(req)
if err != nil {
log.Error(err, "could not perform PATCH request")
return err
}
defer resp.Body.Close()

return nil
}

func (r *PostgresReconciler) httpGetPatroniConfig(log logr.Logger, ctx context.Context, podIP string) (*PatroniConfig, error) {
if podIP == "" {
return nil, errors.New("podIP must not be empty")
}

podPort := "8008"
path := "config"

httpClient := &http.Client{}
url := "http://" + podIP + ":" + podPort + "/" + path

req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
log.Error(err, "could not create GET request")
return nil, err
}
req.Header.Set("Content-Type", "application/json")

resp, err := httpClient.Do(req)
if err != nil {
log.Error(err, "could not perform GET request")
return nil, err
}

defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)
if err != nil {
log.Info("could not read body")
return nil, err
}
var jsonResp PatroniConfig
err = json.Unmarshal(body, &jsonResp)
if err != nil {
log.V(debugLogLevel).Info("could not parse config response")
return nil, err
}

log.V(debugLogLevel).Info("Got config", "response", jsonResp)

return &jsonResp, err
}

func (r *PostgresReconciler) getBackupConfig(ctx context.Context, ns, name string) (*pg.BackupConfig, error) {
// fetch secret
backupSecret := &corev1.Secret{}
Expand Down
Loading