Skip to content

Commit 61e1b12

Browse files
committed
feat: removed JobDB JobParameters table
1 parent 7781d39 commit 61e1b12

File tree

5 files changed

+8
-98
lines changed

5 files changed

+8
-98
lines changed

docs/source/DeveloperGuide/Systems/WorkloadManagement/index.rst

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,5 +100,5 @@ It is based on layered architecture and is based on DIRAC architecture:
100100
SandboxMetadataDB class is a front-end to the metadata for sandboxes.
101101

102102
* JobParametersDB
103-
JobParametersDB class is a front-end to the Elastic/OpenSearch based index providing Job Parameters.
104-
It is used in most of the WMS components and is based on Elastic/OpenSearch.
103+
JobParametersDB class is a front-end to the OpenSearch based index providing Job Parameters.
104+
It is used in most of the WMS components and is based on OpenSearch.

src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,10 @@
3535
from DIRAC.RequestManagementSystem.Client.ReqClient import ReqClient
3636
from DIRAC.RequestManagementSystem.Client.Request import Request
3737
from DIRAC.WorkloadManagementSystem.Client import JobStatus
38-
from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient
3938
from DIRAC.WorkloadManagementSystem.Client.SandboxStoreClient import SandboxStoreClient
4039
from DIRAC.WorkloadManagementSystem.Client.WMSClient import WMSClient
4140
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
41+
from DIRAC.WorkloadManagementSystem.DB.JobParametersDB import JobParametersDB
4242

4343

4444
class JobCleaningAgent(AgentModule):
@@ -293,7 +293,7 @@ def deleteJobOversizedSandbox(self, jobIDList):
293293
failed = {}
294294
successful = {}
295295

296-
result = JobMonitoringClient().getJobParameters(jobIDList, ["OutputSandboxLFN"])
296+
result = JobParametersDB().getJobParameters(jobIDList, ["OutputSandboxLFN"])
297297
if not result["OK"]:
298298
return result
299299
osLFNDict = result["Value"]

src/DIRAC/WorkloadManagementSystem/DB/JobDB.py

Lines changed: 0 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -110,56 +110,6 @@ def getDistinctJobAttributes(self, attribute, condDict=None, older=None, newer=N
110110
"Jobs", attribute, condDict=condDict, older=older, newer=newer, timeStamp=timeStamp
111111
)
112112

