diff --git a/docs/source/DeveloperGuide/Systems/WorkloadManagement/index.rst b/docs/source/DeveloperGuide/Systems/WorkloadManagement/index.rst index 7ec3701e700..58778dc911a 100644 --- a/docs/source/DeveloperGuide/Systems/WorkloadManagement/index.rst +++ b/docs/source/DeveloperGuide/Systems/WorkloadManagement/index.rst @@ -100,5 +100,5 @@ It is based on layered architecture and is based on DIRAC architecture: SandboxMetadataDB class is a front-end to the metadata for sandboxes. * JobParametersDB - JobParametersDB class is a front-end to the Elastic/OpenSearch based index providing Job Parameters. - It is used in most of the WMS components and is based on Elastic/OpenSearch. + JobParametersDB class is a front-end to the OpenSearch based index providing Job Parameters. + It is used in most of the WMS components and is based on OpenSearch. diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py index e3fd627a4b3..596d2b2c62f 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py @@ -35,10 +35,10 @@ from DIRAC.RequestManagementSystem.Client.ReqClient import ReqClient from DIRAC.RequestManagementSystem.Client.Request import Request from DIRAC.WorkloadManagementSystem.Client import JobStatus -from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient from DIRAC.WorkloadManagementSystem.Client.SandboxStoreClient import SandboxStoreClient from DIRAC.WorkloadManagementSystem.Client.WMSClient import WMSClient from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB +from DIRAC.WorkloadManagementSystem.DB.JobParametersDB import JobParametersDB class JobCleaningAgent(AgentModule): @@ -293,7 +293,7 @@ def deleteJobOversizedSandbox(self, jobIDList): failed = {} successful = {} - result = JobMonitoringClient().getJobParameters(jobIDList, ["OutputSandboxLFN"]) + result = JobParametersDB().getJobParameters(jobIDList, ["OutputSandboxLFN"]) if not result["OK"]: return result osLFNDict = result["Value"] diff --git a/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py b/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py index 0d82be10fd6..fb9acc33364 100755 --- a/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py +++ b/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py @@ -110,56 +110,6 @@ def getDistinctJobAttributes(self, attribute, condDict=None, older=None, newer=N "Jobs", attribute, condDict=condDict, older=older, newer=newer, timeStamp=timeStamp ) - ############################################################################# - def getJobParameters(self, jobID, paramList=None): - """Get Job Parameters defined for jobID. - Returns a dictionary with the Job Parameters. - If parameterList is empty - all the parameters are returned. - """ - jobIDList = [jobID] if isinstance(jobID, (str, int)) else jobID - - resultDict = {} - if paramList: - if isinstance(paramList, str): - paramList = paramList.split(",") - paramNameList = [] - for pn in paramList: - ret = self._escapeString(pn) - if not ret["OK"]: - return ret - paramNameList.append(ret["Value"]) - cmd = "SELECT JobID, Name, Value FROM JobParameters WHERE JobID IN ({}) AND Name IN ({})".format( - ",".join(str(int(j)) for j in jobIDList), - ",".join(paramNameList), - ) - result = self._query(cmd) - if result["OK"]: - if result["Value"]: - for res_jobID, res_name, res_value in result["Value"]: - try: - res_value = res_value.decode(errors="replace") # account for use of BLOBs - except AttributeError: - pass - resultDict.setdefault(int(res_jobID), {})[res_name] = res_value - - return S_OK(resultDict) # there's a slim chance that this is an empty dictionary - else: - return S_ERROR("JobDB.getJobParameters: failed to retrieve parameters") - - else: - result = self.getFields("JobParameters", ["JobID", "Name", "Value"], {"JobID": jobID}) - if not result["OK"]: - return result - - for res_jobID, res_name, res_value in result["Value"]: - try: - res_value = res_value.decode(errors="replace") # account for use of BLOBs - except AttributeError: - pass - resultDict.setdefault(int(res_jobID), {})[res_name] = res_value - - return S_OK(resultDict) # there's a slim chance that this is an empty dictionary - ############################################################################# def getAtticJobParameters(self, jobID, paramList=None, rescheduleCounter=-1): """Get Attic Job Parameters defined for a job with jobID. @@ -274,16 +224,6 @@ def getJobAttribute(self, jobID, attribute): return result return S_OK(result["Value"].get(attribute)) - ############################################################################# - @deprecated("Use JobParametersDB instead") - def getJobParameter(self, jobID, parameter): - """Get the given parameter of a job specified by its jobID""" - - result = self.getJobParameters(jobID, [parameter]) - if not result["OK"]: - return result - return S_OK(result.get("Value", {}).get(int(jobID), {}).get(parameter)) - ############################################################################# def getJobOptParameter(self, jobID, parameter): """Get optimizer parameters for the given job.""" @@ -1023,7 +963,6 @@ def removeJobFromDB(self, jobIDs): for table in [ "InputData", - "JobParameters", "AtticJobParameters", "HeartBeatLoggingInfo", "OptimizerParameters", @@ -1101,10 +1040,6 @@ def rescheduleJob(self, jobID): return ret e_jobID = ret["Value"] - res = self._update(f"DELETE FROM JobParameters WHERE JobID={e_jobID}") - if not res["OK"]: - return res - # Delete optimizer parameters if not self._update(f"DELETE FROM OptimizerParameters WHERE JobID={e_jobID}")["OK"]: return S_ERROR("JobDB.removeJobOptParameter: operation failed.") diff --git a/src/DIRAC/WorkloadManagementSystem/DB/JobDB.sql b/src/DIRAC/WorkloadManagementSystem/DB/JobDB.sql index 6b6704b364e..da74371cb14 100755 --- a/src/DIRAC/WorkloadManagementSystem/DB/JobDB.sql +++ b/src/DIRAC/WorkloadManagementSystem/DB/JobDB.sql @@ -78,16 +78,6 @@ CREATE TABLE `InputData` ( FOREIGN KEY (`JobID`) REFERENCES `Jobs`(`JobID`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; --- ------------------------------------------------------------------------------ -DROP TABLE IF EXISTS `JobParameters`; -CREATE TABLE `JobParameters` ( - `JobID` INT(11) UNSIGNED NOT NULL, - `Name` VARCHAR(100) NOT NULL, - `Value` TEXT NOT NULL, - PRIMARY KEY (`JobID`,`Name`), - FOREIGN KEY (`JobID`) REFERENCES `Jobs`(`JobID`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; - -- ------------------------------------------------------------------------------ DROP TABLE IF EXISTS `OptimizerParameters`; CREATE TABLE `OptimizerParameters` ( diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/JobParameters.py b/src/DIRAC/WorkloadManagementSystem/Utilities/JobParameters.py index 939acc8b39e..3fa7815f52b 100644 --- a/src/DIRAC/WorkloadManagementSystem/Utilities/JobParameters.py +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/JobParameters.py @@ -154,10 +154,8 @@ def getJobParameters(jobIDs: list[int], parName: str | None, vo: str = "") -> di :rtype: dict """ from DIRAC.WorkloadManagementSystem.DB.JobParametersDB import JobParametersDB - from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB elasticJobParametersDB = JobParametersDB() - jobDB = JobDB() if vo: # a user is connecting, with a proxy res = elasticJobParametersDB.getJobParameters(jobIDs, vo, parName) @@ -165,8 +163,10 @@ def getJobParameters(jobIDs: list[int], parName: str | None, vo: str = "") -> di return res parameters = res["Value"] else: # a service is connecting, no proxy, e.g. StalledJobAgent + from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB + q = f"SELECT JobID, VO FROM Jobs WHERE JobID IN ({','.join([str(jobID) for jobID in jobIDs])})" - res = jobDB._query(q) + res = JobDB()._query(q) if not res["OK"]: return res if not res["Value"]: @@ -185,19 +185,4 @@ def getJobParameters(jobIDs: list[int], parName: str | None, vo: str = "") -> di return res parameters.update(res["Value"]) - # Need anyway to get also from JobDB, for those jobs with parameters registered in MySQL or in both backends - res = jobDB.getJobParameters(jobIDs, parName) - if not res["OK"]: - return res - parametersM = res["Value"] - - # and now combine - final = dict(parametersM) - # if job in JobDB, update with parameters from ES if any - for jobID in final: - final[jobID].update(parameters.get(jobID, {})) - # if job in ES and not in JobDB, take ES - for jobID in parameters: - if jobID not in final: - final[jobID] = parameters[jobID] - return S_OK(final) + return S_OK(parameters)