@@ -413,10 +413,70 @@ def test_workflow_engine_shutdown_with_service_registry_disabled(self):
413413 "get_lock" ,
414414 mock .MagicMock (return_value = coordination_service .NoOpLock (name = "noop" )),
415415 )
416- def test_workflow_engine_concurrent_shutdown_and_start (self ):
416+ def test_workflow_engine_shutdown_first_then_start (self ):
417417 cfg .CONF .set_override (
418418 name = "service_registry" , override = True , group = "coordination"
419419 )
420+ cfg .CONF .set_override (
421+ name = "exit_still_active_check" , override = 0 , group = "workflow_engine"
422+ )
423+ wf_meta = self .get_wf_fixture_meta_data (TEST_PACK_PATH , "sequential.yaml" )
424+ lv_ac_db = lv_db_models .LiveActionDB (action = wf_meta ["name" ])
425+ lv_ac_db , ac_ex_db = action_service .request (lv_ac_db )
426+
427+ # Assert action execution is running.
428+ lv_ac_db = lv_db_access .LiveAction .get_by_id (str (lv_ac_db .id ))
429+ self .assertEqual (lv_ac_db .status , action_constants .LIVEACTION_STATUS_RUNNING )
430+ wf_ex_db = wf_db_access .WorkflowExecution .query (
431+ action_execution = str (ac_ex_db .id )
432+ )[0 ]
433+ self .assertEqual (wf_ex_db .status , action_constants .LIVEACTION_STATUS_RUNNING )
434+ workflow_engine = workflows .get_engine ()
435+
436+ workflow_engine ._delay = 5
437+ # Initiate shutdown first
438+ eventlet .spawn (workflow_engine .shutdown )
439+ eventlet .spawn_after (1 , workflow_engine .start , True )
440+
441+ # Sleep for few seconds to ensure shutdown sequence completes.
442+ eventlet .sleep (2 )
443+ lv_ac_db = lv_db_access .LiveAction .get_by_id (str (lv_ac_db .id ))
444+
445+ # Shutdown routine acquires the lock first
446+ self .assertEqual (lv_ac_db .status , action_constants .LIVEACTION_STATUS_PAUSING )
447+ # Process task1
448+ query_filters = {"workflow_execution" : str (wf_ex_db .id ), "task_id" : "task1" }
449+ t1_ex_db = wf_db_access .TaskExecution .query (** query_filters )[0 ]
450+ t1_ac_ex_db = ex_db_access .ActionExecution .query (
451+ task_execution = str (t1_ex_db .id )
452+ )[0 ]
453+
454+ workflows .get_engine ().process (t1_ac_ex_db )
455+ # Startup sequence won't proceed until shutdown routine completes.
456+ # Assuming shutdown sequence is complete, start up sequence will resume the workflow.
457+ eventlet .sleep (workflow_engine ._delay + 5 )
458+ lv_ac_db = lv_db_access .LiveAction .get_by_id (str (lv_ac_db .id ))
459+ self .assertTrue (
460+ lv_ac_db .status
461+ in [
462+ action_constants .LIVEACTION_STATUS_RESUMING ,
463+ action_constants .LIVEACTION_STATUS_RUNNING ,
464+ action_constants .LIVEACTION_STATUS_SUCCEEDED ,
465+ ]
466+ )
467+
468+ @mock .patch .object (
469+ coordination_service .NoOpDriver ,
470+ "get_lock" ,
471+ mock .MagicMock (return_value = coordination_service .NoOpLock (name = "noop" )),
472+ )
473+ def test_workflow_engine_start_first_then_shutdown (self ):
474+ cfg .CONF .set_override (
475+ name = "service_registry" , override = True , group = "coordination"
476+ )
477+ cfg .CONF .set_override (
478+ name = "exit_still_active_check" , override = 0 , group = "workflow_engine"
479+ )
420480 wf_meta = self .get_wf_fixture_meta_data (TEST_PACK_PATH , "sequential.yaml" )
421481 lv_ac_db = lv_db_models .LiveActionDB (action = wf_meta ["name" ])
422482 lv_ac_db , ac_ex_db = action_service .request (lv_ac_db )
@@ -431,41 +491,17 @@ def test_workflow_engine_concurrent_shutdown_and_start(self):
431491 workflow_engine = workflows .get_engine ()
432492
433493 workflow_engine ._delay = 0
434- # Start and stop WFE simultaneously.
494+ # Initiate start first
495+ eventlet .spawn (workflow_engine .start , True )
435496 eventlet .spawn_after (1 , workflow_engine .shutdown )
436- eventlet .spawn_after (1 , workflow_engine .start , True )
497+
498+ coordination_service .NoOpDriver .get_members = mock .MagicMock (
499+ return_value = coordination_service .NoOpAsyncResult ("member-1" )
500+ )
437501
438502 lv_ac_db = lv_db_access .LiveAction .get_by_id (str (lv_ac_db .id ))
439503
440- # Case 1: Shutdown routine acquires the lock first
441- # Case 2: Startup routine acquires the lock first
442- if lv_ac_db .status == action_constants .LIVEACTION_STATUS_PAUSING :
443- # Process task1
444- query_filters = {"workflow_execution" : str (wf_ex_db .id ), "task_id" : "task1" }
445- t1_ex_db = wf_db_access .TaskExecution .query (** query_filters )[0 ]
446- t1_ac_ex_db = ex_db_access .ActionExecution .query (
447- task_execution = str (t1_ex_db .id )
448- )[0 ]
449-
450- workflows .get_engine ().process (t1_ac_ex_db )
451- # Startup sequence won't proceed until shutdown routine completes.
452- # Assuming shutdown sequence is complete, start up sequence will resume the workflow.
453- eventlet .sleep (workflow_engine ._delay + 5 )
454- lv_ac_db = lv_db_access .LiveAction .get_by_id (str (lv_ac_db .id ))
455- self .assertTrue (
456- lv_ac_db .status
457- in [
458- action_constants .LIVEACTION_STATUS_RESUMING ,
459- action_constants .LIVEACTION_STATUS_RUNNING ,
460- action_constants .LIVEACTION_STATUS_SUCCEEDED ,
461- ]
462- )
463- else :
464- coordination_service .NoOpDriver .get_members = mock .MagicMock (
465- return_value = coordination_service .NoOpAsyncResult ("member-1" )
466- )
467- eventlet .sleep (workflow_engine ._delay + 5 )
468- lv_ac_db = lv_db_access .LiveAction .get_by_id (str (lv_ac_db .id ))
469- self .assertEqual (
470- lv_ac_db .status , action_constants .LIVEACTION_STATUS_RUNNING
471- )
504+ # Startup routine acquires the lock first and shutdown routine sees a new member present in registry.
505+ eventlet .sleep (workflow_engine ._delay + 5 )
506+ lv_ac_db = lv_db_access .LiveAction .get_by_id (str (lv_ac_db .id ))
507+ self .assertEqual (lv_ac_db .status , action_constants .LIVEACTION_STATUS_RUNNING )
0 commit comments