Skip to content

Commit 326d6a6

Browse files
committed
Get logs by selector in integration_test
This allows to get the logs of the deployment too
1 parent c28d77f commit 326d6a6

File tree

4 files changed

+115
-101
lines changed

4 files changed

+115
-101
lines changed

pkg/deployer/integration_test_helper.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package deployer
55

66
import (
77
"context"
8+
"fmt"
89
"io"
910
"net/http"
1011
"strings"
@@ -156,7 +157,8 @@ func IntegrationTest(t *testing.T, deployer fn.Deployer) {
156157

157158
var buff = &knative.SynchronizedBuffer{}
158159
go func() {
159-
_ = knative.GetKServiceLogs(ctx, namespace, functionName, function.Deploy.Image, &now, buff)
160+
selector := fmt.Sprintf("function.knative.dev/name=%s", functionName)
161+
_ = k8s.GetPodLogsBySelector(ctx, namespace, selector, "user-container", "", &now, buff)
160162
}()
161163

162164
depRes, err := deployer.Deploy(ctx, function)
@@ -203,7 +205,7 @@ func IntegrationTest(t *testing.T, deployer fn.Deployer) {
203205
reqBody := "Hello World!"
204206
respBody, err := postText(ctx, instance.Route, reqBody)
205207
if err != nil {
206-
t.Error(err)
208+
t.Fatalf("failed to invoke function: %v", err)
207209
} else {
208210
t.Log("resp body:\n" + respBody)
209211
if !strings.Contains(respBody, reqBody) {

pkg/deployer/k8s/deployer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ func (d *Deployer) generateResources(f fn.Function, namespace string, daprInstal
182182
}
183183

184184
container := corev1.Container{
185-
Name: f.Name,
185+
Name: "user-container",
186186
Image: f.Deploy.Image,
187187
Ports: []corev1.ContainerPort{
188188
{

pkg/k8s/logs.go

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,15 @@ package k8s
33
import (
44
"bytes"
55
"context"
6+
"fmt"
67
"io"
8+
"sync"
9+
"time"
710

11+
"golang.org/x/sync/errgroup"
812
corev1 "k8s.io/api/core/v1"
13+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
14+
"k8s.io/apimachinery/pkg/watch"
915
)
1016

1117
// GetPodLogs returns logs from a specified Container in a Pod, if container is empty string,
@@ -33,3 +39,105 @@ func GetPodLogs(ctx context.Context, namespace, podName, containerName string) (
3339

3440
return buffer.String(), nil
3541
}
42+
43+
// GetPodLogsBySelector will get logs of a pod.
44+
//
45+
// It will do so by gathering logs of the given container of all affiliated pods.
46+
// In addition, filtering on image can be done so only logs for given image are logged.
47+
//
48+
// This function runs as long as the passed context is active (i.e. it is required cancel the context to stop log gathering).
49+
func GetPodLogsBySelector(ctx context.Context, namespace, labelSelector, containerName, image string, since *time.Time, out io.Writer) error {
50+
client, namespace, err := NewClientAndResolvedNamespace(namespace)
51+
if err != nil {
52+
return fmt.Errorf("cannot create k8s client: %w", err)
53+
}
54+
55+
pods := client.CoreV1().Pods(namespace)
56+
57+
podListOpts := metav1.ListOptions{
58+
Watch: true,
59+
LabelSelector: labelSelector,
60+
}
61+
62+
w, err := pods.Watch(ctx, podListOpts)
63+
if err != nil {
64+
return fmt.Errorf("cannot create watch: %w", err)
65+
}
66+
defer w.Stop()
67+
68+
beingProcessed := make(map[string]bool)
69+
var beingProcessedMu sync.Mutex
70+
71+
copyLogs := func(pod corev1.Pod) error {
72+
defer func() {
73+
beingProcessedMu.Lock()
74+
delete(beingProcessed, pod.Name)
75+
beingProcessedMu.Unlock()
76+
}()
77+
podLogOpts := corev1.PodLogOptions{
78+
Container: containerName,
79+
Follow: true,
80+
}
81+
if since != nil {
82+
sinceTime := metav1.NewTime(*since)
83+
podLogOpts.SinceTime = &sinceTime
84+
}
85+
req := client.CoreV1().Pods(namespace).GetLogs(pod.Name, &podLogOpts)
86+
87+
r, e := req.Stream(ctx)
88+
if e != nil {
89+
return fmt.Errorf("cannot get stream: %w", e)
90+
}
91+
defer r.Close()
92+
_, e = io.Copy(out, r)
93+
if e != nil {
94+
return fmt.Errorf("error copying logs: %w", e)
95+
}
96+
return nil
97+
}
98+
99+
mayReadLogs := func(pod corev1.Pod) bool {
100+
for _, status := range pod.Status.ContainerStatuses {
101+
if status.Name == containerName {
102+
return status.State.Running != nil || status.State.Terminated != nil
103+
}
104+
}
105+
return false
106+
}
107+
108+
getImage := func(pod corev1.Pod) string {
109+
for _, ctr := range pod.Spec.Containers {
110+
if ctr.Name == containerName {
111+
return ctr.Image
112+
}
113+
}
114+
return ""
115+
}
116+
117+
var eg errgroup.Group
118+
119+
for event := range w.ResultChan() {
120+
if event.Type == watch.Modified || event.Type == watch.Added {
121+
pod := *event.Object.(*corev1.Pod)
122+
123+
beingProcessedMu.Lock()
124+
_, loggingAlready := beingProcessed[pod.Name]
125+
beingProcessedMu.Unlock()
126+
127+
if !loggingAlready && (image == "" || image == getImage(pod)) && mayReadLogs(pod) {
128+
129+
beingProcessedMu.Lock()
130+
beingProcessed[pod.Name] = true
131+
beingProcessedMu.Unlock()
132+
133+
eg.Go(func() error { return copyLogs(pod) })
134+
}
135+
}
136+
}
137+
138+
err = eg.Wait()
139+
if err != nil {
140+
return fmt.Errorf("error while gathering logs: %w", err)
141+
}
142+
return nil
143+
}

pkg/knative/logs.go

Lines changed: 2 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,6 @@ import (
88
"sync"
99
"time"
1010

11-
"golang.org/x/sync/errgroup"
12-
13-
corev1 "k8s.io/api/core/v1"
14-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
15-
"k8s.io/apimachinery/pkg/watch"
1611
"knative.dev/func/pkg/k8s"
1712
)
1813

@@ -24,99 +19,8 @@ import (
2419
//
2520
// This function runs as long as the passed context is active (i.e. it is required cancel the context to stop log gathering).
2621
func GetKServiceLogs(ctx context.Context, namespace, kServiceName, image string, since *time.Time, out io.Writer) error {
27-
client, namespace, err := k8s.NewClientAndResolvedNamespace(namespace)
28-
if err != nil {
29-
return fmt.Errorf("cannot create k8s client: %w", err)
30-
}
31-
32-
pods := client.CoreV1().Pods(namespace)
33-
34-
podListOpts := metav1.ListOptions{
35-
Watch: true,
36-
LabelSelector: fmt.Sprintf("serving.knative.dev/service=%s", kServiceName),
37-
}
38-
39-
w, err := pods.Watch(ctx, podListOpts)
40-
if err != nil {
41-
return fmt.Errorf("cannot create watch: %w", err)
42-
}
43-
defer w.Stop()
44-
45-
beingProcessed := make(map[string]bool)
46-
var beingProcessedMu sync.Mutex
47-
48-
copyLogs := func(pod corev1.Pod) error {
49-
defer func() {
50-
beingProcessedMu.Lock()
51-
delete(beingProcessed, pod.Name)
52-
beingProcessedMu.Unlock()
53-
}()
54-
podLogOpts := corev1.PodLogOptions{
55-
Container: "user-container",
56-
Follow: true,
57-
}
58-
if since != nil {
59-
sinceTime := metav1.NewTime(*since)
60-
podLogOpts.SinceTime = &sinceTime
61-
}
62-
req := client.CoreV1().Pods(namespace).GetLogs(pod.Name, &podLogOpts)
63-
64-
r, e := req.Stream(ctx)
65-
if e != nil {
66-
return fmt.Errorf("cannot get stream: %w", e)
67-
}
68-
defer r.Close()
69-
_, e = io.Copy(out, r)
70-
if e != nil {
71-
return fmt.Errorf("error copying logs: %w", e)
72-
}
73-
return nil
74-
}
75-
76-
mayReadLogs := func(pod corev1.Pod) bool {
77-
for _, status := range pod.Status.ContainerStatuses {
78-
if status.Name == "user-container" {
79-
return status.State.Running != nil || status.State.Terminated != nil
80-
}
81-
}
82-
return false
83-
}
84-
85-
getImage := func(pod corev1.Pod) string {
86-
for _, ctr := range pod.Spec.Containers {
87-
if ctr.Name == "user-container" {
88-
return ctr.Image
89-
}
90-
}
91-
return ""
92-
}
93-
94-
var eg errgroup.Group
95-
96-
for event := range w.ResultChan() {
97-
if event.Type == watch.Modified || event.Type == watch.Added {
98-
pod := *event.Object.(*corev1.Pod)
99-
100-
beingProcessedMu.Lock()
101-
_, loggingAlready := beingProcessed[pod.Name]
102-
beingProcessedMu.Unlock()
103-
104-
if !loggingAlready && (image == "" || image == getImage(pod)) && mayReadLogs(pod) {
105-
106-
beingProcessedMu.Lock()
107-
beingProcessed[pod.Name] = true
108-
beingProcessedMu.Unlock()
109-
110-
eg.Go(func() error { return copyLogs(pod) })
111-
}
112-
}
113-
}
114-
115-
err = eg.Wait()
116-
if err != nil {
117-
return fmt.Errorf("error while gathering logs: %w", err)
118-
}
119-
return nil
22+
selector := fmt.Sprintf("serving.knative.dev/service=%s", kServiceName)
23+
return k8s.GetPodLogsBySelector(ctx, namespace, selector, "user-container", image, since, out)
12024
}
12125

12226
type SynchronizedBuffer struct {

0 commit comments

Comments
 (0)