diff --git a/Unified/checkor.py b/Unified/checkor.py index 573e2fd1a..4615c5456 100755 --- a/Unified/checkor.py +++ b/Unified/checkor.py @@ -1,11 +1,5 @@ #!/usr/bin/env python -from assignSession import * -from utils import getWorkflows, workflowInfo, getDatasetEventsAndLumis, getDatasetEventsPerLumi, siteInfo, campaignInfo, getWorkflowById, forceComplete, getDatasetSize, sendLog, reqmgr_url, dbs_url, dbs_url_writer, display_time, checkMemory, ThreadHandler, wtcInfo -from utils import componentInfo, unifiedConfiguration, userLock, moduleLock, dataCache, unified_url, getDatasetLumisAndFiles, getDatasetRuns, duplicateAnalyzer, invalidateFiles, findParent, do_html_in_each_module, getDatasetFileArray -import dbs3Client -dbs3Client.dbs3_url = dbs_url -dbs3Client.dbs3_url_writer = dbs_url_writer -import reqMgrClient + import json from collections import defaultdict import optparse @@ -14,20 +8,53 @@ import time import random import math -from RucioClient import RucioClient -from McMClient import McMClient -from JIRAClient import JIRAClient +import threading + +from assignSession import * from htmlor import htmlor -from utils import sendEmail -from utils import closeoutInfo from showError import parse_one, showError_options -import threading -import sys + from RucioClient import RucioClient +from McMClient import McMClient +from JIRAClient import JIRAClient +import dbs3Client +import reqMgrClient + +from utils import getWorkflows,\ + workflowInfo,\ + getDatasetEventsAndLumis, \ + siteInfo, \ + campaignInfo, \ + getWorkflowById, \ + forceComplete, \ + sendLog, \ + reqmgr_url, \ + dbs_url, \ + dbs_url_writer, \ + checkMemory,\ + ThreadHandler,\ + wtcInfo,\ + componentInfo, \ + unifiedConfiguration, \ + userLock, \ + moduleLock, \ + dataCache, \ + unified_url, \ + getDatasetLumisAndFiles,\ + getDatasetRuns,\ + duplicateAnalyzer,\ + invalidateFiles, \ + findParent, \ + do_html_in_each_module, \ + getDatasetFileArray,\ + sendEmail, \ + closeoutInfo + +dbs3Client.dbs3_url = dbs_url +dbs3Client.dbs3_url_writer = dbs_url_writer + def get_campaign(output, wfi): ## this should be a perfect matching of output->task->campaign - campaign = None - era = None wf_campaign = None if 'Campaign' in wfi.request: wf_campaign = wfi.request['Campaign'] try: @@ -43,27 +70,10 @@ def get_campaign(output, wfi): def getDatasetFiles(url, dataset ,without_invalid=True ): - # VK TODO: can be replaced with list of files API - # JRV: done through getDatasetFileArray files = getDatasetFileArray( dataset, validFileOnly=without_invalid, detail=True) dbs_filenames = [f['logical_file_name'] for f in files] - - #conn = make_x509_conn(url) - #conn = httplib.HTTPSConnection(url, cert_file = os.getenv('X509_USER_PROXY'), key_file = os.getenv('X509_USER_PROXY')) - - #r1=conn.request("GET",'/phedex/datasvc/json/prod/filereplicas?dataset=%s'%(dataset)) - #r2=conn.getresponse() - #result = json.loads(r2.read()) - #items=result['phedex']['block'] - #phedex_filenames = [] - #for block in items: - # for f in block['file']: - # phedex_filenames.append(f['name']) - rucioClient = RucioClient() rucio_filenames = rucioClient.getFileNamesDataset(dataset) - - #return dbs_filenames, phedex_filenames, list(set(dbs_filenames) - set(phedex_filenames)), list(set(phedex_filenames)-set(dbs_filenames)) return dbs_filenames, rucio_filenames, list(set(dbs_filenames) - set(rucio_filenames)), list(set(rucio_filenames)-set(dbs_filenames)) def checkor(url, spec=None, options=None): @@ -71,18 +81,13 @@ def checkor(url, spec=None, options=None): mlock = moduleLock(locking=False) ml=mlock() - fDB = closeoutInfo() - UC = unifiedConfiguration() - - use_mcm = True up = componentInfo(soft=['mcm','wtc']) if not up.check(): return use_mcm = up.status['mcm'] - now_s = time.mktime(time.gmtime()) def time_point(label="",sub_lap=False, percent=None): now = time.mktime(time.gmtime()) nows = time.asctime(time.gmtime()) @@ -134,8 +139,6 @@ def time_point(label="",sub_lap=False, percent=None): wfs.extend( these ) - custodials = defaultdict(list) #sites : dataset list - #transfers = defaultdict(list) #sites : dataset list invalidations = [] #a list of files SI = siteInfo() CI = campaignInfo() @@ -208,16 +211,7 @@ def time_point(label="",sub_lap=False, percent=None): ## remove empty entries ... bypasses = filter(None, bypasses) - - #pattern_fraction_pass = UC.get('pattern_fraction_pass') - #cumulative_fraction_pass = UC.get('cumulative_fraction_pass') - #timeout_for_damping_fraction = UC.get('damping_fraction_pass') - #damping_time = UC.get('damping_fraction_pass_rate') - #damping_fraction_pass_max = float(UC.get('damping_fraction_pass_max')/ 100.) - #acdc_rank_for_truncate = UC.get('acdc_rank_for_truncate') - random.shuffle( wfs ) - in_manual = 0 ## now you have a record of what file was invalidated globally from TT @@ -241,19 +235,13 @@ def rank( wfn ): if options.update: random.shuffle( wfs ) wfs = wfs[:max_per_round] - total_running_time = 1.*60. will_do_that_many = len(wfs) - - ## record all evolution - full_picture = defaultdict(dict) - report_created = 0 checkers = [] for iwfo,wfo in enumerate(wfs): ## do the check other one workflow if spec and not (spec in wfo.name): continue - if not spec and ('cmsunified_task_HIG-RunIIFall17wmLHEGS-05036__v1_T_200712_005621_4159'.lower() in (wfo.name).lower() or 'pdmvserv_task_HIG-RunIISummer16NanoAODv7-03979__v1_T_200915_013748_1986'.lower() in (wfo.name).lower()): continue checkers.append( CheckBuster( will_do_that_many = will_do_that_many, url = url, @@ -311,9 +299,6 @@ def rank( wfn ): for pid in to.pids: mcm.delete('/restapi/requests/forcecomplete/%s'%pid) - if to.custodials: - for site,items in to.custodials.items(): - custodials[site].extend( items ) n_wfs = len(run_threads.threads) if n_wfs and float(failed_threads/n_wfs) > 0: sendLog('checkor','%d/%d threads have failed, better check this out'% (failed_threads, n_wfs), level='critical') @@ -346,8 +331,6 @@ def rank( wfn ): for wfo in session.query(Workflow).filter(Workflow.status.startswith('assistance')).all(): count_statuses[wfo.status]+=1 some_details +='\n'.join(['%3d in status %s'%( count_statuses[st], st ) for st in sorted(count_statuses.keys())]) - #sendLog('checkor',"Fresh status are available at %s/assistance.html\n%s"%(unified_url, some_details)) - #sendEmail("fresh assistance status available","Fresh status are available at %s/assistance.html\n%s"%(unified_url, some_details),destination=['katherine.rozo@cern.ch']) pass print "File Invalidation" @@ -359,9 +342,6 @@ def rank( wfn ): sendEmail('checkor','Checkor loop was shortened artificially using .checkor_stop') os.system('rm -f .checkor_stop') - - - class CheckBuster(threading.Thread): def __init__(self, **args): threading.Thread.__init__(self) @@ -375,19 +355,13 @@ def __init__(self, **args): self.force_by_mcm = None self.pids = None self.report_created = 0 - self.custodials = defaultdict(list) self.failed = False ## need to find a way to redirect the printouts - #self.log_file = '%s/%s.checkout'%(cache_dir,self.wfo.name) def run(self): try: - #with open(self.log_file, 'w') as sys.stdout: self.check() except Exception as e: - #print "failed on", self.wfo.name - #print "due to" - #print str(e) ## there should be a warning at this point import traceback sendLog('checkor','failed on %s due to %s and %s'%( self.wfo.name, str(e), traceback.format_exc()), level='critical') @@ -402,7 +376,6 @@ def check(self): UC = self.UC CI = self.CI - SI = self.SI JC = self.JC url = self.url @@ -454,7 +427,6 @@ def time_point(label="",sub_lap=False, percent=None): usage = checkMemory() print "memory so far",usage - ## get info wfi = workflowInfo(url, wfo.name) wfi.sendLog('checkor',"checking on %s %s"%( wfo.name,wfo.status)) ## make sure the wm status is up to date. @@ -486,14 +458,9 @@ def time_point(label="",sub_lap=False, percent=None): wfi.sendLog('checkor',"no need to check on %s in status %s"%(wfo.name, wfo.wm_status)) return - - #session.commit() - #sub_assistance="" # if that string is filled, there will be need for manual assistance existing_assistance_tags = set(wfo.status.split('-')[1:]) #[0] should be assistance assistance_tags = set() - is_closing = True - stop_duplicate_check = False ## get it from somewhere bypass_checks = False @@ -528,8 +495,7 @@ def time_point(label="",sub_lap=False, percent=None): completed_log = filter(lambda change : change["Status"] in ["completed"],wfi.request['RequestTransition']) delay = (now_s - completed_log[-1]['UpdateTime']) / (60.*60.*24.) if completed_log else 0 ## in days completed_delay = delay ## this is for the workflow itself - #onhold_completed_delay = delay - onhold_completed_delay = min_completed_delays ## this is for any workflows (itself, and ACDC) + onhold_completed_delay = min_completed_delays ## this is for any workflows (itself, and ACDC) onhold_timeout = UC.get('onhold_timeout') if '-onhold' in wfo.status: @@ -553,7 +519,6 @@ def time_point(label="",sub_lap=False, percent=None): tiers_with_no_check = copy.deepcopy(UC.get('tiers_with_no_check')) # dqm* vetoed_custodial_tier = copy.deepcopy(UC.get('tiers_with_no_custodial')) if not wfi.isRelval() else [] #no veto for relvals - to_ddm_tier = copy.deepcopy(UC.get('tiers_to_DDM')) campaigns = {} ## this mapping of campaign per output dataset assumes era==campaing, which is not true for relval expected_outputs = copy.deepcopy( wfi.request['OutputDatasets'] ) @@ -602,7 +567,6 @@ def time_point(label="",sub_lap=False, percent=None): if member['RequestName'] == wfo.name: continue if member['RequestDate'] < wfi.request['RequestDate']: continue if member['PrepID'] != wfi.request['PrepID'] : continue - #if 'OriginalRequestName' in member and (not 'ACDC' in member['OriginalRequestName']) and member['OriginalRequestName'] != wfo.name: continue if member['RequestStatus'] == None: continue if not set(member['OutputDatasets']).issubset( set(expected_outputs)): @@ -634,7 +598,6 @@ def time_point(label="",sub_lap=False, percent=None): forced_already=True elif member['RequestStatus'] in ['failed']: acdc_failed.append( member['RequestName'] ) - #set this or not assistance_tags.add('inconsistent') else: acdc_inactive.append( member['RequestName'] ) assistance_tags.add('recovered') @@ -714,19 +677,14 @@ def time_point(label="",sub_lap=False, percent=None): print event_expected_per_task print task_outputs time_point("expected statistics", sub_lap=True) - running_log = filter(lambda change : change["Status"] in ["running-open","running-closed"],wfi.request['RequestTransition']) - running_delay = (now_s - (min(l['UpdateTime'] for l in running_log))) / (60.*60.*24.) if running_log else 0 ## in days - print delay,"since completed" - default_fraction_overdoing = UC.get('default_fraction_overdoing') for output in wfi.request['OutputDatasets']: default_pass = UC.get('default_fraction_pass') fractions_pass[output] = default_pass fractions_announce[output] = 1.0 - #fractions_truncate_recovery[output] = 0.98 ## above this threshold if the request will pass stats check, we close the acdc c = campaigns[output] if c in CI.campaigns and 'earlyannounce' in CI.campaigns[c]: wfi.sendLog('checkor', "Allowed to announce the output %s over %.2f by campaign requirement"%(out, CI.campaigns[c]['earlyannounce'])) @@ -769,7 +727,6 @@ def time_point(label="",sub_lap=False, percent=None): pass_percent_below = fractions_pass[output]-0.02 weight_full = 7. weight_pass = delay - weight_under_pass = 2*delay if int(wfi.request['RequestPriority'])< 80000 else 0. ## allow to drive it below the threshold weight_under_pass = 0. ## otherwise we can end-up having request at 94% waiting for 95% fractions_truncate_recovery[output] = (fractions_pass[output]*weight_pass +1.*weight_full + pass_percent_below*weight_under_pass) / ( weight_pass+weight_full+weight_under_pass) @@ -783,15 +740,9 @@ def time_point(label="",sub_lap=False, percent=None): print "This is not going to end well if you truncate at a lower threshold than passing",fractions_truncate_recovery[output],fractions_pass[output] ## floor truncating fractions_truncate_recovery[output] = fractions_pass[output] - ##### OR - ##wfi.sendLog('checkor', "Lowering the pass bar since recovery is being truncated") - # - #fractions_pass[output] = fractions_truncate_recovery[output] #introduce a reduction factor on very old requests - #1% every damping_time days after > timeout_for_damping_fraction in completed. Not more than damping_fraction_pass_max - #fraction_damping = min(0.01*(max(running_delay - timeout_for_damping_fraction,0)/damping_time),damping_fraction_pass_max) fraction_damping = min(0.01*(max(completed_delay - timeout_for_damping_fraction,0)/damping_time),damping_fraction_pass_max) print "We could reduce the passing fraction by",fraction_damping,"given it's been in for long" long_lasting_choped = False @@ -805,11 +756,8 @@ def time_point(label="",sub_lap=False, percent=None): if long_lasting_choped : msg = 'Reducing pass thresholds by %.3f%% for long lasting workflow %s '%(100*fraction_damping, wfi.request['RequestName']) wfi.sendLog('checkor', msg) - #sendLog('checkor', msg, level='critical') - + ## do something about workflow with high order ACDC - # acdc_order == -1 None - # acdc_order == 0 ACDC0 first round if acdc_order > acdc_rank_for_truncate: ## there is high order acdc on-going. chop the output at the pass fraction wfi.sendLog('checkor','Truncating at pass threshold because of ACDC of rank %d'% acdc_order) @@ -860,7 +808,6 @@ def upward( ns ): percent_completions[output] = lumi_count / float( lumi_expected ) expectedL[output] = lumi_expected - output_event_expected = event_expected_per_task.get(task_outputs.get(output,'NoTaskFound'), event_expected) if output_event_expected: @@ -880,11 +827,9 @@ def upward( ns ): files_per_rl = {} # a dict of dict "run:lumi":[files] fetched = dict([(out,False) for out in pass_stats_check]) - blocks = wfi.getBlockWhiteList() rwl = wfi.getRunWhiteList() lwl = wfi.getLumiWhiteList() - ## need to come to a way to do this "fast" so that it can be done more often if not all(pass_stats_check.values()):# and False: n_runs = 1 @@ -930,13 +875,9 @@ def upward( ns ): pass_stats_check_over_completion = dict([(out, (percent_completions[out] >= default_fraction_overdoing)) for out in percent_completions ]) print "announce checks" - should_announce = False if pass_stats_check_to_announce and all(pass_stats_check_to_announce.values()): wfi.sendLog('checkor',"The output of this workflow are essentially good to be announced while we work on the rest\n%s \n%s"% ( json.dumps( percent_avg_completions , indent =2 ), json.dumps( fractions_announce , indent =2 ))) assistance_tags.add('announced' if 'announced' in wfo.status else 'announce') - - should_announce = True - if not all(pass_stats_check.values()): possible_recoveries = wfi.getRecoveryDoc() @@ -952,10 +893,7 @@ def upward( ns ): ## hook for creating automatically ACDC ? if not bypass_checks: - ############################### assistance_tags.add('recovery' if use_recoveror else 'manual') - #in_manual += 0 if use_recoveror else 1 - ############################### is_closing = False else: wfi.sendLog('checkor','passing stats check \nCurrent stats:\n%s \nRequired stats:\n%s'%( json.dumps(percent_completions, indent=2), json.dumps(fractions_pass, indent=2) )) @@ -970,10 +908,8 @@ def upward( ns ): ## all outputs are over the top ... wfi.sendLog('checkor','Should force-complete the request going over 100%') wfi.sendLog('checkor',json.dumps(percent_completions, indent=2)) - #sendEmail( "dataset over completion", "Please take a look at %s"% wfo.name) assistance_tags.add('over100') ## set to force complete the whole thing - #forceComplete(url, wfi) time_point("checked output size", sub_lap=True) @@ -998,51 +934,23 @@ def upward( ns ): wfi.sendLog('checkor',"%s has very small lumisections\n%s"%( wfo.name, json.dumps(events_per_lumi, indent=2))) assistance_tags.add('smalllumi') is_closing = False - stop_duplicate_check = True if any([ (lumi_upper_limit[out]>0 and events_per_lumi[out] >= lumi_upper_limit[out]) for out in events_per_lumi]): wfi.sendLog('checkor',"%s has large lumisections\n%s"%( wfo.name, json.dumps(events_per_lumi, indent=2))) ## hook for rejecting the request ? assistance_tags.add('biglumi') - is_closing = False - - - #any_presence = {} - #for output in wfi.request['OutputDatasets']: - # any_presence[output] = getDatasetPresence(url, output, vetoes=[]) - - #time_point("checked dataset presence", sub_lap=True) - - ## custodial copy - custodial_locations = {} - custodial_presences = {} - - time_point("checked custodiality", sub_lap=True) + is_closing = False ## presence in rucio rucio_presence ={} rucioClient = RucioClient() for output in wfi.request['OutputDatasets']: _,dsn,process_string,tier = output.split('/') -# if tier in set(UC.get('tiers_to_rucio_relval')) | set(UC.get('tiers_to_rucio_nonrelval')): - - # - creates lists of tuples ot the type: ('blockName', numFiles) - # for all blockNames per Dataset known to both Phedex and Rucio - # - creates the union of the two sets in order to avoid any duplicates - # (files present in both systems) - # - sums the number of files for the union set - # - assigns the value to 'rucio_presence' even though the full sum - # of the files is present in both systems - this way we avoid - # changing the code for the rest of the consistency checks + rucio_filecount_pb = rucioClient.getFileCountPerBlock(output) all_filecount_pb = set(rucio_filecount_pb) all_blocks = set(map(lambda x: x[0], rucio_filecount_pb)) - # bellow we will misscount in case there are same blocks in both - # Rucio and Phedex but with different number of files in the two - # systems - they will enter the sum twice, because the two tuples - # will be concidered as two different blocks from the two subsets - # hence the following check: if len(all_blocks) == len(all_filecount_pb): rucio_presence[output] = sum(map(lambda x: x[1], all_filecount_pb)) else: @@ -1069,11 +977,6 @@ def upward( ns ): dbs_presence[output] = dbs3Client.getFileCountDataset( output ) dbs_invalid[output] = dbs3Client.getFileCountDataset( output, onlyInvalid=True) - ## prepare the check on having a valid subscription to tape - out_worth_checking = [out for out in custodial_locations.keys() if out.split('/')[-1] not in vetoed_custodial_tier] - size_worth_checking = sum([(getDatasetSize(out)/1023. if not wfi.isRelval() else 0.) for out in out_worth_checking ]) ## size in TBs of all outputs - size_worht_going_to_ddm = sum([getDatasetSize(out)/1023. for out in out_worth_checking if out.split('/')[-1] in to_ddm_tier ]) ## size in TBs of all outputs - all_relevant_output_are_going_to_tape = all(map( lambda sites : len(sites)!=0, [custodial_locations[out] for out in out_worth_checking])) show_N_only = 10 ## number of files to include in a report log @@ -1097,7 +1000,6 @@ def upward( ns ): # Corrections to the lists of files present in Phedex for the data Tiers managed by Rucio _,dsn,process_string,tier = output.split('/') -# if tier in set(UC.get('tiers_to_rucio_relval')) | set(UC.get('tiers_to_rucio_nonrelval')): if True: # Here recalculating the filenames as a union of the rucio_files | rucio_files all_filenames = set(rucioClient.getFileNamesDataset(out)) @@ -1122,20 +1024,15 @@ def upward( ns ): if were_invalidated: wfi.sendLog('checkor',"These %d files were invalidated globally,showing %d only\n%s"%(len(were_invalidated),show_N_only, "\n".join(were_invalidated[:show_N_only]))) - #if not bypass_checks: - ## I don't think we can by pass this is_closing = False time_point("checked file count", sub_lap=True) - ## put that heavy part almost at the end ## duplication check duplications = {} - #files_per_rl = {} lumis_with_duplicates = {} for output in wfi.request['OutputDatasets']: duplications[output] = "skiped" - #files_per_rl[output] = "skiped" ignoreduplicates ={} for out,c in campaigns.items(): @@ -1144,9 +1041,9 @@ def upward( ns ): else: ignoreduplicates[out] = options.ignoreduplicates - - ## check for duplicates prior to making the tape subscription ## this is quite expensive and we run it twice for each sample - if (not stop_duplicate_check) and (is_closing or bypass_checks) and (not all_relevant_output_are_going_to_tape): + # Duplicate check is disabled for now + stop_duplicate_check = True + if (not stop_duplicate_check) and (is_closing or bypass_checks): print "starting duplicate checker for",wfo.name for output in wfi.request['OutputDatasets']: if (output in ignoreduplicates) and (ignoreduplicates[output]): @@ -1165,8 +1062,6 @@ def upward( ns ): if is_closing and any(duplications.values()): duplicate_notice = "" duplicate_notice += "%s has duplicates\n"%wfo.name - #duplicate_notice += json.dumps( duplications,indent=2) - #duplicate_notice += '\n' ## TO DO, make the file list invalidation analysis. to find the files with least number of lumis duplicate_notice += "This number of lumis are duplicated\n" duplicate_notice += json.dumps( dict([(o,len(badl)) for o,badl in lumis_with_duplicates.items() ]), indent=2) @@ -1196,138 +1091,6 @@ def upward( ns ): time_point("checked duplicates", sub_lap=True) - - - if is_closing and not all_relevant_output_are_going_to_tape: - print wfo.name,"has not all custodial location" - print json.dumps(custodial_locations, indent=2) - - ########## - ## hook for making a custodial replica ? - custodial = None - ## get from other outputs - for output in out_worth_checking: - if len(custodial_locations[output]): - custodial = custodial_locations[output][0] - if custodial and float(SI.storage[custodial]) < size_worth_checking: - print "cannot use the other output custodial:",custodial,"because of limited space" - custodial = None - - ## try to get it from campaign configuration - force_custodial = False - if not custodial: - for output in out_worth_checking: - campaign = campaigns[output] - if campaign in CI.campaigns and 'custodial' in CI.campaigns[campaign]: - custodial = CI.campaigns[campaign]['custodial'] - print "Setting custodial to",custodial,"from campaign configuration" - force_custodial = True - - group = None - #phedex_group is a name parameter defined in batchor - if campaign in CI.campaigns and 'phedex_group' in CI.campaigns[campaign]: - group = CI.campaigns[campaign]['phedex_group'] - print "using group",group,"for replica" - - if not force_custodial and custodial and float(SI.storage[custodial]) < size_worth_checking: - print "cannot use the campaign configuration custodial:",custodial,"because of limited space" - custodial = None - - ## get from the parent - pick_custodial = True - use_parent_custodial = UC.get('use_parent_custodial') - tape_size_limit = options.tape_size_limit if options.tape_size_limit else UC.get("tape_size_limit") - - _,prim,_,_ = wfi.getIO() -# if not custodial and prim and use_parent_custodial: -# parent_dataset = prim.pop() - ## this is terribly dangerous to assume only -# parents_custodial = phedexClient.getCustodialSubscriptionRequestSite( parent_dataset ) - ###parents_custodial = findCustodialLocation(url, parent_dataset) -# if not parents_custodial: -# parents_custodial = [] - -# if len(parents_custodial): -# custodial = parents_custodial[0] -# else: -# print "the input dataset",parent_dataset,"does not have custodial in the first place. abort" - #sendEmail( "dataset has no custodial location", "Please take a look at %s in the logs of checkor"%parent_dataset) - ## does not work for RAWOADSIM -# sendLog('checkor',"Please take a look at %s for missing custodial location"% parent_dataset) - ## cannot be bypassed, this is an issue to fix -# is_closing = False -# pick_custodial = False -# assistance_tags.add('parentcustodial') - - if not force_custodial and custodial and float(SI.storage[custodial]) < size_worth_checking: - print "cannot use the custodial:",custodial,"because of limited space" - custodial = None - - #if not custodial and pick_custodial and not force_custodial: - ## pick one at random - # custodial = SI.pick_SE(size=size_worth_checking) - - if custodial and size_worth_checking > tape_size_limit: - wfi.sendLog('checkor',"The total output size (%s TB) is too large for the limit set (%s TB)"%( size_worth_checking, tape_size_limit)) - assistance_tags.add('bigoutput') - custodial = None - - if custodial: - for output in out_worth_checking: - print output - if getDatasetSize(output)/1023 > tape_size_limit: - wfi.sendLog('checkor',"%s output size (%s TB) is too large for the limit set (%s TB)"%( output, out_worth_checking[output], tape_size_limit)) - assistance_tags.add('bigoutput') - custodial = None - - - if not custodial: - print "cannot find a custodial for",wfo.name - wfi.sendLog('checkor',"cannot find a custodial for %s probably because of the total output size %d"%( wfo.name, size_worth_checking)) - sendLog('checkor',"cannot find a custodial for %s probably because of the total output size %d"%( wfo.name, size_worth_checking), level='critical') - - picked_a_tape = custodial and (is_closing or bypass_checks) - #cannot be bypassed - is_closing = False - - if picked_a_tape: - print "picked",custodial,"for tape copy" - ## remember how much you added this round already ; this stays locally - SI.storage[custodial] -= size_worth_checking - ## register the custodial request, if there are no other big issues - holding = [] - for output in out_worth_checking: - if not len(custodial_locations[output]): - if rucio_presence[output]>=1: - wfi.sendLog('checkor','Using %s as a tape destination for %s'%(custodial, output)) - self.custodials[custodial].append( output ) - if group: self.custodials[custodial][-1]+='@%s'%group - ## let's wait and see if that's needed - assistance_tags.add('custodial') - holding.append( output ) - elif output in pass_stats_check and pass_stats_check[output]: - ## there is no file in rucio, but the actual stats check is OK, meaning we are good to let this pass along. the dbs/rucio check will pick this up anyways otherwise - wfi.sendLog('checkor','No file in rucio for %s, but statistics check passed'%output) - else: - ## does not look good - wfi.sendLog('checkor','No file in rucio for %s, not good to add to custodial requests'%output) - holding.append( output ) - if not holding: - is_closing = True - - time_point("determined tape location", sub_lap=True) - - ## disk copy - #disk_copies = {} - #for output in wfi.request['OutputDatasets']: - # disk_copies[output] = [s for s in any_presence[output] if (not 'MSS' in s) and (not 'Buffer' in s)] - - #if not all(map( lambda sites : len(sites)!=0, disk_copies.values())): - # print wfo.name,"has not all output on disk" - # print json.dumps(disk_copies, indent=2) - - - fraction_invalid = 0.20 if not all([(dbs_invalid[out] <= int(fraction_invalid*dbs_presence[out])) for out in wfi.request['OutputDatasets']]) and not options.ignoreinvalid: print wfo.name,"has a dbs invalid file level too high" @@ -1337,13 +1100,8 @@ def upward( ns ): ## need to be going and taking an eye assistance_tags.add('invalidfiles') ## no need for holding stuff because of a fraction of invalid files - #if not bypass_checks: - # #sub_assistance+="-invalidfiles" - # is_closing = False - time_point("checked invalidation", sub_lap=True) - time_point("done with %s"%wfo.name) ## for visualization later on @@ -1365,7 +1123,6 @@ def upward( ns ): rec['fractionpass'] = math.floor(fractions_pass.get(output,0)*10000)/100. rec['duplicate'] = duplications[output] if output in duplications else 'N/A' rec['closeOutDataset'] = is_closing - #rec['transPerc'] = float('%.2f'%any_presence[output][ disk_copies[output][0]][1]) if len(disk_copies[output])!=0 else 'N/A' rec['correctLumis'] = int(events_per_lumi[output]) if (events_per_lumi[output] > lumi_upper_limit[output]) else True rec['dbsFiles'] = dbs_presence[output] rec['dbsInvFiles'] = dbs_invalid[output] @@ -1376,21 +1133,17 @@ def upward( ns ): rec['timestamp'] = time.mktime(now) rec['updated'] = time.asctime(now)+' (GMT)' - #fDB.update( wfo.name, put_record) self.put_record = put_record ## make the lumi summary if wfi.request['RequestType'] == 'ReReco': try: - #os.system('python Unified/lumi_summary.py %s 1 > /dev/null'%(wfi.request['PrepID'])) os.system('python Unified/lumi_summary.py %s %d > /dev/null'%(wfi.request['PrepID'], 0 if all(fetched.values()) else 1)) ## no need for fresh fetch if that has been done for all already os.system('python Unified/lumi_plot.py %s > /dev/null'%(wfi.request['PrepID'])) wfi.sendLog('checkor','Lumi summary available at %s/datalumi/lumi.%s.html'%(unified_url,wfi.request['PrepID'])) except Exception as e: print str(e) - ## make the error report - ## and move on if is_closing: @@ -1427,15 +1180,8 @@ def upward( ns ): if res in [None,"None"]: - #wfo.status = 'close' self.to_status = 'close' - #session.commit() - #fDB.pop( wfo.name ) self.force_by_mcm = force_by_mcm - #if use_mcm and force_by_mcm: - ## shoot large on all prepids, on closing the wf - #for pid in pids: - #mcm.delete('/restapi/requests/forcecomplete/%s'%pid) else: print "could not close out",wfo.name,"will try again next time" else: @@ -1445,7 +1191,6 @@ def upward( ns ): for member in acdc+acdc_inactive+[wfo.name]: try: if options and options.no_report: continue - #expose = UC.get('n_error_exposed') if (report_created < 50 and 'manual' in assistance_tags) else 0 expose = UC.get('n_error_exposed') if ('manual' in assistance_tags) else 0 so = showError_options( expose = expose ) parse_one(url, member, so) @@ -1456,17 +1201,6 @@ def upward( ns ): time_point("Done with reports") - ## full known list - #recovering # has active ACDC - ##OUT #recovered #had inactive ACDC - #recovery #not over the pass bar - #over100 # over 100% - #biglumi # has a big lumiblock - #parentcustodial # the parent does not have a valid subscription yet - #custodial # has had the transfer made, is waiting for a valid custodial subscription to appear - #filemismatch # there is a dbs/rucio mismatch - #duplicates #a lumi section is there twice - ## manual is not added yet, and should be so by recoveror print wfo.name,"was tagged with :",list(assistance_tags) if 'recovering' in assistance_tags: @@ -1495,18 +1229,8 @@ def upward( ns ): go_notify=False if assistance_tags and not 'manual' in existing_assistance_tags and existing_assistance_tags != assistance_tags: go_notify=True - if go_notify: - #if wfo.name in already_notified: - # print "double notification" - # sendEmail('double notification','please take a look at %s'%(wfo.name)) - #else: - # already_notified.append( wfo.name ) - - ###detailslink = 'https://cmsweb.cern.ch/reqmgr/view/details/%s' - #detailslink = 'https://cmsweb.cern.ch/reqmgr2/fetch?rid=%s'%(wfo.name) - ###perflink = 'https://cmsweb.cern.ch/couchdb/workloadsummary/_design/WorkloadSummary/_show/histogramByWorkflow/%s'%(wfo.name) perflink = '%s/report/%s'%(unified_url,wfo.name) splitlink = 'https://cmsweb.cern.ch/reqmgr/view/splitting/%s'%(wfo.name) ## notify templates @@ -1514,8 +1238,6 @@ def upward( ns ): 'recovery': 'Samples completed with missing statistics:\n%s\n%s '%( '\n'.join(['%.2f %% complete for %s'%(percent_completions[output]*100, output) for output in wfi.request['OutputDatasets'] ] ), perflink ), 'biglumi': 'Samples completed with large luminosity blocks:\n%s\n%s '%('\n'.join(['%d > %d for %s'%(events_per_lumi[output], lumi_upper_limit[output], output) for output in wfi.request['OutputDatasets'] if (events_per_lumi[output] > lumi_upper_limit[output])]), splitlink), 'duplicates': 'Samples completed with duplicated luminosity blocks:\n%s\n'%( '\n'.join(['%s'%output for output in wfi.request['OutputDatasets'] if output in duplications and duplications[output] ] ) ), - #'filemismatch': 'Samples completed with inconsistency in DBS/Phedex', - #'manual' : 'Workflow completed and requires manual checks by Ops', } content = "The request PREPID (WORKFLOW) is facing issue in production.\n" @@ -1527,13 +1249,11 @@ def upward( ns ): content += "You are invited to check, while this is being taken care of by Comp-Ops.\n" content += "This is an automated message from Comp-Ops.\n" - items_notified = set() if use_mcm and motive: wfi.notifyRequestor( content , mcm = mcm) ######################################### - ## logic to set the status further if assistance_tags: new_status = 'assistance-'+'-'.join(sorted(assistance_tags) ) @@ -1543,10 +1263,8 @@ def upward( ns ): ## case where the workflow was in manual from recoveror if not 'manual' in wfo.status or new_status!='assistance-recovery': - #wfo.status = new_status if not options.test: wfi.sendLog('checkor','setting %s to %s'%(wfo.name, new_status)) - #session.commit() self.to_status = new_status else: print "current status is",wfo.status,"not changing to anything" @@ -1589,15 +1307,12 @@ def upward( ns ): parser = optparse.OptionParser() parser.add_option('-t','--test', help='Only test the checkor', action='store_true', default=False) parser.add_option('--go',help='Does not check on duplicate process', action='store_true', default=False) - #parser.add_option('--wait',help='Wait for another process to clear', action='store_true', default=False) parser.add_option('--strict', help='Only running workflow that reached completed', action='store_true', default=False) parser.add_option('--update', help='Running workflow that have not yet reached completed', action='store_true', default=False) - parser.add_option('--clear', help='Only the workflow that have reached custodial', action ='store_true', default=False) parser.add_option('--review', help='Look at the workflows that have already completed and had required actions', action='store_true', default=False) parser.add_option('--recovering', help='Look at the workflows that already have on-going acdc', action='store_true', default=False) parser.add_option('--manual', help='Look at the workflows in "manual"', action='store_true', default=False) - parser.add_option('--limit',help='The number of workflow to consider for checking', default=0, type=int) parser.add_option('--fractionpass',help='The completion fraction that is permitted', default=0.0,type='float') parser.add_option('--ignorefiles', help='Force ignoring dbs/rucio differences', action='store_true', default=False) @@ -1627,5 +1342,3 @@ def upward( ns ): if (not spec and do_html_in_each_module) or options.html: htmlor() - -