Skip to content

Use api url from annotation #425

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 23 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
289 changes: 216 additions & 73 deletions controllers/postgres_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net"
"net/http"
"net/url"
"strconv"
"strings"
"time"

"github.com/go-logr/logr"
zalando "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
Expand Down Expand Up @@ -65,16 +67,34 @@ type PostgresReconciler struct {
PartitionID, Tenant, StorageClass string
*operatormanager.OperatorManager
*lbmanager.LBManager
recorder record.EventRecorder
PgParamBlockList map[string]bool
StandbyClustersSourceRanges []string
PostgresletNamespace string
SidecarsConfigMapName string
EnableNetPol bool
EtcdHost string
PatroniTTL uint32
PatroniLoopWait uint32
PatroniRetryTimeout uint32
recorder record.EventRecorder
PgParamBlockList map[string]bool
StandbyClustersSourceRanges []string
PostgresletNamespace string
SidecarsConfigMapName string
EnableNetPol bool
EtcdHost string
PatroniTTL uint32
PatroniLoopWait uint32
PatroniRetryTimeout uint32
ReplicationChangeRequeueDuration time.Duration
}

type PatroniStandbyCluster struct {
CreateReplicaMethods []string `json:"create_replica_methods"`
Host string `json:"host"`
Port int `json:"port"`
ApplicationName string `json:"application_name"`
}
type PatroniConfig struct {
StandbyCluster *PatroniStandbyCluster `json:"standby_cluster"`
SynchronousNodesAdditional *string `json:"synchronous_nodes_additional"`
}

type PatroniPodStatus struct {
APIURL string `json:"api_url"`
State string `json:"state"`
Role string `json:"role"`
}

// Reconcile is the entry point for postgres reconciliation.
Expand Down Expand Up @@ -210,17 +230,13 @@ func (r *PostgresReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return ctrl.Result{}, fmt.Errorf("error while creating standby secrets: %w", err)
}

