Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/DIRAC/Interfaces/API/Job.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

from io import StringIO
from urllib.parse import quote
from pathlib import Path

from DIRAC import S_OK, gLogger
from DIRAC.Core.Base.API import API
Expand Down
58 changes: 55 additions & 3 deletions src/DIRAC/RequestManagementSystem/Client/ReqClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,24 @@
import random
import json
import datetime
from functools import cached_property

# # from DIRAC
from DIRAC import gLogger, S_OK, S_ERROR
from DIRAC.Core.Utilities.List import randomize, fromChar
from DIRAC.Core.Utilities.JEncode import strToIntDict
from DIRAC.Core.Utilities.DEncode import ignoreEncodeWarning
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
from DIRAC.Core.Utilities.ReturnValues import returnValueOrRaise
from DIRAC.ConfigurationSystem.Client import PathFinder
from DIRAC.Core.Base.Client import Client, createClient
from DIRAC.RequestManagementSystem.Client.Request import Request
from DIRAC.RequestManagementSystem.private.RequestValidator import RequestValidator
from DIRAC.WorkloadManagementSystem.Client import JobStatus
from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus
from DIRAC.WorkloadManagementSystem.Client.JobStateUpdateClient import JobStateUpdateClient
from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient
from DIRAC.WorkloadManagementSystem.Client.JobStateUpdateClient import JobStateUpdateClient
from DIRAC.WorkloadManagementSystem.Utilities.JobStatusUtility import JobStatusUtility


@createClient("RequestManagement/ReqManager")
Expand Down Expand Up @@ -309,8 +313,7 @@ def finalizeRequest(self, requestID, jobID, useCertificates=True):
:param str requestID: request id
:param int jobID: job id
"""

stateServer = JobStateUpdateClient(useCertificates=useCertificates)
stateServer = _JobDBInteraction(useCertificates)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're going to run into the same issue with the JobMonitoringClient below.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know but that is used in multiple places so I thought it was better to focus on one client at a time.


# Checking if to update the job status - we should fail here, so it will be re-tried later
# Checking the state, first
Expand Down Expand Up @@ -688,3 +691,52 @@ def recoverableRequest(request):
return False
return True
return True


class _JobDBInteraction:
"""Class to handle the JobDB interaction.

This will either connect to the DB directly or use the client depending on
if use_certificates is set or not.

WARNING: This is not intended for use outside of ReqClient!
"""

def __init__(self, useCertificates: bool):
self._useCertificates = useCertificates

def setJobParameter(self, jobID: int, key: str, value: str):
if self._useCertificates:
vo = returnValueOrRaise(self._jobStatusUtility.jobDB.getJobAttribute(jobID, "VO"))
return self._elasticJobParametersDB.setJobParameter(int(jobID), key, value, vo=vo)
else:
return self._client.setJobParameter(jobID, key, value)
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To draw attention to this, I think JobStateUpdateClient().setJobParameter no longer works with a host certificate as the VO isn't knonw.

cc @fstagni


def setJobStatus(self, jobID: int, newStatus: str, minorStatus: str, source: str):
if self._useCertificates:
return self._jobStatusUtility.setJobStatus(
int(jobID), status=newStatus, minorStatus=minorStatus, source=source
)
else:
return self._client.setJobStatus(jobID, minorStatus, minorStatus, source)

def setJobApplicationStatus(self, jobID: int, appStatus: str, source: str):
if self._useCertificates:
return self._jobStatusUtility.setJobStatus(int(jobID), appStatus=appStatus, source=source)
else:
return self._client.setJobApplicationStatus(jobID, appStatus, source)

@cached_property
def _client(self):
return JobStateUpdateClient(useCertificates=False)

@cached_property
def _jobStatusUtility(self):
return JobStatusUtility()

@cached_property
def _elasticJobParametersDB(self):
result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobParametersDB", "JobParametersDB")
if not result["OK"]:
return result
return result["Value"]()
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from DIRAC.DataManagementSystem.Client.DataManager import DataManager
from DIRAC.TransformationSystem.Utilities.JobInfo import JobInfo
from DIRAC.WorkloadManagementSystem.Client import JobStatus
from DIRAC.WorkloadManagementSystem.Client.JobStateUpdateClient import JobStateUpdateClient
from DIRAC.WorkloadManagementSystem.Utilities.JobStatusUtility import JobStatusUtility


class TransformationInfo:
Expand All @@ -26,7 +26,7 @@ def __init__(self, transformationID, transInfoDict, enabled, tClient, fcClient,
self.transType = transInfoDict["Type"]
self.author = transInfoDict["Author"]
self.authorGroup = transInfoDict["AuthorGroup"]
self.jobStateClient = JobStateUpdateClient()
self.jobStatusUtility = JobStatusUtility()

def checkTasksStatus(self):
"""Check the status for the task of given transformation and taskID"""
Expand Down Expand Up @@ -97,7 +97,9 @@ def __updateJobStatus(self, jobID, status, minorstatus=""):
"""Update the job status."""
if self.enabled:
source = "DataRecoveryAgent"
result = self.jobStateClient.setJobStatus(jobID, status, minorstatus, source, None, True)
result = self.jobStatusUtility.setJobStatus(
int(jobID), status=status, minorStatus=minorstatus, source=source, dateTime=None, force=True
)
else:
return S_OK("DisabledMode")
if not result["OK"]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def tiFixture():
tMock.setFileStatusForTransformation = Mock(name="setFileStat")
fcMock = Mock(name="fcMock", spec=DIRAC.Resources.Catalog.FileCatalogClient.FileCatalogClient)
jmMock = Mock(name="jobMonMock", spec=DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient.JobMonitoringClient)
jsucMock = Mock(name="jsuc", spec=DIRAC.WorkloadManagementSystem.Client.JobStateUpdateClient.JobStateUpdateClient)
jsucMock = Mock(name="jsuc", spec=DIRAC.WorkloadManagementSystem.Utilities.JobStatusUtility.JobStatusUtility)
transInfoDict = dict(
TransformationID=1234,
TransformationName="TestProd12",
Expand All @@ -66,7 +66,7 @@ def tiFixture():
transformationID=1234, transInfoDict=transInfoDict, enabled=False, tClient=tMock, fcClient=fcMock, jobMon=jmMock
)
tri.log = Mock(name="LogMock")
tri.jobStateClient = jsucMock
tri.jobStatusUtility = jsucMock
return tri


Expand Down
Loading