Skip to content

Commit f9a53dc

Browse files
committed
fix: getting cpu work left from a single source of truth
1 parent 7b80cd8 commit f9a53dc

6 files changed

Lines changed: 337 additions & 155 deletions

File tree

src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py

Lines changed: 28 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
from DIRAC.RequestManagementSystem.Client.ReqClient import ReqClient
2828
from DIRAC.RequestManagementSystem.Client.Request import Request
2929
from DIRAC.RequestManagementSystem.private.RequestValidator import RequestValidator
30-
from DIRAC.Resources.Computing.BatchSystems.TimeLeft.TimeLeft import TimeLeft
3130
from DIRAC.Resources.Computing.ComputingElementFactory import ComputingElementFactory
3231
from DIRAC.WorkloadManagementSystem.Client import JobStatus, PilotStatus
3332
from DIRAC.WorkloadManagementSystem.Client.JobManagerClient import JobManagerClient
@@ -66,7 +65,7 @@ def __init__(self, agentName, loadName, baseAgentName=False, properties=None):
6665

6766
# Agent options
6867
# This is the factor to convert raw CPU to Normalized units (based on the CPU Model)
69-
self.cpuFactor = 0.0
68+
self.cpuPower = 0.0
7069
self.jobSubmissionDelay = 10
7170
self.fillingMode = True
7271
self.minimumTimeLeft = 5000
@@ -80,11 +79,10 @@ def __init__(self, agentName, loadName, baseAgentName=False, properties=None):
8079
self.logLevel = "INFO"
8180
self.defaultWrapperLocation = "DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperTemplate.py"
8281

83-
# Timeleft
84-
self.initTimes = os.times()
85-
self.initTimeLeft = 0.0
86-
self.timeLeft = self.initTimeLeft
87-
self.timeLeftUtil = None
82+
# CPU work left (Wall-clock time * CPU Power)
83+
self.initCPUWork = 0.0
84+
self.timeLeft = self.initCPUWork
85+
self.initTime = time.time()
8886
self.pilotInfoReportedFlag = False
8987

9088
# Attributes related to the processed jobs, it should take the following form:
@@ -109,23 +107,18 @@ def initialize(self):
109107
if not result["OK"]:
110108
return result
111109

112-
result = self._getCEDict(self.computingElement)
113-
if not result["OK"]:
114-
return result
115-
ceDict = result["Value"][0]
110+
# Read initial CPU work left from config (seeded by pilot via dirac-wms-get-queue-cpu-time)
111+
self.initCPUWork = gConfig.getValue("/LocalSite/CPUTimeLeft", self.initCPUWork)
112+
self.timeLeft = self.initCPUWork
116113

117-
self.initTimeLeft = ceDict.get("CPUTime", self.initTimeLeft)
118-
self.initTimeLeft = gConfig.getValue("/Resources/Computing/CEDefaults/MaxCPUTime", self.initTimeLeft)
119-
self.timeLeft = self.initTimeLeft
120-
121-
self.initTimes = os.times()
114+
self.initTime = time.time()
122115
# Localsite options
123116
self.siteName = siteName()
124117
self.pilotReference = gConfig.getValue("/LocalSite/PilotReference", self.pilotReference)
125118
self.defaultProxyLength = gConfig.getValue("/Registry/DefaultProxyLifeTime", self.defaultProxyLength)
126119
# Agent options
127120
# This is the factor to convert raw CPU to Normalized units (based on the CPU Model)
128-
self.cpuFactor = gConfig.getValue("/LocalSite/CPUNormalizationFactor", self.cpuFactor)
121+
self.cpuPower = gConfig.getValue("/LocalSite/CPUNormalizationFactor", self.cpuPower)
129122
self.jobSubmissionDelay = self.am_getOption("SubmissionDelay", self.jobSubmissionDelay)
130123
self.fillingMode = self.am_getOption("FillingModeFlag", self.fillingMode)
131124
self.minimumTimeLeft = self.am_getOption("MinimumTimeLeft", self.minimumTimeLeft)
@@ -136,9 +129,6 @@ def initialize(self):
136129
self.logLevel = self.am_getOption("DefaultLogLevel", self.logLevel)
137130
self.defaultWrapperLocation = self.am_getOption("JobWrapperTemplate", self.defaultWrapperLocation)
138131

139-
# Utilities
140-
self.timeLeftUtil = TimeLeft()
141-
142132
# Some innerCEs may want to make use of CGroup2 support, so we prepare it globally here
143133
res = CG2Manager().setUp()
144134
if res["OK"]:
@@ -180,15 +170,17 @@ def execute(self):
180170
if result["OK"] and result["Value"]:
181171
return result
182172