if instance.IsReplicationPrimary() {
// the field is not empty, which means we were either a regular, standalone database that was promoted to leader
// (meaning we are already running) or we are a standby which was promoted to leader (also meaning we are
// already running)
// That means we should be able to call the patroni api already. this is required, as updating the custom
// ressource of a standby db seems to fail (maybe because of the users/databases?)...
// anyway, let's get on with it
if err := r.updatePatroniConfig(ctx, instance); err != nil {
// TODO what to do here? reschedule or ignore?
log.Error(err, "failed to update patroni config via REST call")
}
// check (and update if neccessary) the current patroni replication config.
immediateRequeue, patroniConfigChangeErr := r.checkAndUpdatePatroniReplicationConfig(ctx, instance)
if immediateRequeue {
// if a config change was performed that requires a while to settle in, we simply requeue
// on the next reconciliation loop, the config should be correct already so we can continue with the rest
log.Info("Requeueing after patroni replication config change")
return ctrl.Result{Requeue: true, RequeueAfter: r.ReplicationChangeRequeueDuration}, patroniConfigChangeErr
}

// create standby egress rule first, so the standby can actually connect to the primary
Expand Down Expand Up @@ -275,9 +291,11 @@ func (r *PostgresReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return ctrl.Result{}, fmt.Errorf("unable to create or update ingress ClusterwideNetworkPolicy: %w", err)
}

// this is the call for standbys
if err := r.updatePatroniConfig(ctx, instance); err != nil {
return requeue, fmt.Errorf("unable to update patroni config: %w", err)
// when an error occurred while updating the patroni config, requeue here
// this is done down here to make sure the rest of the resource updates were performed
if patroniConfigChangeErr != nil {
log.Info("Requeueing after modifying patroni replication config failed")
return ctrl.Result{Requeue: true, RequeueAfter: 10 * time.Second}, patroniConfigChangeErr
}

r.recorder.Event(instance, "Normal", "Reconciled", "postgres up to date")
Expand Down Expand Up @@ -695,47 +713,142 @@ func (r *PostgresReconciler) ensureStandbySecrets(ctx context.Context, instance

}

func (r *PostgresReconciler) updatePatroniConfig(ctx context.Context, instance *pg.Postgres) error {
// Finally, send a POST to to the database with the correct config
func (r *PostgresReconciler) checkAndUpdatePatroniReplicationConfig(ctx context.Context, instance *pg.Postgres) (bool, error) {

const requeueImmediately = true
const continueWithReconciliation = false

// If there is no connected postgres, no need to tinker with patroni directly
if instance.Spec.PostgresConnection == nil {
return nil
return continueWithReconciliation, nil
}

r.Log.Info("Sending REST call to Patroni API")
pods := &corev1.PodList{}
r.Log.Info("Checking replication config from Patroni API")

roleReq, err := labels.NewRequirement(pg.SpiloRoleLabelName, selection.In, []string{pg.SpiloRoleLabelValueMaster, pg.SpiloRoleLabelValueStandbyLeader})
// Get the leader pod
leaderPods, err := r.findLeaderPods(ctx, instance)
if err != nil {
r.Log.Info("could not create requirements for label selector to query pods, requeuing")
return err
r.Log.Info("could not query pods, requeuing")
return requeueImmediately, err
}
leaderSelector := labels.NewSelector()
leaderSelector = leaderSelector.Add(*roleReq)

opts := []client.ListOption{
client.InNamespace(instance.ToPeripheralResourceNamespace()),
client.MatchingLabelsSelector{Selector: leaderSelector},
if len(leaderPods.Items) != 1 {
r.Log.Info("expected exactly one leader pod, selecting all spilo pods as a last resort (might be ok if it is still creating)")
// To make sure any updates to the Zalando postgresql manifest are written, we do not requeue in this case
return continueWithReconciliation, r.updatePatroniReplicationConfigOnAllPods(ctx, instance)
}
if err := r.SvcClient.List(ctx, pods, opts...); err != nil {
r.Log.Info("could not query pods, requeuing")
return err

apiURL, err := r.readAPIURL(&leaderPods.Items[0])
if err != nil {
// TODO guessAPI or reconcile?
// If there is no annotation, there is probably no running patroni api either. but this is called AFTER selecting by the spilo-role labels, so this should not happen anyway?
// I guess keeping it as a fallback seems reasonable, especially since we would, if the call to the api fails, reconcile anyway.
// return continueWithReconciliation, err
apiURL = r.guessAPIURL(&leaderPods.Items[0])
}
if len(pods.Items) == 0 {
r.Log.Info("no leader pod found, selecting all spilo pods as a last resort (might be ok if it is still creating)")

err = r.updatePatroniConfigOnAllPods(ctx, instance)
if err != nil {
r.Log.Info("updating patroni config failed, got one or more errors")
return err
var resp *PatroniConfig
resp, err = r.httpGetPatroniConfig(ctx, apiURL)
if err != nil {
return continueWithReconciliation, err
}
if resp == nil {
return continueWithReconciliation, r.httpPatchPatroni(ctx, instance, apiURL)
}

if instance.IsReplicationPrimary() {
if resp.StandbyCluster != nil {
r.Log.Info("standby_cluster mistmatch, updating and requeing", "response", resp)
return requeueImmediately, r.httpPatchPatroni(ctx, instance, apiURL)
}
if instance.Spec.PostgresConnection.SynchronousReplication {
if resp.SynchronousNodesAdditional == nil || *resp.SynchronousNodesAdditional != instance.Spec.PostgresConnection.ConnectedPostgresID {
r.Log.Info("synchronous_nodes_additional mistmatch, updating and requeing", "response", resp)
return requeueImmediately, r.httpPatchPatroni(ctx, instance, apiURL)
}
} else {
if resp.SynchronousNodesAdditional != nil {
r.Log.Info("synchronous_nodes_additional mistmatch, updating and requeing", "response", resp)
return requeueImmediately, r.httpPatchPatroni(ctx, instance, apiURL)
}
}

} else {
if resp.StandbyCluster == nil {
r.Log.Info("standby_cluster mismatch, updating and requeing", "response", resp)
return requeueImmediately, r.httpPatchPatroni(ctx, instance, apiURL)
}
if resp.StandbyCluster.CreateReplicaMethods == nil {
r.Log.Info("create_replica_methods mismatch, updating and requeing", "response", resp)
return requeueImmediately, r.httpPatchPatroni(ctx, instance, apiURL)
}
if resp.StandbyCluster.Host != instance.Spec.PostgresConnection.ConnectionIP {
r.Log.Info("host mismatch, updating and requeing", "response", resp)
return requeueImmediately, r.httpPatchPatroni(ctx, instance, apiURL)
}
if resp.StandbyCluster.Port != int(instance.Spec.PostgresConnection.ConnectionPort) {
r.Log.Info("port mismatch, updating and requeing", "response", resp)
return requeueImmediately, r.httpPatchPatroni(ctx, instance, apiURL)
}
if resp.StandbyCluster.ApplicationName != instance.ObjectMeta.Name {
r.Log.Info("application_name mismatch, updating and requeing", "response", resp)
return requeueImmediately, r.httpPatchPatroni(ctx, instance, apiURL)
}

if resp.SynchronousNodesAdditional != nil {
r.Log.Info("synchronous_nodes_additional mistmatch, updating and requeing", "response", resp)
return requeueImmediately, r.httpPatchPatroni(ctx, instance, apiURL)
}
return nil
}
podIP := pods.Items[0].Status.PodIP

return r.httpPatchPatroni(ctx, instance, podIP)
r.Log.Info("replication config from Patroni API up to date")
return continueWithReconciliation, nil
}

func (r *PostgresReconciler) updatePatroniConfigOnAllPods(ctx context.Context, instance *pg.Postgres) error {
func (r *PostgresReconciler) readAPIURL(pod *corev1.Pod) (string, error) {
value, ok := pod.Annotations["status"]
if !ok {
return "", fmt.Errorf("could not find patroni pod status annotation")
}

var status PatroniPodStatus
err := json.Unmarshal([]byte(value), &status)
if err != nil {
r.Log.Error(err, "could not parse patroni pod status annotation")
return "", err
}

return status.APIURL, nil
}

func (r *PostgresReconciler) guessAPIURL(pod *corev1.Pod) string {
if pod == nil {
return ""
}

if pod.Status.PodIP == "" {
return ""
}

return "http://" + pod.Status.PodIP + ":8080/config"
}

func (r *PostgresReconciler) findLeaderPods(ctx context.Context, instance *pg.Postgres) (*corev1.PodList, error) {
leaderPods := &corev1.PodList{}
roleReq, err := labels.NewRequirement(pg.SpiloRoleLabelName, selection.In, []string{pg.SpiloRoleLabelValueMaster, pg.SpiloRoleLabelValueStandbyLeader})
if err != nil {
r.Log.Info("could not create requirements for label selector to query pods, requeuing")
return leaderPods, err
}
leaderSelector := labels.NewSelector().Add(*roleReq)
opts := []client.ListOption{
client.InNamespace(instance.ToPeripheralResourceNamespace()),
client.MatchingLabelsSelector{Selector: leaderSelector},
}
return leaderPods, r.SvcClient.List(ctx, leaderPods, opts...)
}

func (r *PostgresReconciler) updatePatroniReplicationConfigOnAllPods(ctx context.Context, instance *pg.Postgres) error {
pods := &corev1.PodList{}
opts := []client.ListOption{
client.InNamespace(instance.ToPeripheralResourceNamespace()),
Expand All @@ -749,14 +862,16 @@ func (r *PostgresReconciler) updatePatroniConfigOnAllPods(ctx context.Context, i
if len(pods.Items) == 0 {
r.Log.Info("no spilo pods found at all, requeueing")
return errors.New("no spilo pods found at all")
} else if len(pods.Items) < int(instance.Spec.NumberOfInstances) {
r.Log.Info("expected %d pods, but only found %d (might be ok if it is still creating)", instance.Spec.NumberOfInstances, len(pods.Items))
}

// iterate all spilo pods
var lastErr error
for _, pod := range pods.Items {
pod := pod // pin!
podIP := pod.Status.PodIP
if err := r.httpPatchPatroni(ctx, instance, podIP); err != nil {
url := r.guessAPIURL(&pod)
if err := r.httpPatchPatroni(ctx, instance, url); err != nil {
lastErr = err
r.Log.Info("failed to update pod")
}
Expand All @@ -769,29 +884,15 @@ func (r *PostgresReconciler) updatePatroniConfigOnAllPods(ctx context.Context, i
return nil
}

func (r *PostgresReconciler) httpPatchPatroni(ctx context.Context, instance *pg.Postgres, podIP string) error {
if podIP == "" {
return errors.New("podIP must not be empty")
}

podPort := "8008"
path := "config"

type PatroniStandbyCluster struct {
CreateReplicaMethods []string `json:"create_replica_methods"`
Host string `json:"host"`
Port int `json:"port"`
ApplicationName string `json:"application_name"`
}
type PatroniConfigRequest struct {
StandbyCluster *PatroniStandbyCluster `json:"standby_cluster"`
SynchronousNodesAdditional *string `json:"synchronous_nodes_additional"`
func (r *PostgresReconciler) httpPatchPatroni(ctx context.Context, instance *pg.Postgres, url string) error {
if url == "" {
return errors.New("url must not be empty")
}

r.Log.Info("Preparing request")
var request PatroniConfigRequest
var request PatroniConfig
if instance.IsReplicationPrimary() {
request = PatroniConfigRequest{
request = PatroniConfig{
StandbyCluster: nil,
}
if instance.Spec.PostgresConnection.SynchronousReplication {
Expand All @@ -803,7 +904,7 @@ func (r *PostgresReconciler) httpPatchPatroni(ctx context.Context, instance *pg.
}
} else {
// TODO check values first
request = PatroniConfigRequest{
request = PatroniConfig{
StandbyCluster: &PatroniStandbyCluster{
CreateReplicaMethods: []string{"basebackup_fast_xlog"},
Host: instance.Spec.PostgresConnection.ConnectionIP,
Expand All @@ -821,7 +922,6 @@ func (r *PostgresReconciler) httpPatchPatroni(ctx context.Context, instance *pg.
}

httpClient := &http.Client{}
url := "http://" + podIP + ":" + podPort + "/" + path

req, err := http.NewRequestWithContext(ctx, http.MethodPatch, url, bytes.NewBuffer(jsonReq))
if err != nil {
Expand All @@ -840,6 +940,49 @@ func (r *PostgresReconciler) httpPatchPatroni(ctx context.Context, instance *pg.
return nil
}

func (r *PostgresReconciler) httpGetPatroniConfig(ctx context.Context, podIP string) (*PatroniConfig, error) {
if podIP == "" {
return nil, errors.New("podIP must not be empty")
}

podPort := "8008"
path := "config"

httpClient := &http.Client{}
url := "http://" + podIP + ":" + podPort + "/" + path

req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
r.Log.Error(err, "could not create request")
return nil, err
}
req.Header.Set("Content-Type", "application/json")

resp, err := httpClient.Do(req)
if err != nil {
r.Log.Error(err, "could not perform request")
return nil, err
}

defer resp.Body.Close()

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
r.Log.Info("could not read body")
return nil, err
}
var jsonResp PatroniConfig
err = json.Unmarshal(body, &jsonResp)
if err != nil {
r.Log.Info("could not parse config")
return nil, err
}

r.Log.Info("Got config", "response", jsonResp)

return &jsonResp, err
}

func (r *PostgresReconciler) getBackupConfig(ctx context.Context, ns, name string) (*pg.BackupConfig, error) {
// fetch secret
backupSecret := &corev1.Secret{}
Expand Down
Loading