Skip to content

Commit d1feb26

Browse files
committed
Send kubernetes-generated data to adaptor (CloudNativeSDWAN#32)
Events generated by kubernetes services are now sent to the adaptor. Signed-off-by: Elis Lulja <elulja@cisco.com>
1 parent 77d50d0 commit d1feb26

File tree

3 files changed

+367
-87
lines changed

3 files changed

+367
-87
lines changed

pkg/cmd/watch/kubernetes/command.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323

2424
corev1 "k8s.io/api/core/v1"
2525

26+
"github.com/CloudNativeSDWAN/cnwan-reader/pkg/internal/utils"
2627
"github.com/rs/zerolog"
2728
"github.com/spf13/cobra"
2829
"k8s.io/client-go/util/homedir"
@@ -55,6 +56,13 @@ LoadBalancer services and create events that will be later sent to the CN-WAN Ad
5556
return fmt.Errorf("no annotation keys provided")
5657
}
5758

59+
adaptorEndpoint, err := utils.GetAdaptorEndpointFromFlags(cmd)
60+
if err != nil {
61+
cmd.Help()
62+
return err
63+
}
64+
opts.adaptorEndpoint = adaptorEndpoint
65+
5866
k8s := &k8sWatcher{opts: opts, store: map[string]*corev1.Service{}}
5967
return k8s.main()
6068
},

pkg/cmd/watch/kubernetes/watcher.go

