Skip to content

Commit 2828495

Browse files
authored
Merge pull request #8244 from fstagni/90_logic_JMX
[9.0] Move the logic out of the server (cleaning and deleting jobs)
2 parents a488b43 + 9fe25df commit 2828495

File tree

15 files changed

+231
-336
lines changed

15 files changed

+231
-336
lines changed

setup.cfg

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -292,13 +292,10 @@ console_scripts =
292292
# TransformationSystem
293293
dirac-production-runjoblocal = DIRAC.TransformationSystem.scripts.dirac_production_runjoblocal:main
294294
dirac-transformation-add-files = DIRAC.TransformationSystem.scripts.dirac_transformation_add_files:main [admin]
295-
dirac-transformation-archive = DIRAC.TransformationSystem.scripts.dirac_transformation_archive:main [admin]
296-
dirac-transformation-clean = DIRAC.TransformationSystem.scripts.dirac_transformation_clean:main [admin]
297295
dirac-transformation-cli = DIRAC.TransformationSystem.scripts.dirac_transformation_cli:main [admin]
298296
dirac-transformation-get-files = DIRAC.TransformationSystem.scripts.dirac_transformation_get_files:main [admin]
299297
dirac-transformation-information = DIRAC.TransformationSystem.scripts.dirac_transformation_information:main [admin]
300298
dirac-transformation-recover-data = DIRAC.TransformationSystem.scripts.dirac_transformation_recover_data:main [admin]
301-
dirac-transformation-remove-output = DIRAC.TransformationSystem.scripts.dirac_transformation_remove_output:main [admin]
302299
dirac-transformation-replication = DIRAC.TransformationSystem.scripts.dirac_transformation_replication:main [admin]
303300
dirac-transformation-verify-outputdata = DIRAC.TransformationSystem.scripts.dirac_transformation_verify_outputdata:main [admin]
304301
dirac-transformation-update-derived = DIRAC.TransformationSystem.scripts.dirac_transformation_update_derived:main [admin]

src/DIRAC/TransformationSystem/Agent/TransformationCleaningAgent.py

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,12 @@
3232
from DIRAC.Resources.Storage.StorageElement import StorageElement
3333
from DIRAC.TransformationSystem.Client import TransformationStatus
3434
from DIRAC.TransformationSystem.Client.TransformationClient import TransformationClient
35-
from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient
36-
from DIRAC.WorkloadManagementSystem.Client.WMSClient import WMSClient
3735
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
36+
from DIRAC.WorkloadManagementSystem.Service.JobPolicy import (
37+
RIGHT_DELETE,
38+
RIGHT_KILL,
39+
)
40+
from DIRAC.WorkloadManagementSystem.Utilities.jobAdministration import kill_delete_jobs
3841

3942
# # agent's name
4043
AGENT_NAME = "Transformation/TransformationCleaningAgent"
@@ -58,8 +61,6 @@ def __init__(self, *args, **kwargs):
5861

5962
# # transformation client
6063
self.transClient = None
61-
# # wms client
62-
self.wmsClient = None
6364
# # request client
6465
self.reqClient = None
6566
# # file catalog client
@@ -120,14 +121,10 @@ def initialize(self):
120121

121122
# # transformation client
122123
self.transClient = TransformationClient()
123-
# # wms client
124-
self.wmsClient = WMSClient()
125124
# # request client
126125
self.reqClient = ReqClient()
127126
# # file catalog client
128127
self.metadataClient = FileCatalogClient()
129-
# # job monitoring client
130-
self.jobMonitoringClient = JobMonitoringClient()
131128
# # job DB
132129
self.jobDB = JobDB()
133130

@@ -271,7 +268,7 @@ def finalize(self):
271268

