Skip to content

Commit 5c04c8a

Browse files
authored
Standby create replica methods (#533)
* Patch create_replica_methods * Add datadir option * poc implementation of STANDBY_* envs * * Add SYANDBY_WITH_WALG env * Reconfigure bootstrapping, not replica imaging * Renaming * * More pointers * enable s3 for creating replicas as well * Set additional envs * Refactor * Remove unused code * Test without modifying the bootstrap/create_replica_methods * Cleanup * Cleanup * Refactoring * Tweak WALG for standby bootstrapping as well * Add feature flag
1 parent 20491dd commit 5c04c8a

File tree

3 files changed

+109
-0
lines changed

3 files changed

+109
-0
lines changed

api/v1/postgres_types.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -858,6 +858,14 @@ func (p *Postgres) IsReplicationPrimary() bool {
858858
return false
859859
}
860860

861+
func (p *Postgres) IsReplicationTarget() bool {
862+
if p.Spec.PostgresConnection != nil && p.Spec.PostgresConnection.ReplicationPrimary == false {
863+
// sth is configured and we are not the leader
864+
return true
865+
}
866+
return false
867+
}
868+
861869
// enableAuditLogs configures this postgres instances audit logging
862870
func enableAuditLogs(parameters map[string]string) {
863871
// default values: bg_mon,pg_stat_statements,pgextwlist,pg_auth_mon,set_user,timescaledb,pg_cron,pg_stat_kcache

controllers/postgres_controller.go

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ type PostgresReconciler struct {
8585
EnableRandomStorageEncryptionSecret bool
8686
EnableWalGEncryption bool
8787
PostgresletFullname string
88+
EnableBootstrapStandbyFromS3 bool
8889
}
8990

9091
// Reconcile is the entry point for postgres reconciliation.
@@ -493,6 +494,12 @@ func (r *PostgresReconciler) updatePodEnvironmentConfigMap(ctx context.Context,
493494
"CLONE_WALG_DOWNLOAD_CONCURRENCY": downloadConcurrency,
494495
}
495496

497+
if r.EnableBootstrapStandbyFromS3 && p.IsReplicationTarget() {
498+
data["STANDBY_WALG_UPLOAD_DISK_CONCURRENCY"] = uploadDiskConcurrency
499+
data["STANDBY_WALG_UPLOAD_CONCURRENCY"] = uploadConcurrency
500+
data["STANDBY_WALG_DOWNLOAD_CONCURRENCY"] = downloadConcurrency
501+
}
502+
496503
cm := &corev1.ConfigMap{}
497504
ns := types.NamespacedName{
498505
Name: operatormanager.PodEnvCMName,
@@ -559,6 +566,19 @@ func (r *PostgresReconciler) updatePodEnvironmentSecret(ctx context.Context, p *
559566
} else {
560567
delete(data, "CLONE_WALG_LIBSODIUM_KEY")
561568
}
569+
570+
// we also need that (hopefully identical) key to bootstrap from files in S3
571+
if r.EnableBootstrapStandbyFromS3 && p.IsReplicationTarget() {
572+
data["STANDBY_WALG_LIBSODIUM_KEY"] = k
573+
}
574+
}
575+
576+
// add STANDBY_* variables for bootstrapping from S3
577+
if r.EnableBootstrapStandbyFromS3 && p.IsReplicationTarget() {
578+
standbyEnvs := r.getStandbyEnvs(ctx, p)
579+
for name, value := range standbyEnvs {
580+
data[name] = value
581+
}
562582
}
563583

564584
var s *corev1.Secret
@@ -579,6 +599,80 @@ func (r *PostgresReconciler) updatePodEnvironmentSecret(ctx context.Context, p *
579599
return nil
580600
}
581601

602+
// getStandbyEnvs Fetches all the required info from the remote primary postgres and fills all ENVS required for bootstrapping from S3
603+
func (r *PostgresReconciler) getStandbyEnvs(ctx context.Context, p *pg.Postgres) map[string][]byte {
604+
standbyEnvs := map[string][]byte{}
605+
606+
// fetch backup secret of primary
607+
primary := &pg.Postgres{}
608+
ns := types.NamespacedName{
609+
Name: p.Spec.PostgresConnection.ConnectedPostgresID,
610+
Namespace: p.Namespace,
611+
}
612+
if err := r.CtrlClient.Get(ctx, ns, primary); err != nil {
613+
if apierrors.IsNotFound(err) {
614+
// the instance was updated, but does not exist anymore -> do nothing, it was probably deleted
615+
return standbyEnvs
616+
}
617+
618+
r.recorder.Eventf(primary, "Warning", "Error", "failed to get referenced primary postgres: %v", err)
619+
return standbyEnvs
620+
}
621+
622+
if primary.Spec.BackupSecretRef == "" {
623+
r.recorder.Eventf(primary, "Warning", "Error", "No backupSecretRef for primary postgres found, skipping configuration of wal_e bootstrapping")
624+
return standbyEnvs
625+
}
626+
627+
primaryBackupConfig, err := r.getBackupConfig(ctx, primary.Namespace, primary.Spec.BackupSecretRef)
628+
if err != nil {
629+
r.recorder.Eventf(primary, "Warning", "Error", "failed to get referenced primary backup config, skipping configuration of wal_e bootstrapping: %v", err)
630+
return standbyEnvs
631+
}
632+
primaryS3url, err := url.Parse(primaryBackupConfig.S3Endpoint)
633+
if err != nil {
634+
r.recorder.Eventf(primary, "Warning", "Error", "error while parsing the s3 endpoint url in the backup secret: %w", err)
635+
return standbyEnvs
636+
}
637+
638+
// use the s3 endpoint as provided
639+
primaryAwsEndpoint := primaryS3url.String()
640+
// modify the scheme to 'https+path'
641+
primaryS3url.Scheme = "https+path"
642+
// use the modified s3 endpoint
643+
primaryWalES3Endpoint := primaryS3url.String()
644+
// region
645+
primaryRegion := primaryBackupConfig.S3Region
646+
// s3 prefix
647+
primaryWalGS3Prefix := "s3://" + primaryBackupConfig.S3BucketName + "/" + primary.ToPeripheralResourceName()
648+
// s3 server side encryption SSE is disabled, we use client side encryption
649+
// see STANDBY_WALG_LIBSODIUM_KEY above
650+
primaryWalgDisableSSE := "true"
651+
// aws access key
652+
primaryAwsAccessKeyID := primaryBackupConfig.S3AccessKey
653+
// aws secret key
654+
primaryAwsSecretAccessKey := primaryBackupConfig.S3SecretKey
655+
656+
// create updated content for pod environment configmap
657+
// this is a bit confusing: those are used to bootstrap a remote standby, so they have to point to the primary!
658+
standbyEnvs["STANDBY_AWS_ACCESS_KEY_ID"] = []byte(primaryAwsAccessKeyID)
659+
standbyEnvs["STANDBY_AWS_SECRET_ACCESS_KEY"] = []byte(primaryAwsSecretAccessKey)
660+
standbyEnvs["STANDBY_AWS_ENDPOINT"] = []byte(primaryAwsEndpoint)
661+
standbyEnvs["STANDBY_AWS_S3_FORCE_PATH_STYLE"] = []byte("true")
662+
standbyEnvs["STANDBY_AWS_REGION"] = []byte(primaryRegion)
663+
standbyEnvs["STANDBY_AWS_WALG_S3_ENDPOINT"] = []byte(primaryWalES3Endpoint)
664+
standbyEnvs["STANDBY_USE_WALG_BACKUP"] = []byte("true")
665+
standbyEnvs["STANDBY_USE_WALG_RESTORE"] = []byte("true")
666+
standbyEnvs["STANDBY_WALE_S3_ENDPOINT"] = []byte(primaryWalES3Endpoint)
667+
standbyEnvs["STANDBY_WALG_DISABLE_S3_SSE"] = []byte(primaryWalgDisableSSE)
668+
standbyEnvs["STANDBY_WALG_S3_ENDPOINT"] = []byte(primaryWalES3Endpoint)
669+
standbyEnvs["STANDBY_WALG_S3_PREFIX"] = []byte(primaryWalGS3Prefix)
670+
standbyEnvs["STANDBY_WALG_S3_SSE"] = []byte("")
671+
standbyEnvs["STANDBY_WITH_WALG"] = []byte("true")
672+
673+
return standbyEnvs
674+
}
675+
582676
func (r *PostgresReconciler) isManagedByUs(obj *pg.Postgres) bool {
583677
if obj.Spec.PartitionID != r.PartitionID {
584678
return false

main.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ const (
7272
enableRandomStorageEncrytionSecretFlg = "enable-random-storage-encryption-secret"
7373
enableWalGEncryptionFlg = "enable-walg-encryption"
7474
enableForceSharedIPFlg = "enable-force-shared-ip"
75+
enableBootstrapStandbyFromS3Flg = "enable-bootsrtap-standby-from-s3"
7576
)
7677

7778
var (
@@ -124,6 +125,7 @@ func main() {
124125
enableRandomStorageEncrytionSecret bool
125126
enableWalGEncryption bool
126127
enableForceSharedIP bool
128+
enableBootstrapStandbyFromS3 bool
127129

128130
portRangeStart int
129131
portRangeSize int
@@ -260,6 +262,9 @@ func main() {
260262
viper.SetDefault(enableForceSharedIPFlg, true) // TODO switch to false?
261263
enableForceSharedIP = viper.GetBool(enableForceSharedIPFlg)
262264

265+
viper.SetDefault(enableBootstrapStandbyFromS3Flg, true)
266+
enableBootstrapStandbyFromS3 = viper.GetBool(enableBootstrapStandbyFromS3Flg)
267+
263268
ctrl.SetLogger(zap.New(zap.UseDevMode(true)))
264269

265270
ctrl.Log.Info("flag",
@@ -299,6 +304,7 @@ func main() {
299304
postgresletFullnameFlg, postgresletFullname,
300305
enableWalGEncryptionFlg, enableWalGEncryption,
301306
enableForceSharedIPFlg, enableForceSharedIP,
307+
enableBootstrapStandbyFromS3Flg, enableBootstrapStandbyFromS3,
302308
)
303309

304310
svcClusterConf := ctrl.GetConfigOrDie()
@@ -406,6 +412,7 @@ func main() {
406412
EnableRandomStorageEncryptionSecret: enableRandomStorageEncrytionSecret,
407413
EnableWalGEncryption: enableWalGEncryption,
408414
PostgresletFullname: postgresletFullname,
415+
EnableBootstrapStandbyFromS3: enableBootstrapStandbyFromS3,
409416
}).SetupWithManager(ctrlPlaneClusterMgr); err != nil {
410417
setupLog.Error(err, "unable to create controller", "controller", "Postgres")
411418
os.Exit(1)

0 commit comments

Comments
 (0)