Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions rc/control/daqinterface.py
Original file line number Diff line number Diff line change
Expand Up @@ -895,6 +895,26 @@ def read_settings(self):

self.spackdir = None

# Database settings for run record saving (optional)
# Initialize from environment variables first (as defaults), then settings file will overwrite
# Support both OTSDAQ_RUN_RECORD_DATABASE_* and RUN_RECORD_DATABASE_* patterns
env_enabled = os.environ.get("RUN_RECORD_DATABASE_ENABLED")
if env_enabled:
token = env_enabled.strip()
res = re.search(r"[Tt]rue", token)
self.enable_run_record_database = bool(res)
else:
self.enable_run_record_database = False

self.run_record_database_name = os.environ.get("OTSDAQ_RUNINFO_DATABASE_NAME") or os.environ.get("RUN_RECORD_DATABASE_NAME") or "run_info"
self.run_record_database_host = os.environ.get("OTSDAQ_RUNINFO_DATABASE_HOST") or os.environ.get("RUN_RECORD_DATABASE_HOST") or ""
self.run_record_database_port = os.environ.get("OTSDAQ_RUNINFO_DATABASE_PORT") or os.environ.get("RUN_RECORD_DATABASE_PORT") or ""
self.run_record_database_user = os.environ.get("OTSDAQ_RUNINFO_DATABASE_USER") or os.environ.get("RUN_RECORD_DATABASE_USER") or ""
self.run_record_database_pwd = os.environ.get("OTSDAQ_RUNINFO_DATABASE_PWD") or os.environ.get("RUN_RECORD_DATABASE_PWD") or ""
self.run_record_database_schema = os.environ.get("OTSDAQ_RUNINFO_DATABASE_SCHEMA") or os.environ.get("RUN_RECORD_DATABASE_SCHEMA") or "test"
# Prefix only checks RUN_RECORD_DATABASE_PREFIX (no OTSDAQ_ version)
self.run_record_database_prefix = os.environ.get("RUN_RECORD_DATABASE_PREFIX") or "artdaq"

for line in inf.readlines():

line = expand_environment_variable_in_string(line)
Expand Down Expand Up @@ -1126,6 +1146,52 @@ def read_settings(self):

if res:
self.attempt_existing_pid_kill = True
elif (
"run_record_database_enabled" in line
or "run record database enabled" in line
):
token = line.split()[-1].strip()
res = re.search(r"[Tt]rue", token)
if res:
self.enable_run_record_database = True
elif (
"run_record_database_name" in line
or "run record database name" in line
):
self.run_record_database_name = line.split()[-1].strip()
elif (
"run_record_database_host" in line
or "run record database host" in line
):
self.run_record_database_host = line.split()[-1].strip()
elif (
"run_record_database_port" in line
or "run record database port" in line
):
self.run_record_database_port = line.split()[-1].strip()
elif (
"run_record_database_user" in line
or "run record database user" in line
):
self.run_record_database_user = line.split()[-1].strip()
elif (
"run_record_database_pwd" in line
or "run record database pwd" in line
or "run_record_database_password" in line
or "run record database password" in line
):
self.run_record_database_pwd = line.split()[-1].strip()
elif (
"run_record_database_schema" in line
or "run record database schema" in line
):
self.run_record_database_schema = line.split()[-1].strip()
elif (
"run_record_database_prefix" in line
or "run record database prefix" in line
):
self.run_record_database_prefix = line.split()[-1].strip()


missing_vars = []

Expand Down Expand Up @@ -3754,6 +3820,18 @@ def do_start_running(self, run_number=None):

try:
shutil.copytree(self.tmp_run_record, run_record_directory)

