Skip to content

Commit c8cc833

Browse files
author
Khushboo
committed
Address review comments
1 parent be8d2aa commit c8cc833

File tree

6 files changed

+189
-20
lines changed

6 files changed

+189
-20
lines changed

conf/st2.conf.sample

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,8 @@ logging = /etc/st2/logging.timersengine.conf
363363
webui_base_url = https://localhost
364364

365365
[workflow_engine]
366+
# How long to wait for process (in seconds) to exit after receiving shutdown signal.
367+
exit_still_active_check = 300
366368
# Max seconds to allow workflow execution be idled before it is identified as orphaned and cancelled by the garbage collector. A value of zero means the feature is disabled. This is disabled by default.
367369
gc_max_idle_sec = 0
368370
# Location of the logging configuration file.
@@ -373,4 +375,6 @@ retry_max_jitter_msec = 1000
373375
retry_stop_max_msec = 60000
374376
# Interval inbetween retries.
375377
retry_wait_fixed_msec = 1000
378+
# Time interval between subsequent queries to check executions handled by WFE.
379+
still_active_check_interval = 2
376380

st2actions/st2actions/cmd/workflow_engine.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
__all__ = ["main"]
3838

3939
LOG = logging.getLogger(__name__)
40-
WORKFLOW_ENGINE = "workflow_engine"
40+
WORKFLOW_ENGINE = workflows.WORKFLOW_ENGINE
4141

4242

4343
def setup_sigterm_handler(engine):

st2actions/st2actions/workflows/workflows.py

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -48,14 +48,16 @@
4848
]
4949

5050
WORKFLOW_ENGINE = "workflow_engine"
51-
SHUTDOWN_ROUTINE = "shutdown_routine"
51+
WORKFLOW_ENGINE_START_STOP_SEQ = "workflow_engine_start_stop_seq"
5252

5353

5454
class WorkflowExecutionHandler(consumers.VariableMessageHandler):
5555
def __init__(self, connection, queues):
5656
super(WorkflowExecutionHandler, self).__init__(connection, queues)
5757
self._active_messages = 0
5858
self._semaphore = Semaphore()
59+
# This is required to ensure workflows stuck in pausing state after shutdown transition to paused state after engine startup.
60+
self._delay = 30
5961

6062
def handle_workflow_execution_with_instrumentation(wf_ex_db):
6163
with metrics.CounterWithTimer(key="orquesta.workflow.executions"):
@@ -74,10 +76,6 @@ def handle_action_execution_with_instrumentation(ac_ex_db):
7476
ex_db_models.ActionExecutionDB: handle_action_execution_with_instrumentation,
7577
}
7678

