|
42 | 42 | ENV_POD_USE_IMAGE_ENTRYPOINT, |
43 | 43 | ENV_KUBECTL_IGNORE_ERROR, |
44 | 44 | ENV_DEFAULT_SCHEDULER_QUEUE_TAGS, |
| 45 | + ENV_LOG_POD_STATUS_BEFORE_DELETING, |
45 | 46 | ) |
46 | 47 | from .._vendor import pyyaml as yaml |
47 | 48 |
|
@@ -220,6 +221,7 @@ def __init__( |
220 | 221 | self.ignore_kubectl_errors_re = ( |
221 | 222 | re.compile(ENV_KUBECTL_IGNORE_ERROR.get()) if ENV_KUBECTL_IGNORE_ERROR.get() else None |
222 | 223 | ) |
| 224 | + self.log_pod_status_before_deleting = ENV_LOG_POD_STATUS_BEFORE_DELETING.get() |
223 | 225 |
|
224 | 226 | @property |
225 | 227 | def agent_label(self): |
@@ -431,7 +433,7 @@ def get_pods(self, filters: List[str] = None, debug_msg: str = None): |
431 | 433 | output_config = json.loads(output) |
432 | 434 | except Exception as ex: |
433 | 435 | self.log.warning('Failed parsing kubectl output:\n{}\nEx: {}'.format(output, ex)) |
434 | | - return |
| 436 | + return [] |
435 | 437 | return output_config.get('items', []) |
436 | 438 |
|
437 | 439 | def _get_pod_count(self, extra_labels: List[str] = None, msg: str = None): |
@@ -490,7 +492,6 @@ def run_one_task(self, queue: Text, task_id: Text, worker_args=None, task_sessio |
490 | 492 | "task": task_id, |
491 | 493 | "queue": self.k8s_pending_queue_id, |
492 | 494 | "status_reason": "k8s pending scheduler", |
493 | | - "update_execution_queue": False, |
494 | 495 | } |
495 | 496 | ) |
496 | 497 | if res.ok: |
@@ -1037,7 +1038,71 @@ def _process_bash_lines_response(self, bash_cmd: str, raise_error=True): |
1037 | 1038 | ] |
1038 | 1039 | return lines |
1039 | 1040 |
|
| 1041 | + def _log_pod_statuses(self, pods: List[Dict]): |
| 1042 | + """ |
| 1043 | + Log pod status and exit codes to the task's console log. |
| 1044 | + :param pods: A list of pod dictionaries to log. |
| 1045 | + :param msg: A message to log. |
| 1046 | + """ |
| 1047 | + if not pods: |
| 1048 | + return |
| 1049 | + for pod in pods: |
| 1050 | + pod_name = get_path(pod, "metadata", "name") |
| 1051 | + task_id = pod_name[len(self.pod_name_prefix) :] |
| 1052 | + |
| 1053 | + # Log pod status and exit codes |
| 1054 | + pod_status = get_path(pod, "status", "phase") |
| 1055 | + ctr_statuses = get_path(pod, "status", "containerStatuses") or [] |
| 1056 | + |
| 1057 | + log_lines = [] |
| 1058 | + log_lines.append( |
| 1059 | + "Pod '{pod_name}' ended with status '{pod_status}'".format( |
| 1060 | + pod_name=pod_name, pod_status=pod_status |
| 1061 | + ) |
| 1062 | + ) |
| 1063 | + |
| 1064 | + for ctr_status in ctr_statuses: |
| 1065 | + ctr_name = ctr_status.get("name") |
| 1066 | + ctr_state = get_path(ctr_status, "state") |
| 1067 | + ctr_reason = get_path(ctr_state, "terminated", "reason") |
| 1068 | + ctr_exit_code = get_path(ctr_state, "terminated", "exitCode") |
| 1069 | + log_lines.append( |
| 1070 | + "Container '{name}' ended with reason '{reason}' and exit code '{exit_code}'".format( |
| 1071 | + name=ctr_name, reason=ctr_reason, exit_code=ctr_exit_code |
| 1072 | + ) |
| 1073 | + ) |
| 1074 | + |
| 1075 | + try: |
| 1076 | + self.send_logs(task_id, [os.linesep.join(log_lines)]) |
| 1077 | + except Exception as ex: |
| 1078 | + self.log.warning(f"Failed sending pod status logs for task {task_id}: {ex}") |
| 1079 | + |
| 1080 | + def _delete_pods_by_names(self, names: List[str], namespace: str, msg: str = None) -> List[str]: |
| 1081 | + if not names: |
| 1082 | + return [] |
| 1083 | + kubectl_cmd = "kubectl delete pod --namespace={ns} {names} --output=name".format( |
| 1084 | + ns=namespace, names=" ".join(names) |
| 1085 | + ) |
| 1086 | + self.log.debug("Deleting pods by name {}: {}".format( |
| 1087 | + msg or "", kubectl_cmd |
| 1088 | + )) |
| 1089 | + lines = self._process_bash_lines_response(kubectl_cmd) |
| 1090 | + self.log.debug(" - deleted pods by name %s", ", ".join(lines)) |
| 1091 | + return lines |
| 1092 | + |
1040 | 1093 | def _delete_pods(self, selectors: List[str], namespace: str, msg: str = None) -> List[str]: |
| 1094 | + if self.log_pod_status_before_deleting: |
| 1095 | + pods_to_delete = self.get_pods( |
| 1096 | + filters=selectors, |
| 1097 | + debug_msg="Getting pods to delete: {cmd}", |
| 1098 | + ) |
| 1099 | + self._log_pod_statuses(pods_to_delete) |
| 1100 | + return self._delete_pods_by_names( |
| 1101 | + [get_path(p, "metadata", "name") for p in pods_to_delete if get_path(p, "metadata", "name")], |
| 1102 | + namespace, |
| 1103 | + msg=msg |
| 1104 | + ) |
| 1105 | + |
1041 | 1106 | kubectl_cmd = \ |
1042 | 1107 | "kubectl delete pod -l={agent_label} " \ |
1043 | 1108 | "--namespace={namespace} --field-selector={selector} --output name".format( |
@@ -1089,6 +1154,8 @@ def _delete_completed_or_failed_pods(self, namespace, msg: str = None): |
1089 | 1154 | debug_msg="Deleting failed pods: {cmd}" |
1090 | 1155 | ) |
1091 | 1156 | if failed_pods: |
| 1157 | + if self.log_pod_status_before_deleting: |
| 1158 | + self._log_pod_statuses(failed_pods) |
1092 | 1159 | job_names_to_delete = { |
1093 | 1160 | get_path(pod, "metadata", "labels", "job-name"): get_path(pod, "metadata", "namespace") |
1094 | 1161 | for pod in failed_pods |
|
0 commit comments