Skip to content

Commit a28b3fc

Browse files
author
Robin VAN DE MERGHEL
committed
feat: Modified legacy adaptor for PilotManager to make it work.
1 parent 8946a65 commit a28b3fc

File tree

5 files changed

+239
-121
lines changed

5 files changed

+239
-121
lines changed

src/DIRAC/WorkloadManagementSystem/Client/PilotManagerClient.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,17 @@
44
from DIRAC.Core.Base.Client import Client, createClient
55

66

7+
from DIRAC.WorkloadManagementSystem.FutureClient.PilotManagerClient import (
8+
PilotManagerClient as futurePilotManagerClient,
9+
)
10+
11+
712
@createClient("WorkloadManagement/PilotManager")
813
class PilotManagerClient(Client):
914
"""PilotManagerClient sets url for the PilotManagerHandler."""
1015

16+
diracxClient = futurePilotManagerClient
17+
1118
def __init__(self, url=None, **kwargs):
1219
"""
1320
Sets URL for PilotManager handler
Lines changed: 97 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,20 @@
11
from DIRAC.Core.Utilities.ReturnValues import convertToReturnValue
2+
from DIRAC.Core.Security.DiracX import DiracXClient, FutureClient
23

3-
from DIRAC.Core.Security.DiracX import DiracXClient
44

5-
6-
class PilotManagerClient:
5+
class PilotManagerClient(FutureClient):
76
@convertToReturnValue
87
def addPilotReferences(self, pilot_stamps, VO, gridType="DIRAC", pilot_references={}):
98
with DiracXClient() as api:
109
# We will move toward a stamp as identifier for the pilot
1110
return api.pilots.add_pilot_stamps(
12-
{"pilot_stamps": pilot_stamps, "vo": VO, "grid_type": gridType, "pilot_references": pilot_references}
11+
{"pilot_stamps": pilot_stamps, "vo": VO, "grid_type": gridType, "pilot_references": pilot_references} # type: ignore
1312
)
1413

1514
def set_pilot_field(self, pilot_stamp, values_dict):
1615
with DiracXClient() as api:
1716
values_dict["PilotStamp"] = pilot_stamp
18-
return api.pilots.update_pilot_fields(values_dict)
19-
20-
@convertToReturnValue
21-
def setPilotBenchmark(self, pilotStamp, mark):
22-
return self.set_pilot_field(pilotStamp, {"BenchMark": mark})
23-
24-
@convertToReturnValue
25-
def setAccountingFlag(self, pilotStamp, flag):
26-
return self.set_pilot_field(pilotStamp, {"AccountingSent": flag})
17+
return api.pilots.update_pilot_fields({"pilot_stamps_to_fields_mapping": [values_dict]}) # type: ignore
2718

2819
@convertToReturnValue
2920
def setPilotStatus(self, pilot_stamp, status, destination=None, reason=None, grid_site=None, queue=None):
@@ -44,13 +35,74 @@ def clearPilots(self, interval=30, aborted_interval=7):
4435
api.pilots.delete_pilots(age_in_days=interval, delete_only_aborted=False)
4536
api.pilots.delete_pilots(age_in_days=aborted_interval, delete_only_aborted=True)
4637

38+
@convertToReturnValue
39+
def deletePilot(self, pilot_stamp):
40+
with DiracXClient() as api:
41+
pilot_stamps = [pilot_stamp]
42+
return api.pilots.delete_pilots(pilot_stamps=pilot_stamps)
43+
44+
@convertToReturnValue
45+
def getJobsForPilotByStamp(self, pilotStamp):
46+
with DiracXClient() as api:
47+
return api.pilots.get_pilot_jobs(pilot_stamp=pilotStamp)
48+
49+
@convertToReturnValue
50+
def getPilots(self, job_id):
51+
with DiracXClient() as api:
52+
pilot_ids = api.pilots.get_pilot_jobs(job_id=job_id)
53+
search = [{"parameter": "PilotID", "operator": "in", "value": pilot_ids}]
54+
return api.pilots.search(parameters=[], search=search, sort=[]) # type: ignore
55+
56+
@convertToReturnValue
57+
def getPilotInfo(self, pilot_reference):
58+
"""Important: We assume that to one stamp is mapped one pilot."""
59+
with DiracXClient() as api:
60+
search = [{"parameter": "PilotJobReference", "operator": "eq", "value": pilot_reference}]
61+
pilot = api.pilots.search(parameters=[], search=search, sort=[])[0] # type: ignore
62+
63+
if not pilot:
64+
# Return an error as in the legacy code
65+
return []
66+
67+
# Convert all bools in pilot to str
68+
for k, v in pilot.items():
69+
if isinstance(v, bool):
70+
pilot[k] = str(v)
71+
72+
# Transform the list of pilots into a dict keyed by PilotJobReference
73+
resDict = {}
74+
75+
pilotRef = pilot.get("PilotJobReference", None)
76+
assert pilot_reference == pilotRef
77+
pilotStamp = pilot.get("PilotStamp", None)
78+
79+
if pilotRef is not None:
80+
resDict[pilotRef] = pilot
81+
else:
82+
# Fallback: use PilotStamp or another key if PilotJobReference is missing
83+
resDict[pilotStamp] = pilot
84+
85+
jobIDs = self.getJobsForPilotByStamp(pilotStamp)
86+
if jobIDs: # Only add if jobs exist
87+
for pilotRef, pilotInfo in resDict.items():
88+
pilotInfo["Jobs"] = jobIDs # Attach the entire list
89+
90+
return resDict
91+
92+
@convertToReturnValue
93+
def getGroupedPilotSummary(self, column_list):
94+
with DiracXClient() as api:
95+
return api.pilots.summary(grouping=column_list)
96+
4797
@convertToReturnValue
4898
def deletePilots(self, pilot_stamps):
99+
# Used by no one, but we won't raise `UnimplementedError` because we still use it in tests.
49100
with DiracXClient() as api:
50101
pilot_ids = None
51-
if isinstance(pilot_stamps, list[int]):
52-
# Multiple elements (int)
53-
pilot_ids = pilot_stamps # Semantic
102+
if pilot_stamps and isinstance(pilot_stamps, list):
103+
if isinstance(pilot_stamps[0], int):
104+
# Multiple elements (int)
105+
pilot_ids = pilot_stamps # Semantic
54106
elif isinstance(pilot_stamps, int):
55107
# Only one element (int)
56108
pilot_ids = [pilot_stamps]
@@ -61,37 +113,45 @@ def deletePilots(self, pilot_stamps):
61113

62114
if pilot_ids:
63115
# If we have defined pilot_ids, then we have to change them to pilot_stamps
64-
query = [{"parameter": "PilotID", "operator": "in", "value": pilot_ids}]
116+
search = [{"parameter": "PilotID", "operator": "in", "value": pilot_ids}]
65117

66-
pilots = api.pilots.search(parameters=["PilotStamp"], search=query, sort=[])
118+
pilots = api.pilots.search(parameters=["PilotStamp"], search=search, sort=[]) # type: ignore
67119
pilot_stamps = [pilot["PilotStamp"] for pilot in pilots]
68120

69-
api.pilots.delete_pilots(pilot_stamps=pilot_stamps)
121+
return api.pilots.delete_pilots(pilot_stamps=pilot_stamps) # type: ignore
70122

71123
@convertToReturnValue
72124
def setJobForPilot(self, job_id, pilot_stamp, destination=None):
73-
with DiracXClient() as api:
74-
api.pilots.add_jobs_to_pilot({"pilot_stamp": pilot_stamp, "job_ids": [job_id]})
75-
76-
self.set_pilot_field(
77-
pilot_stamp,
78-
{
79-
"DestinationSite": destination,
80-
},
81-
)
125+
raise NotImplementedError(
126+
"This function is used by no one. I won't be adapted into DiracX via a legacy adaptor."
127+
)
82128

83129
@convertToReturnValue
84-
def getPilots(self, job_id):
85-
with DiracXClient() as api:
86-
pilot_ids = api.pilots.get_pilot_jobs(job_id=job_id)
130+
def countPilots(self, condDict, older=None, newer=None):
131+
raise NotImplementedError(
132+
"This function is used by no one. I won't be adapted into DiracX via a legacy adaptor."
133+
)
87134

88-
query = [{"parameter": "PilotID", "operator": "in", "value": pilot_ids}]
135+
@convertToReturnValue
136+
def selectPilots(self, condDict):
137+
raise NotImplementedError(
138+
"This function is used by no one. I won't be adapted into DiracX via a legacy adaptor."
139+
)
89140

90-
return api.pilots.search(parameters=[], search=query, sort=[])
141+
@convertToReturnValue
142+
def getCurrentPilotCounters(self, attrDict={}):
143+
raise NotImplementedError(
144+
"This function is used by no one. I won't be adapted into DiracX via a legacy adaptor."
145+
)
91146

92147
@convertToReturnValue
93-
def getPilotInfo(self, pilot_stamp):
94-
with DiracXClient() as api:
95-
query = [{"parameter": "PilotStamp", "operator": "eq", "value": pilot_stamp}]
148+
def setPilotBenchmark(self, pilotRef, mark):
149+
raise NotImplementedError(
150+
"This function is used by no one. I won't be adapted into DiracX via a legacy adaptor."
151+
)
96152

97-
return api.pilots.search(parameters=[], search=query, sort=[])
153+
@convertToReturnValue
154+
def setAccountingFlag(self, pilotRef, flag="True"):
155+
raise NotImplementedError(
156+
"This function is used by no one. I won't be adapted into DiracX via a legacy adaptor."
157+
)

src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py

Lines changed: 51 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -38,38 +38,7 @@ def initializeHandler(cls, serviceInfoDict):
3838
return S_OK()
3939

4040
##############################################################################
41-
types_getCurrentPilotCounters = [dict]
42-
43-
@classmethod
44-
def export_getCurrentPilotCounters(cls, attrDict={}):
45-
"""Get pilot counters per Status with attrDict selection. Final statuses are given for
46-
the last day.
47-
"""
48-
49-
result = cls.pilotAgentsDB.getCounters("PilotAgents", ["Status"], attrDict, timeStamp="LastUpdateTime")
50-
if not result["OK"]:
51-
return result
52-
last_update = datetime.datetime.utcnow() - TimeUtilities.day
53-
resultDay = cls.pilotAgentsDB.getCounters(
54-
"PilotAgents", ["Status"], attrDict, newer=last_update, timeStamp="LastUpdateTime"
55-
)
56-
if not resultDay["OK"]:
57-
return resultDay
58-
59-
resultDict = {}
60-
for statusDict, count in result["Value"]:
61-
status = statusDict["Status"]
62-
resultDict[status] = count
63-
if status in PilotStatus.PILOT_FINAL_STATES:
64-
resultDict[status] = 0
65-
for statusDayDict, ccount in resultDay["Value"]:
66-
if status == statusDayDict["Status"]:
67-
resultDict[status] = ccount
68-
break
69-
70-
return S_OK(resultDict)
7141

72-
##############################################################################
7342
types_getPilotOutput = [str]
7443

7544
def export_getPilotOutput(self, pilotReference):
@@ -198,15 +167,7 @@ def _getRemotePilotOutput(self, pilotReference, pilotDict):
198167
return res
199168

200169
##############################################################################
201-
types_selectPilots = [dict]
202170

203-
# Won't be moved to DiracX: not used at all anywhere.
204-
@classmethod
205-
def export_selectPilots(cls, condDict):
206-
"""Select pilots given the selection conditions"""
207-
return cls.pilotAgentsDB.selectPilots(condDict)
208-
209-
##############################################################################
210171
types_storePilotOutput = [str, str, str]
211172

212173
@classmethod
@@ -263,6 +224,19 @@ def export_getPilotSummary(cls, startdate="", enddate=""):
263224
return cls.pilotAgentsDB.getPilotSummary(startdate, enddate)
264225

265226
##############################################################################
227+
228+
# --------------- Moved to DiracX ---------------
229+
230+
types_selectPilots = [dict]
231+
232+
@classmethod
233+
def export_selectPilots(cls, condDict):
234+
"""Select pilots given the selection conditions"""
235+
# Used by no one
236+
return cls.pilotAgentsDB.selectPilots(condDict)
237+
238+
#############################################
239+
266240
types_getGroupedPilotSummary = [list]
267241

268242
@classmethod
@@ -280,11 +254,41 @@ def export_getGroupedPilotSummary(cls, columnList):
280254

281255
@classmethod
282256
def export_countPilots(cls, condDict, older=None, newer=None, timeStamp="SubmissionTime"):
283-
"""Set the pilot agent status"""
284-
257+
"""Count pilots"""
258+
# Used by no one
285259
return cls.pilotAgentsDB.countPilots(condDict, older, newer, timeStamp)
286260

287-
# --------------- Moved to DiracX ---------------
261+
types_getCurrentPilotCounters = [dict]
262+
263+
@classmethod
264+
def export_getCurrentPilotCounters(cls, attrDict={}):
265+
"""Get pilot counters per Status with attrDict selection. Final statuses are given for
266+
the last day.
267+
"""
268+
# Used by no one
269+
270+
result = cls.pilotAgentsDB.getCounters("PilotAgents", ["Status"], attrDict, timeStamp="LastUpdateTime")
271+
if not result["OK"]:
272+
return result
273+
last_update = datetime.datetime.utcnow() - TimeUtilities.day
274+
resultDay = cls.pilotAgentsDB.getCounters(
275+
"PilotAgents", ["Status"], attrDict, newer=last_update, timeStamp="LastUpdateTime"
276+
)
277+
if not resultDay["OK"]:
278+
return resultDay
279+
280+
resultDict = {}
281+
for statusDict, count in result["Value"]:
282+
status = statusDict["Status"]
283+
resultDict[status] = count
284+
if status in PilotStatus.PILOT_FINAL_STATES:
285+
resultDict[status] = 0
286+
for statusDayDict, ccount in resultDay["Value"]:
287+
if status == statusDayDict["Status"]:
288+
resultDict[status] = ccount
289+
break
290+
291+
return S_OK(resultDict)
288292

289293
#############################################
290294
types_addPilotReferences = [list, str]
@@ -304,6 +308,7 @@ def export_addPilotReferences(cls, pilotStamps, VO, gridType="DIRAC", pilotRefDi
304308
@classmethod
305309
def export_setJobForPilot(cls, jobID, pilotRef, destination=None):
306310
"""Report the DIRAC job ID which is executed by the given pilot job"""
311+
# Used by no one.
307312

308313
result = cls.pilotAgentsDB.setJobForPilot(int(jobID), pilotRef)
309314
if not result["OK"]:
@@ -322,14 +327,16 @@ def export_setJobForPilot(cls, jobID, pilotRef, destination=None):
322327
@classmethod
323328
def export_setPilotBenchmark(cls, pilotRef, mark):
324329
"""Set the pilot agent benchmark"""
330+
# Used by no one.
325331
return cls.pilotAgentsDB.setPilotBenchmark(pilotRef, mark)
326332

327333
#############################################
328334
types_setAccountingFlag = [str]
329335

330336
@classmethod
331-
def export_setAccountingFlag(cls, pilotRef, mark="True"):
337+
def export_setAccountingFlag(cls, pilotRef, flag="True"):
332338
"""Set the pilot AccountingSent flag"""
339+
# Used by no one
333340
return cls.pilotAgentsDB.setAccountingFlag(pilotRef, mark)
334341

335342
#############################################
@@ -348,6 +355,7 @@ def export_setPilotStatus(cls, pilotRef, status, destination=None, reason=None,
348355

349356
@classmethod
350357
def export_deletePilots(cls, pilotIDs):
358+
# Used by no one.
351359
if isinstance(pilotIDs, str):
352360
return cls.pilotAgentsDB.deletePilot(pilotIDs)
353361

tests/.dirac-ci-config.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ config:
3939
}"
4040
PILOT_DOWNLOAD_COMMAND: "git clone --single-branch --branch master https://github.com/DIRACGrid/Pilot.git && mv Pilot/Pilot/*.py . && rm -rf Pilot"
4141

42+
# ^^^^^^^ TO MODIFY AHHAAHAH
43+
4244
# List of feature variables which must be passed when preparing
4345
required-feature-flags: []
4446

0 commit comments

Comments
 (0)