Lines changed: 101 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ import (
2525
"os/signal"
2626

2727
"github.com/CloudNativeSDWAN/cnwan-reader/pkg/openapi"
28+
"github.com/CloudNativeSDWAN/cnwan-reader/pkg/queue"
29+
"github.com/CloudNativeSDWAN/cnwan-reader/pkg/services"
2830
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2931

3032
corev1 "k8s.io/api/core/v1"
@@ -36,16 +38,22 @@ import (
3638
)
3739

3840
type k8sOptions struct {
39-
kubeconfigPath string
40-
annotationKeys []string
41+
kubeconfigPath string
42+
adaptorEndpoint string
43+
annotationKeys []string
4144
}
4245

4346
type k8sWatcher struct {
44-
opts k8sOptions
45-
store map[string]*corev1.Service
47+
opts k8sOptions
48+
sendQueue queue.Queue
49+
store map[string]*corev1.Service
4650
}
4751

4852
func (k *k8sWatcher) main() error {
53+
// --------------------------------------
54+
// Set ups
55+
// --------------------------------------
56+
4957
config, err := clientcmd.BuildConfigFromFlags("", k.opts.kubeconfigPath)
5058
if err != nil {
5159
log.Err(err).Msg("error while connecting to kubernetes cluster: exiting...")
@@ -67,20 +75,37 @@ func (k *k8sWatcher) main() error {
6775
return err
6876
}
6977

78+
// --------------------------------------
79+
// Set the helpers
80+
// --------------------------------------
81+
82+
servsHandler, err := services.NewHandler(ctx, k.opts.adaptorEndpoint)
83+
if err != nil {
84+
canc()
85+
return err
86+
}
87+
k.sendQueue = queue.New(ctx, servsHandler)
88+
89+
// --------------------------------------
90+
// Watch
91+
// --------------------------------------
92+
7093
go func() {
7194
k.watch(ctx, w)
7295
close(exitChan)
7396
}()
7497

98+
// --------------------------------------
99+
// Graceful shutdown
100+
// --------------------------------------
101+
75102
sig := make(chan os.Signal, 1)
76103
signal.Notify(sig, os.Interrupt)
77104

78105
<-sig
79106
fmt.Println()
80107
log.Info().Msg("exit requested")
81108

82-
// Cancel the context and wait for objects that use it to receive
83-
// the stop command
84109
canc()
85110
w.Stop()
86111
<-exitChan
@@ -102,114 +127,103 @@ func (k *k8sWatcher) watch(ctx context.Context, w watch.Interface) {
102127
return
103128
}
104129

105-
serv := parseService(ev)
106-
if serv == nil {
107-
continue
130+
if serv, success := ev.Object.(*corev1.Service); success {
131+
k.processNextService(serv, ev.Type)
108132
}
133+
}
134+
}
109135

110-
namespacedName := ktypes.NamespacedName{Namespace: serv.Namespace, Name: serv.Name}
111-
l := log.With().Str("event", string(ev.Type)).Str("service", namespacedName.String()).Logger()
136+
func (k *k8sWatcher) processNextService(serv *corev1.Service, evtype watch.EventType) {
137+
namespacedName := ktypes.NamespacedName{Namespace: serv.Namespace, Name: serv.Name}
138+
l := log.With().Str("service", namespacedName.String()).Logger()
112139

113-
// --------------------------------------
114-
// Parse event
115-
// --------------------------------------
140+
// --------------------------------------
141+
// Parse event
142+
// --------------------------------------
116143

117-
// TODO: on future versions this will be changed with the new openapi
118-
var oaevents []*openapi.Event
119-
switch ev.Type {
144+
// TODO: on future versions this will be changed with the new openapi
145+
oaevents := map[string]*openapi.Event{}
146+
switch evtype {
120147

121-
case watch.Deleted:
122-
if serv, exists := k.store[namespacedName.String()]; exists {
123-
oaservs, _ := getDataFromK8sService(serv, k.opts.annotationKeys)
124-
oaevents = make([]*openapi.Event, len(oaservs))
148+
case watch.Deleted:
149+
if serv, exists := k.store[namespacedName.String()]; exists {
150+
oaservs, _ := getDataFromK8sService(serv, k.opts.annotationKeys)
125151

126-
for i := range oaservs {
127-
oaevents[i] = &openapi.Event{
128-
Event: "delete",
129-
Service: *oaservs[i],
130-
}
152+
for _, serv := range oaservs {
153+
oaevents[serv.Name] = &openapi.Event{
154+
Event: "delete",
155+
Service: *serv,
131156
}
132-
133-
delete(k.store, namespacedName.String())
134157
}
135158

136-
case watch.Added:
137-
oaservs, err := getDataFromK8sService(serv, k.opts.annotationKeys)
138-
if err == nil {
139-
oaevents = make([]*openapi.Event, len(oaservs))
140-
for i := range oaservs {
141-
oaevents[i] = &openapi.Event{
142-
Event: "create",
143-
Service: *oaservs[i],
144-
}
145-
}
159+
delete(k.store, namespacedName.String())
160+
}
146161

147-
k.store[namespacedName.String()] = serv
162+
case watch.Added:
163+
oaservs, err := getDataFromK8sService(serv, k.opts.annotationKeys)
164+
if err == nil {
165+
for _, serv := range oaservs {
166+
oaevents[serv.Name] = &openapi.Event{
167+
Event: "create",
168+
Service: *serv,
169+
}
148170
}
149171

150-
case watch.Modified:
151-
prev, prevExists := k.store[namespacedName.String()]
152-
prevServs := func() []*openapi.Service {
153-
if prev == nil {
154-
return []*openapi.Service{}
155-
}
172+
k.store[namespacedName.String()] = serv
173+
}
156174

157-
servs, _ := getDataFromK8sService(prev, k.opts.annotationKeys)
158-
return servs
159-
}()
160-
161-
currServs, currErr := getDataFromK8sService(serv, k.opts.annotationKeys)
162-
if currErr != nil {
163-
if prevExists {
164-
oaevents = make([]*openapi.Event, len(prevServs))
165-
for i := range prevServs {
166-
oaevents[i] = &openapi.Event{
167-
Event: "delete",
168-
Service: *prevServs[i],
169-
}
175+
case watch.Modified:
176+
prev, prevExists := k.store[namespacedName.String()]
177+
prevServs := []*openapi.Service{}
178+
if prev != nil {
179+
prevServs, _ = getDataFromK8sService(prev, k.opts.annotationKeys)
180+
}
181+
182+
currServs, currErr := getDataFromK8sService(serv, k.opts.annotationKeys)
183+
if currErr != nil {
184+
if prevExists {
185+
for _, serv := range prevServs {
186+
oaevents[serv.Name] = &openapi.Event{
187+
Event: "delete",
188+
Service: *serv,
189+
}
190+
}
191+
delete(k.store, namespacedName.String())
192+
}
193+
} else {
194+
if prevExists {
195+
_oaevents := getServChanges(prevServs, currServs)
196+
for _, oae := range _oaevents {
197+
oaevents[oae.Service.Name] = &openapi.Event{
198+
Event: "update",
199+
Service: oae.Service,
170200
}
171-
delete(k.store, namespacedName.String())
172201
}
173202
} else {
174-
if prevExists {
175-
oaevents = getServChanges(prevServs, currServs)
176-
} else {
177-
oaevents = make([]*openapi.Event, len(currServs))
178-
179-
for i := range currServs {
180-
oaevents[i] = &openapi.Event{
181-
Event: "create",
182-
Service: *currServs[i],
183-
}
203+
for _, serv := range currServs {
204+
oaevents[serv.Name] = &openapi.Event{
205+
Event: "create",
206+
Service: *serv,
184207
}
185208
}
186-
187-
k.store[namespacedName.String()] = serv
188209
}
189-
}
190-
191-
// --------------------------------------
192-
// Send events
193-
// --------------------------------------
194210

195-
if len(oaevents) > 0 {
196-
// TODO: actually send events
197-
l.Info().Int("events", len(oaevents)).Msg("sending events...")
211+
k.store[namespacedName.String()] = serv
198212
}
199213
}
200-
}
201214

202-
func parseService(ev watch.Event) *corev1.Service {
203-
serv, success := ev.Object.(*corev1.Service)
204-
if success {
205-
return serv
206-
}
215+
// --------------------------------------
216+
// Send events
217+
// --------------------------------------
207218

208-
return nil
219+
if len(oaevents) > 0 {
220+
go k.sendQueue.Enqueue(oaevents)
221+
l.Info().Int("events", len(oaevents)).Msg("sending events...")
222+
}
209223
}
210224

211225
// TODO: this needs change with next version of openapi
212-
// TODO: This will be written on a new package of the operator since it uses the same code.
226+
// TODO: write this on a new package of the operator (since it uses the same code)?
213227
func getDataFromK8sService(serv *corev1.Service, annKeys []string) ([]*openapi.Service, error) {
214228
// -- Is this a LoadBalancer
215229
if serv.Spec.Type != corev1.ServiceTypeLoadBalancer {

0 commit comments

Comments
 (0)