113-
#############################################################################
114-
def getJobParameters(self, jobID, paramList=None):
115-
"""Get Job Parameters defined for jobID.
116-
Returns a dictionary with the Job Parameters.
117-
If parameterList is empty - all the parameters are returned.
118-
"""
119-
jobIDList = [jobID] if isinstance(jobID, (str, int)) else jobID
120-
121-
resultDict = {}
122-
if paramList:
123-
if isinstance(paramList, str):
124-
paramList = paramList.split(",")
125-
paramNameList = []
126-
for pn in paramList:
127-
ret = self._escapeString(pn)
128-
if not ret["OK"]:
129-
return ret
130-
paramNameList.append(ret["Value"])
131-
cmd = "SELECT JobID, Name, Value FROM JobParameters WHERE JobID IN ({}) AND Name IN ({})".format(
132-
",".join(str(int(j)) for j in jobIDList),
133-
",".join(paramNameList),
134-
)
135-
result = self._query(cmd)
136-
if result["OK"]:
137-
if result["Value"]:
138-
for res_jobID, res_name, res_value in result["Value"]:
139-
try:
140-
res_value = res_value.decode(errors="replace") # account for use of BLOBs
141-
except AttributeError:
142-
pass
143-
resultDict.setdefault(int(res_jobID), {})[res_name] = res_value
144-
145-
return S_OK(resultDict) # there's a slim chance that this is an empty dictionary
146-
else:
147-
return S_ERROR("JobDB.getJobParameters: failed to retrieve parameters")
148-
149-
else:
150-
result = self.getFields("JobParameters", ["JobID", "Name", "Value"], {"JobID": jobID})
151-
if not result["OK"]:
152-
return result
153-
154-
for res_jobID, res_name, res_value in result["Value"]:
155-
try:
156-
res_value = res_value.decode(errors="replace") # account for use of BLOBs
157-
except AttributeError:
158-
pass
159-
resultDict.setdefault(int(res_jobID), {})[res_name] = res_value
160-
161-
return S_OK(resultDict) # there's a slim chance that this is an empty dictionary
162-
163113
#############################################################################
164114
def getAtticJobParameters(self, jobID, paramList=None, rescheduleCounter=-1):
165115
"""Get Attic Job Parameters defined for a job with jobID.
@@ -274,16 +224,6 @@ def getJobAttribute(self, jobID, attribute):
274224
return result
275225
return S_OK(result["Value"].get(attribute))
276226

277-
#############################################################################
278-
@deprecated("Use JobParametersDB instead")
279-
def getJobParameter(self, jobID, parameter):
280-
"""Get the given parameter of a job specified by its jobID"""
281-
282-
result = self.getJobParameters(jobID, [parameter])
283-
if not result["OK"]:
284-
return result
285-
return S_OK(result.get("Value", {}).get(int(jobID), {}).get(parameter))
286-
287227
#############################################################################
288228
def getJobOptParameter(self, jobID, parameter):
289229
"""Get optimizer parameters for the given job."""
@@ -1023,7 +963,6 @@ def removeJobFromDB(self, jobIDs):
1023963

1024964
for table in [
1025965
"InputData",
1026-
"JobParameters",
1027966
"AtticJobParameters",
1028967
"HeartBeatLoggingInfo",
1029968
"OptimizerParameters",
@@ -1101,10 +1040,6 @@ def rescheduleJob(self, jobID):
11011040
return ret
11021041
e_jobID = ret["Value"]
11031042

1104-
res = self._update(f"DELETE FROM JobParameters WHERE JobID={e_jobID}")
1105-
if not res["OK"]:
1106-
return res
1107-
11081043
# Delete optimizer parameters
11091044
if not self._update(f"DELETE FROM OptimizerParameters WHERE JobID={e_jobID}")["OK"]:
11101045
return S_ERROR("JobDB.removeJobOptParameter: operation failed.")

src/DIRAC/WorkloadManagementSystem/DB/JobDB.sql

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -78,16 +78,6 @@ CREATE TABLE `InputData` (
7878
FOREIGN KEY (`JobID`) REFERENCES `Jobs`(`JobID`)
7979
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
8080

81-
-- ------------------------------------------------------------------------------
82-
DROP TABLE IF EXISTS `JobParameters`;
83-
CREATE TABLE `JobParameters` (
84-
`JobID` INT(11) UNSIGNED NOT NULL,
85-
`Name` VARCHAR(100) NOT NULL,
86-
`Value` TEXT NOT NULL,
87-
PRIMARY KEY (`JobID`,`Name`),
88-
FOREIGN KEY (`JobID`) REFERENCES `Jobs`(`JobID`)
89-
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
90-
9181
-- ------------------------------------------------------------------------------
9282
DROP TABLE IF EXISTS `OptimizerParameters`;
9383
CREATE TABLE `OptimizerParameters` (

src/DIRAC/WorkloadManagementSystem/Utilities/JobParameters.py

Lines changed: 4 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -154,19 +154,19 @@ def getJobParameters(jobIDs: list[int], parName: str | None, vo: str = "") -> di
154154
:rtype: dict
155155
"""
156156
from DIRAC.WorkloadManagementSystem.DB.JobParametersDB import JobParametersDB
157-
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
158157

159158
elasticJobParametersDB = JobParametersDB()
160-
jobDB = JobDB()
161159

162160
if vo: # a user is connecting, with a proxy
163161
res = elasticJobParametersDB.getJobParameters(jobIDs, vo, parName)
164162
if not res["OK"]:
165163
return res
166164
parameters = res["Value"]
167165
else: # a service is connecting, no proxy, e.g. StalledJobAgent
166+
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
167+
168168
q = f"SELECT JobID, VO FROM Jobs WHERE JobID IN ({','.join([str(jobID) for jobID in jobIDs])})"
169-
res = jobDB._query(q)
169+
res = JobDB()._query(q)
170170
if not res["OK"]:
171171
return res
172172
if not res["Value"]:
@@ -185,19 +185,4 @@ def getJobParameters(jobIDs: list[int], parName: str | None, vo: str = "") -> di
185185
return res
186186
parameters.update(res["Value"])
187187

188-
# Need anyway to get also from JobDB, for those jobs with parameters registered in MySQL or in both backends
189-
res = jobDB.getJobParameters(jobIDs, parName)
190-
if not res["OK"]:
191-
return res
192-
parametersM = res["Value"]
193-
194-
# and now combine
195-
final = dict(parametersM)
196-
# if job in JobDB, update with parameters from ES if any
197-
for jobID in final:
198-
final[jobID].update(parameters.get(jobID, {}))
199-
# if job in ES and not in JobDB, take ES
200-
for jobID in parameters:
201-
if jobID not in final:
202-
final[jobID] = parameters[jobID]
203-
return S_OK(final)
188+
return S_OK(parameters)

0 commit comments

Comments
 (0)