77-
# This is required to ensure workflows stuck in pausing state after shutdown transition to paused state after engine startup.
78-
self._delay = 30
79-
spawn_after(self._delay, self._resume_workflows_paused_during_shutdown)
80-
8179
def get_queue_consumer(self, connection, queues):
8280
# We want to use a special ActionsQueueConsumer which uses 2 dispatcher pools
8381
return consumers.VariableMessageQueueConsumer(
@@ -107,27 +105,35 @@ def process(self, message):
107105
with self._semaphore:
108106
self._active_messages -= 1
109107

108+
def start(self, wait):
109+
spawn_after(self._delay, self._resume_workflows_paused_during_shutdown)
110+
super(WorkflowExecutionHandler, self).start(wait=wait)
111+
110112
def shutdown(self):
111113
super(WorkflowExecutionHandler, self).shutdown()
112-
while self._active_messages > 0:
113-
concurrency.sleep(2)
114+
exit_timeout = cfg.CONF.workflow_engine.exit_still_active_check
115+
sleep_delay = cfg.CONF.workflow_engine.still_active_check_interval
116+
timeout = 0
117+
118+
while timeout < exit_timeout and self._active_messages > 0:
119+
concurrency.sleep(sleep_delay)
120+
timeout += sleep_delay
114121

115122
coordinator = coordination.get_coordinator()
116123
member_ids = []
117-
with coordinator.get_lock(SHUTDOWN_ROUTINE):
124+
with coordinator.get_lock(WORKFLOW_ENGINE_START_STOP_SEQ):
118125
try:
119-
member_ids = list(
120-
coordinator.get_members(WORKFLOW_ENGINE.encode("utf-8")).get()
121-
)
126+
group_id = coordination.get_group_id(WORKFLOW_ENGINE)
127+
member_ids = list(coordinator.get_members(group_id).get())
122128
except GroupNotCreated:
123129
pass
124130

125-
# Check if there are other runners in service registry
131+
# Check if there are other WFEs in service registry
126132
if cfg.CONF.coordination.service_registry and not member_ids:
127133
ac_ex_dbs = self._get_running_workflows()
128134
for ac_ex_db in ac_ex_dbs:
129135
lv_ac = action_utils.get_liveaction_by_id(ac_ex_db.liveaction["id"])
130-
ac_svc.request_pause(lv_ac, SHUTDOWN_ROUTINE)
136+
ac_svc.request_pause(lv_ac, WORKFLOW_ENGINE_START_STOP_SEQ)
131137

132138
def _get_running_workflows(self):
133139
query_filters = {
@@ -139,16 +145,16 @@ def _get_running_workflows(self):
139145
def _get_workflows_paused_during_shutdown(self):
140146
query_filters = {
141147
"status": ac_const.LIVEACTION_STATUS_PAUSED,
142-
"context__paused_by": SHUTDOWN_ROUTINE,
148+
"context__paused_by": WORKFLOW_ENGINE_START_STOP_SEQ,
143149
}
144150
return lv_db_access.LiveAction.query(**query_filters)
145151

146152
def _resume_workflows_paused_during_shutdown(self):
147153
coordinator = coordination.get_coordinator()
148-
with coordinator.get_lock(SHUTDOWN_ROUTINE):
154+
with coordinator.get_lock(WORKFLOW_ENGINE_START_STOP_SEQ):
149155
lv_ac_dbs = self._get_workflows_paused_during_shutdown()
150156
for lv_ac_db in lv_ac_dbs:
151-
ac_svc.request_resume(lv_ac_db, SHUTDOWN_ROUTINE)
157+
ac_svc.request_resume(lv_ac_db, WORKFLOW_ENGINE_START_STOP_SEQ)
152158

153159
def fail_workflow_execution(self, message, exception):
154160
# Prepare attributes based on message type.

st2actions/tests/unit/test_workflow_engine.py

Lines changed: 144 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,11 @@ def test_process_error_handling_has_error(self, mock_get_lock):
272272
lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
273273
self.assertEqual(lv_ac_db.status, action_constants.LIVEACTION_STATUS_CANCELED)
274274

275+
@mock.patch.object(
276+
coordination_service.NoOpDriver,
277+
"get_members",
278+
mock.MagicMock(return_value=coordination_service.NoOpAsyncResult("")),
279+
)
275280
def test_workflow_engine_shutdown(self):
276281
cfg.CONF.set_override(
277282
name="service_registry", override=True, group="coordination"
@@ -289,8 +294,6 @@ def test_workflow_engine_shutdown(self):
289294
self.assertEqual(wf_ex_db.status, action_constants.LIVEACTION_STATUS_RUNNING)
290295
workflow_engine = workflows.get_engine()
291296

292-
# Manually add running workflow
293-
workflow_engine._handling_workflows = [str(ac_ex_db.id)]
294297
eventlet.spawn(workflow_engine.shutdown)
295298

296299
# Sleep for few seconds to ensure execution transitions to pausing.
@@ -318,7 +321,9 @@ def test_workflow_engine_shutdown(self):
318321
self.assertEqual(lv_ac_db.status, action_constants.LIVEACTION_STATUS_PAUSED)
319322

320323
workflow_engine = workflows.get_engine()
321-
eventlet.sleep(workflow_engine._delay)
324+
workflow_engine._delay = 0
325+
workflow_engine.start(False)
326+
eventlet.sleep(workflow_engine._delay + 5)
322327
lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
323328
self.assertTrue(
324329
lv_ac_db.status
@@ -328,3 +333,139 @@ def test_workflow_engine_shutdown(self):
328333
action_constants.LIVEACTION_STATUS_SUCCEEDED,
329334
]
330335
)
336+
337+
@mock.patch.object(
338+
coordination_service.NoOpDriver,
339+
"get_members",
340+
mock.MagicMock(return_value=coordination_service.NoOpAsyncResult("member-1")),
341+
)
342+
def test_workflow_engine_shutdown_with_multiple_members(self):
343+
cfg.CONF.set_override(
344+
name="service_registry", override=True, group="coordination"
345+
)
346+
wf_meta = self.get_wf_fixture_meta_data(TEST_PACK_PATH, "sequential.yaml")
347+
lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta["name"])
348+
lv_ac_db, ac_ex_db = action_service.request(lv_ac_db)
349+
350+
# Assert action execution is running.
351+
lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
352+
self.assertEqual(lv_ac_db.status, action_constants.LIVEACTION_STATUS_RUNNING)
353+
wf_ex_db = wf_db_access.WorkflowExecution.query(
354+
action_execution=str(ac_ex_db.id)
355+
)[0]
356+
self.assertEqual(wf_ex_db.status, action_constants.LIVEACTION_STATUS_RUNNING)
357+
workflow_engine = workflows.get_engine()
358+
359+
eventlet.spawn(workflow_engine.shutdown)
360+
361+
# Sleep for few seconds to ensure shutdown sequence completes.
362+
eventlet.sleep(5)
363+
364+
lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
365+
self.assertEqual(lv_ac_db.status, action_constants.LIVEACTION_STATUS_RUNNING)
366+
367+
# Process task1.
368+
query_filters = {"workflow_execution": str(wf_ex_db.id), "task_id": "task1"}
369+
t1_ex_db = wf_db_access.TaskExecution.query(**query_filters)[0]
370+
t1_ac_ex_db = ex_db_access.ActionExecution.query(
371+
task_execution=str(t1_ex_db.id)
372+
)[0]
373+
374+
workflows.get_engine().process(t1_ac_ex_db)
375+
t1_ac_ex_db = ex_db_access.ActionExecution.query(
376+
task_execution=str(t1_ex_db.id)
377+
)[0]
378+
self.assertEqual(
379+
t1_ac_ex_db.status, action_constants.LIVEACTION_STATUS_SUCCEEDED
380+
)
381+
382+
lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
383+
self.assertEqual(lv_ac_db.status, action_constants.LIVEACTION_STATUS_RUNNING)
384+
385+
def test_workflow_engine_shutdown_with_service_registry_disabled(self):
386+
cfg.CONF.set_override(
387+
name="service_registry", override=False, group="coordination"
388+
)
389+
wf_meta = self.get_wf_fixture_meta_data(TEST_PACK_PATH, "sequential.yaml")
390+
lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta["name"])
391+
lv_ac_db, ac_ex_db = action_service.request(lv_ac_db)
392+
393+
# Assert action execution is running.
394+
lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
395+
self.assertEqual(lv_ac_db.status, action_constants.LIVEACTION_STATUS_RUNNING)
396+
wf_ex_db = wf_db_access.WorkflowExecution.query(
397+
action_execution=str(ac_ex_db.id)
398+
)[0]
399+
self.assertEqual(wf_ex_db.status, action_constants.LIVEACTION_STATUS_RUNNING)
400+
workflow_engine = workflows.get_engine()
401+
402+
eventlet.spawn(workflow_engine.shutdown)
403+
404+
# Sleep for few seconds to ensure shutdown sequence completes.
405+
eventlet.sleep(5)
406+
407+
# WFE doesn't pause the workflow, since service registry is disabled.
408+
lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
409+
self.assertEqual(lv_ac_db.status, action_constants.LIVEACTION_STATUS_RUNNING)
410+
411+
@mock.patch.object(
412+
coordination_service.NoOpDriver,
413+
"get_lock",
414+
mock.MagicMock(return_value=coordination_service.NoOpLock(name="noop")),
415+
)
416+
def test_workflow_engine_concurrent_shutdown_and_start(self):
417+
cfg.CONF.set_override(
418+
name="service_registry", override=True, group="coordination"
419+
)
420+
wf_meta = self.get_wf_fixture_meta_data(TEST_PACK_PATH, "sequential.yaml")
421+
lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta["name"])
422+
lv_ac_db, ac_ex_db = action_service.request(lv_ac_db)
423+
424+
# Assert action execution is running.
425+
lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
426+
self.assertEqual(lv_ac_db.status, action_constants.LIVEACTION_STATUS_RUNNING)
427+
wf_ex_db = wf_db_access.WorkflowExecution.query(
428+
action_execution=str(ac_ex_db.id)
429+
)[0]
430+
self.assertEqual(wf_ex_db.status, action_constants.LIVEACTION_STATUS_RUNNING)
431+
workflow_engine = workflows.get_engine()
432+
433+
workflow_engine._delay = 0
434+
# Start and stop WFE simultaneously.
435+
eventlet.spawn_after(1, workflow_engine.shutdown)
436+
eventlet.spawn_after(1, workflow_engine.start, True)
437+
438+
lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
439+
440+
# Case 1: Shutdown routine acquires the lock first
441+
# Case 2: Startup routine acquires the lock first
442+
if lv_ac_db.status == action_constants.LIVEACTION_STATUS_PAUSING:
443+
# Process task1
444+
query_filters = {"workflow_execution": str(wf_ex_db.id), "task_id": "task1"}
445+
t1_ex_db = wf_db_access.TaskExecution.query(**query_filters)[0]
446+
t1_ac_ex_db = ex_db_access.ActionExecution.query(
447+
task_execution=str(t1_ex_db.id)
448+
)[0]
449+
450+
workflows.get_engine().process(t1_ac_ex_db)
451+
# Startup sequence won't proceed until shutdown routine completes.
452+
# Assuming shutdown sequence is complete, start up sequence will resume the workflow.
453+
eventlet.sleep(workflow_engine._delay + 5)
454+
lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
455+
self.assertTrue(
456+
lv_ac_db.status
457+
in [
458+
action_constants.LIVEACTION_STATUS_RESUMING,
459+
action_constants.LIVEACTION_STATUS_RUNNING,
460+
action_constants.LIVEACTION_STATUS_SUCCEEDED,
461+
]
462+
)
463+
else:
464+
coordination_service.NoOpDriver.get_members = mock.MagicMock(
465+
return_value=coordination_service.NoOpAsyncResult("member-1")
466+
)
467+
eventlet.sleep(workflow_engine._delay + 5)
468+
lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
469+
self.assertEqual(
470+
lv_ac_db.status, action_constants.LIVEACTION_STATUS_RUNNING
471+
)

st2common/st2common/config.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -797,6 +797,16 @@ def register_opts(ignore_errors=False):
797797
"orphaned and cancelled by the garbage collector. A value of zero means the "
798798
"feature is disabled. This is disabled by default.",
799799
),
800+
cfg.IntOpt(
801+
"exit_still_active_check",
802+
default=300,
803+
help="How long to wait for process (in seconds) to exit after receiving shutdown signal.",
804+
),
805+
cfg.IntOpt(
806+
"still_active_check_interval",
807+
default=2,
808+
help="Time interval between subsequent queries to check executions handled by WFE.",
809+
),
800810
]
801811

802812
do_register_opts(

st2common/st2common/services/coordination.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,3 +277,11 @@ def get_member_id():
277277
proc_info = system_info.get_process_info()
278278
member_id = six.b("%s_%d" % (proc_info["hostname"], proc_info["pid"]))
279279
return member_id
280+
281+
282+
def get_group_id(service):
283+
if not isinstance(service, six.binary_type):
284+
group_id = service.encode("utf-8")
285+
else:
286+
group_id = service
287+
return group_id

0 commit comments

Comments
 (0)