|
| 1 | +Server-side background task execution system |
| 2 | +============================================ |
| 3 | + |
| 4 | +Introduction |
| 5 | +------------ |
| 6 | + |
| 7 | +In order to facilitate the quick release of _API handler_ processes (so the server can reply to new _Thrift API_ requests), CodeChecker's **`server`** package implements support for the creation of **background tasks**. |
| 8 | +A generic execution library deals with the **driver** aspects of background tasks, including the database handling (for cross-service synchronisation of task statuses) and memory management. |
| 9 | + |
| 10 | +Each task is associated with a **`token`**, which is a random generated identifier, corresponding to the `PRIMARY KEY` in the database. |
| 11 | +This token is used to query information about a task, and to execute administrative actions against the task. |
| 12 | +Ancillary data, stored in the server's storage space on disk, is also keyed by the _`token`_. |
| 13 | + |
| 14 | +The most important property of a _Task_ is its **status**, which can be: |
| 15 | + |
| 16 | +* _`ALLOCATED`_: The task has its identifying token, and is under preparation. |
| 17 | +* _`ENQUEUED`_: The task had been prepared and waiting for an executor to start the work. |
| 18 | +* _`RUNNING`_: The task is currently being executed. |
| 19 | +* _`COMPLETED`_: The task's execution finished successfully. |
| 20 | +* _`FAILED`_: The task's execution "structurally" failed due to an "inside" property of the execution. An uncaught `Exception` would have escaped the executor's _"main"_ method. |
| 21 | +* _`CANCELLED`_: An administrator (**`SUPERUSER`**, see [the Permission system](permissions.md)) cancelled the execution of the task, and the task gracefully terminated itself. |
| 22 | +* _`DROPPED`_: External influence resulted in the executing server's shutdown, and the task did not complete in a graceful way. |
| 23 | + |
| 24 | +Task lifecycle |
| 25 | +-------------- |
| 26 | + |
| 27 | +The workflow of a task's lifecycle is as follows: |
| 28 | + |
| 29 | +### "Foreground" logic |
| 30 | + |
| 31 | +Tasks are generally spawned by API handlers, executed in the control flow of a Thrift RPC function. |
| 32 | + |
| 33 | +1. An **API** request arrives (later, this might be extended with a _`cron`_ -like scheduler) which exercises an endpoint that results in the need for a task. |
| 34 | +2. _(Optionally)_ some conformance checks are executed on the input, in order to not even create the task if the input is ill-formed. |
| 35 | +3. A task **`token`** is _`ALLOCATED`_: the record is written into the database, and now we have a unique identifier for the task. |
| 36 | +4. The task is **pushed** to the _task queue_ of the CodeChecker server, resulting in the _`ENQUEUED`_ status. |
| 37 | +5. The task's identifier **`token`** is returned to the user. |
| 38 | +6. The API hander exits and the Thrift RPC connection is terminated. |
| 39 | + |
| 40 | +The API request dispatching of the CodeChecker server has a **`TaskManager`** instance which should be passed to the API handler implementation, if not already available. |
| 41 | +Then, you can use this _`TaskManager`_ object to perform the necessary actions to enqueue the execution of a task: |
| 42 | + |
| 43 | + |
| 44 | +```py3 |
| 45 | +from pathlib import Path |
| 46 | +from ..profiler import timeit |
| 47 | +from ..task_executors.task_manager import TaskManager |
| 48 | +from .common import exc_to_thrift_reqfail |
| 49 | + |
| 50 | +class MyThriftEndpointHandler: |
| 51 | + def __init__(self, task_manager: TaskManager): |
| 52 | + self._task_manager = task_manager |
| 53 | + |
| 54 | + @exc_to_thrift_reqfail |
| 55 | + @timeit |
| 56 | + def apiRequestThatResultsInATask(self, arg1, arg2, large_arg: str, ...) -> str: # Return the task token! |
| 57 | + # Conformance checks and assertions on the input's validity. |
| 58 | + if invalid_input(arg1, arg2): |
| 59 | + raise ValueError("Bad request!") |
| 60 | + |
| 61 | + # Allocate the task token. |
| 62 | + tok: str = self._task_manager.allocate_task_record( |
| 63 | + # The task's "Kind": a simple string identifier which should NOT |
| 64 | + # depend on user input! |
| 65 | + # Used in filters and to quickly identify the "type" for a task |
| 66 | + # record. |
| 67 | + "MyThriftEndpointHandler::apiRequestThatResultsInATask()", |
| 68 | + |
| 69 | + # The task's "Summary": an arbitrary string that is used visually |
| 70 | + # to describe the executing task. This can be anything, even |
| 71 | + # spliced together from user input. |
| 72 | + # This is not used in the filters. |
| 73 | + "This is a task that was spawned from the API!", |
| 74 | + |
| 75 | + # The task's "User": the name of the user who is the actor which |
| 76 | + # caused the execution of the task. |
| 77 | + # The status of the task may only be queried by the relevant actor, |
| 78 | + # a PRODUCT_ADMIN (if the task is associated with a product) or |
| 79 | + # SUPERUSERs. |
| 80 | + "user", |
| 81 | + |
| 82 | + # If the task is associated with a product, pass the ORM `Product` |
| 83 | + # object here. Otherwise, pass `None`. |
| 84 | + current_product_obj or None) |
| 85 | + |
| 86 | + # Large inputs to the task **MUST** be passed through the file system |
| 87 | + # in order not to crash the server. |
| 88 | + # **If** the task needs large inputs, they must go into a temporary |
| 89 | + # directory appropriately created by the task manager. |
| 90 | + data_dir: Path = self._task_manager.create_task_data(tok) |
| 91 | + |
| 92 | + # Create the files under `data_dir` ... |
| 93 | + with open(data_dir / "large.txt", 'w') as f: |
| 94 | + f.write(large_arg) |
| 95 | + |
| 96 | + # Instantiate the `MyTask` class (see later) which contains the |
| 97 | + # actual business logic of the task. |
| 98 | + # |
| 99 | + # Small input fields that are of trivial serialisable Python types |
| 100 | + # (scalars, string, etc., but not file descriptors or network |
| 101 | + # connections) can be passed directly to the task object's constructor. |
| 102 | + task = MyTask(token, data_dir, arg1, arg2) |
| 103 | + |
| 104 | + # Enqueue the task, at which point it may start immediately executing, |
| 105 | + # depending on server load. |
| 106 | + self._task_manager.push_task(task) |
| 107 | + |
| 108 | + return tok |
| 109 | +``` |
| 110 | + |
| 111 | + |
| 112 | +### "Background" logic |
| 113 | + |
| 114 | +The business logic of tasks are implemented by subclassing the _`AbstractTask`_ class and providing an appropriate constructor and overriding the **`_implementation()`** method. |
| 115 | + |
| 116 | +1. Once a _`Task`_ instance is pushed into the server's task queue by _`TaskManager::push_task()`_, one of the background workers of the server will awaken and pop the task from the queue. The size of the queue is limited, hence why only **small** arguments may be present in the state of the _`Task`_ object! |
| 117 | +2. This popped object is reconstructed by the standard Python library _`pickle`_, hence why only **trivial** scalar-like objects may be present in the state of the _`Task`_ object! |
| 118 | +3. The executor starts running the custom **`_implementation()`** method, after setting the task's status to _`RUNNING`_. |
| 119 | +4. The implementation does its thing, periodically calling _`task_manager.heartbeat()`_ to update the progress timestamp of the task, and, if appropriate, checking with _`task_manager.should_cancel()`_ whether the admins requested the task to cancel or the server is shutting down. |
| 120 | +5. If _`should_cancel()`_ returned `True`, the task does some appropriate clean-up, and exits by raising the special _`TaskCancelHonoured`_ exception, indicating that it responded to the request. (At this point, the status becomes either _`CANCELLED`_ or _`DROPPED`_, depending on the circumstances of the service.) |
| 121 | +6. Otherwise, or if the task is for some reason not cancellable without causing damage, the task executes its logic. |
| 122 | +7. If the task's _`_implementation()`_ method exits cleanly, it reaches the _`COMPLETED`_ status; otherwise, if any exception escapes from the _`_implementation()`_ method, the task becomes _`FAILED`_. |
| 123 | + |
| 124 | +**Caution!** Tasks, executing in a separate background process part of the many processes spawned by a CodeChecker server, no longer have the ability to synchronously communicate with the user! |
| 125 | +This also includes the lack of ability to "return" a value: tasks **only exercise side-effects**, but do not calculate a "result". |
| 126 | + |
| 127 | + |
| 128 | +```py3 |
| 129 | +from ..task_executors.abstract_task import AbstractTask |
| 130 | +from ..task_executors.task_manager import TaskManager |
| 131 | + |
| 132 | +class MyTask(AbstractTask): |
| 133 | + def __init__(self, token: str, data_dir: Path, arg1, arg2): # Note: No large_arg! |
| 134 | + # If the task does not use a temporary data directory, `data_dir` can |
| 135 | + # be omitted, and `None` may be passed instead! |
| 136 | + super().__init__(token, data_dir) |
| 137 | + self.arg1 = arg1 |
| 138 | + self.arg2 = arg2 |
| 139 | + |
| 140 | + def _implementation(self, tm: TaskManager) -> None: # Tasks do not have a result value! |
| 141 | + # First, obtain the rest of the input (e.g., `large_arg`), |
| 142 | + # if any is needed. |
| 143 | + with open(self.data_path / "large.txt", 'r') as f: |
| 144 | + large_arg: str = f.read() |
| 145 | + |
| 146 | + # Exceptions raised above, e.g., the lack of the file, automatically |
| 147 | + # turn the task into the FAILED state. |
| 148 | + |
| 149 | + # Let's assume the task does something in an iteration... |
| 150 | + for i in range(0, int(self.arg1) + int(self.arg2)): |
| 151 | + tm.heartbeat() # Indicate some progress ... |
| 152 | + element = large_arg.split('\n')[i] |
| 153 | + |
| 154 | + if tm.should_cancel(self): |
| 155 | + # A shutdown was requested of the running task. |
| 156 | + |
| 157 | + # Perform some cleanup logic ... |
| 158 | + |
| 159 | + # Maybe have some customised log? |
| 160 | + tm.add_comment(self, |
| 161 | + # The body of the comment. |
| 162 | + "Oh no, we are shutting down ...!\n" |
| 163 | + f"But only processed {i + 1} entries!", |
| 164 | + |
| 165 | + # The actor entity associated with the comment. |
| 166 | + "SYSTEM?") |
| 167 | + |
| 168 | + raise TaskCancelHonoured(self) |
| 169 | + |
| 170 | + # Actually process the step ... |
| 171 | + foo(element) |
| 172 | +``` |
| 173 | + |
| 174 | +Client-side handling |
| 175 | +-------------------- |
| 176 | + |
| 177 | +In a client, call the task-generating API endpoint normally. |
| 178 | +It should return a `str`, the **`token`** identifier of the task. |
| 179 | + |
| 180 | +This _token_ can be awaited (polled) programmatically using a library function: |
| 181 | + |
| 182 | + |
| 183 | +```py3 |
| 184 | +from codechecker_client import client as libclient |
| 185 | +from codechecker_client.task_client import await_task_termination |
| 186 | + |
| 187 | +def main(...) -> int: |
| 188 | + client = setup_client(server_url or product_url) |
| 189 | + tok: str = client.apiRequestThatResultsInATask(16, 32, large_arg_str) |
| 190 | + |
| 191 | + prot, host, port = split_server_url(server_url) |
| 192 | + task_client = libclient.setup_task_client(prot, host, port) |
| 193 | + status: str = await_task_termination(LOG, tok, |
| 194 | + task_api_client=task_client) |
| 195 | + |
| 196 | + if status == "COMPLETED": |
| 197 | + return 0 |
| 198 | + LOG.error("The execution of the task failed!\n%s", |
| 199 | + task_client.getTaskInfo(tok).comments) |
| 200 | + return 1 |
| 201 | +``` |
| 202 | + |
| 203 | +In simpler wrapper scripts, alternatively, |
| 204 | +`CodeChecker cmd serverside-tasks --token TOK --await` may be used to block |
| 205 | +execution until a task terminates (one way or another). |
0 commit comments