Skip to content

Commit c4cd6ae

Browse files
authored
Merge pull request #1574 from mvdbeek/wait_for_upload_via_staging
Wait for upload job and make sure uploads are ok
2 parents 976df2a + e0c609c commit c4cd6ae

File tree

1 file changed

+28
-13
lines changed

1 file changed

+28
-13
lines changed

planemo/galaxy/activity.py

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from typing import (
99
Any,
1010
Dict,
11+
List,
1112
Optional,
1213
Tuple,
1314
Type,
@@ -125,6 +126,7 @@ def __init__(
125126
self._runnable = runnable
126127
self._version_major = version_major
127128
self._simultaneous_uploads = simultaneous_uploads
129+
self._upload_jobs: List[Dict[str, Any]] = []
128130

129131
def _post(self, api_path: str, payload: Dict[str, Any], files_attached: bool = False) -> Dict[str, Any]:
130132
# Keep the files_attached argument because StagingInterface._post() had
@@ -140,11 +142,35 @@ def _post(self, api_path: str, payload: Dict[str, Any], files_attached: bool = F
140142
def _attach_file(self, path):
141143
return attach_file(path)
142144

143-
def _handle_job(self, job_response):
145+
def _handle_job(self, job_response: Dict[str, Any]) -> None:
146+
# Track upload jobs for later waiting
147+
self._upload_jobs.append(job_response)
144148
if not self._simultaneous_uploads:
145149
job_id = job_response["id"]
146150
_wait_for_job(self._user_gi, job_id)
147151

152+
def wait_for_uploads(self, check_ok: bool = True) -> None:
153+
for upload_job in self._upload_jobs:
154+
job_id = upload_job["id"]
155+
final_state = _wait_for_job(self._user_gi, job_id)
156+
if check_ok:
157+
job_response = self._user_gi.jobs.show_job(job_id, full_details=True)
158+
if final_state != "ok":
159+
stderr = job_response["stderr"]
160+
raise Exception(f"Upload job [{job_id}] failed with state [{final_state}]: {stderr}")
161+
for output in job_response["outputs"].values():
162+
hda = self._user_gi.datasets.show_dataset(output["id"])
163+
if hda["state"] not in ("ok", "deferred"):
164+
raise Exception(
165+
f"Upload job [{job_id}] produced output [{hda['hid']}: {hda['name']}] in state [{hda['state']}]"
166+
)
167+
for output in job_response["output_collections"].values():
168+
hdca = self._user_gi.histories.show_dataset_collection(job_response["history_id"], output["id"])
169+
if hdca["state"] not in ("ok",):
170+
raise Exception(
171+
f"Upload job [{job_id}] produced output collection [{hdca['hid']}: {hdca['name']}] in state [{hdca['state']}]"
172+
)
173+
148174
@property
149175
def use_fetch_api(self):
150176
# hack around this not working for galaxy_tools - why is that :(
@@ -328,18 +354,7 @@ def stage_in(
328354
to_posix_lines=to_posix_lines,
329355
)
330356

331-
if datasets and kwds.get("check_uploads_ok", True):
332-
ctx.vlog(f"Uploaded datasets [{datasets}] for activity, checking history state")
333-
final_state = _wait_for_history(ctx, user_gi, history_id)
334-
else:
335-
# Mark uploads as ok because nothing to do.
336-
final_state = "ok"
337-
338-
ctx.vlog(f"Final state is {final_state}")
339-
if final_state != "ok":
340-
msg = "Failed to upload data, upload state is [%s]." % final_state
341-
summarize_history(ctx, user_gi, history_id)
342-
raise Exception(msg)
357+
psi.wait_for_uploads(kwds.get("check_uploads_ok", True))
343358
return job_dict, history_id
344359

345360

0 commit comments

Comments
 (0)