Skip to content

Commit a5dfb1b

Browse files
authored
fix session open close handling bug (#150)
* handle session open and close failure * handle session open and close failure * amend * amend
1 parent cfa5d41 commit a5dfb1b

File tree

10 files changed

+215
-204
lines changed

10 files changed

+215
-204
lines changed

cmd/fornaxtest/app/test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ var (
127127
"cpu": util.ResourceQuantity(0.5*1000, v1.ResourceCPU),
128128
},
129129
Requests: map[v1.ResourceName]resource.Quantity{
130-
"memory": util.ResourceQuantity(100*1024*1024, v1.ResourceMemory),
130+
"memory": util.ResourceQuantity(50*1024*1024, v1.ResourceMemory),
131131
"cpu": util.ResourceQuantity(0.01*1000, v1.ResourceCPU),
132132
},
133133
},

pkg/fornaxcore/application/application_manager.go

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -366,30 +366,32 @@ func (am *ApplicationManager) syncApplication(ctx context.Context, applicationKe
366366
// 2, find how many more pods required for remaining pending sessions
367367
if syncErr == nil {
368368
sessionSummary := pool.summarySession()
369-
numOfOccupiedPod, numOfPendingPod, numOfIdlePod := pool.activePodNums()
370-
numOfUnoccupiedPod := numOfPendingPod + numOfIdlePod
369+
numOfAllocatedPod, numOfPendingPod, numOfIdlePod := pool.activePodNums()
370+
numOfUnAllocatedPod := numOfPendingPod + numOfIdlePod
371371
numOfPendingSession := sessionSummary.pendingCount
372-
numOfDesiredUnoccupiedPod := am.calculateDesiredIdlePods(application, numOfOccupiedPod, numOfUnoccupiedPod, numOfPendingSession)
373-
numOfDesiredPod = numOfOccupiedPod + numOfDesiredUnoccupiedPod
374-
klog.InfoS("Syncing application pod", "application", applicationKey, "pending-sessions", numOfPendingSession, "active-pods", numOfOccupiedPod+numOfUnoccupiedPod, "pending-pods", numOfPendingPod, "idle-pods", numOfIdlePod, "desired-pending+idle-pods", numOfDesiredUnoccupiedPod)
375-
if numOfDesiredUnoccupiedPod > numOfUnoccupiedPod {
372+
numOfDesiredUnAllocatedPod := am.calculateDesiredIdlePods(application, numOfAllocatedPod, numOfUnAllocatedPod, numOfPendingSession)
373+
numOfDesiredPod = numOfAllocatedPod + numOfDesiredUnAllocatedPod
374+
klog.InfoS("Syncing application pod", "application", applicationKey, "pending-sessions", numOfPendingSession, "active-pods", numOfAllocatedPod+numOfUnAllocatedPod, "pending-pods", numOfPendingPod, "idle-pods", numOfIdlePod, "desired-pending+idle-pods", numOfDesiredUnAllocatedPod)
375+
if numOfDesiredUnAllocatedPod > numOfUnAllocatedPod {
376376
action = fornaxv1.DeploymentActionCreateInstance
377-
} else if numOfDesiredUnoccupiedPod < numOfUnoccupiedPod {
377+
} else if numOfDesiredUnAllocatedPod < numOfUnAllocatedPod {
378378
action = fornaxv1.DeploymentActionDeleteInstance
379379
}
380380
// pending session will need pods immediately, the rest of pods can be created as a standby pod
381-
desiredAddition := numOfDesiredUnoccupiedPod - numOfUnoccupiedPod
381+
desiredAddition := numOfDesiredUnAllocatedPod - numOfUnAllocatedPod
382382
syncErr = am.deployApplicationPods(pool, application, desiredAddition)
383-
384-
// take care of timeout and deleting pods
385-
am.pruneDeadPods(pool)
386383
}
387384
} else {
388385
numOfDesiredPod = 0
389386
action = fornaxv1.DeploymentActionDeleteInstance
390387
syncErr = am.cleanupDeletedApplication(pool)
388+
// numOfAllocatedPod, numOfPendingPod, numOfIdlePod := pool.activePodNums()
389+
// desiredAddition := 0 - (numOfIdlePod + numOfPendingPod + numOfAllocatedPod)
390+
// syncErr = am.deployApplicationPods(pool, application, desiredAddition)
391391
}
392392

393+
// take care of timeout and deleting pods
394+
am.pruneDeadPods(pool)
393395
newStatus := am.calculateStatus(pool, application, numOfDesiredPod, action, syncErr)
394396
am.applicationStatusManager.UpdateApplicationStatus(application, newStatus)
395397
}

