@@ -5,54 +5,63 @@ import (
55 "github.com/go-logr/logr"
66 "github.com/pkg/errors"
77 corev1 "k8s.io/api/core/v1"
8- "k8s.io/apimachinery/pkg/labels"
98 "k8s.io/apimachinery/pkg/types"
109 "k8s.io/apimachinery/pkg/util/intstr"
1110 "sigs.k8s.io/aws-load-balancer-controller/pkg/k8s"
1211 "sigs.k8s.io/controller-runtime/pkg/client"
1312)
1413
15- // EndpointResolver resolves the endpoints for specific service & service Port.
14+ // TODO: for pod endpoints, we currently rely on endpoints events, we might change to use pod events directly in the future.
15+ // under current implementation with pod readinessGate enabled, an unready endpoint but not match our inclusionCriteria won't be registered,
16+ // and it won't turn ready due to blocked by readinessGate, and no future endpoint events will trigger.
17+ // We solve this by requeue the TGB if unready endpoints have the potential to be ready if reconcile in later time.
18+
19+ // EndpointResolver resolves the endpoints for specific service & service Port.
1620type EndpointResolver interface {
1721 // ResolvePodEndpoints will resolve endpoints backed by pods directly.
18- ResolvePodEndpoints (ctx context.Context , svcKey types.NamespacedName , port intstr.IntOrString , opts ... EndpointResolveOption ) ([]PodEndpoint , error )
22+ // returns resolved podEndpoints and whether there are unready endpoints that can potentially turn ready in future reconciles.
23+ ResolvePodEndpoints (ctx context.Context , svcKey types.NamespacedName , port intstr.IntOrString ,
24+ opts ... EndpointResolveOption ) ([]PodEndpoint , bool , error )
1925
2026 // ResolveNodePortEndpoints will resolve endpoints backed by nodePort.
21- ResolveNodePortEndpoints (ctx context.Context , svcKey types.NamespacedName , port intstr.IntOrString , opts ... EndpointResolveOption ) ([]NodePortEndpoint , error )
27+ ResolveNodePortEndpoints (ctx context.Context , svcKey types.NamespacedName , port intstr.IntOrString ,
28+ opts ... EndpointResolveOption ) ([]NodePortEndpoint , error )
2229}
2330
2431// NewDefaultEndpointResolver constructs new defaultEndpointResolver
25- func NewDefaultEndpointResolver (k8sClient client.Client , logger logr.Logger ) * defaultEndpointResolver {
32+ func NewDefaultEndpointResolver (k8sClient client.Client , podInfoRepo k8s. PodInfoRepo , logger logr.Logger ) * defaultEndpointResolver {
2633 return & defaultEndpointResolver {
27- k8sClient : k8sClient ,
28- logger : logger ,
34+ k8sClient : k8sClient ,
35+ podInfoRepo : podInfoRepo ,
36+ logger : logger ,
2937 }
3038}
3139
3240var _ EndpointResolver = & defaultEndpointResolver {}
3341
3442// default implementation for EndpointResolver
3543type defaultEndpointResolver struct {
36- k8sClient client.Client
37- logger logr.Logger
44+ k8sClient client.Client
45+ podInfoRepo k8s.PodInfoRepo
46+ logger logr.Logger
3847}
3948
40- func (r * defaultEndpointResolver ) ResolvePodEndpoints (ctx context.Context , svcKey types.NamespacedName , port intstr.IntOrString , opts ... EndpointResolveOption ) ([]PodEndpoint , error ) {
41- resolveOpts := EndpointResolveOptions {
42- NodeSelector : labels .Nothing (),
43- }
49+ func (r * defaultEndpointResolver ) ResolvePodEndpoints (ctx context.Context , svcKey types.NamespacedName , port intstr.IntOrString ,
50+ opts ... EndpointResolveOption ) ([]PodEndpoint , bool , error ) {
51+ resolveOpts := defaultEndpointResolveOptions ()
4452 resolveOpts .ApplyOptions (opts )
53+
4554 svc , svcPort , err := r .findServiceAndServicePort (ctx , svcKey , port )
4655 if err != nil {
47- return nil , err
56+ return nil , false , err
4857 }
49-
5058 epsKey := k8s .NamespacedName (svc ) // k8s Endpoints have same name as k8s Service
5159 eps := & corev1.Endpoints {}
5260 if err := r .k8sClient .Get (ctx , epsKey , eps ); err != nil {
53- return nil , err
61+ return nil , false , err
5462 }
5563
64+ containsPotentialReadyEndpoints := false
5665 var endpoints []PodEndpoint
5766 for _ , epSubset := range eps .Subsets {
5867 for _ , epPort := range epSubset .Ports {
@@ -65,43 +74,53 @@ func (r *defaultEndpointResolver) ResolvePodEndpoints(ctx context.Context, svcKe
6574 if epAddr .TargetRef == nil || epAddr .TargetRef .Kind != "Pod" {
6675 continue
6776 }
68- epPod , err := r .findPodByReference (ctx , svc .Namespace , * epAddr .TargetRef )
77+ pod , exists , err := r .findPodByReference (ctx , svc .Namespace , * epAddr .TargetRef )
6978 if err != nil {
70- return nil , err
79+ return nil , false , err
7180 }
72- endpoints = append (endpoints , buildPodEndpoint (epPod , epAddr , epPort ))
81+ if ! exists {
82+ return nil , false , errors .New ("couldn't find podInfo for ready endpoint" )
83+ }
84+ endpoints = append (endpoints , buildPodEndpoint (pod , epAddr , epPort ))
7385 }
7486
75- if len (resolveOpts .UnreadyPodInclusionCriteria ) != 0 {
87+ if len (resolveOpts .PodReadinessGates ) != 0 {
7688 for _ , epAddr := range epSubset .NotReadyAddresses {
7789 if epAddr .TargetRef == nil || epAddr .TargetRef .Kind != "Pod" {
7890 continue
7991 }
80- epPod , err := r .findPodByReference (ctx , svc .Namespace , * epAddr .TargetRef )
92+ pod , exists , err := r .findPodByReference (ctx , svc .Namespace , * epAddr .TargetRef )
8193 if err != nil {
82- return nil , err
94+ return nil , false , err
95+ }
96+ if ! exists {
97+ containsPotentialReadyEndpoints = true
98+ continue
99+ }
100+ if ! pod .HasAnyOfReadinessGates (resolveOpts .PodReadinessGates ) {
101+ continue
83102 }
84- if isPodMeetCriteria (epPod , resolveOpts .UnreadyPodInclusionCriteria ) {
85- endpoints = append (endpoints , buildPodEndpoint (epPod , epAddr , epPort ))
103+ if ! pod .IsContainersReady () {
104+ containsPotentialReadyEndpoints = true
105+ continue
86106 }
107+ endpoints = append (endpoints , buildPodEndpoint (pod , epAddr , epPort ))
87108 }
88109 }
89110 }
90111 }
91112
92- return endpoints , nil
113+ return endpoints , containsPotentialReadyEndpoints , nil
93114}
94115
95116func (r * defaultEndpointResolver ) ResolveNodePortEndpoints (ctx context.Context , svcKey types.NamespacedName , port intstr.IntOrString , opts ... EndpointResolveOption ) ([]NodePortEndpoint , error ) {
96- resolveOpts := EndpointResolveOptions {
97- NodeSelector : labels .Nothing (),
98- }
117+ resolveOpts := defaultEndpointResolveOptions ()
99118 resolveOpts .ApplyOptions (opts )
119+
100120 svc , svcPort , err := r .findServiceAndServicePort (ctx , svcKey , port )
101121 if err != nil {
102122 return nil , err
103123 }
104-
105124 if svc .Spec .Type != corev1 .ServiceTypeNodePort && svc .Spec .Type != corev1 .ServiceTypeLoadBalancer {
106125 return nil , errors .Errorf ("service type must be either 'NodePort' or 'LoadBalancer': %v" , svcKey )
107126 }
@@ -140,25 +159,12 @@ func (r *defaultEndpointResolver) findServiceAndServicePort(ctx context.Context,
140159 return svc , svcPort , nil
141160}
142161
143- func (r * defaultEndpointResolver ) findPodByReference (ctx context.Context , namespace string , podRef corev1.ObjectReference ) (* corev1.Pod , error ) {
144- pod := & corev1.Pod {}
162+ func (r * defaultEndpointResolver ) findPodByReference (ctx context.Context , namespace string , podRef corev1.ObjectReference ) (k8s.PodInfo , bool , error ) {
145163 podKey := types.NamespacedName {Namespace : namespace , Name : podRef .Name }
146- if err := r .k8sClient .Get (ctx , podKey , pod ); err != nil {
147- return nil , err
148- }
149- return pod , nil
150- }
151-
152- func isPodMeetCriteria (pod * corev1.Pod , criteria []PodPredicate ) bool {
153- for _ , criterion := range criteria {
154- if ! criterion (pod ) {
155- return false
156- }
157- }
158- return true
164+ return r .podInfoRepo .Get (ctx , podKey )
159165}
160166
161- func buildPodEndpoint (pod * corev1. Pod , epAddr corev1.EndpointAddress , epPort corev1.EndpointPort ) PodEndpoint {
167+ func buildPodEndpoint (pod k8s. PodInfo , epAddr corev1.EndpointAddress , epPort corev1.EndpointPort ) PodEndpoint {
162168 return PodEndpoint {
163169 IP : epAddr .IP ,
164170 Port : int64 (epPort .Port ),
0 commit comments