Skip to content

Commit 59ae494

Browse files
committed
feat: support retry in case of sync failure
1 parent a23de40 commit 59ae494

File tree

6 files changed

+215
-9
lines changed

6 files changed

+215
-9
lines changed

internal/provider/apisix/provider.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
"github.com/apache/apisix-ingress-controller/internal/controller/status"
3838
"github.com/apache/apisix-ingress-controller/internal/manager/readiness"
3939
"github.com/apache/apisix-ingress-controller/internal/provider"
40+
"github.com/apache/apisix-ingress-controller/internal/provider/common"
4041
"github.com/apache/apisix-ingress-controller/internal/types"
4142
"github.com/apache/apisix-ingress-controller/internal/utils"
4243
)
@@ -236,19 +237,29 @@ func (d *apisixProvider) Start(ctx context.Context) error {
236237
}
237238
ticker := time.NewTicker(d.SyncPeriod)
238239
defer ticker.Stop()
240+
241+
retrier := common.NewRetrier(common.NewExponentialBackoff(1*time.Second, 1000*time.Second))
239242
for {
240243
synced := false
241244
select {
242245
case <-d.syncCh:
246+
retrier.Cancel()
243247
synced = true
244248
case <-ticker.C:
249+
retrier.Cancel()
250+
synced = true
251+
case <-retrier.C():
245252
synced = true
246253
case <-ctx.Done():
254+
retrier.Cancel()
247255
return nil
248256
}
249257
if synced {
250258
if err := d.sync(ctx); err != nil {
251259
log.Error(err)
260+
retrier.Trigger()
261+
} else {
262+
retrier.Cancel()
252263
}
253264
}
254265
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package common
19+
20+
import (
21+
"sync"
22+
"time"
23+
)
24+
25+
type Backoff interface {
26+
Next() time.Duration
27+
Reset()
28+
}
29+
30+
type ExponentialBackoff struct {
31+
base, max, current time.Duration
32+
}
33+
34+
func NewExponentialBackoff(base, max time.Duration) *ExponentialBackoff {
35+
return &ExponentialBackoff{base: base, max: max, current: base}
36+
}
37+
38+
func (b *ExponentialBackoff) Next() time.Duration {
39+
delay := b.current
40+
b.current *= 2
41+
if b.current > b.max {
42+
b.current = b.max
43+
}
44+
return delay
45+
}
46+
47+
func (b *ExponentialBackoff) Reset() {
48+
b.current = b.base
49+
}
50+
51+
type Retrier struct {
52+
mu sync.Mutex
53+
ch chan struct{}
54+
timer *time.Timer
55+
backoff Backoff
56+
}
57+
58+
func NewRetrier(b Backoff) *Retrier {
59+
return &Retrier{
60+
ch: make(chan struct{}, 1),
61+
backoff: b,
62+
}
63+
}
64+
65+
func (r *Retrier) Cancel() {
66+
r.mu.Lock()
67+
defer r.mu.Unlock()
68+
69+
if r.timer != nil {
70+
r.timer.Stop()
71+
r.timer = nil
72+
}
73+
r.backoff.Reset()
74+
}
75+
76+
func (r *Retrier) Trigger() {
77+
r.mu.Lock()
78+
defer r.mu.Unlock()
79+
80+
delay := r.backoff.Next()
81+
r.timer = time.AfterFunc(delay, func() {
82+
select {
83+
case r.ch <- struct{}{}:
84+
default:
85+
}
86+
})
87+
}
88+
89+
func (r *Retrier) C() <-chan struct{} {
90+
return r.ch
91+
}

test/e2e/crds/v2/route.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1676,4 +1676,59 @@ spec:
16761676
})
16771677
})
16781678
})
1679+
1680+
Context("Exception Test", func() {
1681+
const apisixRouteSpec = `
1682+
apiVersion: apisix.apache.org/v2
1683+
kind: ApisixRoute
1684+
metadata:
1685+
name: default
1686+
spec:
1687+
ingressClassName: %s
1688+
http:
1689+
- name: rule0
1690+
match:
1691+
hosts:
1692+
- httpbin
1693+
paths:
1694+
- /*
1695+
backends:
1696+
- serviceName: httpbin-service-e2e-test
1697+
servicePort: 80
1698+
`
1699+
It("try again when sync failed", func() {
1700+
s.Deployer.ScaleDataplane(0)
1701+
1702+
err := s.CreateResourceFromString(fmt.Sprintf(apisixRouteSpec, s.Namespace()))
1703+
Expect(err).NotTo(HaveOccurred(), "creating ApisixRoute")
1704+
1705+
By("check ApisixRoute status")
1706+
s.RetryAssertion(func() string {
1707+
output, _ := s.GetOutputFromString("ar", "default", "-o", "yaml", "-n", s.Namespace())
1708+
return output
1709+
}).WithTimeout(30 * time.Second).
1710+
Should(
1711+
And(
1712+
ContainSubstring(`status: "False"`),
1713+
ContainSubstring(`reason: SyncFailed`),
1714+
),
1715+
)
1716+
1717+
s.Deployer.ScaleDataplane(1)
1718+
1719+
s.RetryAssertion(func() string {
1720+
output, _ := s.GetOutputFromString("ar", "default", "-o", "yaml", "-n", s.Namespace())
1721+
return output
1722+
}).WithTimeout(60 * time.Second).
1723+
Should(ContainSubstring(`status: "True"`))
1724+
1725+
By("check route in APISIX")
1726+
s.RequestAssert(&scaffold.RequestAssert{
1727+
Method: "GET",
1728+
Path: "/get",
1729+
Host: "httpbin",
1730+
Check: scaffold.WithExpectedStatus(200),
1731+
})
1732+
})
1733+
})
16791734
})

