Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3,693 changes: 2,360 additions & 1,333 deletions bin/rgang

Large diffs are not rendered by default.

11 changes: 11 additions & 0 deletions docs/daqinterface_overrides_for_experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,26 @@
# then set DAQINTERFACE_OVERRIDES_FOR_EXPERIMENT_MODULE_DIR to the directory containing
# that file in $DAQINTERFACE_USER_SOURCEFILE .


def perform_periodic_action_base(self):
pass


def start_datataking_base(self):
pass


def stop_datataking_base(self):
pass


def do_enable_base(self):
pass


def do_disable_base(self):
pass


def check_config_base(self):
pass
110 changes: 66 additions & 44 deletions rc/InhibitManager/InhibitManager.py
Original file line number Diff line number Diff line change
@@ -1,55 +1,70 @@
import sys
import zmq
import thismoduledoesnotexist
import datetime,time
import datetime, time

STATUS_MSG_MARKER = "STATUSMSG"
if isinstance(STATUS_MSG_MARKER, bytes):
STATUS_MSG_MARKER = STATUS_MSG_MARKER.decode('ascii')
STATUS_MSG_MARKER = STATUS_MSG_MARKER.decode("ascii")

INHIBIT_MSG_MARKER = "INHIBITMSG"
if isinstance(INHIBIT_MSG_MARKER, bytes):
INHIBIT_MSG_MARKER = INHIBIT_MSG_MARKER.decode('ascii')
INHIBIT_MSG_MARKER = INHIBIT_MSG_MARKER.decode("ascii")


class StatusMsg:
def __init__(self,msg):
self.raw_msg = str(msg)
def __init__(self, msg):
self.raw_msg = str(msg)
self.msg_list = msg.split("_")
if len(self.msg_list) != 5:
print ("MESSAGE is malformed! raw_msg=%s\n" % self.raw_msg)
print("MESSAGE is malformed! raw_msg=%s\n" % self.raw_msg)
self.msg_ok = False
else:
self.msg_ok = True

def raw_msg(self):
s = "%s"%(self.raw_msg)
s = "%s" % (self.raw_msg)
return s

def type(self):
return self.msg_list[0]

def process(self):
return self.msg_list[1]

def marker(self):
return self.msg_list[2]

def status(self):
return self.msg_list[3]

def time(self):
return self.msg_list[4]

def CreateStatusMsg(process,marker,status):
return "%s_%s_%s_%s_%s" % (STATUS_MSG_MARKER,str(process),str(marker),str(status),datetime.datetime.now())

def CreateStatusMsg(process, marker, status):
return "%s_%s_%s_%s_%s" % (
STATUS_MSG_MARKER,
str(process),
str(marker),
str(status),
datetime.datetime.now(),
)


class StatusSUBNode:
def __init__(self,zmq_context):
def __init__(self, zmq_context):
self.socket = zmq_context.socket(zmq.SUB)
self.socket.setsockopt_string(zmq.SUBSCRIBE,STATUS_MSG_MARKER)
self.socket.setsockopt_string(zmq.SUBSCRIBE, STATUS_MSG_MARKER)

def connect(self,address):
def connect(self, address):
self.socket.connect(str(address))

def recv_status_msg_timeout(self,timeout=0.5): #timeout in seconds
def recv_status_msg_timeout(self, timeout=0.5): # timeout in seconds
timeinit = datetime.datetime.now()
timenow = timeinit
diff = timenow - timeinit
while diff.total_seconds()<timeout:
while diff.total_seconds() < timeout:
try:
msg = self.socket.recv_string(flags=zmq.NOBLOCK)
return msg
Expand All @@ -58,36 +73,42 @@ def recv_status_msg_timeout(self,timeout=0.5): #timeout in seconds
diff = timenow - timeinit
return "TIMEOUT"


class StatusPUBNode:
def __init__(self,zmq_context,address):
def __init__(self, zmq_context, address):
self.socket = zmq_context.socket(zmq.PUB)
self.socket.bind(address)

def send_status_msg(self,process,marker,status):
msg = CreateStatusMsg(process,marker,status)
def send_status_msg(self, process, marker, status):
msg = CreateStatusMsg(process, marker, status)
self.socket.send_string(msg)


class InhibitPUBNode:
def __init__(self,zmq_context,address):
def __init__(self, zmq_context, address):
self.socket = zmq_context.socket(zmq.PUB)
self.socket.bind(address)

def send_inhibit_msg(self,msg,details):
self.socket.send_string("%s_%s_%s (%s)" % (INHIBIT_MSG_MARKER,str(msg),datetime.datetime.now(),str(details)))
def send_inhibit_msg(self, msg, details):
self.socket.send_string(
"%s_%s_%s (%s)"
% (INHIBIT_MSG_MARKER, str(msg), datetime.datetime.now(), str(details))
)


class InhibitSUBNode:
def __init__(self,zmq_context):
def __init__(self, zmq_context):
self.socket = zmq_context.socket(zmq.SUB)
self.socket.setsockopt_string(zmq.SUBSCRIBE,INHIBIT_MSG_MARKER)
self.socket.setsockopt_string(zmq.SUBSCRIBE, INHIBIT_MSG_MARKER)

def connect(self,address):
def connect(self, address):
self.socket.connect(str(address))

def recv_status_msg_timeout(self,timeout=5.0): #timeout in seconds
def recv_status_msg_timeout(self, timeout=5.0): # timeout in seconds
timeinit = datetime.datetime.now()
timenow = timeinit
diff = timenow - timeinit
while diff.total_seconds()<timeout:
while diff.total_seconds() < timeout:
try:
msg = self.socket.recv_string(flags=zmq.NOBLOCK)
return msg
Expand All @@ -96,48 +117,49 @@ def recv_status_msg_timeout(self,timeout=5.0): #timeout in seconds
diff = timenow - timeinit
return "TIMEOUT"