183-
# Check that we are allowed to continue and that time left is sufficient
173+
# Update CPU work left: wall-clock is ticking whether a job is running or not
174+
cpuWorkLeft = self._computeCPUWorkLeft()
175+
result = self._setCPUWorkLeft(cpuWorkLeft)
176+
if not result["OK"]:
177+
return result
178+
179+
# After the first job, check filling mode eligibility
184180
if self.jobCount:
185-
cpuWorkLeft = self._computeCPUWorkLeft()
186181
result = self._checkCPUWorkLeft(cpuWorkLeft)
187182
if not result["OK"]:
188183
return result
189-
result = self._setCPUWorkLeft(cpuWorkLeft)
190-
if not result["OK"]:
191-
return result
192184

193185
# Get environment details and enhance them
194186
result = self._getCEDict(self.computingElement)
@@ -373,7 +365,7 @@ def _setCEDict(self, ceDict):
373365
ceDict["GridCE"] = gridCE
374366
if "PilotReference" not in ceDict:
375367
ceDict["PilotReference"] = str(self.pilotReference)
376-
ceDict["PilotBenchmark"] = self.cpuFactor
368+
ceDict["PilotBenchmark"] = self.cpuPower
377369
ceDict["PilotInfoReportedFlag"] = self.pilotInfoReportedFlag
378370

379371
# Add possible job requirements
@@ -403,28 +395,22 @@ def _checkCEAvailability(self, computingElement):
403395
return S_OK()
404396

405397
#############################################################################
406-
def _computeCPUWorkLeft(self, processors=1):
398+
def _computeCPUWorkLeft(self):
407399
"""
408-
Compute CPU Work Left in hepspec06 seconds
400+
Compute CPU Work Left in hepspec06 seconds.
401+
402+
Uses a simple wall-clock countdown from the initial value (seeded by the pilot
403+
via dirac-wms-get-queue-cpu-time). The elapsed wall-clock time is multiplied by
404+
the CPU normalization factor to get the consumed CPU work.
409405
410-
:param int processors: number of processors available
411406
:return: cpu work left (cpu time left * cpu power of the cpus)
412407
"""
413-
# Sum all times but the last one (elapsed_time) and remove times at init (is this correct?)
414-
cpuTimeConsumed = sum(os.times()[:-1]) - sum(self.initTimes[:-1])
415-
result = self.timeLeftUtil.getTimeLeft(cpuTimeConsumed, processors)
416-
if not result["OK"]:
417-
self.log.warn("There were errors calculating time left using the Timeleft utility", result["Message"])
418-
self.log.warn("The time left will be calculated using os.times() and the info in our possession")
419-
self.log.info(f"Current raw CPU time consumed is {cpuTimeConsumed}")
420-
if self.cpuFactor:
421-
return self.initTimeLeft - cpuTimeConsumed * self.cpuFactor
422-
return self.timeLeft
423-
return result["Value"]
408+
elapsed = time.time() - self.initTime
409+
cpuWorkConsumed = elapsed * self.cpuPower
410+
return self.initCPUWork - cpuWorkConsumed
424411

425412
def _checkCPUWorkLeft(self, cpuWorkLeft):
426413
"""Check that fillingMode is enabled and time left is sufficient to continue the execution"""
427-
# Only call timeLeft utility after a job has been picked up
428414
self.log.info("Attempting to check CPU time left for filling mode")
429415
if not self.fillingMode:
430416
return self._finish("Filling Mode is Disabled")
@@ -435,7 +421,7 @@ def _checkCPUWorkLeft(self, cpuWorkLeft):
435421
return S_OK()
436422

437423
def _setCPUWorkLeft(self, cpuWorkLeft):
438-
"""Update the TimeLeft within the CE and the configuration for next matching request"""
424+
"""Update the CPU work left within the CE and the configuration for next matching request"""
439425
self.timeLeft = cpuWorkLeft
440426

441427
result = self.computingElement.setCPUTimeLeft(cpuTimeLeft=self.timeLeft)

src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobAgent.py

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
from DIRAC.Core.Security.X509Chain import X509Chain # pylint: disable=import-error
1212

1313
from DIRAC import S_ERROR, S_OK, gLogger
14-
from DIRAC.Resources.Computing.BatchSystems.TimeLeft.TimeLeft import TimeLeft
1514
from DIRAC.Resources.Computing.ComputingElementFactory import ComputingElementFactory
1615
from DIRAC.Resources.Computing.test.Test_PoolComputingElement import badJobScript, jobScript
1716
from DIRAC.WorkloadManagementSystem.Agent.JobAgent import JobAgent
@@ -150,28 +149,27 @@ def test__checkCEAvailability(mocker, ceType, mockCEReply, expectedResult):
150149

151150