272269
# Remove JobIDs that were unknown to the TransformationSystem
273270
jobGroupsToCheck = [str(transDict["TransformationID"]).zfill(8) for transDict in toClean + toArchive]
274-
res = self.jobMonitoringClient.getJobs({"JobGroup": jobGroupsToCheck})
271+
res = self.jobDB.selectJobs({"JobGroup": jobGroupsToCheck})
275272
if not res["OK"]:
276273
return res
277274
jobIDsToRemove = [int(jobID) for jobID in res["Value"]]
@@ -613,8 +610,8 @@ def __removeWMSTasks(self, transJobIDs):
613610
# Prevent 0 job IDs
614611
jobIDs = [int(j) for j in transJobIDs if int(j)]
615612
allRemove = True
616-
for jobList in breakListIntoChunks(jobIDs, 500):
617-
res = self.wmsClient.killJob(jobList, force=True)
613+
for jobList in breakListIntoChunks(jobIDs, 1000):
614+
res = kill_delete_jobs(RIGHT_KILL, jobList, force=True)
618615
if res["OK"]:
619616
self.log.info(f"Successfully killed {len(jobList)} jobs from WMS")
620617
elif ("InvalidJobIDs" in res) and ("NonauthorizedJobIDs" not in res) and ("FailedJobIDs" not in res):
@@ -626,7 +623,7 @@ def __removeWMSTasks(self, transJobIDs):
626623
self.log.error("Failed to kill jobs", f"(n={len(res['FailedJobIDs'])})")
627624
allRemove = False
628625

629-
res = self.wmsClient.deleteJob(jobList)
626+
res = kill_delete_jobs(RIGHT_DELETE, jobList, force=True)
630627
if res["OK"]:
631628
self.log.info("Successfully deleted jobs from WMS", f"(n={len(jobList)})")
632629
elif ("InvalidJobIDs" in res) and ("NonauthorizedJobIDs" not in res) and ("FailedJobIDs" not in res):

src/DIRAC/TransformationSystem/scripts/dirac_transformation_archive.py

Lines changed: 0 additions & 30 deletions
This file was deleted.

src/DIRAC/TransformationSystem/scripts/dirac_transformation_clean.py

Lines changed: 0 additions & 30 deletions
This file was deleted.

src/DIRAC/TransformationSystem/scripts/dirac_transformation_remove_output.py

Lines changed: 0 additions & 30 deletions
This file was deleted.

src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,12 @@
3535
from DIRAC.RequestManagementSystem.Client.ReqClient import ReqClient
3636
from DIRAC.RequestManagementSystem.Client.Request import Request
3737
from DIRAC.WorkloadManagementSystem.Client import JobStatus
38-
from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient
3938
from DIRAC.WorkloadManagementSystem.Client.WMSClient import WMSClient
4039
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
4140
from DIRAC.WorkloadManagementSystem.DB.SandboxMetadataDB import SandboxMetadataDB
41+
from DIRAC.WorkloadManagementSystem.Service.JobPolicy import RIGHT_DELETE
42+
from DIRAC.WorkloadManagementSystem.Utilities.jobAdministration import kill_delete_jobs
43+
from DIRAC.WorkloadManagementSystem.Utilities.JobParameters import getJobParameters
4244

4345

