From 68bd24b6ff558b866a8a10183ec21f12575c0caa Mon Sep 17 00:00:00 2001 From: aldbr Date: Fri, 31 Oct 2025 15:09:28 +0100 Subject: [PATCH] feat: SSHCE export inputs/import outputs as a JSON file to handle more jobs in parallel --- .../Computing/BatchSystems/Condor.py | 3 - .../Computing/BatchSystems/executeBatch.py | 22 +++-- .../Computing/LocalComputingElement.py | 2 - .../Computing/SSHComputingElement.py | 99 ++++++++++++------- 4 files changed, 76 insertions(+), 50 deletions(-) diff --git a/src/DIRAC/Resources/Computing/BatchSystems/Condor.py b/src/DIRAC/Resources/Computing/BatchSystems/Condor.py index 388d112bdc1..5757a7f2cfc 100644 --- a/src/DIRAC/Resources/Computing/BatchSystems/Condor.py +++ b/src/DIRAC/Resources/Computing/BatchSystems/Condor.py @@ -203,9 +203,6 @@ def submitJob(self, **kwargs): resultDict["Jobs"] = [] for i in range(submittedJobs): resultDict["Jobs"].append(".".join([cluster, str(i)])) - # Executable is transferred afterward - # Inform the caller that Condor cannot delete it before the end of the execution - resultDict["ExecutableToKeep"] = executable else: resultDict["Status"] = status resultDict["Message"] = error diff --git a/src/DIRAC/Resources/Computing/BatchSystems/executeBatch.py b/src/DIRAC/Resources/Computing/BatchSystems/executeBatch.py index 4f721349d51..34f30958b10 100644 --- a/src/DIRAC/Resources/Computing/BatchSystems/executeBatch.py +++ b/src/DIRAC/Resources/Computing/BatchSystems/executeBatch.py @@ -35,8 +35,10 @@ from urllib.parse import unquote as urlunquote - arguments = sys.argv[1] - inputDict = json.loads(urlunquote(arguments)) + # Read options from JSON file + optionsFilePath = sys.argv[1] + with open(optionsFilePath, 'r') as f: + inputDict = json.load(f) method = inputDict.pop('Method') batchSystem = inputDict.pop('BatchSystem') @@ -45,9 +47,15 @@ try: result = getattr(batch, method)(**inputDict) except Exception: - result = traceback.format_exc() - - resultJson = urlquote(json.dumps(result)) - print("============= Start output ===============") - print(resultJson) + # Wrap the traceback in a proper error structure + result = { + 'Status': -1, + 'Message': 'Exception during batch method execution', + 'Traceback': traceback.format_exc() + } + + # Write result to JSON file + resultFilePath = optionsFilePath.replace('.json', '_result.json') + with open(resultFilePath, 'w') as f: + json.dump(result, f) """ diff --git a/src/DIRAC/Resources/Computing/LocalComputingElement.py b/src/DIRAC/Resources/Computing/LocalComputingElement.py index 5f23f3e5e0c..064482cf074 100644 --- a/src/DIRAC/Resources/Computing/LocalComputingElement.py +++ b/src/DIRAC/Resources/Computing/LocalComputingElement.py @@ -182,8 +182,6 @@ def submitJob(self, executableFile, proxy=None, numberOfJobs=1): batchSystemName = self.batchSystem.__class__.__name__.lower() jobIDs = ["ssh" + batchSystemName + "://" + self.ceName + "/" + _id for _id in resultSubmit["Jobs"]] result = S_OK(jobIDs) - if "ExecutableToKeep" in resultSubmit: - result["ExecutableToKeep"] = resultSubmit["ExecutableToKeep"] else: result = S_ERROR(resultSubmit["Message"]) diff --git a/src/DIRAC/Resources/Computing/SSHComputingElement.py b/src/DIRAC/Resources/Computing/SSHComputingElement.py index dc353bbdcac..25668e62b57 100644 --- a/src/DIRAC/Resources/Computing/SSHComputingElement.py +++ b/src/DIRAC/Resources/Computing/SSHComputingElement.py @@ -67,9 +67,10 @@ import os import shutil import stat +import tempfile import uuid from shlex import quote as shlex_quote -from urllib.parse import quote, unquote, urlparse +from urllib.parse import urlparse import pexpect @@ -484,47 +485,69 @@ def __executeHostCommand(self, command, options, ssh=None, host=None): options["User"] = self.user options["Queue"] = self.queue - options = json.dumps(options) - options = quote(options) + localOptionsFile = None + remoteOptionsFile = None + localResultFile = None + remoteResultFile = None + try: + # Write options to a local temporary file + with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: + json.dump(options, f) + localOptionsFile = f.name + + # Upload the options file to the remote host + remoteOptionsFile = f"{self.sharedArea}/batch_options_{uuid.uuid4().hex}.json" + result = ssh.scpCall(30, localOptionsFile, remoteOptionsFile) + if not result["OK"]: + return result - cmd = ( - "bash --login -c 'python3 %s/execute_batch %s || python %s/execute_batch %s || python2 %s/execute_batch %s'" - % (self.sharedArea, options, self.sharedArea, options, self.sharedArea, options) - ) + # Execute the batch command with the options file path + cmd = ( + f"bash --login -c 'python3 {self.sharedArea}/execute_batch {remoteOptionsFile} || " + f"python {self.sharedArea}/execute_batch {remoteOptionsFile} || " + f"python2 {self.sharedArea}/execute_batch {remoteOptionsFile}'" + ) - self.log.verbose(f"CE submission command: {cmd}") + self.log.verbose(f"CE submission command: {cmd}") - result = ssh.sshCall(120, cmd) - if not result["OK"]: - self.log.error(f"{self.ceType} CE job submission failed", result["Message"]) - return result + result = ssh.sshCall(120, cmd) + if not result["OK"]: + self.log.error(f"{self.ceType} CE job submission failed", result["Message"]) + return result - sshStatus = result["Value"][0] - sshStdout = result["Value"][1] - sshStderr = result["Value"][2] - - # Examine results of the job submission - if sshStatus == 0: - output = sshStdout.strip().replace("\r", "").strip() - if not output: - return S_ERROR("No output from remote command") - - try: - index = output.index("============= Start output ===============") - output = output[index + 42 :] - except ValueError: - return S_ERROR(f"Invalid output from remote command: {output}") - - try: - output = unquote(output) - result = json.loads(output) - if isinstance(result, str) and result.startswith("Exception:"): - return S_ERROR(result) - return S_OK(result) - except Exception: - return S_ERROR("Invalid return structure from job submission") - else: - return S_ERROR("\n".join([sshStdout, sshStderr])) + sshStatus = result["Value"][0] + if sshStatus != 0: + sshStdout = result["Value"][1] + sshStderr = result["Value"][2] + return S_ERROR(f"CE job submission command failed with status {sshStatus}: {sshStdout} {sshStderr}") + + # The result should be written to a JSON file by execute_batch + # Compute the expected result file path + remoteResultFile = remoteOptionsFile.replace(".json", "_result.json") + + # Try to download the result file + with tempfile.NamedTemporaryFile(mode="r", suffix=".json", delete=False) as f: + localResultFile = f.name + + result = ssh.scpCall(30, localResultFile, remoteResultFile, upload=False) + if not result["OK"]: + return result + + # Read the result from the downloaded file + with open(localResultFile) as f: + result = json.load(f) + return S_OK(result) + finally: + # Clean up local temporary file + if localOptionsFile and os.path.exists(localOptionsFile): + os.remove(localOptionsFile) + if localResultFile and os.path.exists(localResultFile): + os.remove(localResultFile) + # Clean up remote temporary files + if remoteOptionsFile: + ssh.sshCall(30, f"rm -f {remoteOptionsFile}") + if remoteResultFile: + ssh.sshCall(30, f"rm -f {remoteResultFile}") def submitJob(self, executableFile, proxy, numberOfJobs=1): # self.log.verbose( "Executable file path: %s" % executableFile )