Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/7026.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Log (re)commencement of xtrigger calls, for old xtriggers.
4 changes: 4 additions & 0 deletions cylc/flow/xtrigger_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,7 @@ def call_xtriggers_async(self, itask: 'TaskProxy'):
# General case: potentially slow asynchronous function call.
if sig in self.sat_xtrig:
# Already satisfied, just update the task
LOG.info(f"[{itask}] satisfying xtrigger prerequisite: {sig}")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO we should log this as INFO, as confirmation that an xtrigger prerequisite is being satisfied by an existing xtrigger result.

if not itask.state.xtriggers[label]:
itask.state.xtriggers[label] = True
res = {}
Expand Down Expand Up @@ -760,7 +761,10 @@ def housekeep(self, itasks):
itask, sigs_only=True, unsat_only=True)
for sig in list(self.sat_xtrig):
if sig not in all_xtrig:
LOG.debug(f"Housekeeping xtrigger result: {sig}")
del self.sat_xtrig[sig]
with suppress(KeyError):
del self.t_next_call[sig]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the bug fix.

self.do_housekeeping = False

def all_task_seq_xtriggers_satisfied(self, itask: 'TaskProxy') -> bool:
Expand Down
155 changes: 150 additions & 5 deletions tests/integration/test_xtrigger_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from cylc.flow.pathutil import get_workflow_run_dir
from cylc.flow.scheduler import Scheduler
from cylc.flow.subprocctx import SubFuncContext
from cylc.flow.task_outputs import TASK_OUTPUT_SUCCEEDED


async def test_2_xtriggers(flow, start, scheduler, monkeypatch):
Expand Down Expand Up @@ -95,10 +96,12 @@ async def test_2_xtriggers(flow, start, scheduler, monkeypatch):

async def test_1_xtrigger_2_tasks(flow, start, scheduler, mocker):
"""
If multiple tasks depend on the same satisfied xtrigger, the DB mgr method
put_xtriggers should only be called once - when the xtrigger gets satisfied
If two tasks depend on the same satisfied xtrigger, put_xtriggers should
only be called once - when the xtrigger gets satisfied.
- https://github.com/cylc/cylc-flow/pull/5908

See [GitHub #5908](https://github.com/cylc/cylc-flow/pull/5908)
So long as both tasks are active at once:
- https://github.com/cylc/cylc-flow/issues/7027

"""
id_ = flow({
Expand All @@ -120,8 +123,8 @@ async def test_1_xtrigger_2_tasks(flow, start, scheduler, mocker):
# (For clock triggers this is synchronous)
schd.xtrigger_mgr.call_xtriggers_async(task)

# It should now be satisfied.
assert task.state.xtriggers == {'wall_clock': True}
# It should now be satisfied.
assert task.state.xtriggers == {'wall_clock': True}

# Check one put_xtriggers call only, not two.
assert spy.call_count == 1
Expand All @@ -133,6 +136,148 @@ async def test_1_xtrigger_2_tasks(flow, start, scheduler, mocker):
# loop doesn't run in this test.


async def test_1_xtrigger_2_tasks_async(
flow, start, scheduler, mocker, caplog
):
"""
Like test_1_xtrigger_2_tasks but for async (not clock) xtriggers.

If two tasks depend on the same satisfied xtrigger, put_xtriggers should
only be called once - when the xtrigger gets satisfied (so long as both
are active at once - https://github.com/cylc/cylc-flow/issues/7027

"""
id_ = flow({
'scheduling': {
'cycling mode': 'integer',
'xtriggers': {
'echo': 'echo("whatever", succeed=False)',
},
'graph': {
'R1': '''
@echo => foo & bar
'''
},
},
})

schd = scheduler(id_)
spy = mocker.spy(schd.workflow_db_mgr, 'put_xtriggers')

async with start(schd):

foo = schd.pool._get_task_by_id('1/foo')
bar = schd.pool._get_task_by_id('1/bar')

schd.xtrigger_mgr.call_xtriggers_async(foo)
assert "Commencing xtrigger" in caplog.text
caplog.clear()

schd.xtrigger_mgr.call_xtriggers_async(bar)
assert "Commencing xtrigger" not in caplog.text
caplog.clear()

satisfy_xtrigger_functions(schd) # mock results
assert "xtrigger succeeded" in caplog.text
caplog.clear()

schd.xtrigger_mgr.call_xtriggers_async(foo) # process callbacks
assert "satisfying xtrigger" in caplog.text
caplog.clear()

schd.xtrigger_mgr.call_xtriggers_async(bar) # process callbacks
assert "satisfying xtrigger" in caplog.text
caplog.clear()

# It should now be satisfied.
assert foo.state.xtriggers == {'echo': True}
assert bar.state.xtriggers == {'echo': True}

# Check put_xtriggers called once, not twice.
assert spy.call_count == 1


async def test_1_xtrigger_2_tasks_later(
flow, start, scheduler, mocker, caplog
):
"""
If two tasks depend on the same satisfied xtrigger, but are not
both active at the same time, the xtrigger will need to be
satisfied twice - https://github.com/cylc/cylc-flow/issues/7027

"""
id_ = flow({
'scheduling': {
'cycling mode': 'integer',
'xtriggers': {
'echo': 'echo("whatever", succeed=False)',
},
'graph': {
'R1': '''
@echo => foo & bar
# spawn bar after @echo is housekept post satisfying foo
foo => bar
'''
},
},
})

schd = scheduler(id_)
spy = mocker.spy(schd.workflow_db_mgr, 'put_xtriggers')

async with start(schd):

# Get foo at startup
foo = schd.pool._get_task_by_id('1/foo')

# call the xtrigger
schd.xtrigger_mgr.call_xtriggers_async(foo)
assert "Commencing xtrigger" in caplog.text
caplog.clear()

# satisfy it
satisfy_xtrigger_functions(schd) # mock results
assert "xtrigger succeeded" in caplog.text
caplog.clear()

# process callback
schd.xtrigger_mgr.call_xtriggers_async(foo)
assert "satisfying xtrigger" in caplog.text
caplog.clear()

# foo should now be satisfied.
assert foo.state.xtriggers == {'echo': True}

# this will delete the xtrigger - nothing else depends on it
schd.xtrigger_mgr.housekeep([foo])

# Spawn bar and remove foo
schd.pool.spawn_on_output(foo, TASK_OUTPUT_SUCCEEDED)

bar = schd.pool._get_task_by_id('1/bar')

# the xtrigger should be called again for bar
schd.xtrigger_mgr.call_xtriggers_async(bar)
assert "Commencing xtrigger" in caplog.text
caplog.clear()

# satisfy it
satisfy_xtrigger_functions(schd) # mock results
assert "xtrigger succeeded" in caplog.text
caplog.clear()

# process callback
schd.xtrigger_mgr.call_xtriggers_async(bar)
assert "satisfying xtrigger" in caplog.text
caplog.clear()

# bar should now be satisfied.
assert bar.state.xtriggers == {'echo': True}

# Check put_xtriggers called twice, not once.
assert spy.call_count == 2


async def test_xtriggers_restart(flow, start, scheduler, db_select):
"""It should write satisfied xtriggers to the DB and load on restart.

Expand Down
Loading