Skip to content

Commit d65555e

Browse files
authored
Merge pull request #5463 from khushboobhatia01/workflow_engine_graceful_shutdown
Workflow engine graceful shutdown
2 parents 2eb6eee + dea5f1e commit d65555e

File tree

8 files changed

+336
-5
lines changed

8 files changed

+336
-5
lines changed

CHANGELOG.rst

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,11 @@ Fixed
99

1010
* Fix redis SSL problems with sentinel #5660
1111

12+
Added
13+
~~~~~
14+
15+
* Added graceful shutdown for workflow engine. #5463
16+
Contributed by @khushboobhatia01
1217

1318
3.7.0 - May 05, 2022
1419
--------------------

conf/st2.conf.sample

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,8 @@ logging = /etc/st2/logging.timersengine.conf
363363
webui_base_url = https://localhost
364364

365365
[workflow_engine]
366+
# How long to wait for process (in seconds) to exit after receiving shutdown signal.
367+
exit_still_active_check = 300
366368
# Max seconds to allow workflow execution be idled before it is identified as orphaned and cancelled by the garbage collector. A value of zero means the feature is disabled. This is disabled by default.
367369
gc_max_idle_sec = 0
368370
# Location of the logging configuration file.
@@ -373,4 +375,6 @@ retry_max_jitter_msec = 1000
373375
retry_stop_max_msec = 60000
374376
# Interval inbetween retries.
375377
retry_wait_fixed_msec = 1000
378+
# Time interval between subsequent queries to check executions handled by WFE.
379+
still_active_check_interval = 2
376380

st2actions/st2actions/cmd/workflow_engine.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
__all__ = ["main"]
3838

3939
LOG = logging.getLogger(__name__)
40-
WORKFLOW_ENGINE = "workflow_engine"
4140

4241

4342
def setup_sigterm_handler(engine):
@@ -53,7 +52,7 @@ def sigterm_handler(signum=None, frame=None):
5352
def setup():
5453
capabilities = {"name": "workflowengine", "type": "passive"}
5554
common_setup(
56-
service=WORKFLOW_ENGINE,
55+
service=workflows.WORKFLOW_ENGINE,
5756
config=config,
5857
setup_db=True,
5958
register_mq_exchanges=True,
@@ -72,7 +71,7 @@ def run_server():
7271
engine.start(wait=True)
7372
except (KeyboardInterrupt, SystemExit):
7473
LOG.info("(PID=%s) Workflow engine stopped.", os.getpid())
75-
deregister_service(service=WORKFLOW_ENGINE)
74+
deregister_service(service=workflows.WORKFLOW_ENGINE)
7675
engine.shutdown()
7776
except:
7877
LOG.exception("(PID=%s) Workflow engine unexpectedly stopped.", os.getpid())

st2actions/st2actions/workflows/workflows.py

Lines changed: 72 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,22 +14,29 @@
1414
# limitations under the License.
1515

1616
from __future__ import absolute_import
17+
from oslo_config import cfg
1718

1819
from orquesta import statuses
19-
20+
from tooz.coordination import GroupNotCreated
21+
from st2common.services import coordination
22+
from eventlet.semaphore import Semaphore
23+
from eventlet import spawn_after
2024
from st2common.constants import action as ac_const
2125
from st2common import log as logging
2226
from st2common.metrics import base as metrics
2327
from st2common.models.db import execution as ex_db_models
2428
from st2common.models.db import workflow as wf_db_models
2529
from st2common.persistence import liveaction as lv_db_access
2630
from st2common.persistence import workflow as wf_db_access
31+
from st2common.persistence import execution as ex_db_access
32+
from st2common.services import action as ac_svc
2733
from st2common.services import policies as pc_svc
2834
from st2common.services import workflows as wf_svc
2935
from st2common.transport import consumers
3036
from st2common.transport import queues
3137
from st2common.transport import utils as txpt_utils
32-
38+
from st2common.util import concurrency
39+
from st2common.util import action_db as action_utils
3340

3441
LOG = logging.getLogger(__name__)
3542

@@ -40,10 +47,17 @@
4047
queues.WORKFLOW_ACTION_EXECUTION_UPDATE_QUEUE,
4148
]
4249

