Skip to content

Commit cfa5d41

Browse files
authored
limit nodagent container runtime mutation operation concurrency (#149)
1 parent d31ffac commit cfa5d41

File tree

6 files changed

+44
-27
lines changed

6 files changed

+44
-27
lines changed

cmd/runtimetest/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import (
2525
)
2626

2727
func main() {
28-
runtimeService, err := dependency.InitRuntimeService(config.DefaultContainerRuntimeEndpoint)
28+
runtimeService, err := dependency.InitRuntimeService(config.DefaultContainerRuntimeEndpoint, 1)
2929
if err != nil {
3030
klog.ErrorS(err, "Failed to init runtime service")
3131
os.Exit(-1)

pkg/fornaxcore/application/application_session.go

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ func (am *ApplicationManager) onApplicationSessionAddEvent(obj interface{}) {
9696

9797
klog.InfoS("Application session created", "session", util.Name(session))
9898
if v := pool.getSession(string(session.GetUID())); v != nil {
99-
am.onApplicationSessionUpdateEvent(v.session, session)
99+
am.onApplicationSessionUpdateEvent(v.session, v)
100100
return
101101
} else {
102102
if !util.SessionInTerminalState(session) {
@@ -119,13 +119,12 @@ func (am *ApplicationManager) onApplicationSessionUpdateEvent(old, cur interface
119119

120120
if v := pool.getSession(string(newCopy.GetUID())); v != nil {
121121
updateSessionPool(pool, newCopy)
122-
am.enqueueApplication(applicationKey)
123122
} else {
124123
if !util.SessionInTerminalState(newCopy) {
125124
updateSessionPool(pool, newCopy)
126125
}
127-
am.enqueueApplication(applicationKey)
128126
}
127+
am.enqueueApplication(applicationKey)
129128
}
130129

131130
// callback from Application informer when ApplicationSession is physically deleted
@@ -143,8 +142,7 @@ func (am *ApplicationManager) onApplicationSessionDeleteEvent(obj interface{}) {
143142
if pool == nil {
144143
return
145144
}
146-
oldCopy := pool.getSession(string(session.GetUID()))
147-
if oldCopy != nil {
145+
if oldCopy := pool.getSession(string(session.GetUID())); oldCopy != nil {
148146
if oldCopy.session.DeletionTimestamp == nil {
149147
oldCopy.session.DeletionTimestamp = util.NewCurrentMetaTime()
150148
}
@@ -168,14 +166,15 @@ func (ps PendingSessions) Swap(i, j int) {
168166
ps[i], ps[j] = ps[j], ps[i]
169167
}
170168

171-
// deployApplicationSessions grab a list of pending session and try to allocate them to pods and call OpenSession on choosen pod.
172-
// session status change in memory to SessionStatusStarting, but do not update etcd to avoid unnecessary resync.
173-
// session status will be changed in etcd until pod report back, if fornax core restart and lost these memory state, it rely on pod to report back.
174-
// It also cleanup session when a session is in Starting or Pending state for more than a timeout duration.
169+
// deployApplicationSessions group session into pending, timeout, deleting states, and
170+
// 1, assign pending session to idle pods and call OpenSession on choosen pod.
171+
// session status change in memory to SessionStatusStarting, session is store in node and report back,
172+
// if fornax core restart and lost these memory state, it rely on pod to report back.
173+
// 2, It cleanup timeout session which stuck in pending or starting session for more than a timeout duration.
174+
// and call node to close session if session is in starting state which was sent to a pod before.
175175
// session is changed to SessionStatusTimeout, session client need to create a new session.
176-
// It also cleanup session in deletingSessions when a session is in Starting or Pending state for more than a timeout duration.
177-
// session is changed to SessionStatusClosed, session client need to create a new session.
178-
// session timedout and closed are removed from application pool's session list, so, syncApplicationPods do not need to consider these sessions anymore
176+
// 3, if a session is being deleted by client(aka, close session), it call node to close session session,
177+
// timedout and closed session are removed from application's session pool
179178
func (am *ApplicationManager) deployApplicationSessions(pool *ApplicationPool, application *fornaxv1.Application) error {
180179
pendingSessions, deletingSessions, timeoutSessions := pool.getNonRunningSessions()
181180
// get 5 more in case some pods assigment failed
@@ -306,7 +305,7 @@ func (am *ApplicationManager) bindSessionToPod(pool *ApplicationPool, pod *v1.Po
306305
// cleanupSessionOnDeletedPod handle pod is terminated unexpectedly, e.g. node crash
307306
// in normal cases,session should be closed before pod is terminated and deleted.
308307
// It update open session to closed and pending session to timedout,
309-
// and does not try to call node to close session, as session does not exist at all on node when pod deleted
308+
// and does not try to call node to close session, as session does not exist at all on node when pod terminated on node
310309
func (am *ApplicationManager) cleanupSessionOnDeletedPod(pool *ApplicationPool, podName string) {
311310
podSessions := pool.getPodSessions(podName)
312311
for _, sess := range podSessions {
@@ -315,9 +314,9 @@ func (am *ApplicationManager) cleanupSessionOnDeletedPod(pool *ApplicationPool,
315314
}
316315
}
317316

318-
// cleanupSessionOfApplication if a application is being deleted,
319-
// close all sessions which are still alive and delete sessions from application sessions pool if they are still pending
320-
// when alive session reported as closed by Node Agent, then session can be eventually deleted
317+
// cleanupSessionOfApplication if a application is being deleted, it
318+
// call node to close all sessions which are still open, when session reported as closed from node, then session can be eventually deleted
319+
// if session is still pending assigned to pod, delete sessions from application sessions pool directly
321320
func (am *ApplicationManager) cleanupSessionOfApplication(pool *ApplicationPool) error {
322321
deleteErrors := []error{}
323322

pkg/nodeagent/config/config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ const (
7373
KubeletPluginsDirSELinuxLabel = "system_u:object_r:container_file_t:s0"
7474
DefaultPodCgroupName = "containers"
7575
DefaultRuntimeHandler = "runc"
76+
DefaultPodConcurrency = 5
7677
)
7778

7879
type NodeConfiguration struct {
@@ -111,6 +112,7 @@ type NodeConfiguration struct {
111112
SeccompDefault bool
112113
NodePortStartingNo int32
113114
SessionServicePort int32
115+
PodConcurrency int
114116
}
115117

116118
func DefaultNodeConfiguration() (*NodeConfiguration, error) {
@@ -142,6 +144,7 @@ func DefaultNodeConfiguration() (*NodeConfiguration, error) {
142144
NodeAgentCgroupName: DefaultNodeAgentCgroupName,
143145
OOMScoreAdj: -999,
144146
QOSReserved: map[v1.ResourceName]int64{},
147+
PodConcurrency: DefaultPodConcurrency,
145148
PodLogRootPath: DefaultPodLogsRootPath,
146149
PodPidLimits: DefaultPodPidLimits,
147150
PodsPerCore: DefaultPodsPerCore,

pkg/nodeagent/dependency/dependency.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ func InitBasicDependencies(ctx context.Context, nodeConfig config.NodeConfigurat
8080
dependencies.NetworkProvider = InitNetworkProvider(nodeConfig.Hostname)
8181

8282
// Runtime
83-
dependencies.RuntimeService, err = InitRuntimeService(nodeConfig.ContainerRuntimeEndpoint)
83+
dependencies.RuntimeService, err = InitRuntimeService(nodeConfig.ContainerRuntimeEndpoint, nodeConfig.PodConcurrency)
8484
if err != nil {
8585
klog.ErrorS(err, "failed to init container runtime client")
8686
return nil, err
@@ -104,8 +104,8 @@ func InitBasicDependencies(ctx context.Context, nodeConfig config.NodeConfigurat
104104
return &dependencies, nil
105105
}
106106

107-
func InitRuntimeService(endpoint string) (runtime.RuntimeService, error) {
108-
return runtime.NewRemoteRuntimeService(endpoint, runtime.DefaultTimeout)
107+
func InitRuntimeService(endpoint string, concurrency int) (runtime.RuntimeService, error) {
108+
return runtime.NewRemoteRuntimeService(endpoint, runtime.DefaultTimeout, concurrency)
109109
}
110110

111111
func InitImageService(endpoint string) (images.ImageManager, error) {
@@ -169,7 +169,7 @@ func (n *Dependencies) Complete(node *v1.Node, nodeConfig config.NodeConfigurati
169169

170170
// CRIRuntime
171171
if n.RuntimeService == nil {
172-
n.RuntimeService, err = InitRuntimeService(nodeConfig.ContainerRuntimeEndpoint)
172+
n.RuntimeService, err = InitRuntimeService(nodeConfig.ContainerRuntimeEndpoint, nodeConfig.PodConcurrency)
173173
if err != nil {
174174
klog.ErrorS(err, "Failed to init runtime service")
175175
return err

pkg/nodeagent/pod/pod_actor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ func (a *PodActor) podHouseKeeping() (err error) {
309309
}
310310

311311
switch {
312-
case pod.FornaxPodState == types.PodStateTerminating:
312+
case pod.FornaxPodState == types.PodStateTerminating || pod.FornaxPodState == types.PodStateFailed:
313313
err = a.terminate(true)
314314
case pod.RuntimePod == nil:
315315
// pod create failed to create a sandbox

pkg/nodeagent/runtime/runtime.go

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"time"
2424

2525
grpc_util "centaurusinfra.io/fornax-serverless/pkg/util"
26+
"golang.org/x/sync/semaphore"
2627

2728
"github.com/containerd/containerd"
2829
"github.com/containerd/containerd/api/services/tasks/v1"
@@ -42,6 +43,7 @@ var _ RuntimeService = &remoteRuntimeManager{}
4243
type remoteRuntimeManager struct {
4344
runtimeService criapi.RuntimeService
4445
containerdService *containerd.Client
46+
podConcurrency *semaphore.Weighted
4547
}
4648

4749
// GetPodSandbox implements RuntimeService
@@ -77,12 +79,14 @@ func (r *remoteRuntimeManager) GetRuntimeStatus() (*criv1.RuntimeStatus, error)
7779
return resp.GetStatus(), nil
7880
}
7981

80-
func (m *remoteRuntimeManager) CreateContainer(podSandboxID string, containerConfig *criv1.ContainerConfig, podSandboxConfig *criv1.PodSandboxConfig) (*Container, error) {
82+
func (r *remoteRuntimeManager) CreateContainer(podSandboxID string, containerConfig *criv1.ContainerConfig, podSandboxConfig *criv1.PodSandboxConfig) (*Container, error) {
83+
r.podConcurrency.Acquire(context.Background(), 1)
84+
defer r.podConcurrency.Release(1)
8185
klog.InfoS("Create container", "PodSandboxID", podSandboxID, "ContainerConfig", containerConfig)
8286

8387
var containerId string
8488
var err error
85-
containerId, err = m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig)
89+
containerId, err = r.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig)
8690
if err != nil {
8791
return nil, err
8892
}
@@ -93,7 +97,7 @@ func (m *remoteRuntimeManager) CreateContainer(podSandboxID string, containerCon
9397
}
9498

9599
var containers []*criv1.Container
96-
containers, err = m.runtimeService.ListContainers(&criv1.ContainerFilter{
100+
containers, err = r.runtimeService.ListContainers(&criv1.ContainerFilter{
97101
Id: containerId,
98102
PodSandboxId: podSandboxID,
99103
})
@@ -113,6 +117,8 @@ func (m *remoteRuntimeManager) CreateContainer(podSandboxID string, containerCon
113117
// CreateSandbox implements cri.RuntimeService
114118
// if RunPodSandbox succeeded but failed in following steps, return it still, it will be cleaned by node
115119
func (r *remoteRuntimeManager) CreateSandbox(sandboxConfig *criv1.PodSandboxConfig, runtimeClassName string) (*Pod, error) {
120+
r.podConcurrency.Acquire(context.Background(), 1)
121+
defer r.podConcurrency.Release(1)
116122
klog.InfoS("Run pod sandbox", "SandboxConfig", sandboxConfig)
117123
podSandBoxID, err := r.runtimeService.RunPodSandbox(sandboxConfig, runtimeClassName)
118124
if err != nil {
@@ -268,6 +274,8 @@ func (r *remoteRuntimeManager) GetPods(includeContainers bool) ([]*Pod, error) {
268274

269275
// StopContainer implements RuntimeService
270276
func (r *remoteRuntimeManager) StopContainer(containerID string, timeout time.Duration) error {
277+
r.podConcurrency.Acquire(context.Background(), 1)
278+
defer r.podConcurrency.Release(1)
271279
klog.InfoS("Stop container", "ContainerID", containerID)
272280

273281
err := r.runtimeService.StopContainer(containerID, int64(timeout.Seconds()))
@@ -280,6 +288,8 @@ func (r *remoteRuntimeManager) StopContainer(containerID string, timeout time.Du
280288

281289
// StartContainer implements cri.RuntimeService
282290
func (r *remoteRuntimeManager) StartContainer(containerID string) error {
291+
r.podConcurrency.Acquire(context.Background(), 1)
292+
defer r.podConcurrency.Release(1)
283293
klog.InfoS("Start container", "ContainerID", containerID)
284294

285295
err := r.runtimeService.StartContainer(containerID)
@@ -291,6 +301,8 @@ func (r *remoteRuntimeManager) StartContainer(containerID string) error {
291301

292302
// TerminateContainer implements cri.RuntimeService
293303
func (r *remoteRuntimeManager) TerminateContainer(containerID string) error {
304+
r.podConcurrency.Acquire(context.Background(), 1)
305+
defer r.podConcurrency.Release(1)
294306
klog.InfoS("Terminate container, stop immediately without gracePeriod", "ContainerID", containerID)
295307

296308
status, err := r.GetContainerStatus(containerID)
@@ -317,6 +329,8 @@ func (r *remoteRuntimeManager) TerminateContainer(containerID string) error {
317329

318330
// TerminatePod implements cri.RuntimeService
319331
func (r *remoteRuntimeManager) TerminatePod(podSandboxID string, containerIDs []string) error {
332+
r.podConcurrency.Acquire(context.Background(), 1)
333+
defer r.podConcurrency.Release(1)
320334
klog.InfoS("Terminate pod", "PodSandboxID", podSandboxID)
321335
var err error
322336
if len(containerIDs) == 0 {
@@ -327,7 +341,7 @@ func (r *remoteRuntimeManager) TerminatePod(podSandboxID string, containerIDs []
327341
for _, v := range containers {
328342
if v.State != criv1.ContainerState_CONTAINER_EXITED {
329343
klog.InfoS("Terminate container in pod", "PodSandboxID", podSandboxID, "ContainerID", v)
330-
err = r.TerminateContainer(v.GetId())
344+
err = r.runtimeService.StopContainer(v.GetId(), 0)
331345
if err != nil {
332346
return err
333347
}
@@ -430,7 +444,7 @@ func (r *remoteRuntimeManager) getPodContainers(podSandboxID string) ([]*criv1.C
430444
return containers, nil
431445
}
432446

433-
func NewRemoteRuntimeService(endpoint string, connectionTimeout time.Duration) (*remoteRuntimeManager, error) {
447+
func NewRemoteRuntimeService(endpoint string, connectionTimeout time.Duration, concurrency int) (*remoteRuntimeManager, error) {
434448
klog.InfoS("Connecting to runtime service", "endpoint", endpoint)
435449
remoteService, err := remote.NewRemoteRuntimeService(endpoint, connectionTimeout)
436450
if err != nil {
@@ -446,6 +460,7 @@ func NewRemoteRuntimeService(endpoint string, connectionTimeout time.Duration) (
446460
service := &remoteRuntimeManager{
447461
runtimeService: remoteService,
448462
containerdService: containerdClient,
463+
podConcurrency: semaphore.NewWeighted(int64(concurrency)),
449464
}
450465

451466
return service, nil

0 commit comments

Comments
 (0)