77"""
88
99import re
10- import time
1110import threading
12-
11+ import time
1312from errno import ENOENT
1413
15- from DIRAC import gLogger , S_OK , S_ERROR
14+ from DIRAC import S_ERROR , S_OK , gLogger
15+ from DIRAC .ConfigurationSystem .Client .Helpers .Operations import Operations
1616from DIRAC .Core .Base .DB import DB
17- from DIRAC .Core .Utilities .DErrno import cmpError
18- from DIRAC .Resources .Catalog .FileCatalog import FileCatalog
1917from DIRAC .Core .Security .ProxyInfo import getProxyInfo
20- from DIRAC .Core .Utilities .List import stringListToString , intListToString , breakListIntoChunks
18+ from DIRAC .Core .Utilities .DErrno import cmpError
19+ from DIRAC .Core .Utilities .List import breakListIntoChunks , intListToString , stringListToString
2120from DIRAC .Core .Utilities .Shifter import setupShifterProxyInEnv
22- from DIRAC .ConfigurationSystem .Client .Helpers .Operations import Operations
2321from DIRAC .Core .Utilities .Subprocess import pythonCall
2422from DIRAC .DataManagementSystem .Client .MetaQuery import MetaQuery
23+ from DIRAC .Resources .Catalog .FileCatalog import FileCatalog
2524
2625MAX_ERROR_COUNT = 10
2726
@@ -242,7 +241,7 @@ def addTransformation(
242241 gLogger .error ("Could not insert files, now deleting" , res ["Message" ])
243242 return self .deleteTransformation (transID , connection = connection )
244243
245- # Add files to the DataFiles table
244+ # Add files to the TransformationFiles table
246245 catalog = FileCatalog ()
247246 if addFiles and inputMetaQuery :
248247 res = catalog .findFilesByMetadata (inputMetaQuery )
@@ -253,16 +252,7 @@ def addTransformation(
253252 gLogger .notice ("filesToAdd" , filesToAdd )
254253 if filesToAdd :
255254 connection = self .__getConnection (connection )
256- res = self .__addDataFiles (filesToAdd , connection = connection )
257- if not res ["OK" ]:
258- return res
259- lfnFileIDs = res ["Value" ]
260- # Add the files to the transformations
261- fileIDs = []
262- for lfn in filesToAdd :
263- if lfn in lfnFileIDs :
264- fileIDs .append (lfnFileIDs [lfn ])
265- res = self .__addFilesToTransformation (transID , fileIDs , connection = connection )
255+ res = self .__addFilesToTransformation (transID , filesToAdd , connection = connection )
266256 if not res ["OK" ]:
267257 gLogger .error ("Failed to add files to transformation" , f"{ transID } { res ['Message' ]} " )
268258 message = f"Created transformation { transID } "
@@ -558,7 +548,7 @@ def addFilesToTransformation(self, transName, lfns, connection=False):
558548 """Add a list of LFNs to the transformation directly"""
559549 gLogger .info (
560550 "TransformationDB.addFilesToTransformation:"
561- " Attempting to add %s files to transformations: %s" % ( len ( lfns ), transName )
551+ f" Attempting to add { len ( lfns ) } files to transformation { transName } "
562552 )
563553 if not lfns :
564554 return S_ERROR ("Zero length LFN list" )
@@ -567,10 +557,6 @@ def addFilesToTransformation(self, transName, lfns, connection=False):
567557 return res
568558 connection = res ["Value" ]["Connection" ]
569559 transID = res ["Value" ]["TransformationID" ]
570- # Add missing files if necessary (__addDataFiles does the job)
571- res = self .__addDataFiles (lfns , connection = connection )
572- if not res ["OK" ]:
573- return res
574560 fileIDs = {fileID : lfn for lfn , fileID in res ["Value" ].items ()}
575561 # Attach files to transformation
576562 successful = {}
@@ -724,7 +710,7 @@ def __addFilesToTransformation(self, transID, fileIDs, connection=False):
724710 if not fileIDs :
725711 return S_OK ([])
726712 values = [(transID , fileID ) for fileID in fileIDs ]
727- req = "INSERT INTO TransformationFiles (TransformationID,FileID ,LastUpdate,InsertedTime) VALUES (%s, %s, UTC_TIMESTAMP(), UTC_TIMESTAMP())"
713+ req = "INSERT INTO TransformationFiles (TransformationID,LFN ,LastUpdate,InsertedTime) VALUES (%s, %s, UTC_TIMESTAMP(), UTC_TIMESTAMP())"
728714 if not (res := self ._updatemany (req , values , conn = connection ))["OK" ]:
729715 return res
730716 return S_OK (fileIDs )
@@ -782,52 +768,19 @@ def __assignTransformationFile(self, transID, taskID, se, fileIDs, connection=Fa
782768 gLogger .error ("Failed to assign file to task" , res ["Message" ])
783769 return res
784770
785- def __setTransformationFileStatus (self , fileIDs , status , connection = False ):
786- req = f"UPDATE TransformationFiles SET Status = '{ status } ' WHERE FileID IN ({ intListToString (fileIDs )} );"
787- res = self ._update (req , conn = connection )
788- if not res ["OK" ]:
789- gLogger .error ("Failed to update file status" , res ["Message" ])
790- return res
791-
792- def __setTransformationFileUsedSE (self , fileIDs , usedSE , connection = False ):
793- req = f"UPDATE TransformationFiles SET UsedSE = '{ usedSE } ' WHERE FileID IN ({ intListToString (fileIDs )} );"
794- res = self ._update (req , conn = connection )
795- if not res ["OK" ]:
796- gLogger .error ("Failed to update file usedSE" , res ["Message" ])
797- return res
798-
799- def __resetTransformationFile (self , transID , taskID , connection = False ):
800- req = (
801- "UPDATE TransformationFiles SET TaskID=NULL, UsedSE='Unknown', Status='Unused'\
802- WHERE TransformationID = %d AND TaskID=%d;"
803- % (transID , taskID )
771+ def __resetTransformationFile (self , transformationID , taskID , connection = False ):
772+ query = (
773+ f"UPDATE TransformationFiles SET TaskID=NULL, UsedSE='Unknown', Status='Unused' "
774+ f"WHERE TransformationID = { transformationID } AND TaskID={ taskID } ;"
804775 )
805- res = self ._update (req , conn = connection )
806- if not res ["OK" ]:
807- gLogger .error ("Failed to reset transformation file" , res ["Message" ])
808- return res
776+ result = self ._update (query , conn = connection )
777+ if not result ["OK" ]:
778+ gLogger .error ("Failed to reset transformation file" , result ["Message" ])
779+ return result
809780
810781 def __deleteTransformationFiles (self , transID , connection = False ):
811- """Remove the files associated to a transformation.
812- It also tries to remove the associated DataFiles.
813- If these DataFiles are still used by other transformations, they
814- will be kept thanks to the ForeignKey constraint.
815- In the very unlikely event of removing a file that was juuuuuuuust about to be
816- used by another transformation, well, tough luck, but the other transformation
817- will succeed at the next attempt to insert the file.
818- """
819- # The IGNORE keyword will make sure we do not abort the full removal
820- # on a foreign key error
821- # https://dev.mysql.com/doc/refman/5.7/en/sql-mode.html#ignore-strict-comparison
822- req = (
823- "DELETE IGNORE tf, df \
824- FROM TransformationFiles tf \
825- JOIN DataFiles df \
826- ON tf.FileID=df.FileID \
827- WHERE TransformationID = %d;"
828- % transID
829- )
830- res = self ._update (req , conn = connection )
782+ """Remove the files associated to a transformation."""
783+ res = self ._update (f"DELETE FROM TransformationFiles WHERE TransformationID = { transID } ;" , conn = connection )
831784 if not res ["OK" ]:
832785 gLogger .error ("Failed to delete transformation files" , res ["Message" ])
833786 return res
@@ -1269,62 +1222,6 @@ def __deleteTransformationLog(self, transID, connection=False):
12691222 req = f"DELETE FROM TransformationLog WHERE TransformationID={ transID } "
12701223 return self ._update (req , conn = connection )
12711224
1272- ###########################################################################
1273- #
1274- # These methods manipulate the DataFiles table
1275- #
1276-
1277- def __getFileIDsForLfns (self , lfns , connection = False ):
1278- """Get file IDs for the given list of lfns
1279- warning: if the file is not present, we'll see no errors
1280- """
1281- req = f"SELECT LFN,FileID FROM DataFiles WHERE LFN in ({ stringListToString (lfns )} );"
1282- res = self ._query (req , conn = connection )
1283- if not res ["OK" ]:
1284- return res
1285- lfns = dict (res ["Value" ])
1286- # Reverse dictionary
1287- fids = {fileID : lfn for lfn , fileID in lfns .items ()}
1288- return S_OK ((fids , lfns ))
1289-
1290- def __getLfnsForFileIDs (self , fileIDs , connection = False ):
1291- """Get lfns for the given list of fileIDs"""
1292- req = f"SELECT LFN,FileID FROM DataFiles WHERE FileID in ({ stringListToString (fileIDs )} );"
1293- res = self ._query (req , conn = connection )
1294- if not res ["OK" ]:
1295- return res
1296- fids = dict (res ["Value" ])
1297- # Reverse dictionary
1298- lfns = {fileID : lfn for lfn , fileID in fids .items ()}
1299- return S_OK ((fids , lfns ))
1300-
1301- def __addDataFiles (self , lfns , connection = False ):
1302- """Add a file to the DataFiles table and retrieve the FileIDs"""
1303- res = self .__getFileIDsForLfns (lfns , connection = connection )
1304- if not res ["OK" ]:
1305- return res
1306- # Insert only files not found, and assume the LFN is unique in the table
1307- lfnFileIDs = res ["Value" ][1 ]
1308- for lfn in set (lfns ) - set (lfnFileIDs ):
1309- req = f"INSERT INTO DataFiles (LFN,Status) VALUES ('{ lfn } ','New');"
1310- res = self ._update (req , conn = connection )
1311- # If the LFN is duplicate we get an error and ignore it
1312- if res ["OK" ]:
1313- lfnFileIDs [lfn ] = res ["lastRowId" ]
1314- # If two transformations are adding files at the same time we will have missed some LFNs
1315- missedLfns = set (lfns ) - set (lfnFileIDs )
1316- if missedLfns :
1317- res = self .__getFileIDsForLfns (missedLfns , connection = connection )
1318- if not res ["OK" ]:
1319- return res
1320- lfnFileIDs .update (res ["Value" ][1 ])
1321- return S_OK (lfnFileIDs )
1322-
1323- def __setDataFileStatus (self , fileIDs , status , connection = False ):
1324- """Set the status of the supplied files"""
1325- req = f"UPDATE DataFiles SET Status = '{ status } ' WHERE FileID IN ({ intListToString (fileIDs )} );"
1326- return self ._update (req , conn = connection )
1327-
13281225 ###########################################################################
13291226 #
13301227 # These methods manipulate multiple tables
@@ -1514,21 +1411,6 @@ def _getConnectionTransID(self, connection, transName):
15141411 #
15151412 ####################################################################################
15161413
1517- def exists (self , lfns , connection = False ):
1518- """Check the presence of the lfn in the TransformationDB DataFiles table"""
1519- gLogger .info (f"TransformationDB.exists: Attempting to determine existence of { len (lfns )} files." )
1520- res = self .__getFileIDsForLfns (lfns , connection = connection )
1521- if not res ["OK" ]:
1522- return res
1523- fileIDs = res ["Value" ][0 ]
1524- failed = {}
1525- successful = {}
1526- fileIDsValues = set (fileIDs .values ())
1527- for lfn in lfns :
1528- successful [lfn ] = lfn in fileIDsValues
1529- resDict = {"Successful" : successful , "Failed" : failed }
1530- return S_OK (resDict )
1531-
15321414 def addFile (self , fileDicts , force = False , connection = False ):
15331415 """Add the supplied lfn to the Transformations and to the DataFiles table if it passes the filter"""
15341416 gLogger .info (f"TransformationDB.addFile: Attempting to add { len (fileDicts )} files." )
@@ -1575,34 +1457,6 @@ def addFile(self, fileDicts, force=False, connection=False):
15751457 res = S_OK ({"Successful" : successful , "Failed" : failed })
15761458 return res
15771459
1578- def removeFile (self , lfns , connection = False ):
1579- """Remove file specified by lfn from the ProcessingDB"""
1580- gLogger .info (f"TransformationDB.removeFile: Attempting to remove { len (lfns )} files." )
1581- failed = {}
1582- successful = {}
1583- connection = self .__getConnection (connection )
1584- if not lfns :
1585- return S_ERROR ("No LFNs supplied" )
1586- res = self .__getFileIDsForLfns (lfns , connection = connection )
1587- if not res ["OK" ]:
1588- return res
1589- fileIDs , lfnFilesIDs = res ["Value" ]
1590- for lfn in lfns :
1591- if lfn not in lfnFilesIDs :
1592- successful [lfn ] = "File does not exist"
1593- if fileIDs :
1594- res = self .__setTransformationFileStatus (list (fileIDs ), "Deleted" , connection = connection )
1595- if not res ["OK" ]:
1596- return res
1597- res = self .__setDataFileStatus (list (fileIDs ), "Deleted" , connection = connection )
1598- if not res ["OK" ]:
1599- return S_ERROR ("TransformationDB.removeFile: Failed to remove files." )
1600- for lfn in lfnFilesIDs :
1601- if lfn not in failed :
1602- successful [lfn ] = True
1603- resDict = {"Successful" : successful , "Failed" : failed }
1604- return S_OK (resDict )
1605-
16061460 def addDirectory (self , path , force = False ):
16071461 """Adds all the files stored in a given directory in file catalog"""
16081462 gLogger .info (f"TransformationDB.addDirectory: Attempting to populate { path } ." )
0 commit comments