1414# limitations under the License.
1515
1616from __future__ import absolute_import
17+ from oslo_config import cfg
1718
1819from orquesta import statuses
19-
20+ from tooz .coordination import GroupNotCreated
21+ from st2common .services import coordination
22+ from eventlet .semaphore import Semaphore
23+ from eventlet import spawn_after
2024from st2common .constants import action as ac_const
2125from st2common import log as logging
2226from st2common .metrics import base as metrics
2327from st2common .models .db import execution as ex_db_models
2428from st2common .models .db import workflow as wf_db_models
2529from st2common .persistence import liveaction as lv_db_access
2630from st2common .persistence import workflow as wf_db_access
31+ from st2common .persistence import execution as ex_db_access
32+ from st2common .services import action as ac_svc
2733from st2common .services import policies as pc_svc
2834from st2common .services import workflows as wf_svc
2935from st2common .transport import consumers
3036from st2common .transport import queues
3137from st2common .transport import utils as txpt_utils
32-
38+ from st2common .util import concurrency
39+ from st2common .util import action_db as action_utils
3340
3441LOG = logging .getLogger (__name__ )
3542
4047 queues .WORKFLOW_ACTION_EXECUTION_UPDATE_QUEUE ,
4148]
4249
50+ WORKFLOW_ENGINE = "workflow_engine"
51+ SHUTDOWN_ROUTINE = "shutdown_routine"
52+
4353
4454class WorkflowExecutionHandler (consumers .VariableMessageHandler ):
4555 def __init__ (self , connection , queues ):
4656 super (WorkflowExecutionHandler , self ).__init__ (connection , queues )
57+ self ._active_messages = 0
58+ self ._semaphore = Semaphore ()
4759
4860 def handle_workflow_execution_with_instrumentation (wf_ex_db ):
4961 with metrics .CounterWithTimer (key = "orquesta.workflow.executions" ):
@@ -62,6 +74,10 @@ def handle_action_execution_with_instrumentation(ac_ex_db):
6274 ex_db_models .ActionExecutionDB : handle_action_execution_with_instrumentation ,
6375 }
6476
77+ # This is required to ensure workflows stuck in pausing state after shutdown transition to paused state after engine startup.
78+ self ._delay = 30
79+ spawn_after (self ._delay , self ._resume_workflows_paused_during_shutdown )
80+
6581 def get_queue_consumer (self , connection , queues ):
6682 # We want to use a special ActionsQueueConsumer which uses 2 dispatcher pools
6783 return consumers .VariableMessageQueueConsumer (
@@ -78,13 +94,61 @@ def process(self, message):
7894 raise ValueError (msg )
7995
8096 try :
97+ with self ._semaphore :
98+ self ._active_messages += 1
8199 handler_function (message )
82100 except Exception as e :
83101 # If the exception is caused by DB connection error, then the following
84102 # error handling routine will fail as well because it will try to update
85103 # the database and fail the workflow execution gracefully. In this case,
86104 # the garbage collector will find and cancel these workflow executions.
87105 self .fail_workflow_execution (message , e )
106+ finally :
107+ with self ._semaphore :
108+ self ._active_messages -= 1
109+
110+ def shutdown (self ):
111+ super (WorkflowExecutionHandler , self ).shutdown ()
112+ while self ._active_messages > 0 :
113+ concurrency .sleep (2 )
114+
115+ coordinator = coordination .get_coordinator ()
116+ member_ids = []
117+ with coordinator .get_lock (SHUTDOWN_ROUTINE ):
118+ try :
119+ member_ids = list (
120+ coordinator .get_members (WORKFLOW_ENGINE .encode ("utf-8" )).get ()
121+ )
122+ except GroupNotCreated :
123+ pass
124+
125+ # Check if there are other runners in service registry
126+ if cfg .CONF .coordination .service_registry and not member_ids :
127+ ac_ex_dbs = self ._get_running_workflows ()
128+ for ac_ex_db in ac_ex_dbs :
129+ lv_ac = action_utils .get_liveaction_by_id (ac_ex_db .liveaction ["id" ])
130+ ac_svc .request_pause (lv_ac , SHUTDOWN_ROUTINE )
131+
132+ def _get_running_workflows (self ):
133+ query_filters = {
134+ "runner__name" : "orquesta" ,
135+ "status" : ac_const .LIVEACTION_STATUS_RUNNING ,
136+ }
137+ return ex_db_access .ActionExecution .query (** query_filters )
138+
139+ def _get_workflows_paused_during_shutdown (self ):
140+ query_filters = {
141+ "status" : ac_const .LIVEACTION_STATUS_PAUSED ,
142+ "context__paused_by" : SHUTDOWN_ROUTINE ,
143+ }
144+ return lv_db_access .LiveAction .query (** query_filters )
145+
146+ def _resume_workflows_paused_during_shutdown (self ):
147+ coordinator = coordination .get_coordinator ()
148+ with coordinator .get_lock (SHUTDOWN_ROUTINE ):
149+ lv_ac_dbs = self ._get_workflows_paused_during_shutdown ()
150+ for lv_ac_db in lv_ac_dbs :
151+ ac_svc .request_resume (lv_ac_db , SHUTDOWN_ROUTINE )
88152
89153 def fail_workflow_execution (self , message , exception ):
90154 # Prepare attributes based on message type.
0 commit comments