Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -100,5 +100,5 @@ It is based on layered architecture and is based on DIRAC architecture:
SandboxMetadataDB class is a front-end to the metadata for sandboxes.

* JobParametersDB
JobParametersDB class is a front-end to the Elastic/OpenSearch based index providing Job Parameters.
It is used in most of the WMS components and is based on Elastic/OpenSearch.
JobParametersDB class is a front-end to the OpenSearch based index providing Job Parameters.
It is used in most of the WMS components and is based on OpenSearch.
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@
from DIRAC.RequestManagementSystem.Client.ReqClient import ReqClient
from DIRAC.RequestManagementSystem.Client.Request import Request
from DIRAC.WorkloadManagementSystem.Client import JobStatus
from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient
from DIRAC.WorkloadManagementSystem.Client.SandboxStoreClient import SandboxStoreClient
from DIRAC.WorkloadManagementSystem.Client.WMSClient import WMSClient
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
from DIRAC.WorkloadManagementSystem.DB.JobParametersDB import JobParametersDB


class JobCleaningAgent(AgentModule):
Expand Down Expand Up @@ -293,7 +293,7 @@ def deleteJobOversizedSandbox(self, jobIDList):
failed = {}
successful = {}

result = JobMonitoringClient().getJobParameters(jobIDList, ["OutputSandboxLFN"])
result = JobParametersDB().getJobParameters(jobIDList, ["OutputSandboxLFN"])
if not result["OK"]:
return result
osLFNDict = result["Value"]
Expand Down
65 changes: 0 additions & 65 deletions src/DIRAC/WorkloadManagementSystem/DB/JobDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,56 +110,6 @@ def getDistinctJobAttributes(self, attribute, condDict=None, older=None, newer=N
"Jobs", attribute, condDict=condDict, older=older, newer=newer, timeStamp=timeStamp
)

#############################################################################
def getJobParameters(self, jobID, paramList=None):
"""Get Job Parameters defined for jobID.
Returns a dictionary with the Job Parameters.
If parameterList is empty - all the parameters are returned.
"""
jobIDList = [jobID] if isinstance(jobID, (str, int)) else jobID

resultDict = {}
if paramList:
if isinstance(paramList, str):
paramList = paramList.split(",")
paramNameList = []
for pn in paramList:
ret = self._escapeString(pn)
if not ret["OK"]:
return ret
paramNameList.append(ret["Value"])
cmd = "SELECT JobID, Name, Value FROM JobParameters WHERE JobID IN ({}) AND Name IN ({})".format(
",".join(str(int(j)) for j in jobIDList),
",".join(paramNameList),
)
result = self._query(cmd)
if result["OK"]:
if result["Value"]:
for res_jobID, res_name, res_value in result["Value"]:
try:
res_value = res_value.decode(errors="replace") # account for use of BLOBs
except AttributeError:
pass
resultDict.setdefault(int(res_jobID), {})[res_name] = res_value

return S_OK(resultDict) # there's a slim chance that this is an empty dictionary
else:
return S_ERROR("JobDB.getJobParameters: failed to retrieve parameters")

else:
result = self.getFields("JobParameters", ["JobID", "Name", "Value"], {"JobID": jobID})
if not result["OK"]:
return result

for res_jobID, res_name, res_value in result["Value"]:
try:
res_value = res_value.decode(errors="replace") # account for use of BLOBs
except AttributeError:
pass
resultDict.setdefault(int(res_jobID), {})[res_name] = res_value

return S_OK(resultDict) # there's a slim chance that this is an empty dictionary

#############################################################################
def getAtticJobParameters(self, jobID, paramList=None, rescheduleCounter=-1):
"""Get Attic Job Parameters defined for a job with jobID.
Expand Down Expand Up @@ -274,16 +224,6 @@ def getJobAttribute(self, jobID, attribute):
return result
return S_OK(result["Value"].get(attribute))

#############################################################################
@deprecated("Use JobParametersDB instead")
def getJobParameter(self, jobID, parameter):
"""Get the given parameter of a job specified by its jobID"""

result = self.getJobParameters(jobID, [parameter])
if not result["OK"]:
return result
return S_OK(result.get("Value", {}).get(int(jobID), {}).get(parameter))

#############################################################################
def getJobOptParameter(self, jobID, parameter):
"""Get optimizer parameters for the given job."""
Expand Down Expand Up @@ -1023,7 +963,6 @@ def removeJobFromDB(self, jobIDs):

for table in [
"InputData",
"JobParameters",
"AtticJobParameters",
"HeartBeatLoggingInfo",
"OptimizerParameters",
Expand Down Expand Up @@ -1101,10 +1040,6 @@ def rescheduleJob(self, jobID):
return ret
e_jobID = ret["Value"]

res = self._update(f"DELETE FROM JobParameters WHERE JobID={e_jobID}")
if not res["OK"]:
return res

# Delete optimizer parameters
if not self._update(f"DELETE FROM OptimizerParameters WHERE JobID={e_jobID}")["OK"]:
return S_ERROR("JobDB.removeJobOptParameter: operation failed.")
Expand Down
10 changes: 0 additions & 10 deletions src/DIRAC/WorkloadManagementSystem/DB/JobDB.sql
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,6 @@ CREATE TABLE `InputData` (
FOREIGN KEY (`JobID`) REFERENCES `Jobs`(`JobID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- ------------------------------------------------------------------------------
DROP TABLE IF EXISTS `JobParameters`;
CREATE TABLE `JobParameters` (
`JobID` INT(11) UNSIGNED NOT NULL,
`Name` VARCHAR(100) NOT NULL,
`Value` TEXT NOT NULL,
PRIMARY KEY (`JobID`,`Name`),
FOREIGN KEY (`JobID`) REFERENCES `Jobs`(`JobID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- ------------------------------------------------------------------------------
DROP TABLE IF EXISTS `OptimizerParameters`;
CREATE TABLE `OptimizerParameters` (
Expand Down
23 changes: 4 additions & 19 deletions src/DIRAC/WorkloadManagementSystem/Utilities/JobParameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,19 +154,19 @@ def getJobParameters(jobIDs: list[int], parName: str | None, vo: str = "") -> di
:rtype: dict
"""
from DIRAC.WorkloadManagementSystem.DB.JobParametersDB import JobParametersDB
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB

elasticJobParametersDB = JobParametersDB()
jobDB = JobDB()

if vo: # a user is connecting, with a proxy
res = elasticJobParametersDB.getJobParameters(jobIDs, vo, parName)
if not res["OK"]:
return res
parameters = res["Value"]
else: # a service is connecting, no proxy, e.g. StalledJobAgent
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB

q = f"SELECT JobID, VO FROM Jobs WHERE JobID IN ({','.join([str(jobID) for jobID in jobIDs])})"
res = jobDB._query(q)
res = JobDB()._query(q)
if not res["OK"]:
return res
if not res["Value"]:
Expand All @@ -185,19 +185,4 @@ def getJobParameters(jobIDs: list[int], parName: str | None, vo: str = "") -> di
return res
parameters.update(res["Value"])

# Need anyway to get also from JobDB, for those jobs with parameters registered in MySQL or in both backends
res = jobDB.getJobParameters(jobIDs, parName)
if not res["OK"]:
return res
parametersM = res["Value"]

# and now combine
final = dict(parametersM)
# if job in JobDB, update with parameters from ES if any
for jobID in final:
final[jobID].update(parameters.get(jobID, {}))
# if job in ES and not in JobDB, take ES
for jobID in parameters:
if jobID not in final:
final[jobID] = parameters[jobID]
return S_OK(final)
return S_OK(parameters)
Loading