test/e2e/crds/v2/status.go

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424

2525
. "github.com/onsi/ginkgo/v2"
2626
. "github.com/onsi/gomega"
27+
corev1 "k8s.io/api/core/v1"
2728
"k8s.io/apimachinery/pkg/types"
2829
"sigs.k8s.io/yaml"
2930

@@ -41,7 +42,7 @@ var _ = Describe("Test CRD Status", Label("apisix.apache.org", "v2", "apisixrout
4142
Context("Test ApisixRoute Sync Status", func() {
4243
BeforeEach(func() {
4344
By("create GatewayProxy")
44-
gatewayProxy := s.GetGatewayProxyYaml()
45+
gatewayProxy := s.GetGatewayProxyWithServiceYaml()
4546
err := s.CreateResourceFromString(gatewayProxy)
4647
Expect(err).NotTo(HaveOccurred(), "creating GatewayProxy")
4748
time.Sleep(5 * time.Second)
@@ -144,7 +145,8 @@ spec:
144145

145146
It("dataplane unavailable", func() {
146147
By("apply ApisixRoute")
147-
applier.MustApplyAPIv2(types.NamespacedName{Namespace: s.Namespace(), Name: "default"}, &apiv2.ApisixRoute{}, fmt.Sprintf(ar, s.Namespace(), s.Namespace()))
148+
arYaml := fmt.Sprintf(ar, s.Namespace(), s.Namespace())
149+
applier.MustApplyAPIv2(types.NamespacedName{Namespace: s.Namespace(), Name: "default"}, &apiv2.ApisixRoute{}, arYaml)
148150

149151
By("check route in APISIX")
150152
s.RequestAssert(&scaffold.RequestAssert{
@@ -154,27 +156,51 @@ spec:
154156
Check: scaffold.WithExpectedStatus(200),
155157
})
156158

157-
s.Deployer.ScaleDataplane(0)
159+
By("get yaml from service")
160+
serviceYaml, err := s.GetOutputFromString("svc", framework.ProviderType, "-o", "yaml")
161+
Expect(err).NotTo(HaveOccurred(), "getting service yaml")
162+
By("update service to type ExternalName with invalid host")
163+
var k8sservice corev1.Service
164+
err = yaml.Unmarshal([]byte(serviceYaml), &k8sservice)
165+
Expect(err).NotTo(HaveOccurred(), "unmarshalling service")
166+
oldSpec := k8sservice.Spec
167+
k8sservice.Spec = corev1.ServiceSpec{
168+
Type: corev1.ServiceTypeExternalName,
169+
ExternalName: "invalid.host",
170+
}
171+
newServiceYaml, err := yaml.Marshal(k8sservice)
172+
Expect(err).NotTo(HaveOccurred(), "marshalling service")
173+
err = s.CreateResourceFromString(string(newServiceYaml))
174+
Expect(err).NotTo(HaveOccurred(), "creating service")
158175

159176
By("check ApisixRoute status")
160177
s.RetryAssertion(func() string {
161-
output, _ := s.GetOutputFromString("ar", "default", "-o", "yaml", "-n", s.Namespace())
178+
output, _ := s.GetOutputFromString("ar", "default", "-o", "yaml")
162179
return output
163-
}).WithTimeout(80 * time.Second).
180+
}).WithTimeout(60 * time.Second).
164181
Should(
165182
And(
166183
ContainSubstring(`status: "False"`),
167184
ContainSubstring(`reason: SyncFailed`),
168185
),
169186
)
170187

171-
s.Deployer.ScaleDataplane(1)
188+
By("update service to original spec")
189+
serviceYaml, err = s.GetOutputFromString("svc", framework.ProviderType, "-o", "yaml")
190+
Expect(err).NotTo(HaveOccurred(), "getting service yaml")
191+
err = yaml.Unmarshal([]byte(serviceYaml), &k8sservice)
192+
Expect(err).NotTo(HaveOccurred(), "unmarshalling service")
193+
k8sservice.Spec = oldSpec
194+
newServiceYaml, err = yaml.Marshal(k8sservice)
195+
Expect(err).NotTo(HaveOccurred(), "marshalling service")
196+
err = s.CreateResourceFromString(string(newServiceYaml))
197+
Expect(err).NotTo(HaveOccurred(), "creating service")
172198

173199
By("check ApisixRoute status after scaling up")
174200
s.RetryAssertion(func() string {
175-
output, _ := s.GetOutputFromString("ar", "default", "-o", "yaml", "-n", s.Namespace())
201+
output, _ := s.GetOutputFromString("ar", "default", "-o", "yaml")
176202
return output
177-
}).WithTimeout(80 * time.Second).
203+
}).WithTimeout(60 * time.Second).
178204
Should(
179205
And(
180206
ContainSubstring(`status: "True"`),

test/e2e/framework/manifests/ingress.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,7 @@ data:
333333
# The period between two consecutive syncs.
334334
# The default value is 0 seconds, which means the controller will not sync.
335335
# If you want to enable the sync, set it to a positive value.
336-
init_sync_delay: {{ .InitSyncDelay | default "1m" }}
336+
init_sync_delay: {{ .InitSyncDelay | default "20m" }}
337337
---
338338
apiVersion: v1
339339
kind: Service

test/e2e/scaffold/scaffold.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -460,6 +460,25 @@ spec:
460460
value: "%s"
461461
`
462462

463+
const gatewayProxyWithServiceYaml = `
464+
apiVersion: apisix.apache.org/v1alpha1
465+
kind: GatewayProxy
466+
metadata:
467+
name: %s
468+
namespace: %s
469+
spec:
470+
provider:
471+
type: ControlPlane
472+
controlPlane:
473+
service:
474+
name: %s
475+
port: 9180
476+
auth:
477+
type: AdminKey
478+
adminKey:
479+
value: "%s"
480+
`
481+
463482
const ingressClassYaml = `
464483
apiVersion: networking.k8s.io/v1
465484
kind: IngressClass
@@ -479,6 +498,10 @@ func (s *Scaffold) GetGatewayProxyYaml() string {
479498
return fmt.Sprintf(gatewayProxyYaml, s.namespace, s.namespace, s.Deployer.GetAdminEndpoint(), s.AdminKey())
480499
}
481500

501+
func (s *Scaffold) GetGatewayProxyWithServiceYaml() string {
502+
return fmt.Sprintf(gatewayProxyWithServiceYaml, s.namespace, s.namespace, s.dataplaneService.Name, s.AdminKey())
503+
}
504+
482505
func (s *Scaffold) GetIngressClassYaml() string {
483506
return fmt.Sprintf(ingressClassYaml, s.namespace, s.GetControllerName(), s.namespace, s.namespace)
484507
}

0 commit comments

Comments
 (0)