Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions rc/control/bookkeeping.py
Original file line number Diff line number Diff line change
Expand Up @@ -1179,16 +1179,24 @@ def sends_to_via_RootNetOutput(proc1, proc2):

return False

def art_analyzer_count(procinfo):
res = re.search(r"\s*art_analyzer_count\s*:\s*([0-9\.e]+)", procinfo.fhicl_used)
if res:
return int(float(res.group(1)))
return 1

for subsystem_id, subsystem in self.subsystems.items():

init_fragment_counts = {}
broadcast_fragment_counts = {}

for procinfo in [pi for pi in self.procinfos if pi.subsystem == subsystem_id]:

if procinfo.name not in init_fragment_counts:

possible_event_senders = []
init_fragment_count = 0
broadcast_fragment_count = 0

if procinfo.name == "EventBuilder":
for ss_source in subsystem.sources:
Expand All @@ -1201,6 +1209,7 @@ def sends_to_via_RootNetOutput(proc1, proc2):
possible_sender_procinfo, procinfo
):
init_fragment_count += 1
broadcast_fragment_count += art_analyzer_count(possible_sender_procinfo)
elif procinfo.name == "DataLogger":
for possible_sender_procinfo in [
pi
Expand All @@ -1212,6 +1221,7 @@ def sends_to_via_RootNetOutput(proc1, proc2):
possible_sender_procinfo, procinfo
):
init_fragment_count += 1
broadcast_fragment_count += art_analyzer_count(possible_sender_procinfo)
elif procinfo.name == "Dispatcher":
for possible_sender_procinfo in [
pi
Expand All @@ -1223,6 +1233,7 @@ def sends_to_via_RootNetOutput(proc1, proc2):
possible_sender_procinfo, procinfo
):
init_fragment_count += 1
broadcast_fragment_count += art_analyzer_count(possible_sender_procinfo)
if (
init_fragment_count == 0
): # Dispatcher will _always_ receive init Fragments, this probably means we're running without DataLoggers
Expand All @@ -1236,15 +1247,23 @@ def sends_to_via_RootNetOutput(proc1, proc2):
possible_sender_procinfo, procinfo
):
init_fragment_count += 1
broadcast_fragment_count += art_analyzer_count(possible_sender_procinfo)

init_fragment_counts[procinfo.name] = init_fragment_count
broadcast_fragment_counts[procinfo.name] = broadcast_fragment_count

procinfo.fhicl_used = re.sub(
"init_fragment_count\s*:\s*\S+",
"init_fragment_count: %d" % init_fragment_counts[procinfo.name],
procinfo.fhicl_used,
)

procinfo.fhicl_used = re.sub(
"broadcast_fragment_count\s*:\s*\S+",
"broadcast_fragment_count: %d" % broadcast_fragment_counts[procinfo.name],
procinfo.fhicl_used,
)


def bookkeeping_for_fhicl_documents_artdaq_v4_base(self):
pass
163 changes: 48 additions & 115 deletions rc/control/daqinterface.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
from rc.control.utilities import reformat_fhicl_documents
from rc.control.utilities import fhicl_writes_root_file
from rc.control.utilities import get_setup_commands
from rc.control.utilities import get_build_info
from rc.control.utilities import kill_tail_f
from rc.control.utilities import obtain_messagefacility_fhicl
from rc.control.utilities import record_directory_info
Expand All @@ -66,8 +67,9 @@