# Save run record to database now that run_number is available
try:
from rc.control.save_run_record import _save_run_record_to_database
_save_run_record_to_database(self)
except Exception:
# Don't fail the start transition if database save fails
self.print_log("w", traceback.format_exc())
self.print_log(
"w",
"Failed to save run record to database, but continuing with start transition",
)
except:
self.print_log("e", traceback.format_exc())
self.alert_and_recover(
Expand Down
164 changes: 164 additions & 0 deletions rc/control/run_record_database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@

"""
Database helper functions for saving run records to PostgreSQL.

"""

import os


def get_db_connection(self):
"""
Get or create database connection. Returns None if database saving is disabled.

Connection is stored in self._run_record_db_conn for reuse.
Database settings are read from instance variables by read_settings().

Returns:
psycopg2.connection or None: Database connection if enabled and successful, None otherwise
"""
# Check if database saving is enabled (from settings file)
if not hasattr(self, 'enable_run_record_database') or not self.enable_run_record_database:
return None

# Return existing connection if available (stored in instance)
if hasattr(self, '_run_record_db_conn') and self._run_record_db_conn is not None:
return self._run_record_db_conn

# Try to import psycopg2
try:
import psycopg2
except ImportError:
self.print_log(
"w",
"psycopg2 module not available. Database saving disabled. "
"Install psycopg2 to enable database saving."
)
return None

# Get database configuration from instance variables (set by read_settings())
conn_params = {
'host': getattr(self, 'run_record_database_host', ''),
'port': getattr(self, 'run_record_database_port', ''),
'dbname': getattr(self, 'run_record_database_name', 'run_info'),
'user': getattr(self, 'run_record_database_user', ''),
'password': getattr(self, 'run_record_database_pwd', ''),
}

# Build connection string - only add parameters if they are set (non-empty)
conn_info_parts = ["%s=%s" % (k, v) for k, v in conn_params.items() if v]
conn_info_parts.append("connect_timeout=10")
conn_info = " ".join(conn_info_parts)

try:
self._run_record_db_conn = psycopg2.connect(conn_info)
self.print_log(
"d",
"Database connection opened successfully for run record saving",
2,
)
return self._run_record_db_conn
except Exception as e:
self.print_log(
"w",
"Failed to connect to database for run record saving: %s. "
"Database saving will be skipped." % str(e)
)
self._run_record_db_conn = None
return None


def get_db_schema(self):
"""Get database schema name from instance variable (set by read_settings())."""
return getattr(self, 'run_record_database_schema', 'test')


def is_database_enabled(self):
"""Check if database saving is enabled via instance variable (set by read_settings())."""
return hasattr(self, 'enable_run_record_database') and self.enable_run_record_database


def get_db_prefix(self):
"""Get database table prefix from instance variable (set by read_settings())."""
return getattr(self, 'run_record_database_prefix', 'artdaq')


def create_tables_if_not_exist(cursor, dbschema, prefix="artdaq"):
"""Create database tables if they don't exist.

Creates two tables:
- {prefix}_components: stores process information (procinfo) per run
- {prefix}_fcl: stores FHiCL content per run

Args:
cursor: psycopg2 cursor object
dbschema: Database schema name
prefix: Table name prefix (defaults to 'artdaq')
"""
from psycopg2 import sql

# Create {prefix}_components table
# Uses run_number as primary key (saved during do_start_running when run_number is available)
components_table = sql.Identifier(dbschema, "%s_components" % prefix)
create_components_table = sql.SQL(
"CREATE TABLE IF NOT EXISTS {table} ("
"run_number INTEGER NOT NULL, "
"name VARCHAR(255) NOT NULL, "
"rank INTEGER NOT NULL, "
"host VARCHAR(255) NOT NULL, "
"port VARCHAR(50) NOT NULL, "
"label VARCHAR(255) NOT NULL, "
"subsystem VARCHAR(50), "
"allowed_processors VARCHAR(255), "
"target VARCHAR(255), "
"insertion_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, "
"PRIMARY KEY (run_number, label)"
")"
).format(table=components_table)

cursor.execute(create_components_table)

# Create index on run_number
components_index = sql.Identifier(dbschema, "%s_components_run_number_idx" % prefix)
create_components_index = sql.SQL(
"CREATE INDEX IF NOT EXISTS {index} ON {table} (run_number)"
).format(index=components_index, table=components_table)
cursor.execute(create_components_index)

# Create composite index on (run_number, name) for efficient queries by run and process type
components_run_name_index = sql.Identifier(dbschema, "%s_components_run_name_idx" % prefix)
create_components_run_name_index = sql.SQL(
"CREATE INDEX IF NOT EXISTS {index} ON {table} (run_number, name)"
).format(index=components_run_name_index, table=components_table)
cursor.execute(create_components_run_name_index)

# Create {prefix}_fcl table
# Uses run_number as primary key (saved during do_start_running when run_number is available)
fcl_table = sql.Identifier(dbschema, "%s_fcl" % prefix)
create_fcl_table = sql.SQL(
"CREATE TABLE IF NOT EXISTS {table} ("
"run_number INTEGER NOT NULL, "
"name VARCHAR(255) NOT NULL, "
"label VARCHAR(255) NOT NULL, "
"content TEXT NOT NULL, "
"insertion_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, "
"PRIMARY KEY (run_number, label)"
")"
).format(table=fcl_table)

cursor.execute(create_fcl_table)

# Create index on run_number
fcl_index = sql.Identifier(dbschema, "%s_fcl_run_number_idx" % prefix)
create_fcl_index = sql.SQL(
"CREATE INDEX IF NOT EXISTS {index} ON {table} (run_number)"
).format(index=fcl_index, table=fcl_table)
cursor.execute(create_fcl_index)

# Create composite index on (run_number, name) for efficient queries by run and process type
fcl_run_name_index = sql.Identifier(dbschema, "%s_fcl_run_name_idx" % prefix)
create_fcl_run_name_index = sql.SQL(
"CREATE INDEX IF NOT EXISTS {index} ON {table} (run_number, name)"
).format(index=fcl_run_name_index, table=fcl_table)
cursor.execute(create_fcl_run_name_index)

Loading
Loading