50+
WORKFLOW_ENGINE = "workflow_engine"
51+
WORKFLOW_ENGINE_START_STOP_SEQ = "workflow_engine_start_stop_seq"
52+
4353

4454
class WorkflowExecutionHandler(consumers.VariableMessageHandler):
4555
def __init__(self, connection, queues):
4656
super(WorkflowExecutionHandler, self).__init__(connection, queues)
57+
self._active_messages = 0
58+
self._semaphore = Semaphore()
59+
# This is required to ensure workflows stuck in pausing state after shutdown transition to paused state after engine startup.
60+
self._delay = 30
4761

4862
def handle_workflow_execution_with_instrumentation(wf_ex_db):
4963
with metrics.CounterWithTimer(key="orquesta.workflow.executions"):
@@ -78,13 +92,69 @@ def process(self, message):
7892
raise ValueError(msg)
7993

8094
try:
95+
with self._semaphore:
96+
self._active_messages += 1
8197
handler_function(message)
8298
except Exception as e:
8399
# If the exception is caused by DB connection error, then the following
84100
# error handling routine will fail as well because it will try to update
85101
# the database and fail the workflow execution gracefully. In this case,
86102
# the garbage collector will find and cancel these workflow executions.
87103
self.fail_workflow_execution(message, e)
104+
finally:
105+
with self._semaphore:
106+
self._active_messages -= 1
107+
108+
def start(self, wait):
109+
spawn_after(self._delay, self._resume_workflows_paused_during_shutdown)
110+
super(WorkflowExecutionHandler, self).start(wait=wait)
111+
112+
def shutdown(self):
113+
super(WorkflowExecutionHandler, self).shutdown()
114+
exit_timeout = cfg.CONF.workflow_engine.exit_still_active_check
115+
sleep_delay = cfg.CONF.workflow_engine.still_active_check_interval
116+
timeout = 0
117+
118+
while timeout < exit_timeout and self._active_messages > 0:
119+
concurrency.sleep(sleep_delay)
120+
timeout += sleep_delay
121+
122+
coordinator = coordination.get_coordinator()
123+
member_ids = []
124+
with coordinator.get_lock(WORKFLOW_ENGINE_START_STOP_SEQ):
125+
try:
126+
group_id = coordination.get_group_id(WORKFLOW_ENGINE)
127+
member_ids = list(coordinator.get_members(group_id).get())
128+
except GroupNotCreated:
129+
pass
130+
131+
# Check if there are other WFEs in service registry
132+
if cfg.CONF.coordination.service_registry and not member_ids:
133+
ac_ex_dbs = self._get_running_workflows()
134+
for ac_ex_db in ac_ex_dbs:
135+
lv_ac = action_utils.get_liveaction_by_id(ac_ex_db.liveaction["id"])
136+
ac_svc.request_pause(lv_ac, WORKFLOW_ENGINE_START_STOP_SEQ)
137+
138+
def _get_running_workflows(self):
139+
query_filters = {
140+
"runner__name": "orquesta",
141+
"status": ac_const.LIVEACTION_STATUS_RUNNING,
142+
}
143+
return ex_db_access.ActionExecution.query(**query_filters)
144+
145+
def _get_workflows_paused_during_shutdown(self):
146+
query_filters = {
147+
"status": ac_const.LIVEACTION_STATUS_PAUSED,
148+
"context__paused_by": WORKFLOW_ENGINE_START_STOP_SEQ,
149+
}
150+
return lv_db_access.LiveAction.query(**query_filters)
151+
152+
def _resume_workflows_paused_during_shutdown(self):
153+
coordinator = coordination.get_coordinator()
154+
with coordinator.get_lock(WORKFLOW_ENGINE_START_STOP_SEQ):
155+
lv_ac_dbs = self._get_workflows_paused_during_shutdown()
156+
for lv_ac_db in lv_ac_dbs:
157+
ac_svc.request_resume(lv_ac_db, WORKFLOW_ENGINE_START_STOP_SEQ)
88158

89159
def fail_workflow_execution(self, message, exception):
90160
# Prepare attributes based on message type.

0 commit comments

Comments
 (0)