152151
@pytest.mark.parametrize(
153-
"initTimeLeft, timeLeft, cpuFactor, mockTimeLeftReply, expectedTimeLeft",
152+
"initCPUWork, cpuPower, elapsedSeconds, expectedTimeLeft",
154153
[
155-
(100000, 75000, None, {"OK": False, "Message": "Error"}, 75000),
156-
(100000, 75000, 10, {"OK": False, "Message": "Error"}, 100000),
157-
(100000, 75000, 10, {"OK": True, "Value": 25000}, 25000),
154+
# No CPU power: no work consumed, time left equals initial
155+
(100000, 0, 100, 100000),
156+
# With CPU power: elapsed * cpuPower is subtracted from initCPUWork
157+
(100000, 10, 100, 99000),
158+
# Longer elapsed time
159+
(100000, 10, 5000, 50000),
158160
],
159161
)
160-
def test__computeCPUWorkLeft(mocker, initTimeLeft, timeLeft, cpuFactor, mockTimeLeftReply, expectedTimeLeft):
162+
def test__computeCPUWorkLeft(mocker, initCPUWork, cpuPower, elapsedSeconds, expectedTimeLeft):
161163
"""Test JobAgent()._computeCPUWorkLeft()"""
162164
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.AgentModule.__init__")
163-
mocker.patch(
164-
"DIRAC.Resources.Computing.BatchSystems.TimeLeft.TimeLeft.TimeLeft.getTimeLeft", return_value=mockTimeLeftReply
165-
)
166165

167166
jobAgent = JobAgent("Test", "Test1")
168167
jobAgent.log = gLogger
169168
jobAgent.log.setLevel("DEBUG")
170-
jobAgent.timeLeftUtil = TimeLeft()
171169

172-
jobAgent.initTimeLeft = initTimeLeft
173-
jobAgent.timeLeft = timeLeft
174-
jobAgent.cpuFactor = cpuFactor
170+
jobAgent.initCPUWork = initCPUWork
171+
jobAgent.cpuPower = cpuPower
172+
jobAgent.initTime = time.time() - elapsedSeconds
175173
result = jobAgent._computeCPUWorkLeft()
176174

177175
assert abs(result - expectedTimeLeft) < 10

src/DIRAC/WorkloadManagementSystem/Client/CPUNormalization.py

Lines changed: 50 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,11 @@
1414

1515