pkg/fornaxcore/application/application_pod.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,10 @@ func (am *ApplicationManager) onPodEventFromNode(podEvent *ie.PodEvent) {
8181
}
8282

8383
// When a pod is created or updated, add this pod reference to app pods pool
84+
// set pod state from pending => idle if pod is running successfully on node
85+
// set pod state from idle => allocated if pod has a session on node
86+
// request node to terminate pod if application not found
87+
// retry deleting pod if pod already moved to deleting queue, in this case, node did not received termination request somehow
8488
func (am *ApplicationManager) handlePodAddUpdateFromNode(pod *v1.Pod) {
8589
podName := util.Name(pod)
8690
applicationKey, err := am.getPodApplicationKey(pod)
@@ -92,20 +96,17 @@ func (am *ApplicationManager) handlePodAddUpdateFromNode(pod *v1.Pod) {
9296
}
9397

9498
if len(applicationKey) == 0 {
95-
klog.InfoS("Pod does not belong to any application, terminated it", "pod", podName, "labels", pod.GetLabels())
99+
klog.InfoS("Pod does not belong to any application, terminate it", "pod", podName, "labels", pod.GetLabels())
96100
am.podManager.TerminatePod(podName)
97101
return
98102
} else {
99103
pool := am.getOrCreateApplicationPool(applicationKey)
100104
ap := pool.getPod(podName)
101105
if ap != nil && ap.state == PodStateDeleting {
102-
// this pod was requested to terminate, and node did not receive termination or failed to do it, try it again
103106
am.deleteApplicationPod(pool, ap.podName)
104107
return
105108
}
106109
if ap != nil && ap.state == PodStateAllocated {
107-
// this pod is assigned to session by FornaxCore, but node have not report back yet, or message lost, skip
108-
// after session setup timeout, this pod will be released
109110
return
110111
}
111112
if util.PodIsPending(pod) {
@@ -127,7 +128,8 @@ func (am *ApplicationManager) handlePodAddUpdateFromNode(pod *v1.Pod) {
127128
am.enqueueApplication(applicationKey)
128129
}
129130

130-
// When a pod is deleted, find application that manages it and remove pod reference from its pod pool
131+
// When a pod is deleted on node, find application that manages it and remove pod reference from its pod pool
132+
// if pod has a session associated, cleanup sessions
131133
func (am *ApplicationManager) handlePodDeleteFromNode(pod *v1.Pod) {
132134
podName := util.Name(pod)
133135
if pod.DeletionTimestamp == nil {
@@ -155,27 +157,28 @@ func (am *ApplicationManager) handlePodDeleteFromNode(pod *v1.Pod) {
155157
am.cleanupSessionOnDeletedPod(pool, podName)
156158
pool.deletePod(podName)
157159
}
158-
// enqueue application to evaluate application status
159160
am.enqueueApplication(applicationKey)
160161
}
161162

163+
// move pod to deleting state and request node to terminate pod
162164
func (am *ApplicationManager) deleteApplicationPod(pool *ApplicationPool, podName string) error {
163165
podState := pool.getPod(podName)
164166
if podState == nil {
165167
return nil
166168
}
167169

170+
// reset pod deletiontimestamp and retry if deletion timeout
168171
if podState.state == PodStateDeleting {
169172
pod := am.podManager.FindPod(podName)
170173
if pod != nil && pod.DeletionTimestamp != nil && pod.DeletionTimestamp.Time.Before(time.Now().Add(-1*DefaultPodDeletingTimeoutDuration)) {
171-
// reset pod deletiontimestamp and retry if deletion timeout
172174
pod.DeletionTimestamp = nil
173175
} else {
174176
return nil
175177
}
178+
} else {
179+
pool.addOrUpdatePod(podName, PodStateDeleting, []string{})
176180
}
177181

178-
pool.addOrUpdatePod(podName, PodStateDeleting, []string{})
179182
err := am.podManager.TerminatePod(podName)
180183
if err != nil {
181184
if err == fornaxpod.PodNotFoundError {

pkg/fornaxcore/application/application_pool.go

Lines changed: 64 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -77,47 +77,57 @@ func (pool *ApplicationPool) _getPodNoLock(podName string) *ApplicationPod {
7777
return nil
7878
}
7979

80+
func (pool *ApplicationPool) podStateTransitionAllowed(oldState, newState ApplicationPodState) bool {
81+
if oldState == newState {
82+
return true
83+
} else if oldState == PodStatePending {
84+
return true
85+
} else if oldState == PodStateIdle && newState != PodStatePending {
86+
return true
87+
} else if oldState == PodStateAllocated && newState != PodStatePending {
88+
return true
89+
} else if oldState == PodStateDeleting && newState == PodStateDeleting {
90+
return true
91+
}
92+
return false
93+
}
94+
8095
// find pod in a state map, move it to different state map and add session bundle on it
81-
func (pool *ApplicationPool) addOrUpdatePod(podName string, podState ApplicationPodState, sessionIds []string) *ApplicationPod {
96+
func (pool *ApplicationPool) addOrUpdatePod(podName string, podState ApplicationPodState, sessionNames []string) *ApplicationPod {
8297
pool.mu.Lock()
8398
defer pool.mu.Unlock()
8499
if p := pool._getPodNoLock(podName); p != nil {
85-
// pod state should not be reverted to avoid race condition
86-
if p.state == PodStateDeleting && podState != PodStateDeleting {
87-
// do not change a pod state already marked as deleting pods
88-
return p
89-
}
90-
if p.state != PodStatePending && podState == PodStatePending {
91-
// do not change a pod state to pending if pod is not pending anymore
100+
if !pool.podStateTransitionAllowed(p.state, podState) {
92101
return p
93102
}
94103
}
95-
return pool._addOrUpdatePodNoLock(podName, podState, sessionIds)
104+
return pool._addOrUpdatePodNoLock(podName, podState, sessionNames)
96105
}
97106

98-
func (pool *ApplicationPool) _addOrUpdatePodNoLock(podName string, podState ApplicationPodState, sessionIds []string) *ApplicationPod {
107+
// move pod from a state bucket to new state bucket and update its session map
108+
func (pool *ApplicationPool) _addOrUpdatePodNoLock(podName string, podNewState ApplicationPodState, sessionNames []string) *ApplicationPod {
99109
for _, pods := range pool.podsByState {
100110
if p, f := pods[podName]; f {
101-
for _, v := range sessionIds {
111+
for _, v := range sessionNames {
102112
p.sessions[v] = true
103113
}
104-
if p.state == podState {
114+
if p.state == podNewState {
105115
return p
106116
} else {
107-
p.state = podState
108-
pool.podsByState[podState][podName] = p
117+
p.state = podNewState
118+
pool.podsByState[podNewState][podName] = p
109119
delete(pods, podName)
110120
return p
111121
}
112122
}
113123
}
114124

115125
// not found, add it
116-
p := NewApplicationPod(podName, podState)
117-
for _, v := range sessionIds {
126+
p := NewApplicationPod(podName, podNewState)
127+
for _, v := range sessionNames {
118128
p.sessions[v] = true
119129
}
120-
pool.podsByState[podState][podName] = p
130+
pool.podsByState[podNewState][podName] = p
121131
return p
122132
}
123133

@@ -220,32 +230,8 @@ func (pool *ApplicationPool) summarySession() ApplicationSessionSummary {
220230
summary.deletingCount = len(pool.sessions[SessionStateDeleting])
221231
summary.runningCount = len(pool.sessions[SessionStateRunning])
222232
summary.startingCount = len(pool.sessions[SessionStateStarting])
233+
summary.pendingCount = len(pool.sessions[SessionStatePending])
223234

224-
for _, s := range pool.sessions[SessionStatePending] {
225-
timeoutDuration := DefaultSessionOpenTimeoutDuration
226-
if s.session.Spec.OpenTimeoutSeconds > 0 {
227-
timeoutDuration = time.Duration(s.session.Spec.OpenTimeoutSeconds) * time.Second
228-
}
229-
pendingTimeoutTimeStamp := time.Now().Add(-1 * timeoutDuration)
230-
if s.session.CreationTimestamp.Time.Before(pendingTimeoutTimeStamp) {
231-
summary.timeoutCount += 1
232-
} else {
233-
summary.pendingCount += 1
234-
}
235-
}
236-
237-
for _, s := range pool.sessions[SessionStateStarting] {
238-
timeoutDuration := DefaultSessionOpenTimeoutDuration
239-
if s.session.Spec.OpenTimeoutSeconds > 0 {
240-
timeoutDuration = time.Duration(s.session.Spec.OpenTimeoutSeconds) * time.Second
241-
}
242-
pendingTimeoutTimeStamp := time.Now().Add(-1 * timeoutDuration)
243-
if s.session.CreationTimestamp.Time.Before(pendingTimeoutTimeStamp) {
244-
summary.timeoutCount += 1
245-
} else {
246-
summary.startingCount += 1
247-
}
248-
}
249235
return summary
250236
}
251237

@@ -265,59 +251,74 @@ func (pool *ApplicationPool) _getSessionNoLock(key string) *ApplicationSession {
265251
return nil
266252
}
267253

268-
func (pool *ApplicationPool) addSession(sessionId string, session *fornaxv1.ApplicationSession) {
254+
func (pool *ApplicationPool) addSession(sessionName string, session *fornaxv1.ApplicationSession) {
269255
pool.mu.Lock()
270256
defer pool.mu.Unlock()
271257
newState := SessionStatePending
272-
if session.DeletionTimestamp != nil {
258+
if session.DeletionTimestamp != nil || util.SessionIsClosing(session) {
273259
newState = SessionStateDeleting
274-
} else if util.SessionIsOpen(session) {
275-
newState = SessionStateRunning
276260
} else if util.SessionIsStarting(session) {
277261
newState = SessionStateStarting
278262
} else if util.SessionIsPending(session) {
279263
newState = SessionStatePending
280-
} else if util.SessionIsClosing(session) {
281-
newState = SessionStateDeleting
264+
} else if util.SessionIsOpen(session) {
265+
newState = SessionStateRunning
282266
} else {
283-
// do not add a terminal state session, instead of deleting and return
284267
pool._deleteSessionNoLock(session)
285268
return
286269
}
287270

288-
s := pool._getSessionNoLock(sessionId)
271+
s := pool._getSessionNoLock(sessionName)
289272
if s != nil {
290-
if newState != s.state {
291-
delete(pool.sessions[s.state], sessionId)
273+
if pool.sessionStateTransitionAllowed(s.state, newState) {
274+
delete(pool.sessions[s.state], sessionName)
275+
} else {
276+
return
292277
}
293278
}
294279

295280
// update pool with new state
296-
pool.sessions[newState][sessionId] = &ApplicationSession{
281+
pool.sessions[newState][sessionName] = &ApplicationSession{
297282
session: session,
298283
state: newState,
299284
}
300285
if session.Status.PodReference != nil {
301286
podName := session.Status.PodReference.Name
302-
pool._addOrUpdatePodNoLock(podName, PodStateAllocated, []string{string(session.GetUID())})
287+
pool._addOrUpdatePodNoLock(podName, PodStateAllocated, []string{sessionName})
303288
}
304289
}
305290

291+
func (pool *ApplicationPool) sessionStateTransitionAllowed(oldState, newState ApplicationSessionState) bool {
292+
if oldState == newState {
293+
return true
294+
} else if oldState == SessionStatePending {
295+
return true
296+
} else if oldState == SessionStateStarting && newState != SessionStatePending {
297+
return true
298+
} else if oldState == SessionStateRunning && newState != SessionStatePending && newState != SessionStateStarting {
299+
return true
300+
} else if oldState == SessionStateDeleting && newState != SessionStatePending && newState != SessionStateStarting && newState != SessionStateRunning {
301+
return true
302+
}
303+
return false
304+
}
305+
306306
func (pool *ApplicationPool) deleteSession(session *fornaxv1.ApplicationSession) {
307307
pool.mu.Lock()
308308
defer pool.mu.Unlock()
309309
pool._deleteSessionNoLock(session)
310310
}
311311

312+
// delete a session from application pool, and delete it from referenced pod's session map, and change pod state back to idle state,
313+
// only allow from allocated => idle when delete a session from this pod, pod is in pending/deleting state should keep its state
312314
func (pool *ApplicationPool) _deleteSessionNoLock(session *fornaxv1.ApplicationSession) {
313-
sessionId := string(session.GetUID())
315+
sessionName := util.Name(session)
314316
if session.Status.PodReference != nil {
315317
podName := session.Status.PodReference.Name
316318
for _, podsOfState := range pool.podsByState {
317319
if pod, found := podsOfState[podName]; found {
318-
delete(pod.sessions, sessionId)
320+
delete(pod.sessions, sessionName)
319321
if len(pod.sessions) == 0 && pod.state == PodStateAllocated {
320-
// only allow from allocated => idle when delete a session from this pod, pod is in pending/deleting state should keep its state
321322
delete(podsOfState, podName)
322323
pod.state = PodStateIdle
323324
pool.podsByState[PodStateIdle][podName] = pod
@@ -327,14 +328,14 @@ func (pool *ApplicationPool) _deleteSessionNoLock(session *fornaxv1.ApplicationS
327328
}
328329
}
329330
for _, v := range pool.sessions {
330-
delete(v, sessionId)
331+
delete(v, sessionName)
331332
}
332333
}
333334

334335
// getNonRunningSessions return a list of session of different states,
335-
// pending, not assigned to pod yet
336-
// deleting, delete requested
337-
// timeout, session timedout to get a pod, or session assigned to node, but timeout to get session state from node
336+
// 1/ pending, not assigned to pod yet
337+
// 2/ deleting, delete requested
338+
// 3/ timeout, session timedout to get a pod, or session assigned to node, but timeout to get session state from node
338339
func (pool *ApplicationPool) getNonRunningSessions() (pendingSessions, deletingSessions, timeoutSessions []*ApplicationSession) {
339340
pool.mu.RLock()
340341
defer pool.mu.RUnlock()
@@ -360,8 +361,6 @@ func (pool *ApplicationPool) getNonRunningSessions() (pendingSessions, deletingS
360361
pendingTimeoutTimeStamp := time.Now().Add(-1 * timeoutDuration)
361362
if s.session.CreationTimestamp.Time.Before(pendingTimeoutTimeStamp) {
362363
timeoutSessions = append(timeoutSessions, s)
363-
} else {
364-
pendingSessions = append(pendingSessions, s)
365364
}
366365
}
367366

@@ -375,13 +374,13 @@ func (pool *ApplicationPool) getNonRunningSessions() (pendingSessions, deletingS
375374
// add active session into application's session pool and delete terminal session from pool
376375
// add session/delete session will update pod state according pod's session usage
377376
func updateSessionPool(pool *ApplicationPool, session *fornaxv1.ApplicationSession) {
378-
sessionId := string(session.GetUID())
377+
sessionName := util.Name(session)
379378
if util.SessionInTerminalState(session) {
380379
pool.deleteSession(session)
381380
} else {
382381
// a trick to make sure pending session are sorted using micro second, api server truncate creation timestamp to second
383382
session.CreationTimestamp = *util.NewCurrentMetaTime()
384-
pool.addSession(sessionId, session)
383+
pool.addSession(sessionName, session)
385384
}
386385
}
387386

0 commit comments

Comments
 (0)