Skip to content

Commit 1e67481

Browse files
authored
Merge pull request #140 from fi-ts/refactor-nextFreeSocket
Refactoring of LBManager.nextFreeSocket
2 parents 5fc4fdb + 65c2f4b commit 1e67481

File tree

2 files changed

+160
-32
lines changed

2 files changed

+160
-32
lines changed

pkg/lbmanager/lbmanager.go

Lines changed: 39 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,15 @@ import (
1111
"sigs.k8s.io/controller-runtime/pkg/client"
1212
)
1313

14+
// LBManager Responsible for the creation and deletion of externally accessible Services to access the Postgresql clusters managed by the Postgreslet.
1415
type LBManager struct {
15-
client.Client // todo: service cluster
16-
LBIP string // todo: via configmap
17-
PortRangeStart int32 // todo: via configmap
16+
client.Client
17+
LBIP string
18+
PortRangeStart int32
1819
PortRangeSize int32
1920
}
2021

22+
// New Creates a new LBManager with the given configuration
2123
func New(client client.Client, lbIP string, portRangeStart, portRangeSize int32) *LBManager {
2224
return &LBManager{
2325
Client: client,
@@ -27,6 +29,7 @@ func New(client client.Client, lbIP string, portRangeStart, portRangeSize int32)
2729
}
2830
}
2931