messagefacility_fhicl_filename = obtain_messagefacility_fhicl(True)
if (
not "ARTDAQ_LOG_FHICL" in os.environ
#or os.environ["ARTDAQ_LOG_FHICL"] != messagefacility_fhicl_filename
not "ARTDAQ_LOG_FHICL"
in os.environ
# or os.environ["ARTDAQ_LOG_FHICL"] != messagefacility_fhicl_filename
):
raise Exception(
make_paragraph(
Expand Down Expand Up @@ -868,7 +870,6 @@ def read_settings(self):
self.record_directory = None
self.package_hashes_to_save = []
self.package_versions = {}
self.productsdir_for_bash_scripts = None
self.max_fragment_size_bytes = None

self.boardreader_timeout = 30
Expand All @@ -892,7 +893,6 @@ def read_settings(self):
self.max_num_launch_procs_checks = 20
self.launch_procs_wait_time = 40

self.productsdir = None
self.spackdir = None

for line in inf.readlines():
Expand All @@ -912,11 +912,6 @@ def read_settings(self):
self.log_directory = line.split()[-1].strip()
elif "record_directory" in line or "record directory" in line:
self.record_directory = line.split()[-1].strip()
elif (
"productsdir_for_bash_scripts" in line
or "productsdir for bash scripts" in line
):
self.productsdir = line.split()[-1].strip()
elif (
"spack_root_for_bash_scripts" in line
or "spack root for bash scripts" in line
Expand Down Expand Up @@ -1106,11 +1101,17 @@ def read_settings(self):
self.data_directory_override = self.data_directory_override + "/"
elif "transfer_plugin_to_use" in line or "transfer plugin to use" in line:
self.default_transfer = line.split()[-1].strip()
elif "transfer_plugin_from_brs" in line or "transfer plugin from brs" in line:
elif (
"transfer_plugin_from_brs" in line or "transfer plugin from brs" in line
):
self.br_transfer = line.split()[-1].strip()
elif "transfer_plugin_from_ebs" in line or "transfer plugin from ebs" in line:
elif (
"transfer_plugin_from_ebs" in line or "transfer plugin from ebs" in line
):
self.eb_transfer = line.split()[-1].strip()
elif "transfer_plugin_from_dls" in line or "transfer plugin from dls" in line:
elif (
"transfer_plugin_from_dls" in line or "transfer plugin from dls" in line
):
self.dl_transfer = line.split()[-1].strip()
elif "allowed_processors" in line or "allowed processors" in line:
self.allowed_processors = line.split()[-1].strip()
Expand Down Expand Up @@ -1138,8 +1139,8 @@ def read_settings(self):
if self.record_directory is None:
missing_vars.append("record_directory")

if self.productsdir is None and self.spackdir is None:
missing_vars.append("productsdir_for_bash_scripts or spack_root_for_bash_scripts")
if self.spackdir is None:
missing_vars.append("spack_root_for_bash_scripts")

if not self.advanced_memory_usage and self.max_fragment_size_bytes is None:
missing_vars.append("max_fragment_size_bytes")
Expand Down Expand Up @@ -1303,11 +1304,9 @@ def have_artdaq_mfextensions(self):
return self.artdaq_mfextensions_booleans[self.daq_setup_script]

cmds = []
cmds += get_setup_commands(self.productsdir, self.spackdir)
cmds += get_setup_commands(self.spackdir)
cmds.append(". %s for_running" % (self.daq_setup_script))
cmds.append(
'type -P "msgviewer" && true || false'
)
cmds.append('type -P "msgviewer" && true || false')

checked_cmd = construct_checked_command(cmds)

Expand All @@ -1326,33 +1325,6 @@ def have_artdaq_mfextensions(self):

return self.artdaq_mfextensions_booleans[self.daq_setup_script]

def artdaq_mfextensions_info(self):

assert self.have_artdaq_mfextensions()

cmds = []
cmds += get_setup_commands(self.productsdir, self.spackdir)
cmds.append(". %s for_running" % (self.daq_setup_script))
cmds.append(
'if [ -n "$SETUP_ARTDAQ_MFEXTENSIONS" ]; then printenv SETUP_ARTDAQ_MFEXTENSIONS; else echo "artdaq_mfextensions $ARTDAQ_MFEXTENSIONS_VERSION $MRB_QUALS";fi'
)

proc = Popen(
";".join(cmds),
executable="/bin/bash",
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)

proclines = proc.stdout.readlines()

printenv_line = proclines[-1].decode("utf-8")
version = printenv_line.split()[1]
qualifiers = printenv_line.split()[-1]

return (version, qualifiers)

# WK 8/31/21
# refactor out launching the message viewer into a function
# and make that function run in the background
Expand All @@ -1364,11 +1336,12 @@ def launch_msgviewer(self):
self.partition_number,
os.environ["USER"],
)
cmds += get_setup_commands(self.productsdir, self.spackdir)
cmds += get_setup_commands(self.spackdir)
cmds.append(". %s for_running" % (self.daq_setup_script))
cmds.append("which msgviewer")
cmds.append(
"cp `find ${FHICL_FILE_PATH//:/\/ } -name 'msgviewer.fcl' 2>/dev/null|head -1` %s" % (msgviewer_fhicl)
"cp `find ${FHICL_FILE_PATH//:/\/ } -name 'msgviewer.fcl' 2>/dev/null|head -1` %s"
% (msgviewer_fhicl)
)
cmds.append(
'res=$( grep -l "port: %d" %s )' % (port_to_replace, msgviewer_fhicl)
Expand Down Expand Up @@ -2020,7 +1993,10 @@ def softlink_logfiles(self):
link_logfile_cmd = "; ".join(softlink_commands_to_run_on_host[host])

if not host_is_local(host):
link_logfile_cmd = "ssh -o BatchMode=yes %s '%s'" % (host, link_logfile_cmd)
link_logfile_cmd = "ssh -o BatchMode=yes %s '%s'" % (
host,
link_logfile_cmd,
)

proc = Popen(
link_logfile_cmd,
Expand Down Expand Up @@ -2056,8 +2032,9 @@ def fill_package_versions(self, packages):

if self.spackdir != None:
cmd = (
"%s ; . %s; spack find --loaded | sed -r -n 's/^(%s)@(\\S+).*/\\1 \\2/p'" % (
";".join(get_setup_commands(self.productsdir, self.spackdir)),
"%s ; . %s; spack find | sed -r -n 's/^(%s)@(\\S+).*/\\1 \\2/p'"
% (
";".join(get_setup_commands(self.spackdir)),
self.daq_setup_script,
"|".join(needed_packages),
)
Expand Down Expand Up @@ -2618,12 +2595,10 @@ def archive_documents(self, labeled_fhicl_documents):
3,
)
try:
self.procinfos[
procinfo_index
].lastreturned = self.procinfos[
procinfo_index
].server.daq.add_config_archive_entry(
label, contents
self.procinfos[procinfo_index].lastreturned = (
self.procinfos[
procinfo_index
].server.daq.add_config_archive_entry(label, contents)
)
except:
self.print_log("d", traceback.format_exc(), 2)
Expand Down Expand Up @@ -2775,37 +2750,9 @@ def create_setup_fhiclcpp_if_needed(self):
),
)
with open(os.environ["DAQINTERFACE_SETUP_FHICLCPP"], "w") as outf:
outf.write("\n".join(get_setup_commands(self.productsdir, self.spackdir)))
outf.write("\n".join(get_setup_commands(self.spackdir)))
outf.write("\n\n")
if self.productsdir != None:
lines = Popen(
'%s;ups list -aK+ fhiclcpp | sort -n'
% (";".join(get_setup_commands(self.productsdir, self.spackdir))),
executable="/bin/bash",
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
).stdout.readlines()
if len(lines) > 0:
fhiclcpp_to_setup_line = lines[-1].decode("utf-8")
else:
os.unlink(os.environ["DAQINTERFACE_SETUP_FHICLCPP"])
raise Exception(
make_paragraph(
'Unable to find fhiclcpp ups product in products directory "%s" provided in the DAQInterface settings file, "%s"'
% (self.productsdir, os.environ["DAQINTERFACE_SETTINGS"])
)
)

outf.write(
"setup %s %s -q %s\n"
% (
fhiclcpp_to_setup_line.split()[0],
fhiclcpp_to_setup_line.split()[1],
fhiclcpp_to_setup_line.split()[3],
)
)
elif self.spackdir != None:
if self.spackdir != None:
outf.write("spack load --first fhicl-cpp")

if os.path.exists(os.environ["DAQINTERFACE_SETUP_FHICLCPP"]):
Expand Down Expand Up @@ -3050,15 +2997,13 @@ def revert_failed_boot(failed_action):
)

elif self.have_artdaq_mfextensions():
version, qualifiers = self.artdaq_mfextensions_info()

self.print_log(
"i",
make_paragraph(
"artdaq_mfextensions %s, %s, appears to be available; "
"artdaq_mfextensions appears to be available; "
"if windowing is supported on your host you should see the "
"messageviewer window pop up momentarily"
% (version, qualifiers)
),
)

Expand Down Expand Up @@ -3116,7 +3061,7 @@ def revert_failed_boot(failed_action):
# self.print_log("d", "\n", random_node_source_debug_level)

cmd = "%s ; . %s for_running" % (
";".join(get_setup_commands(self.productsdir, self.spackdir)),
";".join(get_setup_commands(self.spackdir)),
self.daq_setup_script,
)

Expand Down Expand Up @@ -3250,7 +3195,10 @@ def revert_failed_boot(failed_action):
cmds = []
cmds.append(
"if [[ -z $( command -v fhicl-dump ) ]]; then %s; source %s; fi"
% (";".join(get_setup_commands(self.productsdir, self.spackdir)), os.environ["DAQINTERFACE_SETUP_FHICLCPP"])
% (
";".join(get_setup_commands(self.spackdir)),
os.environ["DAQINTERFACE_SETUP_FHICLCPP"],
)
)
cmds.append(
"if [[ $FHICLCPP_VERSION =~ v4_1[01]|v4_0|v[0123] ]]; then dump_arg=0;else dump_arg=none;fi"
Expand Down Expand Up @@ -4302,9 +4250,9 @@ def send_recover_command(command):

for procinfo in self.procinfos:
if name in procinfo.name:
priorities_used[
procinfo.priority
] = "We only care about the key in the dict"
priorities_used[procinfo.priority] = (
"We only care about the key in the dict"
)

for priority in sorted(priorities_used.keys(), reverse=True):
for procinfo in self.procinfos:
Expand Down Expand Up @@ -4345,27 +4293,13 @@ def send_recover_command(command):
# transition "in the queue" despite DAQInterface being in the
# Stopped state after we've finished this recover

self.__do_boot = (
self.__do_shutdown
) = (
self.__do_config
) = (
self.__do_recover
) = (
self.__do_boot = self.__do_shutdown = self.__do_config = self.__do_recover = (
self.__do_start_running
) = (
self.__do_stop_running
) = (
self.__do_terminate
) = (
self.__do_pause_running
) = (
) = self.__do_stop_running = self.__do_terminate = self.__do_pause_running = (
self.__do_resume_running
) = (
self.__do_enable
) = (
self.__do_disable
) = self.do_trace_get_boolean = self.do_trace_set_boolean = False
) = self.__do_enable = self.__do_disable = self.do_trace_get_boolean = (
self.do_trace_set_boolean
) = False

self.complete_state_change(self.name, "recovering")

Expand Down Expand Up @@ -4414,7 +4348,6 @@ def artdaq_process_info(self, name, quiet=False):
# 5/30/14, called every 1s by control.py

def runner(self):

"""
Component "ops" loop. Called at threading hearbeat frequency,
currently 1/sec.
Expand Down Expand Up @@ -4625,7 +4558,7 @@ def main(): # no-coverage
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
encoding="UTF-8"
encoding="UTF-8",
)
.stdout.readlines()[0]
.strip()
Expand Down
Loading
Loading