@@ -1062,14 +1062,28 @@ func (r *PostgresReconciler) checkAndUpdatePatroniReplicationConfig(log logr.Log
1062
1062
return requeueAfterReconcile , nil
1063
1063
}
1064
1064
if instance .Spec .PostgresConnection .SynchronousReplication {
1065
- if resp .SynchronousNodesAdditional == nil || * resp .SynchronousNodesAdditional != instance .Spec .PostgresConnection .ConnectedPostgresID {
1065
+ // fetch the sync standby to determine the correct application_name of the instance
1066
+ log .V (debugLogLevel ).Info ("fetching the referenced sync standby" )
1067
+ var synchronousStandbyApplicationName * string
1068
+ s := & pg.Postgres {}
1069
+ ns := types.NamespacedName {
1070
+ Name : instance .Spec .PostgresConnection .ConnectedPostgresID ,
1071
+ Namespace : instance .Namespace ,
1072
+ }
1073
+ if err := r .CtrlClient .Get (ctx , ns , s ); err != nil {
1074
+ r .recorder .Eventf (s , "Warning" , "Error" , "failed to get referenced sync standby: %v" , err )
1075
+ synchronousStandbyApplicationName = nil
1076
+ } else {
1077
+ synchronousStandbyApplicationName = pointer .String (s .ToPeripheralResourceName ())
1078
+ }
1079
+ if resp .SynchronousNodesAdditional == nil || * resp .SynchronousNodesAdditional != * synchronousStandbyApplicationName {
1066
1080
log .V (debugLogLevel ).Info ("synchronous_nodes_additional mismatch, updating and requeing" , "response" , resp )
1067
- return requeueAfterReconcile , r .httpPatchPatroni (log , ctx , instance , leaderIP )
1081
+ return requeueAfterReconcile , r .httpPatchPatroni (log , ctx , instance , leaderIP , synchronousStandbyApplicationName )
1068
1082
}
1069
1083
} else {
1070
1084
if resp .SynchronousNodesAdditional != nil {
1071
1085
log .V (debugLogLevel ).Info ("synchronous_nodes_additional mismatch, updating and requeing" , "response" , resp )
1072
- return requeueAfterReconcile , r .httpPatchPatroni (log , ctx , instance , leaderIP )
1086
+ return requeueAfterReconcile , r .httpPatchPatroni (log , ctx , instance , leaderIP , nil )
1073
1087
}
1074
1088
}
1075
1089
@@ -1078,25 +1092,25 @@ func (r *PostgresReconciler) checkAndUpdatePatroniReplicationConfig(log logr.Log
1078
1092
log .V (debugLogLevel ).Info ("standby_cluster mismatch, requeing" , "response" , resp )
1079
1093
return requeueAfterReconcile , nil
1080
1094
}
1081
- if resp .StandbyCluster .ApplicationName != instance .ObjectMeta .Name {
1082
- log .V (debugLogLevel ).Info ("application_name mismatch, updating and requeing" , "response" , resp )
1083
- return requeueAfterReconcile , r .httpPatchPatroni (log , ctx , instance , leaderIP )
1084
- }
1085
- if resp .SynchronousNodesAdditional != nil {
1086
- log .V (debugLogLevel ).Info ("synchronous_nodes_additional mismatch, updating and requeing" , "response" , resp )
1087
- return requeueAfterReconcile , r .httpPatchPatroni (log , ctx , instance , leaderIP )
1088
- }
1089
1095
if resp .StandbyCluster .CreateReplicaMethods == nil {
1090
1096
log .V (debugLogLevel ).Info ("create_replica_methods mismatch, updating and requeing" , "response" , resp )
1091
- return requeueAfterReconcile , r .httpPatchPatroni (log , ctx , instance , leaderIP )
1097
+ return requeueAfterReconcile , r .httpPatchPatroni (log , ctx , instance , leaderIP , nil )
1092
1098
}
1093
1099
if resp .StandbyCluster .Host != instance .Spec .PostgresConnection .ConnectionIP {
1094
1100
log .V (debugLogLevel ).Info ("host mismatch, updating and requeing" , "updating" , resp )
1095
- return requeueAfterReconcile , r .httpPatchPatroni (log , ctx , instance , leaderIP )
1101
+ return requeueAfterReconcile , r .httpPatchPatroni (log , ctx , instance , leaderIP , nil )
1096
1102
}
1097
1103
if resp .StandbyCluster .Port != int (instance .Spec .PostgresConnection .ConnectionPort ) {
1098
1104
log .V (debugLogLevel ).Info ("port mismatch, updating and requeing" , "updating" , resp )
1099
- return requeueAfterReconcile , r .httpPatchPatroni (log , ctx , instance , leaderIP )
1105
+ return requeueAfterReconcile , r .httpPatchPatroni (log , ctx , instance , leaderIP , nil )
1106
+ }
1107
+ if resp .StandbyCluster .ApplicationName != instance .ToPeripheralResourceName () {
1108
+ log .V (debugLogLevel ).Info ("application_name mismatch, updating and requeing" , "response" , resp )
1109
+ return requeueAfterReconcile , r .httpPatchPatroni (log , ctx , instance , leaderIP , nil )
1110
+ }
1111
+ if resp .SynchronousNodesAdditional != nil {
1112
+ log .V (debugLogLevel ).Info ("synchronous_nodes_additional mismatch, updating and requeing" , "response" , resp )
1113
+ return requeueAfterReconcile , r .httpPatchPatroni (log , ctx , instance , leaderIP , nil )
1100
1114
}
1101
1115
}
1102
1116
@@ -1142,7 +1156,7 @@ func (r *PostgresReconciler) updatePatroniReplicationConfigOnAllPods(log logr.Lo
1142
1156
for _ , pod := range pods .Items {
1143
1157
pod := pod // pin!
1144
1158
podIP := pod .Status .PodIP
1145
- if err := r .httpPatchPatroni (log , ctx , instance , podIP ); err != nil {
1159
+ if err := r .httpPatchPatroni (log , ctx , instance , podIP , nil ); err != nil {
1146
1160
lastErr = err
1147
1161
log .Info ("failed to update pod" )
1148
1162
}
@@ -1155,7 +1169,7 @@ func (r *PostgresReconciler) updatePatroniReplicationConfigOnAllPods(log logr.Lo
1155
1169
return nil
1156
1170
}
1157
1171
1158
- func (r * PostgresReconciler ) httpPatchPatroni (log logr.Logger , ctx context.Context , instance * pg.Postgres , podIP string ) error {
1172
+ func (r * PostgresReconciler ) httpPatchPatroni (log logr.Logger , ctx context.Context , instance * pg.Postgres , podIP string , synchronousStandbyApplicationName * string ) error {
1159
1173
if podIP == "" {
1160
1174
return errors .New ("podIP must not be empty" )
1161
1175
}
@@ -1170,8 +1184,23 @@ func (r *PostgresReconciler) httpPatchPatroni(log logr.Logger, ctx context.Conte
1170
1184
StandbyCluster : nil ,
1171
1185
}
1172
1186
if instance .Spec .PostgresConnection .SynchronousReplication {
1187
+ if synchronousStandbyApplicationName == nil {
1188
+ // fetch the sync standby to determine the correct application_name of the instance
1189
+ log .V (debugLogLevel ).Info ("unexpectetly having to fetch the referenced sync standby" )
1190
+ s := & pg.Postgres {}
1191
+ ns := types.NamespacedName {
1192
+ Name : instance .Spec .PostgresConnection .ConnectedPostgresID ,
1193
+ Namespace : instance .Namespace ,
1194
+ }
1195
+ if err := r .CtrlClient .Get (ctx , ns , s ); err != nil {
1196
+ r .recorder .Eventf (s , "Warning" , "Error" , "failed to get referenced sync standby: %v" , err )
1197
+ synchronousStandbyApplicationName = nil
1198
+ } else {
1199
+ synchronousStandbyApplicationName = pointer .String (s .ToPeripheralResourceName ())
1200
+ }
1201
+ }
1173
1202
// enable sync replication
1174
- request .SynchronousNodesAdditional = pointer . String ( instance . Spec . PostgresConnection . ConnectedPostgresID )
1203
+ request .SynchronousNodesAdditional = synchronousStandbyApplicationName
1175
1204
} else {
1176
1205
// disable sync replication
1177
1206
request .SynchronousNodesAdditional = nil
@@ -1183,7 +1212,7 @@ func (r *PostgresReconciler) httpPatchPatroni(log logr.Logger, ctx context.Conte
1183
1212
CreateReplicaMethods : []string {"basebackup_fast_xlog" },
1184
1213
Host : instance .Spec .PostgresConnection .ConnectionIP ,
1185
1214
Port : int (instance .Spec .PostgresConnection .ConnectionPort ),
1186
- ApplicationName : instance .ObjectMeta . Name ,
1215
+ ApplicationName : instance .ToPeripheralResourceName () ,
1187
1216
},
1188
1217
SynchronousNodesAdditional : nil ,
1189
1218
}
@@ -1212,6 +1241,11 @@ func (r *PostgresReconciler) httpPatchPatroni(log logr.Logger, ctx context.Conte
1212
1241
}
1213
1242
defer resp .Body .Close ()
1214
1243
1244
+ // fake error when standbyApplicationName is required but not provided
1245
+ if instance .IsReplicationPrimary () && instance .Spec .PostgresConnection .SynchronousReplication && synchronousStandbyApplicationName == nil {
1246
+ return fmt .Errorf ("missing application_name of synchronous standby, disable synchronous replication" )
1247
+ }
1248
+
1215
1249
return nil
1216
1250
}
1217
1251
0 commit comments