Skip to content

Commit effa9e4

Browse files
authored
Sync mode reloaded (#572)
* Check patronic config and only update if neccessary * Refactoring * ... * Refactoring * Fix linter warnings * Fix linter warning * Delay requeue by 10 secs * Check before updating standby configs as well * Improve logging * Improve logging * Improve logging * Fix check for SynchronousNodesAdditional * Fix comparison, improve logging * Additional nil check... * Test different execution order for primaries and standbies * Revert "Test different execution order for primaries and standbies" This reverts commit 3f57b1c. * Logging * Refactoring * Make requeue duration configurable * Rename variable * Add additional check (but only log the result for now) * Only requeue when REST call was successful * Fix linter errors * Set to nil when not needed (so it will actually be removed from the CR) * Only set the params required for sync replication and leave the rest to the postgres operator * Remove unneccessary code * logging * Remove unused code * Fix logic * Update logic * Revert "Remove unused code" This reverts commit 7e525ec. * Revert "Remove unneccessary code" This reverts commit 00cc28f. * Update previously reverted code * Set all values when paching * Logging * Back to status quo: set the whole config * typo * Logging * Remove TODOs after review * Simplify
1 parent 2fa6626 commit effa9e4

File tree

2 files changed

+241
-55
lines changed

2 files changed

+241
-55
lines changed

controllers/postgres_controller.go

Lines changed: 229 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,20 @@
77
package controllers
88

99
import (
10+
"bytes"
1011
"context"
1112
"crypto/rand"
1213
"encoding/json"
1314
"errors"
1415
"fmt"
16+
"io"
1517
"math/big"
1618
"net"
19+
"net/http"
1720
"net/url"
1821
"strconv"
1922
"strings"
23+
"time"
2024

2125
"github.com/go-logr/logr"
2226
zalando "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
@@ -70,13 +74,13 @@ var requeue = ctrl.Result{
7074

7175
// PostgresReconciler reconciles a Postgres object
7276
type PostgresReconciler struct {
73-
CtrlClient client.Client
74-
SvcClient client.Client
75-
Log logr.Logger
76-
Scheme *runtime.Scheme
77-
PartitionID, Tenant, StorageClass string
78-
OperatorManager *operatormanager.OperatorManager
79-
LBManager *lbmanager.LBManager
77+
CtrlClient client.Client
78+
SvcClient client.Client
79+
Log logr.Logger
80+
Scheme *runtime.Scheme
81+
PartitionID, Tenant, StorageClass string
82+
*operatormanager.OperatorManager
83+
*lbmanager.LBManager
8084
recorder record.EventRecorder
8185
PgParamBlockList map[string]bool
8286
StandbyClustersSourceRanges []string
@@ -87,6 +91,7 @@ type PostgresReconciler struct {
8791
PatroniTTL uint32
8892
PatroniLoopWait uint32
8993
PatroniRetryTimeout uint32
94+
ReplicationChangeRequeueDuration time.Duration
9095
EnableRandomStorageEncryptionSecret bool
9196
EnableWalGEncryption bool
9297
PostgresletFullname string
@@ -99,6 +104,17 @@ type PostgresReconciler struct {
99104
TLSSubDomain string
100105
}
101106

107+
type PatroniStandbyCluster struct {
108+
CreateReplicaMethods []string `json:"create_replica_methods"`
109+
Host string `json:"host"`
110+
Port int `json:"port"`
111+
ApplicationName string `json:"application_name"`
112+
}
113+
type PatroniConfig struct {
114+
StandbyCluster *PatroniStandbyCluster `json:"standby_cluster"`
115+
SynchronousNodesAdditional *string `json:"synchronous_nodes_additional"`
116+
}
117+
102118
// Reconcile is the entry point for postgres reconciliation.
103119
// +kubebuilder:rbac:groups=database.fits.cloud,resources=postgres,verbs=get;list;watch;create;update;patch;delete
104120
// +kubebuilder:rbac:groups=database.fits.cloud,resources=postgres/status,verbs=get;update;patch
@@ -253,18 +269,8 @@ func (r *PostgresReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
253269
return ctrl.Result{}, fmt.Errorf("error while creating postgres secrets: %w", err)
254270
}
255271

256-
if instance.IsReplicationPrimary() {
257-
// the field is not empty, which means we were either a regular, standalone database that was promoted to leader
258-
// (meaning we are already running) or we are a standby which was promoted to leader (also meaning we are
259-
// already running)
260-
// That means we should be able to call the patroni api already. this is required, as updating the custom
261-
// resource of a standby db seems to fail (maybe because of the users/databases?)...
262-
// anyway, let's get on with it
263-
if err := r.updatePatroniConfig(log, ctx, instance); err != nil {
264-
// TODO what to do here? reschedule or ignore?
265-
log.Error(err, "failed to update patroni config via REST call")
266-
}
267-
}
272+
// check (and update if neccessary) the current patroni replication config.
273+
requeueAfterReconcile, patroniConfigChangeErr := r.checkAndUpdatePatroniReplicationConfig(log, ctx, instance)
268274

269275
// create standby egress rule first, so the standby can actually connect to the primary
270276
if err := r.createOrUpdateEgressCWNP(ctx, instance); err != nil {
@@ -318,7 +324,7 @@ func (r *PostgresReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
318324
// Check if socket port is ready
319325
port := instance.Status.Socket.Port
320326
if port == 0 {
321-
r.recorder.Event(instance, "Warning", "Self-Reconcilation", "socket port not ready")
327+
r.recorder.Event(instance, "Warning", "Self-Reconciliation", "socket port not ready")
322328
log.Info("socket port not ready, requeueing")
323329
return requeue, nil
324330
}
@@ -329,9 +335,17 @@ func (r *PostgresReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
329335
return ctrl.Result{}, fmt.Errorf("unable to create or update ingress ClusterwideNetworkPolicy: %w", err)
330336
}
331337

332-
// this is the call for standbys
333-
if err := r.updatePatroniConfig(log, ctx, instance); err != nil {
334-
return requeue, fmt.Errorf("unable to update patroni config: %w", err)
338+
// when an error occurred while updating the patroni config, requeue here
339+
// we try again in the next loop, hoping things will settle
340+
if patroniConfigChangeErr != nil {
341+
log.Info("Requeueing after getting/setting patroni replication config failed")
342+
return ctrl.Result{Requeue: true, RequeueAfter: 10 * time.Second}, patroniConfigChangeErr
343+
}
344+
// if the config isn't in the expected state yet (we only add values to an existing config, we do not perform the actual switch), we simply requeue.
345+
// on the next reconciliation loop, postgres-operator shoud have catched up and the config should hopefully be correct already so we can continue with adding our values.
346+
if requeueAfterReconcile {
347+
log.Info("Requeueing after patroni replication hasn't returned the expected state (yet)")
348+
return ctrl.Result{Requeue: true, RequeueAfter: r.ReplicationChangeRequeueDuration}, nil
335349
}
336350

337351
log.Info("postgres reconciled")
@@ -1003,60 +1017,124 @@ func (r *PostgresReconciler) copySecrets(log logr.Logger, ctx context.Context, s
10031017
return nil
10041018
}
10051019

1006-
func (r *PostgresReconciler) updatePatroniConfig(log logr.Logger, ctx context.Context, instance *pg.Postgres) error {
1007-
// Finally, send a POST to to the database with the correct config
1020+
func (r *PostgresReconciler) checkAndUpdatePatroniReplicationConfig(log logr.Logger, ctx context.Context, instance *pg.Postgres) (bool, error) {
1021+
1022+
log = log.WithValues("label", "patroni")
1023+
1024+
const requeueAfterReconcile = true
1025+
const allDone = false
1026+
1027+
// If there is no connected postgres, no need to tinker with patroni directly
10081028
if instance.Spec.PostgresConnection == nil {
1009-
return nil
1029+
return allDone, nil
10101030
}
10111031

1012-
log.V(debugLogLevel).Info("Sending REST call to Patroni API")
1013-
pods := &corev1.PodList{}
1032+
log.V(debugLogLevel).Info("Checking replication config from Patroni API")
10141033

1015-
roleReq, err := labels.NewRequirement(pg.SpiloRoleLabelName, selection.In, []string{pg.SpiloRoleLabelValueMaster, pg.SpiloRoleLabelValueStandbyLeader})
1034+
// Get the leader pod
1035+
leaderPods, err := r.findLeaderPods(log, ctx, instance)
10161036
if err != nil {
1017-
log.Info("could not create requirements for label selector to query pods, requeuing")
1018-
return err
1037+
log.V(debugLogLevel).Info("could not query pods, requeuing")
1038+
return requeueAfterReconcile, err
10191039
}
1020-
leaderSelector := labels.NewSelector()
1021-
leaderSelector = leaderSelector.Add(*roleReq)
10221040

1023-
opts := []client.ListOption{
1024-
client.InNamespace(instance.ToPeripheralResourceNamespace()),
1025-
client.MatchingLabelsSelector{Selector: leaderSelector},
1041+
if len(leaderPods.Items) != 1 {
1042+
log.V(debugLogLevel).Info("expected exactly one leader pod, selecting all spilo pods as a last resort (might be ok if it is still creating)")
1043+
// To make sure any updates to the Zalando postgresql manifest are written, we do not requeue in this case
1044+
return allDone, r.updatePatroniReplicationConfigOnAllPods(log, ctx, instance)
10261045
}
1027-
if err := r.SvcClient.List(ctx, pods, opts...); err != nil {
1028-
log.Info("could not query pods, requeuing")
1029-
return err
1046+
leaderIP := leaderPods.Items[0].Status.PodIP
1047+
1048+
var resp *PatroniConfig
1049+
resp, err = r.httpGetPatroniConfig(log, ctx, leaderIP)
1050+
if err != nil {
1051+
log.V(debugLogLevel).Info("could not query patroni, requeuing")
1052+
return requeueAfterReconcile, err
1053+
}
1054+
if resp == nil {
1055+
log.V(debugLogLevel).Info("got nil response from patroni, requeuing")
1056+
return requeueAfterReconcile, nil
10301057
}
1031-
if len(pods.Items) == 0 {
1032-
log.Info("no leader pod found, selecting all spilo pods as a last resort (might be ok if it is still creating)")
10331058

1034-
err = r.updatePatroniConfigOnAllPods(log, ctx, instance)
1035-
if err != nil {
1036-
log.Info("updating patroni config failed, got one or more errors")
1037-
return err
1059+
if instance.IsReplicationPrimary() {
1060+
if resp.StandbyCluster != nil {
1061+
log.V(debugLogLevel).Info("standby_cluster mistmatch, requeing", "response", resp)
1062+
return requeueAfterReconcile, nil
1063+
}
1064+
if instance.Spec.PostgresConnection.SynchronousReplication {
1065+
if resp.SynchronousNodesAdditional == nil || *resp.SynchronousNodesAdditional != instance.Spec.PostgresConnection.ConnectedPostgresID {
1066+
log.V(debugLogLevel).Info("synchronous_nodes_additional mistmatch, updating", "response", resp)
1067+
return allDone, r.httpPatchPatroni(log, ctx, instance, leaderIP)
1068+
}
1069+
} else {
1070+
if resp.SynchronousNodesAdditional != nil {
1071+
log.V(debugLogLevel).Info("synchronous_nodes_additional mistmatch, updating", "response", resp)
1072+
return allDone, r.httpPatchPatroni(log, ctx, instance, leaderIP)
1073+
}
1074+
}
1075+
1076+
} else {
1077+
if resp.StandbyCluster == nil {
1078+
log.V(debugLogLevel).Info("standby_cluster mismatch, requeing", "response", resp)
1079+
return requeueAfterReconcile, nil
1080+
}
1081+
if resp.StandbyCluster.ApplicationName != instance.ObjectMeta.Name {
1082+
log.V(debugLogLevel).Info("application_name mismatch, updating", "response", resp)
1083+
return allDone, r.httpPatchPatroni(log, ctx, instance, leaderIP)
1084+
}
1085+
if resp.SynchronousNodesAdditional != nil {
1086+
log.V(debugLogLevel).Info("synchronous_nodes_additional mistmatch, updating", "response", resp)
1087+
return allDone, r.httpPatchPatroni(log, ctx, instance, leaderIP)
1088+
}
1089+
if resp.StandbyCluster.CreateReplicaMethods == nil {
1090+
log.V(debugLogLevel).Info("create_replica_methods mismatch, updating", "response", resp)
1091+
return requeueAfterReconcile, r.httpPatchPatroni(log, ctx, instance, leaderIP)
1092+
}
1093+
if resp.StandbyCluster.Host != instance.Spec.PostgresConnection.ConnectionIP {
1094+
log.V(debugLogLevel).Info("host mismatch, requeing", "updating", resp)
1095+
return requeueAfterReconcile, r.httpPatchPatroni(log, ctx, instance, leaderIP)
1096+
}
1097+
if resp.StandbyCluster.Port != int(instance.Spec.PostgresConnection.ConnectionPort) {
1098+
log.V(debugLogLevel).Info("port mismatch, requeing", "updating", resp)
1099+
return requeueAfterReconcile, r.httpPatchPatroni(log, ctx, instance, leaderIP)
10381100
}
1039-
return nil
10401101
}
1041-
podIP := pods.Items[0].Status.PodIP
10421102

1043-
return r.httpPatchPatroni(log, ctx, instance, podIP)
1103+
log.V(debugLogLevel).Info("replication config from Patroni API up to date")
1104+
return allDone, nil
1105+
}
1106+
1107+
func (r *PostgresReconciler) findLeaderPods(log logr.Logger, ctx context.Context, instance *pg.Postgres) (*corev1.PodList, error) {
1108+
leaderPods := &corev1.PodList{}
1109+
roleReq, err := labels.NewRequirement(pg.SpiloRoleLabelName, selection.In, []string{pg.SpiloRoleLabelValueMaster, pg.SpiloRoleLabelValueStandbyLeader})
1110+
if err != nil {
1111+
log.V(debugLogLevel).Info("could not create requirements for label selector to query pods, requeuing")
1112+
return leaderPods, err
1113+
}
1114+
leaderSelector := labels.NewSelector().Add(*roleReq)
1115+
opts := []client.ListOption{
1116+
client.InNamespace(instance.ToPeripheralResourceNamespace()),
1117+
client.MatchingLabelsSelector{Selector: leaderSelector},
1118+
}
1119+
return leaderPods, r.SvcClient.List(ctx, leaderPods, opts...)
10441120
}
10451121

1046-
func (r *PostgresReconciler) updatePatroniConfigOnAllPods(log logr.Logger, ctx context.Context, instance *pg.Postgres) error {
1122+
func (r *PostgresReconciler) updatePatroniReplicationConfigOnAllPods(log logr.Logger, ctx context.Context, instance *pg.Postgres) error {
10471123
pods := &corev1.PodList{}
10481124
opts := []client.ListOption{
10491125
client.InNamespace(instance.ToPeripheralResourceNamespace()),
10501126
client.MatchingLabels{pg.ApplicationLabelName: pg.ApplicationLabelValue},
10511127
}
10521128
if err := r.SvcClient.List(ctx, pods, opts...); err != nil {
1053-
log.Info("could not query pods, requeuing")
1129+
log.V(debugLogLevel).Info("could not query pods, requeuing")
10541130
return err
10551131
}
10561132

10571133
if len(pods.Items) == 0 {
1058-
log.Info("no spilo pods found at all, requeueing")
1134+
log.V(debugLogLevel).Info("no spilo pods found at all, requeueing")
10591135
return errors.New("no spilo pods found at all")
1136+
} else if len(pods.Items) < int(instance.Spec.NumberOfInstances) {
1137+
log.V(debugLogLevel).Info("expected %d pods, but only found %d (might be ok if it is still creating)", instance.Spec.NumberOfInstances, len(pods.Items))
10601138
}
10611139

10621140
// iterate all spilo pods
@@ -1070,18 +1148,116 @@ func (r *PostgresReconciler) updatePatroniConfigOnAllPods(log logr.Logger, ctx c
10701148
}
10711149
}
10721150
if lastErr != nil {
1073-
log.Info("updating patroni config failed, got one or more errors")
1151+
log.V(debugLogLevel).Info("updating patroni config failed, got one or more errors")
10741152
return lastErr
10751153
}
10761154
log.V(debugLogLevel).Info("updating patroni config succeeded")
10771155
return nil
10781156
}
10791157

10801158
func (r *PostgresReconciler) httpPatchPatroni(log logr.Logger, ctx context.Context, instance *pg.Postgres, podIP string) error {
1081-
log.Info("Skipping call to patroni")
1159+
if podIP == "" {
1160+
return errors.New("podIP must not be empty")
1161+
}
1162+
1163+
podPort := "8008"
1164+
path := "config"
1165+
1166+
log.V(debugLogLevel).Info("Preparing request")
1167+
var request PatroniConfig
1168+
if instance.IsReplicationPrimary() {
1169+
request = PatroniConfig{
1170+
StandbyCluster: nil,
1171+
}
1172+
if instance.Spec.PostgresConnection.SynchronousReplication {
1173+
// enable sync replication
1174+
request.SynchronousNodesAdditional = pointer.String(instance.Spec.PostgresConnection.ConnectedPostgresID)
1175+
} else {
1176+
// disable sync replication
1177+
request.SynchronousNodesAdditional = nil
1178+
}
1179+
} else {
1180+
// TODO check values first
1181+
request = PatroniConfig{
1182+
StandbyCluster: &PatroniStandbyCluster{
1183+
CreateReplicaMethods: []string{"basebackup_fast_xlog"},
1184+
Host: instance.Spec.PostgresConnection.ConnectionIP,
1185+
Port: int(instance.Spec.PostgresConnection.ConnectionPort),
1186+
ApplicationName: instance.ObjectMeta.Name,
1187+
},
1188+
SynchronousNodesAdditional: nil,
1189+
}
1190+
}
1191+
log.V(debugLogLevel).Info("Prepared request", "request", request)
1192+
jsonReq, err := json.Marshal(request)
1193+
if err != nil {
1194+
log.V(debugLogLevel).Info("could not create config")
1195+
return err
1196+
}
1197+
1198+
httpClient := &http.Client{}
1199+
url := "http://" + podIP + ":" + podPort + "/" + path
1200+
1201+
req, err := http.NewRequestWithContext(ctx, http.MethodPatch, url, bytes.NewBuffer(jsonReq))
1202+
if err != nil {
1203+
log.Error(err, "could not create PATCH request")
1204+
return err
1205+
}
1206+
req.Header.Set("Content-Type", "application/json")
1207+
1208+
resp, err := httpClient.Do(req)
1209+
if err != nil {
1210+
log.Error(err, "could not perform PATCH request")
1211+
return err
1212+
}
1213+
defer resp.Body.Close()
1214+
10821215
return nil
10831216
}
10841217

1218+
func (r *PostgresReconciler) httpGetPatroniConfig(log logr.Logger, ctx context.Context, podIP string) (*PatroniConfig, error) {
1219+
if podIP == "" {
1220+
return nil, errors.New("podIP must not be empty")
1221+
}
1222+
1223+
podPort := "8008"
1224+
path := "config"
1225+
1226+
httpClient := &http.Client{}
1227+
url := "http://" + podIP + ":" + podPort + "/" + path
1228+
1229+
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
1230+
if err != nil {
1231+
log.Error(err, "could not create GET request")
1232+
return nil, err
1233+
}
1234+
req.Header.Set("Content-Type", "application/json")
1235+
1236+
resp, err := httpClient.Do(req)
1237+
if err != nil {
1238+
log.Error(err, "could not perform GET request")
1239+
return nil, err
1240+
}
1241+
1242+
defer resp.Body.Close()
1243+
1244+
body, err := io.ReadAll(resp.Body)
1245+
if err != nil {
1246+
log.Info("could not read body")
1247+
return nil, err
1248+
}
1249+
var jsonResp PatroniConfig
1250+
err = json.Unmarshal(body, &jsonResp)
1251+
if err != nil {
1252+
log.V(debugLogLevel).Info("could not parse config response")
1253+
return nil, err
1254+
}
1255+
1256+
log.V(debugLogLevel).Info("Got config", "response", jsonResp)
1257+
1258+
return &jsonResp, err
1259+
}
1260+
10851261
func (r *PostgresReconciler) getBackupConfig(ctx context.Context, ns, name string) (*pg.BackupConfig, error) {
10861262
// fetch secret
10871263
backupSecret := &corev1.Secret{}

0 commit comments

Comments
 (0)