32+
// CreateSvcLBIfNone Creates a new Service of type LoadBalancer for the given Postgres resource if neccessary
3033
func (m *LBManager) CreateSvcLBIfNone(ctx context.Context, in *api.Postgres) error {
3134
if err := m.Get(ctx, client.ObjectKey{
3235
Namespace: in.ToPeripheralResourceNamespace(),
@@ -36,30 +39,20 @@ func (m *LBManager) CreateSvcLBIfNone(ctx context.Context, in *api.Postgres) err
3639
return fmt.Errorf("failed to fetch Service of type LoadBalancer: %w", err)
3740
}
3841

39-
existingLBIP, nextFreePort, err := m.nextFreeSocket(ctx)
42+
nextFreePort, err := m.nextFreePort(ctx)
4043
if err != nil {
41-
return fmt.Errorf("failed to get a free port for creating Service of type LoadBalancer: %w", err)
42-
}
43-
var lbIPToUse string
44-
if m.LBIP != "" {
45-
// a specific IP was configured in the config, so use that one
46-
lbIPToUse = m.LBIP
47-
} else if existingLBIP != "" {
48-
// no ip was configured, but one is already in use, so use the existing one
49-
lbIPToUse = existingLBIP
50-
} else {
51-
// nothing was configured, nothing exists yet, so use an empty address so a new loadbalancer will be created and assigned
52-
lbIPToUse = ""
44+
return fmt.Errorf("failed to get the next free port: %w", err)
5345
}
5446

55-
if err := m.Create(ctx, in.ToSvcLB(lbIPToUse, nextFreePort)); err != nil {
47+
if err := m.Create(ctx, in.ToSvcLB(m.LBIP, nextFreePort)); err != nil {
5648
return fmt.Errorf("failed to create Service of type LoadBalancer: %w", err)
5749
}
5850
return nil
5951
}
6052
return nil
6153
}
6254

55+
// DeleteSvcLB Deletes the corresponding Service of type LoadBalancer of the given Postgres resource.
6356
func (m *LBManager) DeleteSvcLB(ctx context.Context, in *api.Postgres) error {
6457
lb := &corev1.Service{}
6558
lb.Namespace = in.ToPeripheralResourceNamespace()
@@ -70,37 +63,51 @@ func (m *LBManager) DeleteSvcLB(ctx context.Context, in *api.Postgres) error {
7063
return nil
7164
}
7265

73-
func (m *LBManager) nextFreeSocket(ctx context.Context) (string, int32, error) {
66+
// nextFreeSocket finds any existing LoadBalancerIP and the next free port out of the configure port range.
67+
func (m *LBManager) nextFreePort(ctx context.Context) (int32, error) {
7468
// TODO prevent concurrency issues when calculating port / ip.
7569

76-
existingLBIP := ""
77-
70+
// Fetch all services managed by this postgreslet
7871
lbs := &corev1.ServiceList{}
7972
if err := m.List(ctx, lbs, client.MatchingLabels(api.SvcLoadBalancerLabel)); err != nil {
80-
return existingLBIP, 0, fmt.Errorf("failed to fetch the list of services of type LoadBalancer: %w", err)
73+
return 0, fmt.Errorf("failed to fetch the list of services of type LoadBalancer: %w", err)
8174
}
8275

76+
// If there are none, this will be the first (managed) service we create, so start with PortRangeStart and return
8377
if len(lbs.Items) == 0 {
84-
return existingLBIP, m.PortRangeStart, nil
78+
return m.PortRangeStart, nil
8579
}
8680

87-
// Record weather any port is occupied
88-
isOccupied := make([]bool, int(m.PortRangeSize))
81+
// If there are already any managed services, store all the used ports in a slice.
82+
portsInUse := []int32{}
8983
for i := range lbs.Items {
9084
svc := lbs.Items[i]
9185
if len(svc.Spec.Ports) > 0 {
92-
isOccupied[svc.Spec.Ports[0].Port-m.PortRangeStart] = true
93-
}
94-
if svc.Spec.LoadBalancerIP != "" {
95-
existingLBIP = svc.Spec.LoadBalancerIP
86+
portsInUse = append(portsInUse, svc.Spec.Ports[0].Port)
9687
}
9788
}
9889

99-
for i := range isOccupied {
100-
if !isOccupied[i] {
101-
return existingLBIP, m.PortRangeStart + int32(i), nil
90+
// Now try all ports in the configured port range to find a free one.
91+
// While not as effective as other implementations, this allows us to freely change PortRangeStart and PortRangeSize
92+
// retroactively without breaking the implementation.
93+
for port := m.PortRangeStart; port < m.PortRangeStart+m.PortRangeSize; port++ {
94+
if containsElem(portsInUse, port) {
95+
// Port already in use, try the next one
96+
continue
10297
}
98+
// The postgreslet hasn't assigned this port yet, so use it.
99+
return port, nil
103100
}
104101

105-
return existingLBIP, 0, errors.New("no free port")
102+
// If we made it this far, no free port could be found.
103+
return 0, errors.New("no free port in the configured port range found")
104+
}
105+
106+
func containsElem(s []int32, v int32) bool {
107+
for _, elem := range s {
108+
if elem == v {
109+
return true
110+
}
111+
}
112+
return false
106113
}

pkg/lbmanager/lbmanager_test.go

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
package lbmanager
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"testing"
7+
8+
api "github.com/fi-ts/postgreslet/api/v1"
9+
corev1 "k8s.io/api/core/v1"
10+
"k8s.io/apimachinery/pkg/runtime"
11+
"sigs.k8s.io/controller-runtime/pkg/client/fake"
12+
)
13+
14+
func TestLBManager_nextFreePort(t *testing.T) {
15+
portRangeStart := int32(0)
16+
portRangeSize := int32(5)
17+
18+
tests := []struct {
19+
name string
20+
lbMgr *LBManager
21+
want int32
22+
wantErr bool
23+
}{
24+
{
25+
name: "no svc in the cluster",
26+
lbMgr: &LBManager{
27+
Client: fake.NewClientBuilder().WithScheme(scheme()).WithLists(svcListWithPorts()).Build(),
28+
LBIP: "0.0.0.0",
29+
PortRangeStart: portRangeStart,
30+
PortRangeSize: portRangeSize,
31+
},
32+
want: 0,
33+
wantErr: false,
34+
},
35+
{
36+
name: "one svc already in the cluster",
37+
lbMgr: &LBManager{
38+
Client: fake.NewClientBuilder().WithScheme(scheme()).WithLists(svcListWithPorts(0)).Build(),
39+
LBIP: "0.0.0.0",
40+
PortRangeStart: portRangeStart,
41+
PortRangeSize: portRangeSize,
42+
},
43+
want: 1,
44+
wantErr: false,
45+
},
46+
{
47+
name: "last free port left",
48+
lbMgr: &LBManager{
49+
Client: fake.NewClientBuilder().WithScheme(scheme()).WithLists(svcListWithPorts(0, 1, 2, 3)).Build(),
50+
LBIP: "0.0.0.0",
51+
PortRangeStart: portRangeStart,
52+
PortRangeSize: portRangeSize,
53+
},
54+
want: 4,
55+
wantErr: false,
56+
},
57+
{
58+
name: "no free port",
59+
lbMgr: &LBManager{
60+
Client: fake.NewClientBuilder().WithScheme(scheme()).WithLists(svcListWithPorts(0, 1, 2, 3, 4)).Build(),
61+
LBIP: "0.0.0.0",
62+
PortRangeStart: portRangeStart,
63+
PortRangeSize: portRangeSize,
64+
},
65+
want: 0,
66+
wantErr: true,
67+
},
68+
{
69+
name: "re-use releaased port",
70+
lbMgr: &LBManager{
71+
Client: fake.NewClientBuilder().WithScheme(scheme()).WithLists(svcListWithPorts(0, 2, 3)).Build(),
72+
LBIP: "0.0.0.0",
73+
PortRangeStart: portRangeStart,
74+
PortRangeSize: portRangeSize,
75+
},
76+
want: 1,
77+
wantErr: false,
78+
},
79+
}
80+
81+
for _, tt := range tests {
82+
t.Run(tt.name, func(t *testing.T) {
83+
got, err := tt.lbMgr.nextFreePort(context.Background())
84+
if (err != nil) != tt.wantErr {
85+
t.Errorf("LBManager.nextFreePort() error = %v, wantErr %v", err, tt.wantErr)
86+
return
87+
}
88+
if got != tt.want {
89+
t.Errorf("LBManager.nextFreePort() = %v, want %v", got, tt.want)
90+
}
91+
})
92+
}
93+
}
94+
95+
func scheme() *runtime.Scheme {
96+
scheme := runtime.NewScheme()
97+
_ = corev1.AddToScheme(scheme)
98+
99+
return scheme
100+
}
101+
102+
// svcListWithPorts generates a `ServiceList` containing `Service`s with ports respectively
103+
func svcListWithPorts(ports ...int32) *corev1.ServiceList {
104+
svcList := &corev1.ServiceList{}
105+
for _, port := range ports {
106+
svcList.Items = append(svcList.Items, *svcWithPort(port))
107+
}
108+
return svcList
109+
}
110+
111+
func svcWithPort(port int32) *corev1.Service {
112+
svc := corev1.Service{}
113+
svc.Name = fmt.Sprintf("svc-with-port-%d", port)
114+
svc.Labels = api.SvcLoadBalancerLabel
115+
svc.Spec.Ports = []corev1.ServicePort{
116+
{
117+
Port: port,
118+
},
119+
}
120+
return &svc
121+
}

0 commit comments

Comments
 (0)