1616
def getCPUTime(cpuNormalizationFactor):
17-
"""Trying to get CPUTime left for execution (in seconds).
17+
"""Compute the initial CPUTime left for execution (in seconds).
1818
19-
It will first look to get the work left looking for batch system information useing the TimeLeft utility.
20-
If it succeeds, it will convert it in real second, and return it.
21-
22-
If it fails, it tries to get it from the static info found in CS.
23-
If it fails, it returns the default, which is a large 9999999, that we may consider as "Infinite".
24-
25-
This is a generic method, independent from the middleware of the resource if TimeLeft doesn't return a value
19+
This is called at pilot bootstrap (via dirac-wms-get-queue-cpu-time) to seed
20+
the initial CPUTimeLeft value. It queries the batch system first, then falls
21+
back to static CS configuration.
2622
2723
args:
2824
cpuNormalizationFactor (float): the CPU power of the current Worker Node.
@@ -31,55 +27,58 @@ def getCPUTime(cpuNormalizationFactor):
3127
returns:
3228
cpuTimeLeft (int): the CPU time left, in seconds
3329
"""
34-
cpuTimeLeft = 0.0
35-
cpuWorkLeft = gConfig.getValue("/LocalSite/CPUTimeLeft", 0)
36-
37-
if not cpuWorkLeft:
38-
# Try and get the information from the CPU left utility
39-
result = TimeLeft().getTimeLeft()
40-
if result["OK"]:
41-
cpuWorkLeft = result["Value"]
42-
43-
if cpuWorkLeft > 0:
44-
# This is in HS06sseconds
45-
# We need to convert in real seconds
46-
if not cpuNormalizationFactor: # if cpuNormalizationFactor passed in is 0, try get it from the local cfg
30+
31+
# 1. Try to compute time left from the batch system (sacct, qstat, etc.)
32+
result = TimeLeft().getTimeLeft()
33+
if result["OK"]:
34+
cpuWorkLeft = result["Value"]
35+
# Batch system answered — trust it, even if 0
36+
if not cpuNormalizationFactor:
4737
cpuNormalizationFactor = gConfig.getValue("/LocalSite/CPUNormalizationFactor", 0.0)
4838
if cpuNormalizationFactor:
49-
cpuTimeLeft = cpuWorkLeft / cpuNormalizationFactor
39+
return int(cpuWorkLeft / cpuNormalizationFactor)
40+
return 0
5041

51-
if not cpuTimeLeft:
52-
# now we know that we have to find the CPUTimeLeft by looking in the CS
53-
# this is not granted to be correct as the CS units may not be real seconds
54-
gridCE = gConfig.getValue("/LocalSite/GridCE")
55-
ceQueue = gConfig.getValue("/LocalSite/CEQueue")
56-
if not ceQueue:
57-
# we have to look for a ceQueue in the CS
58-
# A bit hacky. We should better profit from something generic
59-
gLogger.warn("No CEQueue in local configuration, looking to find one in CS")
60-
siteName = DIRAC.siteName()
61-
queueSection = f"/Resources/Sites/{siteName.split('.')[0]}/{siteName}/CEs/{gridCE}/Queues"
62-
res = gConfig.getSections(queueSection)
63-
if not res["OK"]:
64-
raise RuntimeError(res["Message"])
65-
queues = res["Value"]
66-
cpuTimes = [gConfig.getValue(queueSection + "/" + queue + "/maxCPUTime", 9999999.0) for queue in queues]
67-
# These are (real, wall clock) minutes - damn BDII!
42+
cpuTimeLeft = 0.0
43+
44+
# 2. Fall back to queue configuration in the CS.
45+
# These values are wall-clock minutes from BDII, so we convert to seconds.
46+
gridCE = gConfig.getValue("/LocalSite/GridCE")
47+
ceQueue = gConfig.getValue("/LocalSite/CEQueue")
48+
if not ceQueue:
49+
# we have to look for a ceQueue in the CS
50+
# A bit hacky. We should better profit from something generic
51+
gLogger.warn("No CEQueue in local configuration, looking to find one in CS")
52+
siteName = DIRAC.siteName()
53+
queueSection = f"/Resources/Sites/{siteName.split('.')[0]}/{siteName}/CEs/{gridCE}/Queues"
54+
res = gConfig.getSections(queueSection)
55+
if not res["OK"]:
56+
raise RuntimeError(res["Message"])
57+
queues = res["Value"]
58+
cpuTimes = [gConfig.getValue(queueSection + "/" + queue + "/maxCPUTime", 0.0) for queue in queues]
59+
cpuTimes = [t for t in cpuTimes if t > 0]
60+
if cpuTimes:
6861
cpuTimeLeft = min(cpuTimes) * 60
62+
else:
63+
queueInfo = getQueueInfo(f"{gridCE}/{ceQueue}")
64+
if not queueInfo["OK"] or not queueInfo["Value"]:
65+
gLogger.warn("Can't find a CE/queue in CS")
6966
else:
70-
queueInfo = getQueueInfo(f"{gridCE}/{ceQueue}")
71-
cpuTimeLeft = 9999999.0
72-
if not queueInfo["OK"] or not queueInfo["Value"]:
73-
gLogger.warn("Can't find a CE/queue, defaulting CPUTime to %d" % cpuTimeLeft)
67+
queueCSSection = queueInfo["Value"]["QueueCSSection"]
68+
cpuTimeInMinutes = gConfig.getValue(f"{queueCSSection}/maxCPUTime", 0.0)
69+
if cpuTimeInMinutes:
70+
cpuTimeLeft = cpuTimeInMinutes * 60.0
71+
gLogger.info(f"CPUTime for {queueCSSection}: {cpuTimeLeft:f}")
7472
else:
75-
queueCSSection = queueInfo["Value"]["QueueCSSection"]
76-
# These are (real, wall clock) minutes - damn BDII!
77-
cpuTimeInMinutes = gConfig.getValue(f"{queueCSSection}/maxCPUTime", 0.0)
78-
if cpuTimeInMinutes:
79-
cpuTimeLeft = cpuTimeInMinutes * 60.0
80-
gLogger.info(f"CPUTime for {queueCSSection}: {cpuTimeLeft:f}")
81-
else:
82-
gLogger.warn(f"Can't find maxCPUTime for {queueCSSection}, defaulting CPUTime to {cpuTimeLeft:f}")
73+
gLogger.warn(f"Can't find maxCPUTime for {queueCSSection}")
74+
75+
if not cpuTimeLeft:
76+
# 3. Last resort: global default from CS, or 0 (fail safe: match no more jobs)
77+
cpuTimeLeft = gConfig.getValue("/Resources/Computing/CEDefaults/MaxCPUTime", 0)
78+
if cpuTimeLeft:
79+
gLogger.warn(f"Using fallback MaxCPUTime: {cpuTimeLeft}")
80+
else:
81+
gLogger.warn("Could not determine CPUTime left")
8382

8483
return int(cpuTimeLeft)
8584

0 commit comments

Comments
 (0)