4446
class JobCleaningAgent(AgentModule):
@@ -230,11 +232,11 @@ def _deleteRemoveJobs(self, jobList, remove=False):
230232
if not res["OK"]:
231233
self.log.error("No DN found", f"for {user}")
232234
return res
233-
wmsClient = WMSClient(useCertificates=True, delegatedDN=res["Value"][0], delegatedGroup=ownerGroup)
234235
if remove:
236+
wmsClient = WMSClient(useCertificates=True, delegatedDN=res["Value"][0], delegatedGroup=ownerGroup)
235237
result = wmsClient.removeJob(jobsList)
236238
else:
237-
result = wmsClient.deleteJob(jobsList)
239+
result = kill_delete_jobs(RIGHT_DELETE, jobsList)
238240
if not result["OK"]:
239241
self.log.error(
240242
f"Could not {'remove' if remove else 'delete'} jobs",
@@ -294,7 +296,8 @@ def deleteJobOversizedSandbox(self, jobIDList):
294296
failed = {}
295297
successful = {}
296298

297-
result = JobMonitoringClient().getJobParameters(jobIDList, ["OutputSandboxLFN"])
299+
jobIDs = [int(jobID) for jobID in jobIDList]
300+
result = getJobParameters(jobIDs, "OutputSandboxLFN")
298301
if not result["OK"]:
299302
return result
300303
osLFNDict = result["Value"]

src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py

Lines changed: 3 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,16 @@
1414
from DIRAC import S_ERROR, S_OK, gConfig
1515
from DIRAC.AccountingSystem.Client.Types.Job import Job
1616
from DIRAC.ConfigurationSystem.Client.Helpers import cfgPath
17-
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getDNForUsername
1817
from DIRAC.Core.Base.AgentModule import AgentModule
1918
from DIRAC.Core.Utilities import DErrno
2019
from DIRAC.Core.Utilities.ClassAd.ClassAdLight import ClassAd
2120
from DIRAC.Core.Utilities.TimeUtilities import fromString, second, toEpoch
2221
from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus, JobStatus
23-
from DIRAC.WorkloadManagementSystem.Client.WMSClient import WMSClient
2422
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
2523
from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB
2624
from DIRAC.WorkloadManagementSystem.DB.PilotAgentsDB import PilotAgentsDB
25+
from DIRAC.WorkloadManagementSystem.Service.JobPolicy import RIGHT_KILL
26+
from DIRAC.WorkloadManagementSystem.Utilities.jobAdministration import kill_delete_jobs
2727
from DIRAC.WorkloadManagementSystem.Utilities.JobParameters import getJobParameters
2828
from DIRAC.WorkloadManagementSystem.Utilities.Utils import rescheduleJobs
2929

@@ -235,7 +235,7 @@ def _failStalledJobs(self, jobID):
235235
# Set the jobs Failed, send them a kill signal in case they are not really dead
236236
# and send accounting info
237237
if setFailed:
238-
res = self._sendKillCommand(jobID)
238+
res = kill_delete_jobs(RIGHT_KILL, [jobID], nonauthJobList=[], force=True)
239239
if not res["OK"]:
240240
self.log.error("Failed to kill job", jobID)
241241

@@ -574,26 +574,3 @@ def _failSubmittingJobs(self):
574574
continue
575575

576576
return S_OK()
577-
578-
def _sendKillCommand(self, job):
579-
"""Send a kill signal to the job such that it cannot continue running.
580-
581-
:param int job: ID of job to send kill command
582-
"""
583-
584-
res = self.jobDB.getJobAttribute(job, "Owner")
585-
if not res["OK"]:
586-
return res
587-
owner = res["Value"]
588-
589-
res = self.jobDB.getJobAttribute(job, "OwnerGroup")
590-
if not res["OK"]:
591-
return res
592-
ownerGroup = res["Value"]
593-
594-
wmsClient = WMSClient(
595-
useCertificates=True,
596-
delegatedDN=getDNForUsername(owner)["Value"][0] if owner else None,
597-
delegatedGroup=ownerGroup,
598-
)
599-
return wmsClient.killJob(job)

src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobCleaningAgent.py

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
""" Test class for Job Cleaning Agent
22
"""
3-
import pytest
43
from unittest.mock import MagicMock
54

5+
import pytest
6+
67
# DIRAC Components
7-
from DIRAC import gLogger, S_OK
8+
from DIRAC import S_OK, gLogger
89
from DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent import JobCleaningAgent
910

1011
gLogger.setLevel("DEBUG")
@@ -32,7 +33,6 @@ def jca(mocker):
3233
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobDB.selectJobs", side_effect=mockReply)
3334
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobDB.__init__", side_effect=mockNone)
3435
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.ReqClient", return_value=mockNone)
35-
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobMonitoringClient", return_value=mockJMC)
3636

3737
jca = JobCleaningAgent()
3838
jca.log = gLogger
@@ -98,7 +98,7 @@ def test_deleteJobsByStatus(jca, conditions, mockReplyInput, expected):
9898
"inputs, params, expected",
9999
[
100100
([], {"OK": True, "Value": {}}, {"OK": True, "Value": {"Failed": {}, "Successful": {}}}),
101-
(["a", "b"], {"OK": True, "Value": {}}, {"OK": True, "Value": {"Failed": {}, "Successful": {}}}),
101+
(["123", "456"], {"OK": True, "Value": {}}, {"OK": True, "Value": {"Failed": {}, "Successful": {}}}),
102102
(
103103
[],
104104
{"OK": True, "Value": {1: {"OutputSandboxLFN": "/some/lfn/1.txt"}}},
@@ -113,11 +113,11 @@ def test_deleteJobsByStatus(jca, conditions, mockReplyInput, expected):
113113
{"OK": True, "Value": {"Failed": {}, "Successful": {1: "/some/lfn/1.txt", 2: "/some/other/lfn/2.txt"}}},
114114
),
115115
(
116-
["a", "b"],
116+
["123", "456"],
117117
{"OK": True, "Value": {1: {"OutputSandboxLFN": "/some/lfn/1.txt"}}},
118118
{"OK": True, "Value": {"Failed": {}, "Successful": {1: "/some/lfn/1.txt"}}},
119119
),
120-
(["a", "b"], {"OK": False}, {"OK": False}),
120+
(["123", "456"], {"OK": False}, {"OK": False}),
121121
],
122122
)
123123
def test_deleteJobOversizedSandbox(mocker, inputs, params, expected):
@@ -127,19 +127,17 @@ def test_deleteJobOversizedSandbox(mocker, inputs, params, expected):
127127
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.AgentModule.am_getOption", return_value=mockAM)
128128
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobDB", return_value=mockNone)
129129
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.ReqClient", return_value=mockNone)
130-
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobMonitoringClient", return_value=mockJMC)
131130
mocker.patch(
132131
"DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.getDNForUsername", return_value=S_OK(["/bih/boh/DN"])
133132
)
133+
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.getJobParameters", return_value=params)
134134