class InhibitManager:

def __init__(self,update_freq=1.0,verbose=False): #timeout in seconds
self.frontend_dict = { 'INHIBITMANAGER':"OK" }
def __init__(self, update_freq=1.0, verbose=False): # timeout in seconds
self.frontend_dict = {"INHIBITMANAGER": "OK"}
self.current_status = "XON"
self.update_freq = update_freq
self.verbose = verbose

def print_status():
print ("Current Status = %s" % self.current_status)
print("Current Status = %s" % self.current_status)
for fe in self.frontend_dict:
print ("\t%s : %s" % (fe,frontend_dict[fe]))
print("\t%s : %s" % (fe, frontend_dict[fe]))

def status(self):
return self.current_status

def update_status(self):
tmp_status="XON"
tmp_status = "XON"
for stat in self.frontend_dict.values():
if stat!="OK":
tmp_status="XOFF"
if stat != "OK":
tmp_status = "XOFF"
break
self.current_status=tmp_status
self.current_status = tmp_status

def register_status_msg(self,msg):
def register_status_msg(self, msg):
self.frontend_dict[msg.process()] = msg.status()
if msg.status()!=self.status():
if msg.status() != self.status():
self.update_status()

def run(self,subscriber,publisher):
def run(self, subscriber, publisher):
timelast = datetime.datetime.now()
while True:
init_status = self.status()
msg = subscriber.recv_status_msg_timeout(self.update_freq)
if(self.verbose):
print ("Msg received: %s" % msg)
if msg!="TIMEOUT":
if self.verbose:
print("Msg received: %s" % msg)
if msg != "TIMEOUT":
self.register_status_msg(StatusMsg(msg))
if self.status()!=init_status:
publisher.send_inhibit_msg(self.status(),"global status change")
if self.status() != init_status:
publisher.send_inhibit_msg(self.status(), "global status change")
timenow = datetime.datetime.now()
diff = timenow - timelast
if diff.total_seconds()>self.update_freq:
publisher.send_inhibit_msg(self.status(),"no change")
timelast = timenow
if diff.total_seconds() > self.update_freq:
publisher.send_inhibit_msg(self.status(), "no change")
timelast = timenow
2 changes: 1 addition & 1 deletion rc/InhibitManager/MonitorInhibitManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@

while True:
msg = subscriber.recv_status_msg_timeout()
print (msg)
print(msg)
6 changes: 3 additions & 3 deletions rc/InhibitManager/RunInhibitManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
import InhibitManager

context = zmq.Context()
publisher = InhibitManager.InhibitPUBNode(context,"tcp://*:5566")
publisher = InhibitManager.InhibitPUBNode(context, "tcp://*:5566")

subscriber = InhibitManager.StatusSUBNode(context)
subscriber.connect("tcp://localhost:5556")
subscriber.connect("tcp://localhost:5557")

im = InhibitManager.InhibitManager(0.5,False)
im = InhibitManager.InhibitManager(0.5, False)

im.run(subscriber,publisher)
im.run(subscriber, publisher)
12 changes: 6 additions & 6 deletions rc/InhibitManager/SendInhibitMessage.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
import getpass

context = zmq.Context()
publisher = InhibitManager.StatusPUBNode(context,"tcp://*:5556")
publisher = InhibitManager.StatusPUBNode(context, "tcp://*:5556")

time.sleep(0.5)
#while(True):
# while(True):
# msg = raw_input("Enter MSG to send:")
# publisher.send_status_msg("CommandLineMessage",getpass.getuser(),msg)
# print "\n"
#msg = raw_input("Enter MSG to send:")
publisher.send_status_msg("CommandLineMessage",getpass.getuser(),sys.argv[1])
#time.sleep(2)
#print "\n"
# msg = raw_input("Enter MSG to send:")
publisher.send_status_msg("CommandLineMessage", getpass.getuser(), sys.argv[1])
# time.sleep(2)
# print "\n"
4 changes: 2 additions & 2 deletions rc/compatibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
from xmlrpc.server import SimpleXMLRPCServer, SimpleXMLRPCRequestHandler
import http.client as httplib
import queue as Queue

# nested not supported - WIP
else:
import xmlrpclib
from SimpleXMLRPCServer import (SimpleXMLRPCServer,
SimpleXMLRPCRequestHandler)
from SimpleXMLRPCServer import SimpleXMLRPCServer, SimpleXMLRPCRequestHandler
import httplib
import Queue
from contextlib import nested
38 changes: 37 additions & 1 deletion rc/control/all_functions_noop.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

# JCF, Jun-19-2017

# This file contains the names of stubbed-out (empty) functions for
Expand All @@ -18,41 +17,78 @@
# up; this is possible since I don't need to worry about function
# definition order like I might if these weren't no-op


def bookkeeping_for_fhicl_documents_artdaq_v3_base(self):
pass


def check_config_base(self):
pass


def check_proc_heartbeats_base(self):
pass


def do_disable_base(self):
pass


def do_enable_base(self):
pass


def find_process_manager_variable_base():
pass


def get_pid_for_process_base(self):
pass


def get_process_manager_log_filenames_base(self):
pass


def kill_procs_base(self):
pass


def launch_procs_base(self):
pass


def mopup_process_base(self):
pass


def perform_periodic_action_base(self):
pass


def process_launch_diagnostics_base(self):
pass


def process_manager_cleanup_base(self):
pass


def reset_process_manager_variables_base(self):
pass


def set_process_manager_default_variables_base(self):
pass


def softlink_process_manager_logfiles_base(self):
pass


def start_datataking_base(self):
pass


def stop_datataking_base(self):
pass
Loading