-
Notifications
You must be signed in to change notification settings - Fork 1
Description
Description
Covalent now supports dispatch cancellation. Executor plugins can opt in to this functionality by registering a job identifier for each task (in this case the job id or ARN) and implementing a cancel method. This method will be invoked by Covalent when the user requests that the task be cancelled.
Methods provided by Covalent
All executor plugins automatically implement the following methods to be invoked in the plugin's run() method.
async def set_job_handle(self, job_handle)saves a job identifier (job_handle) in Covalent's database. The job_handle can be any JSONable type, typically a string or integer, and should contain whatever information is needed to cancel the job later. In this case, the ECS stop_task method expects the task id assigned by ECS.
async def get_cancel_requested(self) -> boolqueries Covalent if task cancellation has been requested. This can be called at various junctures in the run() method if desired.
The run() method should raise a TaskCancelledError exception upon ascertaining that the backend job has been cancelled.
Methods to be implemented by each plugin:
Each plugin defines the following abstract method to be overridden with backend-specific logic:
async def cancel(self, task_metadata: Dict, job_handle: str) -> bool`Upon receiving a cancellation request, the Covalent server will invoke this with the following inputs:
task_metadata: currently contains the keysdispatch_idandnode_idas for therun()method.job_handle: the task's job identifier previously saved usingset_job_handle().
In addition to querying Covalent for cancellation requests, the run() method may need to query the compute backend to determine whether the job is in fact cancelled and raise TaskCancelledError if that is the case.
The code below provides a rough guide to using the above methods
from covalent._shared_files.exceptions import TaskCancelledError
...
async def proceed_if_task_not_cancelled(self):
if await self.get_cancel_requested():
self._debug_log(f"Task Cancelled")
raise TaskCancelledError(f"Batch job {batch_job_name} cancelled")
async def run(self, function: Callable, args: List, kwargs: Dict, task_metadata: Dict) -> Any:
...
await self.proceed_if_task_not_cancelled()
# upload pickled assets
...
await self.proceed_if_task_not_cancelled()
# invoke job/task
await self.set_job_handle(handle=job_handle)
...
# await self.poll_job_status(task_metadata, job_handle)
async def poll_job_status(self, task_metadata, job_id):
# Boto3 client invocations to check the job status
# while timeout_not_exceeded:
# job_state = client.describe_job(job_id)
# if job_state == "CANCELLED":
# raise TaskCancelledError
# ...
# await asyncio.sleep(poll_freq)
async def cancel(self, task_metadata: Dict, job_handle: str) -> None:
"""
Cancel the batch job
Arg(s)
task_metadata: Dictionary with the task's dispatch_id and node id
job_handle: Unique job handle assigned to the task by Batch
Return(s)
True/False indicating if the job was cancelled
"""
# boto client invocations to cancel the task
...
if job_cancelled:
return True
else:
# Job cannot be cancelled for one reason or another
return False
Acceptance Criteria
- In the
run()method:- Save the
job handlefor the task once that is determined. - Check whether the job has been cancelled at various junctures
- Save the
- Implement
cancelmethod - Ensure that the a workflow is tested with cancellation to ensure
cancelfunctionality correctly integrated