135135
jobCleaningAgent = JobCleaningAgent()
136136
jobCleaningAgent.log = gLogger
137137
jobCleaningAgent.log.setLevel("DEBUG")
138138
jobCleaningAgent._AgentModule__configDefaults = mockAM
139139
jobCleaningAgent.initialize()
140140

141-
mockJMC.getJobParameters.return_value = params
142-
143141
result = jobCleaningAgent.deleteJobOversizedSandbox(inputs)
144142

145143
assert result == expected

src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_StalledJobAgent.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@ def sja(mocker):
2828
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.rescheduleJobs", return_value=MagicMock())
2929
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.PilotAgentsDB", return_value=MagicMock())
3030
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.getJobParameters", return_value=MagicMock())
31-
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.WMSClient", return_value=MagicMock())
32-
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.getDNForUsername", return_value=MagicMock())
3331

3432
stalledJobAgent = StalledJobAgent()
3533
stalledJobAgent._AgentModule__configDefaults = mockAM

src/DIRAC/WorkloadManagementSystem/Client/WMSClient.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,10 @@
22
methods necessary to communicate with the Workload Management System
33
"""
44
import os
5-
from io import StringIO
65
import time
6+
from io import StringIO
77

8-
from DIRAC import S_OK, S_ERROR, gLogger
9-
8+
from DIRAC import S_ERROR, S_OK, gLogger
109
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
1110
from DIRAC.Core.Utilities import File
1211
from DIRAC.Core.Utilities.ClassAd.ClassAdLight import ClassAd

0 commit comments

Comments
 (0)