From 4f01ea34dab9b2e62f1c94d3ad73698405c72b0b Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Fri, 3 Feb 2023 10:21:06 -0600 Subject: [PATCH 01/37] added JobsConfig --- datajoint/autopopulate.py | 40 +++++++++++++++++ datajoint/jobs.py | 93 +++++++++++++++++++++++++++++++++------ datajoint/schemas.py | 15 ++++++- 3 files changed, 133 insertions(+), 15 deletions(-) diff --git a/datajoint/autopopulate.py b/datajoint/autopopulate.py index 2b40b1e61..ea3dea8ee 100644 --- a/datajoint/autopopulate.py +++ b/datajoint/autopopulate.py @@ -8,6 +8,8 @@ from .hash import key_hash from .expression import QueryExpression, AndList from .errors import DataJointError, LostConnectionError +from .settings import config +from .utils import user_choice import signal import multiprocessing as mp import contextlib @@ -348,3 +350,41 @@ def progress(self, *restrictions, display=True): flush=True, ) return remaining, total + + @property + def jobs(self): + return self.connection.schemas[self.target.database].jobs & { + "table_name": self.target.table_name + } + + def register_key_source(self, safemode=None): + key_source_sql = self.key_source.make_sql() + key_source_uuid = key_hash({"sql": key_source_sql}) + + jobs_config = self.connection.schemas[self.target.database].jobs_config + + entry = { + "table_name": self.target.table_name, + "key_source": key_source_sql, + "key_source_uuid": key_source_uuid, + } + + if jobs_config & {"table_name": self.target.table_name}: + registered_uuid = ( + jobs_config & {"table_name": self.target.table_name} + ).fetch1("key_source_uuid") + if key_source_uuid == registered_uuid: + return + + safemode = config["safemode"] if safemode is None else safemode + if ( + not safemode + or user_choice( + f"Modified key_source for table {self.target.table_name} - Re-register?", + default="no", + ) + == "yes" + ): + jobs_config.insert1(entry, replace=True) + else: + jobs_config.insert1(entry) diff --git a/datajoint/jobs.py b/datajoint/jobs.py index 698a02141..4bce85381 100644 --- a/datajoint/jobs.py +++ b/datajoint/jobs.py @@ -29,15 +29,19 @@ def __init__(self, conn, database): table_name :varchar(255) # className of the table key_hash :char(32) # key hash --- - status :enum('reserved','error','ignore') # if tuple is missing, the job is available + status :enum('reserved','error','ignore','scheduled','success') key=null :blob # structure containing the key error_message="" :varchar({error_message_length}) # error message returned if failed error_stack=null :mediumblob # error stack if failed user="" :varchar(255) # database user host="" :varchar(255) # system hostname pid=0 :int unsigned # system process id - connection_id = 0 : bigint unsigned # connection_id() - timestamp=CURRENT_TIMESTAMP :timestamp # automatic timestamp + connection_id = 0 : bigint unsigned # connection_id() + timestamp=UTC_TIMESTAMP :timestamp # the scheduled time (UTC) for the job to run at or after + run_duration=null: float # run duration in seconds + run_version="": varchar(255) # some string representation of the code/env version of a run (e.g. git commit hash) + index(table_name, status) + index(status) """.format( database=database, error_message_length=ERROR_MESSAGE_LENGTH ) @@ -94,11 +98,16 @@ def ignore(self, table_name, key): :param table_name: `database`.`table_name` :param key: the dict of the job's primary key - :return: True if ignore job successfully. False = the jobs is already taken + :return: True if ignore job successfully. False = the jobs is already processed, too late to "ignore" """ + job_key = dict(table_name=table_name, key_hash=key_hash(key)) + if self & job_key: + current_status = (self & job_key).fetch1("status") + if current_status not in ("scheduled", "ignore"): + return False + job = dict( - table_name=table_name, - key_hash=key_hash(key), + job_key, status="ignore", host=platform.node(), pid=os.getpid(), @@ -106,22 +115,38 @@ def ignore(self, table_name, key): key=key, user=self._user, ) - try: - with config(enable_python_native_blobs=True): - self.insert1(job, ignore_extra_fields=True) - except DuplicateError: - return False + + with config(enable_python_native_blobs=True): + self.insert1(job, replace=True, ignore_extra_fields=True) + return True - def complete(self, table_name, key): + def complete(self, table_name, key, run_duration=None, run_version=""): """ Log a completed job. When a job is completed, its reservation entry is deleted. :param table_name: `database`.`table_name` :param key: the dict of the job's primary key + :param run_duration: duration in second of the job run + :param run_version: some string representation of the code/env version of a run (e.g. git commit hash) """ - job_key = dict(table_name=table_name, key_hash=key_hash(key)) - (self & job_key).delete_quick() + with config(enable_python_native_blobs=True): + self.insert1( + dict( + table_name=table_name, + key_hash=key_hash(key), + status="success", + host=platform.node(), + pid=os.getpid(), + connection_id=self.connection.connection_id, + user=self._user, + key=key, + run_duration=run_duration, + run_version=run_version, + ), + replace=True, + ignore_extra_fields=True, + ) def error(self, table_name, key, error_message, error_stack=None): """ @@ -155,3 +180,43 @@ def error(self, table_name, key, error_message, error_stack=None): replace=True, ignore_extra_fields=True, ) + + +class JobConfigTable(Table): + """ + A base table with no definition. Allows reserving jobs + """ + + def __init__(self, conn, database): + self.database = database + self._connection = conn + self._heading = Heading( + table_info=dict( + conn=conn, database=database, table_name=self.table_name, context=None + ) + ) + self._support = [self.full_table_name] + + self._definition = """ # job configuration table for `{database}` + table_name :varchar(255) # className of the table + --- + key_source :mediumblob # sql statement for the key_source of the table - from make_sql() + key_source_uuid: UUID # hash of the key_source + unique index (key_source_hash) + refresh_rate=1: int unsigned # (second) how often should the jobs for the table be refreshed + refresh_reserved=0: bool # is the jobs for the table currently being refreshed + last_refresh_time=UTC_TIMESTAMP :timestamp # timestamp (UTC) of the last refresh time for the table + """.format( + database=database + ) + if not self.is_declared: + self.declare() + self._user = self.connection.get_user() + + @property + def definition(self): + return self._definition + + @property + def table_name(self): + return "~jobs_config" diff --git a/datajoint/schemas.py b/datajoint/schemas.py index 0c196fe8f..9877dce4e 100644 --- a/datajoint/schemas.py +++ b/datajoint/schemas.py @@ -8,7 +8,7 @@ from .diagram import Diagram, _get_tier from .settings import config from .errors import DataJointError, AccessError -from .jobs import JobTable +from .jobs import JobTable, JobConfigTable from .external import ExternalMapping from .heading import Heading from .utils import user_choice, to_camel_case @@ -71,6 +71,7 @@ def __init__( self.create_schema = create_schema self.create_tables = create_tables self._jobs = None + self._jobs_config = None self.external = ExternalMapping(self) self.add_objects = add_objects self.declare_list = [] @@ -401,6 +402,18 @@ def jobs(self): self._jobs = JobTable(self.connection, self.database) return self._jobs + @property + def jobs_config(self): + """ + schema.jobs provides a view of the job reservation table for the schema + + :return: jobs table + """ + self._assert_exists() + if self._jobs_config is None: + self._jobs_config = JobConfigTable(self.connection, self.database) + return self._jobs_config + @property def code(self): self._assert_exists() From de4437c4fc489b2ad002bcd253834a008e4070c6 Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Thu, 9 Feb 2023 18:39:57 -0600 Subject: [PATCH 02/37] successful prototype for key_source --- datajoint/autopopulate.py | 32 ++++++++++++++++++------- datajoint/expression.py | 50 +++++++++++++++++++++++++++++++++++++++ datajoint/jobs.py | 12 ++++++---- 3 files changed, 80 insertions(+), 14 deletions(-) diff --git a/datajoint/autopopulate.py b/datajoint/autopopulate.py index ea3dea8ee..c4edf3fa1 100644 --- a/datajoint/autopopulate.py +++ b/datajoint/autopopulate.py @@ -6,10 +6,10 @@ import inspect from tqdm import tqdm from .hash import key_hash -from .expression import QueryExpression, AndList +from .expression import QueryExpression, AndList, _SQLExpression from .errors import DataJointError, LostConnectionError from .settings import config -from .utils import user_choice +from .utils import user_choice, to_camel_case import signal import multiprocessing as mp import contextlib @@ -89,6 +89,19 @@ def _rename_attributes(table, props): self._key_source = _rename_attributes(*parents[0]) for q in parents[1:]: self._key_source *= _rename_attributes(*q) + + jobs_config = self.connection.schemas[self.target.database].jobs_config + key_source_query = jobs_config & {"table_name": self.target.table_name} + if key_source_query: + key_source_uuid = key_hash({"sql": self._key_source.make_sql()}) + if not (key_source_query & {"key_source_uuid": key_source_uuid}): + # different key_source stored in jobs_config + return _SQLExpression( + self._key_source.proj(), key_source_query.fetch1("key_source") + ) + else: + self.register_key_source(self._key_source.proj().make_sql()) + return self._key_source def make(self, key): @@ -357,8 +370,8 @@ def jobs(self): "table_name": self.target.table_name } - def register_key_source(self, safemode=None): - key_source_sql = self.key_source.make_sql() + def register_key_source(self, sql=None, safemode=None): + key_source_sql = sql or self.key_source.proj().make_sql() key_source_uuid = key_hash({"sql": key_source_sql}) jobs_config = self.connection.schemas[self.target.database].jobs_config @@ -367,20 +380,21 @@ def register_key_source(self, safemode=None): "table_name": self.target.table_name, "key_source": key_source_sql, "key_source_uuid": key_source_uuid, + "last_refresh_time": datetime.datetime.utcnow(), } if jobs_config & {"table_name": self.target.table_name}: - registered_uuid = ( - jobs_config & {"table_name": self.target.table_name} - ).fetch1("key_source_uuid") - if key_source_uuid == registered_uuid: + if jobs_config & { + "table_name": self.target.table_name, + "key_source_uuid": key_source_uuid, + }: return safemode = config["safemode"] if safemode is None else safemode if ( not safemode or user_choice( - f"Modified key_source for table {self.target.table_name} - Re-register?", + f"Modified key_source for table `{to_camel_case(self.target.table_name)}`, Re-register?", default="no", ) == "yes" diff --git a/datajoint/expression.py b/datajoint/expression.py index ab2d27eec..b178cfced 100644 --- a/datajoint/expression.py +++ b/datajoint/expression.py @@ -927,3 +927,53 @@ def aggr(self, group, **named_attributes): ) aggregate = aggr # alias for aggr + + +class _SQLExpression: + def __init__(self, expression, sql): + self._expression = expression + self.make_sql = lambda: sql + self._expression.cursor = self.cursor + + def cursor(self, offset=0, limit=None, order_by=None, as_dict=False): + """ + See expression.fetch() for input description. + :return: query cursor + """ + if offset and limit is None: + raise DataJointError("limit is required when offset is set") + sql = self.make_sql() + if order_by is not None: + sql += " ORDER BY " + ", ".join(order_by) + if limit is not None: + sql += " LIMIT %d" % limit + (" OFFSET %d" % offset if offset else "") + logger.debug(sql) + return self._expression.connection.query(sql, as_dict=as_dict) + + @property + def fetch1(self): + return Fetch1(self._expression) + + @property + def fetch(self): + return Fetch(self._expression) + + def __len__(self): + """:return: number of elements in the result set e.g. ``len(q1)``.""" + select, from_where = re.search( + r"SELECT (.*) FROM (.*)", self.make_sql() + ).groups() + + return self._expression.connection.query( + "SELECT {select_} FROM {from_where}".format( + select_=f"count(DISTINCT {select})", + from_where=from_where, + ) + ).fetchone()[0] + + def __bool__(self): + """ + :return: True if the result is not empty. Equivalent to len(self) > 0 but often + faster e.g. ``bool(q1)``. + """ + return bool(self._expression.connection.query(self.make_sql()).fetchone()[0]) diff --git a/datajoint/jobs.py b/datajoint/jobs.py index 4bce85381..b9d180da2 100644 --- a/datajoint/jobs.py +++ b/datajoint/jobs.py @@ -37,7 +37,7 @@ def __init__(self, conn, database): host="" :varchar(255) # system hostname pid=0 :int unsigned # system process id connection_id = 0 : bigint unsigned # connection_id() - timestamp=UTC_TIMESTAMP :timestamp # the scheduled time (UTC) for the job to run at or after + timestamp=CURRENT_TIMESTAMP :timestamp # the scheduled time (UTC) for the job to run at or after run_duration=null: float # run duration in seconds run_version="": varchar(255) # some string representation of the code/env version of a run (e.g. git commit hash) index(table_name, status) @@ -184,7 +184,7 @@ def error(self, table_name, key, error_message, error_stack=None): class JobConfigTable(Table): """ - A base table with no definition. Allows reserving jobs + A base table with no definition. Allows job configuration for imported/computed tables """ def __init__(self, conn, database): @@ -200,12 +200,14 @@ def __init__(self, conn, database): self._definition = """ # job configuration table for `{database}` table_name :varchar(255) # className of the table --- - key_source :mediumblob # sql statement for the key_source of the table - from make_sql() key_source_uuid: UUID # hash of the key_source - unique index (key_source_hash) + key_source :mediumblob # sql statement for the key_source of the table - from make_sql() + unique index (table_name, key_source_uuid) + unique index (key_source_uuid) refresh_rate=1: int unsigned # (second) how often should the jobs for the table be refreshed refresh_reserved=0: bool # is the jobs for the table currently being refreshed - last_refresh_time=UTC_TIMESTAMP :timestamp # timestamp (UTC) of the last refresh time for the table + last_refresh_time: datetime # timestamp (UTC) of the last refresh time for the table + additional_config=null: longblob # (TODO: change to json) any additional configuration (e.g. retries, docker image, etc.) """.format( database=database ) From 8edf17900647836840421bb4ffffdc28d5e48677 Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Fri, 10 Feb 2023 11:59:41 -0600 Subject: [PATCH 03/37] added `refresh_jobs` and `purge_invalid_jobs` --- datajoint/autopopulate.py | 66 ++++++++++++++++++++++++++++++++++++++- datajoint/expression.py | 20 ++++++++---- datajoint/jobs.py | 43 +++++++++++++++++++++++-- 3 files changed, 120 insertions(+), 9 deletions(-) diff --git a/datajoint/autopopulate.py b/datajoint/autopopulate.py index c4edf3fa1..bddbbc4f2 100644 --- a/datajoint/autopopulate.py +++ b/datajoint/autopopulate.py @@ -146,7 +146,7 @@ def _jobs_to_do(self, restrictions): if inspect.isclass(todo) and issubclass(todo, QueryExpression): todo = todo() - if not isinstance(todo, QueryExpression): + if not isinstance(todo, (QueryExpression, _SQLExpression)): raise DataJointError("Invalid key_source value") try: @@ -402,3 +402,67 @@ def register_key_source(self, sql=None, safemode=None): jobs_config.insert1(entry, replace=True) else: jobs_config.insert1(entry) + + def refresh_jobs(self): + jobs_config = self.connection.schemas[self.target.database].jobs_config + key_source_query = jobs_config & {"table_name": self.target.table_name} + if not key_source_query: + self.register_key_source() + + if key_source_query.fetch1("refresh_reserved"): + # jobs for this table is currently being freshed + return + + last_refresh_time, refresh_rate = key_source_query.fetch1( + "last_refresh_time", "refresh_rate" + ) + new_refresh_time = last_refresh_time + datetime.timedelta(seconds=refresh_rate) + if datetime.datetime.utcnow() < new_refresh_time: + # not yet time to refresh jobs + return + + # add new scheduled jobs to JobTable + jobs = self.connection.schemas[self.target.database].jobs + jobs_config.update1( + {"table_name": self.target.table_name, "refresh_reserved": 1} + ) + try: + with self.connection.transaction: + schedule_count = 0 + for key in (self._jobs_to_do({}) - self.target).fetch("KEY"): + schedule_count += jobs.schedule(self.target.table_name, key) + except Exception as e: + logger.exception(str(e)) + else: + logger.info( + f"{schedule_count} new jobs scheduled for `{to_camel_case(self.target.table_name)}`" + ) + finally: + # purge invalid jobs + self.purge_invalid_jobs() + jobs_config.update1( + {"table_name": self.target.table_name, "refresh_reserved": 0} + ) + + def purge_invalid_jobs(self): + """ + Check and remove any invalid/outdated jobs in the JobTable for this autopopulate table + Job keys that are in the JobTable (regardless of status) but are no longer in the `key_source` + (e.g. jobs added but upstream table(s) got deleted) + This is potentially a time-consuming process - but should not expect to have to run very often + """ + + jobs = self.connection.schemas[self.target.database].jobs + jobs_query = jobs & {"table_name": self.target.table_name} + + invalid_count = len(jobs_query) - len(self._jobs_to_do({})) + invalid_removed = 0 + if invalid_count > 0: + for key, job_key in jobs_query.fetch("KEY", "key"): + if not (self._jobs_to_do({}) & job_key): + (jobs_query & key).delete() + invalid_removed += 1 + + logger.info( + f"{invalid_removed}/{invalid_count} invalid jobs removed for `{to_camel_case(self.target.table_name)}`" + ) diff --git a/datajoint/expression.py b/datajoint/expression.py index b178cfced..869aadbe9 100644 --- a/datajoint/expression.py +++ b/datajoint/expression.py @@ -935,6 +935,14 @@ def __init__(self, expression, sql): self.make_sql = lambda: sql self._expression.cursor = self.cursor + self._select, self._from_where = re.match( + r"SELECT (.*) FROM (.*)", sql + ).groups() + + assert set(self._expression.heading.names) == set( + v.strip("`") for v in self._select.split(",") + ) + def cursor(self, offset=0, limit=None, order_by=None, as_dict=False): """ See expression.fetch() for input description. @@ -950,6 +958,10 @@ def cursor(self, offset=0, limit=None, order_by=None, as_dict=False): logger.debug(sql) return self._expression.connection.query(sql, as_dict=as_dict) + @property + def heading(self): + return self._expression.heading + @property def fetch1(self): return Fetch1(self._expression) @@ -960,14 +972,10 @@ def fetch(self): def __len__(self): """:return: number of elements in the result set e.g. ``len(q1)``.""" - select, from_where = re.search( - r"SELECT (.*) FROM (.*)", self.make_sql() - ).groups() - return self._expression.connection.query( "SELECT {select_} FROM {from_where}".format( - select_=f"count(DISTINCT {select})", - from_where=from_where, + select_=f"count(DISTINCT {self._select})", + from_where=self._from_where, ) ).fetchone()[0] diff --git a/datajoint/jobs.py b/datajoint/jobs.py index b9d180da2..f0db9fd65 100644 --- a/datajoint/jobs.py +++ b/datajoint/jobs.py @@ -1,4 +1,5 @@ import os +import datetime from .hash import key_hash import platform from .table import Table @@ -65,10 +66,44 @@ def drop(self): """bypass interactive prompts and dependencies""" self.drop_quick() + def schedule(self, table_name, key, seconds_delay=0, force=False): + """ + Schedule a job for computation. + + :param table_name: `database`.`table_name` + :param key: the dict of the job's primary key + :param seconds_delay: add time delay (in second) in scheduling this job + :param force: force scheduling this job (even if it is in error/ignore status) + :return: True if schedule job successfully. False = the jobs already exists with a different status + """ + job_key = dict(table_name=table_name, key_hash=key_hash(key)) + if self & job_key: + current_status = (self & job_key).fetch1("status") + if current_status in ("scheduled", "reserved", "success"): + return True + if current_status in ("error", "ignore") and not force: + return False + + job = dict( + job_key, + status="scheduled", + host=platform.node(), + pid=os.getpid(), + connection_id=self.connection.connection_id, + key=key, + user=self._user, + timestamp=datetime.datetime.utcnow() + + datetime.timedelta(seconds=seconds_delay), + ) + + with config(enable_python_native_blobs=True): + self.insert1(job, replace=True, ignore_extra_fields=True) + + return True + def reserve(self, table_name, key): """ - Reserve a job for computation. When a job is reserved, the job table contains an entry for the - job key, identified by its hash. When jobs are completed, the entry is removed. + Reserve a job for computation. :param table_name: `database`.`table_name` :param key: the dict of the job's primary key @@ -83,6 +118,7 @@ def reserve(self, table_name, key): connection_id=self.connection.connection_id, key=key, user=self._user, + timestamp=datetime.datetime.utcnow(), ) try: with config(enable_python_native_blobs=True): @@ -114,6 +150,7 @@ def ignore(self, table_name, key): connection_id=self.connection.connection_id, key=key, user=self._user, + timestamp=datetime.datetime.utcnow(), ) with config(enable_python_native_blobs=True): @@ -143,6 +180,7 @@ def complete(self, table_name, key, run_duration=None, run_version=""): key=key, run_duration=run_duration, run_version=run_version, + timestamp=datetime.datetime.utcnow(), ), replace=True, ignore_extra_fields=True, @@ -176,6 +214,7 @@ def error(self, table_name, key, error_message, error_stack=None): key=key, error_message=error_message, error_stack=error_stack, + timestamp=datetime.datetime.utcnow(), ), replace=True, ignore_extra_fields=True, From 8692eb4ec9ed1098fc0778245999fe6c1247a5b0 Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Fri, 24 Mar 2023 14:42:06 -0500 Subject: [PATCH 04/37] rename function: `schedule_jobs` --- datajoint/autopopulate.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datajoint/autopopulate.py b/datajoint/autopopulate.py index e6f26564c..117995751 100644 --- a/datajoint/autopopulate.py +++ b/datajoint/autopopulate.py @@ -420,7 +420,7 @@ def register_key_source(self, sql=None, safemode=None): else: jobs_config.insert1(entry) - def refresh_jobs(self): + def schedule_jobs(self): jobs_config = self.connection.schemas[self.target.database].jobs_config key_source_query = jobs_config & {"table_name": self.target.table_name} if not key_source_query: From f38ce9f0eaacc6b1433c2fe3d64de2121da6a1b1 Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Fri, 24 Mar 2023 16:11:53 -0500 Subject: [PATCH 05/37] implement `schedule_jobs` as part of `populate()` --- datajoint/autopopulate.py | 92 ++++++++++++++++++++++++--------------- datajoint/jobs.py | 16 +++++-- 2 files changed, 68 insertions(+), 40 deletions(-) diff --git a/datajoint/autopopulate.py b/datajoint/autopopulate.py index 117995751..52e10d767 100644 --- a/datajoint/autopopulate.py +++ b/datajoint/autopopulate.py @@ -22,14 +22,14 @@ # --- helper functions for multiprocessing -- -def _initialize_populate(table, jobs, populate_kwargs): +def _initialize_populate(table, reserve_jobs, populate_kwargs): """ Initialize the process for mulitprocessing. Saves the unpickled copy of the table to the current process and reconnects. """ process = mp.current_process() process.table = table - process.jobs = jobs + process.reserve_jobs = reserve_jobs process.populate_kwargs = populate_kwargs table.connection.connect() # reconnect @@ -177,6 +177,7 @@ def populate( processes=1, return_success_count=False, make_kwargs=None, + schedule_jobs=True, ): """ ``table.populate()`` calls ``table.make(key)`` for every primary key in @@ -198,6 +199,8 @@ def populate( to be passed down to each ``make()`` call. Computation arguments should be specified within the pipeline e.g. using a `dj.Lookup` table. :type make_kwargs: dict, optional + :param schedule_jobs: if True, run schedule_jobs before doing populate (default: True), + only applicable if reserved_jobs is True """ if self.connection.in_transaction: raise DataJointError("Populate cannot be called during a transaction.") @@ -207,10 +210,6 @@ def populate( raise DataJointError( "The order argument must be one of %s" % str(valid_order) ) - jobs = ( - self.connection.schemas[self.target.database].jobs if reserve_jobs else None - ) - # define and set up signal handler for SIGTERM: if reserve_jobs: @@ -220,16 +219,18 @@ def handler(signum, frame): old_handler = signal.signal(signal.SIGTERM, handler) - keys = (self._jobs_to_do(restrictions) - self.target).fetch("KEY", limit=limit) + if schedule_jobs: + self.schedule_jobs() - # exclude "error" or "ignore" jobs - if reserve_jobs: - exclude_key_hashes = ( - jobs + keys = ( + self._Jobs & {"table_name": self.target.table_name} - & 'status in ("error", "ignore")' - ).fetch("key_hash") - keys = [key for key in keys if key_hash(key) not in exclude_key_hashes] + & 'status = "scheduled"' + ).fetch("KEY", limit=limit) + else: + keys = (self._jobs_to_do(restrictions) - self.target).fetch( + "KEY", limit=limit + ) if order == "reverse": keys.reverse() @@ -259,7 +260,7 @@ def handler(signum, frame): if display_progress else keys ): - status = self._populate1(key, jobs, **populate_kwargs) + status = self._populate1(key, reserve_jobs, **populate_kwargs) if status is not None: if isinstance(status, tuple): error_list.append(status) @@ -270,7 +271,9 @@ def handler(signum, frame): self.connection.close() # disconnect parent process from MySQL server del self.connection._conn.ctx # SSLContext is not pickleable with mp.Pool( - processes, _initialize_populate, (self, jobs, populate_kwargs) + processes, + _initialize_populate, + (self, reserve_jobs, populate_kwargs), ) as pool, ( tqdm(desc="Processes: ", total=nkeys) if display_progress @@ -298,11 +301,16 @@ def handler(signum, frame): return sum(success_list) def _populate1( - self, key, jobs, suppress_errors, return_exception_objects, make_kwargs=None + self, + key, + reserve_jobs, + suppress_errors, + return_exception_objects, + make_kwargs=None, ): """ populates table for one source key, calling self.make inside a transaction. - :param jobs: the jobs table or None if not reserve_jobs + :param reserve_jobs: if True, reserve jobs to populate in asynchronous fashion :param key: dict specifying job to populate :param suppress_errors: bool if errors should be suppressed and returned :param return_exception_objects: if True, errors must be returned as objects @@ -310,15 +318,17 @@ def _populate1( """ make = self._make_tuples if hasattr(self, "_make_tuples") else self.make - if jobs is None or jobs.reserve(self.target.table_name, self._job_key(key)): + if not reserve_jobs or self._Jobs.reserve( + self.target.table_name, self._job_key(key) + ): self.connection.start_transaction() if key in self.target: # already populated self.connection.cancel_transaction() - if jobs is not None: - jobs.complete(self.target.table_name, self._job_key(key)) + self._Jobs.complete(self.target.table_name, self._job_key(key)) else: logger.debug(f"Making {key} -> {self.target.full_table_name}") self.__class__._allow_insert = True + make_start = datetime.datetime.utcnow() try: make(dict(key), **(make_kwargs or {})) except (KeyboardInterrupt, SystemExit, Exception) as error: @@ -333,9 +343,9 @@ def _populate1( logger.debug( f"Error making {key} -> {self.target.full_table_name} - {error_message}" ) - if jobs is not None: + if reserve_jobs: # show error name and error message (if any) - jobs.error( + self._Jobs.error( self.target.table_name, self._job_key(key), error_message=error_message, @@ -348,11 +358,16 @@ def _populate1( return key, error if return_exception_objects else error_message else: self.connection.commit_transaction() + self._Jobs.complete( + self.target.table_name, + self._job_key(key), + run_duration=( + datetime.datetime.utcnow() - make_start + ).total_seconds(), + ) logger.debug( f"Success making {key} -> {self.target.full_table_name}" ) - if jobs is not None: - jobs.complete(self.target.table_name, self._job_key(key)) return True finally: self.__class__._allow_insert = False @@ -381,11 +396,13 @@ def progress(self, *restrictions, display=True): ) return remaining, total + @property + def _Jobs(self): + return self.connection.schemas[self.target.database].jobs + @property def jobs(self): - return self.connection.schemas[self.target.database].jobs & { - "table_name": self.target.table_name - } + return self._Jobs & {"table_name": self.target.table_name} def register_key_source(self, sql=None, safemode=None): key_source_sql = sql or self.key_source.proj().make_sql() @@ -420,14 +437,19 @@ def register_key_source(self, sql=None, safemode=None): else: jobs_config.insert1(entry) - def schedule_jobs(self): + def schedule_jobs(self, purge_invalid_jobs=True): + """ + Schedule new jobs for this autopopulate table + :param purge_invalid_jobs: if True, remove invalid entry from the jobs table (potentially expensive operation) + :return: + """ jobs_config = self.connection.schemas[self.target.database].jobs_config key_source_query = jobs_config & {"table_name": self.target.table_name} if not key_source_query: self.register_key_source() if key_source_query.fetch1("refresh_reserved"): - # jobs for this table is currently being freshed + # scheduling is ongoing return last_refresh_time, refresh_rate = key_source_query.fetch1( @@ -439,7 +461,6 @@ def schedule_jobs(self): return # add new scheduled jobs to JobTable - jobs = self.connection.schemas[self.target.database].jobs jobs_config.update1( {"table_name": self.target.table_name, "refresh_reserved": 1} ) @@ -447,7 +468,7 @@ def schedule_jobs(self): with self.connection.transaction: schedule_count = 0 for key in (self._jobs_to_do({}) - self.target).fetch("KEY"): - schedule_count += jobs.schedule(self.target.table_name, key) + schedule_count += self._Jobs.schedule(self.target.table_name, key) except Exception as e: logger.exception(str(e)) else: @@ -455,8 +476,8 @@ def schedule_jobs(self): f"{schedule_count} new jobs scheduled for `{to_camel_case(self.target.table_name)}`" ) finally: - # purge invalid jobs - self.purge_invalid_jobs() + if purge_invalid_jobs: + self.purge_invalid_jobs() jobs_config.update1( {"table_name": self.target.table_name, "refresh_reserved": 0} ) @@ -469,8 +490,7 @@ def purge_invalid_jobs(self): This is potentially a time-consuming process - but should not expect to have to run very often """ - jobs = self.connection.schemas[self.target.database].jobs - jobs_query = jobs & {"table_name": self.target.table_name} + jobs_query = self._Jobs & {"table_name": self.target.table_name} invalid_count = len(jobs_query) - len(self._jobs_to_do({})) invalid_removed = 0 diff --git a/datajoint/jobs.py b/datajoint/jobs.py index f0db9fd65..59ac46c62 100644 --- a/datajoint/jobs.py +++ b/datajoint/jobs.py @@ -38,9 +38,9 @@ def __init__(self, conn, database): host="" :varchar(255) # system hostname pid=0 :int unsigned # system process id connection_id = 0 : bigint unsigned # connection_id() - timestamp=CURRENT_TIMESTAMP :timestamp # the scheduled time (UTC) for the job to run at or after - run_duration=null: float # run duration in seconds - run_version="": varchar(255) # some string representation of the code/env version of a run (e.g. git commit hash) + timestamp :timestamp # the scheduled time (UTC) for the job to run at or after + run_duration=null : float # run duration in seconds + run_version="" : varchar(255) # some string representation of the code/env version of a run (e.g. git commit hash) index(table_name, status) index(status) """.format( @@ -127,13 +127,14 @@ def reserve(self, table_name, key): return False return True - def ignore(self, table_name, key): + def ignore(self, table_name, key, message=""): """ Set a job to be ignored for computation. When a job is ignored, the job table contains an entry for the job key, identified by its hash, with status "ignore". :param table_name: `database`.`table_name` :param key: the dict of the job's primary key + :param message: optional message for why the key is to be ignored :return: True if ignore job successfully. False = the jobs is already processed, too late to "ignore" """ job_key = dict(table_name=table_name, key_hash=key_hash(key)) @@ -149,6 +150,7 @@ def ignore(self, table_name, key): pid=os.getpid(), connection_id=self.connection.connection_id, key=key, + error_message=message, user=self._user, timestamp=datetime.datetime.utcnow(), ) @@ -167,6 +169,12 @@ def complete(self, table_name, key, run_duration=None, run_version=""): :param run_duration: duration in second of the job run :param run_version: some string representation of the code/env version of a run (e.g. git commit hash) """ + job_key = dict(table_name=table_name, key_hash=key_hash(key)) + if self & job_key: + current_status = (self & job_key).fetch1("status") + if current_status == "success": + return + with config(enable_python_native_blobs=True): self.insert1( dict( From 8b9ac0fd80bdb6e016c0ad9fdaf8d9b8de920658 Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Fri, 24 Mar 2023 16:35:34 -0500 Subject: [PATCH 06/37] remove JobConfigTable and register_key_source --- datajoint/autopopulate.py | 73 ++------------------------------------- datajoint/expression.py | 58 ------------------------------- datajoint/jobs.py | 42 ---------------------- 3 files changed, 2 insertions(+), 171 deletions(-) diff --git a/datajoint/autopopulate.py b/datajoint/autopopulate.py index 5b0d8a985..951ee4100 100644 --- a/datajoint/autopopulate.py +++ b/datajoint/autopopulate.py @@ -6,7 +6,7 @@ import inspect from tqdm import tqdm from .hash import key_hash -from .expression import QueryExpression, AndList, _SQLExpression +from .expression import QueryExpression, AndList from .errors import DataJointError, LostConnectionError from .settings import config from .utils import user_choice, to_camel_case @@ -90,18 +90,6 @@ def _rename_attributes(table, props): for q in parents[1:]: self._key_source *= _rename_attributes(*q) - jobs_config = self.connection.schemas[self.target.database].jobs_config - key_source_query = jobs_config & {"table_name": self.target.table_name} - if key_source_query: - key_source_uuid = key_hash({"sql": self._key_source.make_sql()}) - if not (key_source_query & {"key_source_uuid": key_source_uuid}): - # different key_source stored in jobs_config - return _SQLExpression( - self._key_source.proj(), key_source_query.fetch1("key_source") - ) - else: - self.register_key_source(self._key_source.proj().make_sql()) - return self._key_source def make(self, key): @@ -146,7 +134,7 @@ def _jobs_to_do(self, restrictions): if inspect.isclass(todo) and issubclass(todo, QueryExpression): todo = todo() - if not isinstance(todo, (QueryExpression, _SQLExpression)): + if not isinstance(todo, QueryExpression): raise DataJointError("Invalid key_source value") try: @@ -403,66 +391,12 @@ def _Jobs(self): def jobs(self): return self._Jobs & {"table_name": self.target.table_name} - def register_key_source(self, sql=None, safemode=None): - key_source_sql = sql or self.key_source.proj().make_sql() - key_source_uuid = key_hash({"sql": key_source_sql}) - - jobs_config = self.connection.schemas[self.target.database].jobs_config - - entry = { - "table_name": self.target.table_name, - "key_source": key_source_sql, - "key_source_uuid": key_source_uuid, - "last_refresh_time": datetime.datetime.utcnow(), - } - - if jobs_config & {"table_name": self.target.table_name}: - if jobs_config & { - "table_name": self.target.table_name, - "key_source_uuid": key_source_uuid, - }: - return - - safemode = config["safemode"] if safemode is None else safemode - if ( - not safemode - or user_choice( - f"Modified key_source for table `{to_camel_case(self.target.table_name)}`, Re-register?", - default="no", - ) - == "yes" - ): - jobs_config.insert1(entry, replace=True) - else: - jobs_config.insert1(entry) - def schedule_jobs(self, purge_invalid_jobs=True): """ Schedule new jobs for this autopopulate table :param purge_invalid_jobs: if True, remove invalid entry from the jobs table (potentially expensive operation) :return: """ - jobs_config = self.connection.schemas[self.target.database].jobs_config - key_source_query = jobs_config & {"table_name": self.target.table_name} - if not key_source_query: - self.register_key_source() - - if key_source_query.fetch1("refresh_reserved"): - # scheduling is ongoing - return - - last_refresh_time, refresh_rate = key_source_query.fetch1( - "last_refresh_time", "refresh_rate" - ) - new_refresh_time = last_refresh_time + datetime.timedelta(seconds=refresh_rate) - if datetime.datetime.utcnow() < new_refresh_time: - # not yet time to refresh jobs - return - - # add new scheduled jobs to JobTable - jobs_config.update1( - {"table_name": self.target.table_name, "refresh_reserved": 1} - ) try: with self.connection.transaction: schedule_count = 0 @@ -477,9 +411,6 @@ def schedule_jobs(self, purge_invalid_jobs=True): finally: if purge_invalid_jobs: self.purge_invalid_jobs() - jobs_config.update1( - {"table_name": self.target.table_name, "refresh_reserved": 0} - ) def purge_invalid_jobs(self): """ diff --git a/datajoint/expression.py b/datajoint/expression.py index df8f39c33..25dd2fe40 100644 --- a/datajoint/expression.py +++ b/datajoint/expression.py @@ -931,61 +931,3 @@ def aggr(self, group, **named_attributes): ) aggregate = aggr # alias for aggr - - -class _SQLExpression: - def __init__(self, expression, sql): - self._expression = expression - self.make_sql = lambda: sql - self._expression.cursor = self.cursor - - self._select, self._from_where = re.match( - r"SELECT (.*) FROM (.*)", sql - ).groups() - - assert set(self._expression.heading.names) == set( - v.strip("`") for v in self._select.split(",") - ) - - def cursor(self, offset=0, limit=None, order_by=None, as_dict=False): - """ - See expression.fetch() for input description. - :return: query cursor - """ - if offset and limit is None: - raise DataJointError("limit is required when offset is set") - sql = self.make_sql() - if order_by is not None: - sql += " ORDER BY " + ", ".join(order_by) - if limit is not None: - sql += " LIMIT %d" % limit + (" OFFSET %d" % offset if offset else "") - logger.debug(sql) - return self._expression.connection.query(sql, as_dict=as_dict) - - @property - def heading(self): - return self._expression.heading - - @property - def fetch1(self): - return Fetch1(self._expression) - - @property - def fetch(self): - return Fetch(self._expression) - - def __len__(self): - """:return: number of elements in the result set e.g. ``len(q1)``.""" - return self._expression.connection.query( - "SELECT {select_} FROM {from_where}".format( - select_=f"count(DISTINCT {self._select})", - from_where=self._from_where, - ) - ).fetchone()[0] - - def __bool__(self): - """ - :return: True if the result is not empty. Equivalent to len(self) > 0 but often - faster e.g. ``bool(q1)``. - """ - return bool(self._expression.connection.query(self.make_sql()).fetchone()[0]) diff --git a/datajoint/jobs.py b/datajoint/jobs.py index e88f07702..195606b40 100644 --- a/datajoint/jobs.py +++ b/datajoint/jobs.py @@ -233,45 +233,3 @@ def error(self, table_name, key, error_message, error_stack=None): replace=True, ignore_extra_fields=True, ) - - -class JobConfigTable(Table): - """ - A base table with no definition. Allows job configuration for imported/computed tables - """ - - def __init__(self, conn, database): - self.database = database - self._connection = conn - self._heading = Heading( - table_info=dict( - conn=conn, database=database, table_name=self.table_name, context=None - ) - ) - self._support = [self.full_table_name] - - self._definition = """ # job configuration table for `{database}` - table_name :varchar(255) # className of the table - --- - key_source_uuid: UUID # hash of the key_source - key_source :mediumblob # sql statement for the key_source of the table - from make_sql() - unique index (table_name, key_source_uuid) - unique index (key_source_uuid) - refresh_rate=1: int unsigned # (second) how often should the jobs for the table be refreshed - refresh_reserved=0: bool # is the jobs for the table currently being refreshed - last_refresh_time: datetime # timestamp (UTC) of the last refresh time for the table - additional_config=null: longblob # (TODO: change to json) any additional configuration (e.g. retries, docker image, etc.) - """.format( - database=database - ) - if not self.is_declared: - self.declare() - self._user = self.connection.get_user() - - @property - def definition(self): - return self._definition - - @property - def table_name(self): - return "~jobs_config" From 71b0696a44fef21e3c2ace986ebd74d51715e8cd Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Thu, 30 Mar 2023 11:22:42 -0500 Subject: [PATCH 07/37] bugfix, add tests --- datajoint/autopopulate.py | 30 ++++++++++++----- datajoint/jobs.py | 24 ++++++++------ datajoint/schemas.py | 2 +- tests/test_autopopulate.py | 68 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 104 insertions(+), 20 deletions(-) diff --git a/datajoint/autopopulate.py b/datajoint/autopopulate.py index 951ee4100..f56e6c8f1 100644 --- a/datajoint/autopopulate.py +++ b/datajoint/autopopulate.py @@ -41,7 +41,9 @@ def _call_populate1(key): :return: key, error if error, otherwise None """ process = mp.current_process() - return process.table._populate1(key, process.jobs, **process.populate_kwargs) + return process.table._populate1( + key, process.reserve_jobs, **process.populate_kwargs + ) class AutoPopulate: @@ -208,13 +210,18 @@ def handler(signum, frame): old_handler = signal.signal(signal.SIGTERM, handler) if schedule_jobs: - self.schedule_jobs() + self.schedule_jobs(*restrictions) keys = ( self._Jobs & {"table_name": self.target.table_name} & 'status = "scheduled"' - ).fetch("KEY", limit=limit) + ).fetch("key", limit=limit) + + if restrictions: + # hitting the `key_source` again to apply the restrictions + # this is expensive/suboptimal + keys = (self._jobs_to_do(restrictions) & keys).fetch("KEY", limit=limit) else: keys = (self._jobs_to_do(restrictions) - self.target).fetch( "KEY", limit=limit @@ -338,6 +345,9 @@ def _populate1( self._job_key(key), error_message=error_message, error_stack=traceback.format_exc(), + run_duration=( + datetime.datetime.utcnow() - make_start + ).total_seconds(), ) if not suppress_errors or isinstance(error, SystemExit): raise @@ -391,16 +401,18 @@ def _Jobs(self): def jobs(self): return self._Jobs & {"table_name": self.target.table_name} - def schedule_jobs(self, purge_invalid_jobs=True): + def schedule_jobs(self, *restrictions, purge_invalid_jobs=True): """ Schedule new jobs for this autopopulate table + :param restrictions: a list of restrictions each restrict + (table.key_source - target.proj()) :param purge_invalid_jobs: if True, remove invalid entry from the jobs table (potentially expensive operation) :return: """ try: with self.connection.transaction: schedule_count = 0 - for key in (self._jobs_to_do({}) - self.target).fetch("KEY"): + for key in (self._jobs_to_do(restrictions) - self.target).fetch("KEY"): schedule_count += self._Jobs.schedule(self.target.table_name, key) except Exception as e: logger.exception(str(e)) @@ -425,11 +437,11 @@ def purge_invalid_jobs(self): invalid_count = len(jobs_query) - len(self._jobs_to_do({})) invalid_removed = 0 if invalid_count > 0: - for key, job_key in jobs_query.fetch("KEY", "key"): + for key, job_key in zip(*jobs_query.fetch("KEY", "key")): if not (self._jobs_to_do({}) & job_key): (jobs_query & key).delete() invalid_removed += 1 - logger.info( - f"{invalid_removed}/{invalid_count} invalid jobs removed for `{to_camel_case(self.target.table_name)}`" - ) + logger.info( + f"{invalid_removed}/{invalid_count} invalid jobs removed for `{to_camel_case(self.target.table_name)}`" + ) diff --git a/datajoint/jobs.py b/datajoint/jobs.py index 195606b40..89da0f009 100644 --- a/datajoint/jobs.py +++ b/datajoint/jobs.py @@ -79,9 +79,9 @@ def schedule(self, table_name, key, seconds_delay=0, force=False): job_key = dict(table_name=table_name, key_hash=key_hash(key)) if self & job_key: current_status = (self & job_key).fetch1("status") - if current_status in ("scheduled", "reserved", "success"): - return True - if current_status in ("error", "ignore") and not force: + if current_status in ("scheduled", "reserved", "success") or ( + current_status in ("error", "ignore") and not force + ): return False job = dict( @@ -109,9 +109,14 @@ def reserve(self, table_name, key): :param key: the dict of the job's primary key :return: True if reserved job successfully. False = the jobs is already taken """ + job_key = dict(table_name=table_name, key_hash=key_hash(key)) + if self & job_key: + current_status = (self & job_key).fetch1("status") + if current_status != "scheduled": + return False + job = dict( - table_name=table_name, - key_hash=key_hash(key), + job_key, status="reserved", host=platform.node(), pid=os.getpid(), @@ -120,11 +125,10 @@ def reserve(self, table_name, key): user=self._user, timestamp=datetime.datetime.utcnow(), ) - try: - with config(enable_python_native_blobs=True): - self.insert1(job, ignore_extra_fields=True) - except DuplicateError: - return False + + with config(enable_python_native_blobs=True): + self.insert1(job, replace=True, ignore_extra_fields=True) + return True def ignore(self, table_name, key, message=""): diff --git a/datajoint/schemas.py b/datajoint/schemas.py index bddc119b7..42c288e1c 100644 --- a/datajoint/schemas.py +++ b/datajoint/schemas.py @@ -8,7 +8,7 @@ from .diagram import Diagram, _get_tier from .settings import config from .errors import DataJointError, AccessError -from .jobs import JobTable, JobConfigTable +from .jobs import JobTable from .external import ExternalMapping from .heading import Heading from .utils import user_choice, to_camel_case diff --git a/tests/test_autopopulate.py b/tests/test_autopopulate.py index 82503a596..d4646e643 100644 --- a/tests/test_autopopulate.py +++ b/tests/test_autopopulate.py @@ -24,6 +24,7 @@ def tearDown(self): self.trial.Condition.delete_quick() self.trial.delete_quick() self.experiment.delete_quick() + schema.schema.jobs.delete_quick() def test_populate(self): # test simple populate @@ -70,6 +71,73 @@ def test_populate_with_success_count(self): ) assert_equal(len(self.trial.key_source & self.trial), success_count) + def test_schedule_jobs(self): + assert_true(self.subject, "root tables are empty") + assert_false(self.experiment, "table already filled?") + # test schedule jobs + self.experiment.schedule_jobs() + assert_true( + len( + schema.schema.jobs + & {"table_name": self.experiment.table_name, "status": "scheduled"} + ) + == len(self.experiment.key_source), + "failed scheduling jobs", + ) + # test executing jobs + self.experiment.populate(reserve_jobs=True, schedule_jobs=False) + assert_true( + len( + schema.schema.jobs + & {"table_name": self.experiment.table_name, "status": "success"} + ) + == len(self.experiment.key_source), + "failed executing jobs", + ) + # test schedule and execute jobs with restriction + restriction = self.subject.proj(animal="subject_id").fetch("KEY")[0] + self.trial.schedule_jobs(restriction) + assert_true( + len( + schema.schema.jobs + & {"table_name": self.trial.table_name, "status": "scheduled"} + ) + == len(self.trial.key_source & restriction), + "failed scheduling jobs", + ) + self.trial.schedule_jobs() + assert_true( + len( + schema.schema.jobs + & {"table_name": self.trial.table_name, "status": "scheduled"} + ) + == len(self.trial.key_source), + "failed scheduling jobs", + ) + self.trial.populate(restriction, reserve_jobs=True, schedule_jobs=False) + assert_equal( + len(self.trial.key_source & self.trial), + len(self.trial.key_source & restriction), + ) + assert_equal( + len(self.trial.key_source - self.trial), + len(self.trial.key_source - restriction), + ) + self.trial.populate(reserve_jobs=True, schedule_jobs=False) + assert_equal( + len(self.trial.key_source & self.trial), len(self.trial.key_source) + ) + # test purge invalid jobs + restriction["subject_id"] = restriction.pop("animal") + with dj.config(safemode=False): + (self.experiment & restriction).delete() + self.trial.purge_invalid_jobs() + assert_true( + len(schema.schema.jobs & {"table_name": self.trial.table_name}) + == len(self.trial.key_source), + "failed purging invalid jobs", + ) + def test_populate_exclude_error_and_ignore_jobs(self): # test simple populate assert_true(self.subject, "root tables are empty") From c13b3e1c129120dbd5e11d33567202f5ee72946e Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Thu, 30 Mar 2023 11:58:02 -0500 Subject: [PATCH 08/37] bugfix - remove `jobconfig` --- datajoint/autopopulate.py | 2 +- datajoint/schemas.py | 13 ------------- 2 files changed, 1 insertion(+), 14 deletions(-) diff --git a/datajoint/autopopulate.py b/datajoint/autopopulate.py index f56e6c8f1..6986c7613 100644 --- a/datajoint/autopopulate.py +++ b/datajoint/autopopulate.py @@ -428,7 +428,7 @@ def purge_invalid_jobs(self): """ Check and remove any invalid/outdated jobs in the JobTable for this autopopulate table Job keys that are in the JobTable (regardless of status) but are no longer in the `key_source` - (e.g. jobs added but upstream table(s) got deleted) + (e.g. jobs added but entries in upstream table(s) got deleted) This is potentially a time-consuming process - but should not expect to have to run very often """ diff --git a/datajoint/schemas.py b/datajoint/schemas.py index 42c288e1c..d217c7b2b 100644 --- a/datajoint/schemas.py +++ b/datajoint/schemas.py @@ -71,7 +71,6 @@ def __init__( self.create_schema = create_schema self.create_tables = create_tables self._jobs = None - self._jobs_config = None self.external = ExternalMapping(self) self.add_objects = add_objects self.declare_list = [] @@ -402,18 +401,6 @@ def jobs(self): self._jobs = JobTable(self.connection, self.database) return self._jobs - @property - def jobs_config(self): - """ - schema.jobs provides a view of the job reservation table for the schema - - :return: jobs table - """ - self._assert_exists() - if self._jobs_config is None: - self._jobs_config = JobConfigTable(self.connection, self.database) - return self._jobs_config - @property def code(self): self._assert_exists() From 0d9ec01fa36cde448575bb952183bdf812efbeb5 Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Wed, 28 May 2025 15:02:16 -0500 Subject: [PATCH 09/37] chore: minor bugfix --- datajoint/autopopulate.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datajoint/autopopulate.py b/datajoint/autopopulate.py index cc8c0c5eb..3c4bd6ef9 100644 --- a/datajoint/autopopulate.py +++ b/datajoint/autopopulate.py @@ -235,7 +235,7 @@ def handler(signum, frame): # exclude "error", "ignore" or "reserved" jobs if reserve_jobs: exclude_key_hashes = ( - jobs + self._Jobs & {"table_name": self.target.table_name} & 'status in ("error", "ignore", "reserved")' ).fetch("key_hash") From 4cc170d22f51c0a40a9cf491bffabfefb23c2ffb Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Wed, 28 May 2025 16:03:56 -0500 Subject: [PATCH 10/37] chore: code cleanup --- datajoint/autopopulate.py | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/datajoint/autopopulate.py b/datajoint/autopopulate.py index 3c4bd6ef9..ef5fbdf5c 100644 --- a/datajoint/autopopulate.py +++ b/datajoint/autopopulate.py @@ -206,37 +206,37 @@ def populate( raise DataJointError( "The order argument must be one of %s" % str(valid_order) ) + + if schedule_jobs: + self.schedule_jobs(*restrictions) + # define and set up signal handler for SIGTERM: if reserve_jobs: def handler(signum, frame): logger.info("Populate terminated by SIGTERM") raise SystemExit("SIGTERM received") - old_handler = signal.signal(signal.SIGTERM, handler) - if schedule_jobs: - self.schedule_jobs(*restrictions) - - keys = ( - self._Jobs - & {"table_name": self.target.table_name} - & 'status = "scheduled"' - ).fetch("key", limit=limit) - - if restrictions: - # hitting the `key_source` again to apply the restrictions - # this is expensive/suboptimal - keys = (self._jobs_to_do(restrictions) & keys).fetch("KEY", limit=limit) - else: - if keys is None: + # retrieve `keys` if not provided + if keys is None: + if reserve_jobs: + keys = ( + self.jobs + & {'status': 'scheduled'} + ).fetch("key", order_by="timestamp", limit=limit) + if restrictions: + # hitting the `key_source` again to apply the restrictions + # this is expensive/suboptimal + keys = (self._jobs_to_do(restrictions) & keys).fetch("KEY") + else: keys = (self._jobs_to_do(restrictions) - self.target).fetch( "KEY", limit=limit ) + else: # exclude "error", "ignore" or "reserved" jobs if reserve_jobs: exclude_key_hashes = ( - self._Jobs - & {"table_name": self.target.table_name} + self.jobs & 'status in ("error", "ignore", "reserved")' ).fetch("key_hash") keys = [key for key in keys if key_hash(key) not in exclude_key_hashes] From b7e4d9bd80fa5a26bfa6ae043f6649d6e057d5cd Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Wed, 28 May 2025 16:04:15 -0500 Subject: [PATCH 11/37] fix: `key` attribute of type `JSON` --- datajoint/jobs.py | 48 ++++++++++++++++++++++++++++++++++------------- 1 file changed, 35 insertions(+), 13 deletions(-) diff --git a/datajoint/jobs.py b/datajoint/jobs.py index 73a7ac404..1c4b472c0 100644 --- a/datajoint/jobs.py +++ b/datajoint/jobs.py @@ -1,6 +1,8 @@ import os import datetime import platform +import json +from typing import Dict, Any, Union from .errors import DuplicateError from .hash import key_hash @@ -32,18 +34,19 @@ def __init__(self, conn, database): key_hash :char(32) # key hash --- status :enum('reserved','error','ignore','scheduled','success') - key=null :blob # structure containing the key + key=null :json # structure containing the key for querying error_message="" :varchar({error_message_length}) # error message returned if failed error_stack=null :mediumblob # error stack if failed user="" :varchar(255) # database user host="" :varchar(255) # system hostname pid=0 :int unsigned # system process id connection_id = 0 : bigint unsigned # connection_id() - timestamp :timestamp # the scheduled time (UTC) for the job to run at or after + timestamp :timestamp # timestamp of the job status change or scheduled time run_duration=null : float # run duration in seconds run_version="" : varchar(255) # some string representation of the code/env version of a run (e.g. git commit hash) index(table_name, status) index(status) + index(timestamp) # for ordering jobs """.format( database=database, error_message_length=ERROR_MESSAGE_LENGTH ) @@ -69,13 +72,24 @@ def drop(self): def schedule(self, table_name, key, seconds_delay=0, force=False): """ - Schedule a job for computation. + Schedule a job for computation in the DataJoint pipeline. - :param table_name: `database`.`table_name` - :param key: the dict of the job's primary key - :param seconds_delay: add time delay (in second) in scheduling this job - :param force: force scheduling this job (even if it is in error/ignore status) - :return: True if schedule job successfully. False = the jobs already exists with a different status + This method manages job scheduling with the following key behaviors: + 1. Creates a new job entry if one doesn't exist + 2. Updates existing jobs based on their current status: + - Allows rescheduling if job is in error/ignore status and force=True + - Prevents rescheduling if job is already scheduled/reserved/success + 3. Records job metadata including host, process ID, and user info + 4. Supports delayed execution through seconds_delay parameter + + Args: + table_name: Full table name in format `database`.`table_name` + key: Dictionary containing the job's primary key + seconds_delay: Optional delay in seconds before job execution (default: 0) + force: If True, allows rescheduling jobs in error/ignore status (default: False) + + Returns: + bool: True if job was successfully scheduled, False if job already exists with incompatible status """ job_key = dict(table_name=table_name, key_hash=key_hash(key)) if self & job_key: @@ -91,7 +105,7 @@ def schedule(self, table_name, key, seconds_delay=0, force=False): host=platform.node(), pid=os.getpid(), connection_id=self.connection.connection_id, - key=key, + key=_jsonify(key), user=self._user, timestamp=datetime.datetime.utcnow() + datetime.timedelta(seconds=seconds_delay), @@ -122,7 +136,7 @@ def reserve(self, table_name, key): host=platform.node(), pid=os.getpid(), connection_id=self.connection.connection_id, - key=key, + key=_jsonify(key), user=self._user, timestamp=datetime.datetime.utcnow(), ) @@ -160,7 +174,7 @@ def ignore(self, table_name, key, message=""): host=platform.node(), pid=os.getpid(), connection_id=self.connection.connection_id, - key=key, + key=_jsonify(key), error_message=message, user=self._user, timestamp=datetime.datetime.utcnow(), @@ -196,7 +210,7 @@ def complete(self, table_name, key, run_duration=None, run_version=""): pid=os.getpid(), connection_id=self.connection.connection_id, user=self._user, - key=key, + key=_jsonify(key), run_duration=run_duration, run_version=run_version, timestamp=datetime.datetime.utcnow(), @@ -230,7 +244,7 @@ def error(self, table_name, key, error_message, error_stack=None): pid=os.getpid(), connection_id=self.connection.connection_id, user=self._user, - key=key, + key=_jsonify(key), error_message=error_message, error_stack=error_stack, timestamp=datetime.datetime.utcnow(), @@ -238,3 +252,11 @@ def error(self, table_name, key, error_message, error_stack=None): replace=True, ignore_extra_fields=True, ) + + +def _jsonify(key: Dict[str, Any]) -> Dict[str, Any]: + """ + Ensure the key is JSON serializable by converting to JSON and back. + Uses str() as fallback for any non-serializable objects. + """ + return json.loads(json.dumps(key, default=str)) From 57c724713cdf4edeff5b17262d3167b0199949a8 Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Wed, 28 May 2025 18:02:00 -0500 Subject: [PATCH 12/37] feat: prevent excessive scheduling with `min_scheduling_interval` --- datajoint/autopopulate.py | 43 +++++++++++++++++++++++++++++++++------ datajoint/settings.py | 2 ++ 2 files changed, 39 insertions(+), 6 deletions(-) diff --git a/datajoint/autopopulate.py b/datajoint/autopopulate.py index ef5fbdf5c..43832b270 100644 --- a/datajoint/autopopulate.py +++ b/datajoint/autopopulate.py @@ -448,14 +448,45 @@ def _Jobs(self): def jobs(self): return self._Jobs & {"table_name": self.target.table_name} - def schedule_jobs(self, *restrictions, purge_invalid_jobs=True): + def schedule_jobs(self, *restrictions, purge_invalid_jobs=True, min_scheduling_interval=None): """ - Schedule new jobs for this autopopulate table - :param restrictions: a list of restrictions each restrict - (table.key_source - target.proj()) - :param purge_invalid_jobs: if True, remove invalid entry from the jobs table (potentially expensive operation) - :return: + Schedule new jobs for this autopopulate table by finding keys that need computation. + + This method implements an optimization strategy to avoid excessive scheduling: + 1. First checks if any jobs were scheduled recently (within min_scheduling_interval) + 2. If recent jobs exist, skips scheduling to prevent database load + 3. Otherwise, finds keys that need computation and schedules them + + The method also optionally purges invalid jobs (jobs that no longer exist in key_source) + to maintain database cleanliness. + + Args: + restrictions: a list of restrictions each restrict (table.key_source - target.proj()) + purge_invalid_jobs: if True, remove invalid entry from the jobs table (potentially expensive operation) + min_scheduling_interval: minimum time in seconds that must have passed since last job scheduling. + If None, uses the value from dj.config["min_scheduling_interval"] (default: None) + + Returns: + None """ + if min_scheduling_interval is None: + min_scheduling_interval = config["min_scheduling_interval"] + + # First check if we have any recent jobs + if min_scheduling_interval > 0: + recent_jobs = len( + self.jobs + & {"status": "scheduled"} + & f"timestamp <= UTC_TIMESTAMP()" # Only consider jobs up to current UTC time + & f"timestamp >= DATE_SUB(UTC_TIMESTAMP(), INTERVAL {min_scheduling_interval} SECOND)" + ) + if recent_jobs > 0: + logger.debug( + f"Skipping job scheduling for `{to_camel_case(self.target.table_name)}` - " + f"found {recent_jobs} jobs created within last {min_scheduling_interval} seconds" + ) + return + try: with self.connection.transaction: schedule_count = 0 diff --git a/datajoint/settings.py b/datajoint/settings.py index 30b206f99..162ba188a 100644 --- a/datajoint/settings.py +++ b/datajoint/settings.py @@ -51,6 +51,8 @@ "add_hidden_timestamp": False, # file size limit for when to disable checksums "filepath_checksum_size_limit": None, + # minimum time in seconds between job scheduling operations + "min_scheduling_interval": 5, } ) From 3f5247b326503d2d560ad8f177fcc7c319e3e9ba Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Thu, 29 May 2025 12:18:58 -0500 Subject: [PATCH 13/37] chore: minor cleanup --- datajoint/autopopulate.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datajoint/autopopulate.py b/datajoint/autopopulate.py index 43832b270..209ab52e6 100644 --- a/datajoint/autopopulate.py +++ b/datajoint/autopopulate.py @@ -208,7 +208,7 @@ def populate( ) if schedule_jobs: - self.schedule_jobs(*restrictions) + self.schedule_jobs(*restrictions, purge_invalid_jobs=False) # define and set up signal handler for SIGTERM: if reserve_jobs: From 45b565836563ca1f27df103af4c59eb40fb36909 Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Thu, 29 May 2025 13:37:18 -0500 Subject: [PATCH 14/37] feat: improve logic to prevent excessive scheduling --- datajoint/autopopulate.py | 35 ++++++++++++++++++----------------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/datajoint/autopopulate.py b/datajoint/autopopulate.py index 209ab52e6..a13d5f49a 100644 --- a/datajoint/autopopulate.py +++ b/datajoint/autopopulate.py @@ -453,8 +453,8 @@ def schedule_jobs(self, *restrictions, purge_invalid_jobs=True, min_scheduling_i Schedule new jobs for this autopopulate table by finding keys that need computation. This method implements an optimization strategy to avoid excessive scheduling: - 1. First checks if any jobs were scheduled recently (within min_scheduling_interval) - 2. If recent jobs exist, skips scheduling to prevent database load + 1. First checks if jobs were scheduled recently (within min_scheduling_interval) + 2. If recent scheduling event exists, skips scheduling to prevent database load 3. Otherwise, finds keys that need computation and schedules them The method also optionally purges invalid jobs (jobs that no longer exist in key_source) @@ -469,22 +469,23 @@ def schedule_jobs(self, *restrictions, purge_invalid_jobs=True, min_scheduling_i Returns: None """ + __scheduled_event = { + "table_name": self.target.table_name, + "__type__": "jobs scheduling event" + } + if min_scheduling_interval is None: min_scheduling_interval = config["min_scheduling_interval"] - # First check if we have any recent jobs if min_scheduling_interval > 0: - recent_jobs = len( - self.jobs - & {"status": "scheduled"} - & f"timestamp <= UTC_TIMESTAMP()" # Only consider jobs up to current UTC time + recent_scheduling_event = ( + self._Jobs + & {"table_name": f"__{self.target.table_name}__"} + & {"key_hash": key_hash(__scheduled_event)} & f"timestamp >= DATE_SUB(UTC_TIMESTAMP(), INTERVAL {min_scheduling_interval} SECOND)" ) - if recent_jobs > 0: - logger.debug( - f"Skipping job scheduling for `{to_camel_case(self.target.table_name)}` - " - f"found {recent_jobs} jobs created within last {min_scheduling_interval} seconds" - ) + if recent_scheduling_event: + logger.debug(f"Skipping jobs scheduling for `{to_camel_case(self.target.table_name)}` (most recent scheduling event was within {min_scheduling_interval} seconds)") return try: @@ -495,6 +496,8 @@ def schedule_jobs(self, *restrictions, purge_invalid_jobs=True, min_scheduling_i except Exception as e: logger.exception(str(e)) else: + self._Jobs.ignore(f"__{self.target.table_name}__", __scheduled_event, + message=f"Jobs scheduling event: {__scheduled_event['table_name']}") logger.info( f"{schedule_count} new jobs scheduled for `{to_camel_case(self.target.table_name)}`" ) @@ -510,14 +513,12 @@ def purge_invalid_jobs(self): This is potentially a time-consuming process - but should not expect to have to run very often """ - jobs_query = self._Jobs & {"table_name": self.target.table_name} - - invalid_count = len(jobs_query) - len(self._jobs_to_do({})) + invalid_count = len(self.jobs) - len(self._jobs_to_do({})) invalid_removed = 0 if invalid_count > 0: - for key, job_key in zip(*jobs_query.fetch("KEY", "key")): + for key, job_key in zip(*self.jobs.fetch("KEY", "key")): if not (self._jobs_to_do({}) & job_key): - (jobs_query & key).delete() + (self.jobs & key).delete() invalid_removed += 1 logger.info( From 9903e02b09aca651c60c48cb3d8b38542888a94d Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Thu, 29 May 2025 13:47:05 -0500 Subject: [PATCH 15/37] chore: minor bugfix --- datajoint/autopopulate.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datajoint/autopopulate.py b/datajoint/autopopulate.py index a13d5f49a..086607cbb 100644 --- a/datajoint/autopopulate.py +++ b/datajoint/autopopulate.py @@ -479,13 +479,13 @@ def schedule_jobs(self, *restrictions, purge_invalid_jobs=True, min_scheduling_i if min_scheduling_interval > 0: recent_scheduling_event = ( - self._Jobs + self._Jobs.proj(last_scheduled="TIMESTAMPDIFF(SECOND, timestamp, UTC_TIMESTAMP())") & {"table_name": f"__{self.target.table_name}__"} & {"key_hash": key_hash(__scheduled_event)} - & f"timestamp >= DATE_SUB(UTC_TIMESTAMP(), INTERVAL {min_scheduling_interval} SECOND)" + & f"last_scheduled >= {min_scheduling_interval}" ) if recent_scheduling_event: - logger.debug(f"Skipping jobs scheduling for `{to_camel_case(self.target.table_name)}` (most recent scheduling event was within {min_scheduling_interval} seconds)") + logger.debug(f"Skipping jobs scheduling for `{to_camel_case(self.target.table_name)}` (last scheduling event was {recent_scheduling_event.fetch1('last_scheduled')} seconds ago)") return try: From 872c5dc265b630f1433828a8a3f23ba5261fb2ab Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Thu, 29 May 2025 13:49:28 -0500 Subject: [PATCH 16/37] chore: tiny bugfix --- datajoint/autopopulate.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datajoint/autopopulate.py b/datajoint/autopopulate.py index 086607cbb..edff635d2 100644 --- a/datajoint/autopopulate.py +++ b/datajoint/autopopulate.py @@ -482,7 +482,7 @@ def schedule_jobs(self, *restrictions, purge_invalid_jobs=True, min_scheduling_i self._Jobs.proj(last_scheduled="TIMESTAMPDIFF(SECOND, timestamp, UTC_TIMESTAMP())") & {"table_name": f"__{self.target.table_name}__"} & {"key_hash": key_hash(__scheduled_event)} - & f"last_scheduled >= {min_scheduling_interval}" + & f"last_scheduled <= {min_scheduling_interval}" ) if recent_scheduling_event: logger.debug(f"Skipping jobs scheduling for `{to_camel_case(self.target.table_name)}` (last scheduling event was {recent_scheduling_event.fetch1('last_scheduled')} seconds ago)") From 69d883110bb146f50702967b734b56b4a0ba8e8e Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Thu, 29 May 2025 13:55:07 -0500 Subject: [PATCH 17/37] fix: fix scheduling logic --- datajoint/jobs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datajoint/jobs.py b/datajoint/jobs.py index 1c4b472c0..7f221710f 100644 --- a/datajoint/jobs.py +++ b/datajoint/jobs.py @@ -94,7 +94,7 @@ def schedule(self, table_name, key, seconds_delay=0, force=False): job_key = dict(table_name=table_name, key_hash=key_hash(key)) if self & job_key: current_status = (self & job_key).fetch1("status") - if current_status in ("scheduled", "reserved", "success") or ( + if current_status in ("scheduled", "reserved") or ( current_status in ("error", "ignore") and not force ): return False From cc0f3983506ec94e42e76fa940f59f2e52d74958 Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Thu, 29 May 2025 13:58:33 -0500 Subject: [PATCH 18/37] chore: minor logging tweak --- datajoint/autopopulate.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datajoint/autopopulate.py b/datajoint/autopopulate.py index edff635d2..cd46ab154 100644 --- a/datajoint/autopopulate.py +++ b/datajoint/autopopulate.py @@ -485,7 +485,7 @@ def schedule_jobs(self, *restrictions, purge_invalid_jobs=True, min_scheduling_i & f"last_scheduled <= {min_scheduling_interval}" ) if recent_scheduling_event: - logger.debug(f"Skipping jobs scheduling for `{to_camel_case(self.target.table_name)}` (last scheduling event was {recent_scheduling_event.fetch1('last_scheduled')} seconds ago)") + logger.info(f"Skip jobs scheduling for `{to_camel_case(self.target.table_name)}` (last scheduled {recent_scheduling_event.fetch1('last_scheduled')} seconds ago)") return try: From 2ac9fa2250210554a80af409d315eaac31b2794d Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Thu, 29 May 2025 14:51:28 -0500 Subject: [PATCH 19/37] fix: log run_duration in error jobs --- datajoint/jobs.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/datajoint/jobs.py b/datajoint/jobs.py index 7f221710f..a3c58d49f 100644 --- a/datajoint/jobs.py +++ b/datajoint/jobs.py @@ -219,7 +219,7 @@ def complete(self, table_name, key, run_duration=None, run_version=""): ignore_extra_fields=True, ) - def error(self, table_name, key, error_message, error_stack=None): + def error(self, table_name, key, error_message, error_stack=None, run_duration=None, run_version=""): """ Log an error message. The job reservation is replaced with an error entry. if an error occurs, leave an entry describing the problem @@ -228,6 +228,8 @@ def error(self, table_name, key, error_message, error_stack=None): :param key: the dict of the job's primary key :param error_message: string error message :param error_stack: stack trace + :param run_duration: duration in second of the job run + :param run_version: some string representation of the code/env version of a run (e.g. git commit hash) """ if len(error_message) > ERROR_MESSAGE_LENGTH: error_message = ( @@ -247,6 +249,8 @@ def error(self, table_name, key, error_message, error_stack=None): key=_jsonify(key), error_message=error_message, error_stack=error_stack, + run_duration=run_duration, + run_version=run_version, timestamp=datetime.datetime.utcnow(), ), replace=True, From db93e0aaceffdadf7741b2ed07285357dd0da0e9 Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Thu, 29 May 2025 16:52:17 -0500 Subject: [PATCH 20/37] fix: improve logic in `purge_invalid_jobs` --- datajoint/autopopulate.py | 40 +++++++++++++++++++++++++++------------ 1 file changed, 28 insertions(+), 12 deletions(-) diff --git a/datajoint/autopopulate.py b/datajoint/autopopulate.py index cd46ab154..534dec104 100644 --- a/datajoint/autopopulate.py +++ b/datajoint/autopopulate.py @@ -507,20 +507,36 @@ def schedule_jobs(self, *restrictions, purge_invalid_jobs=True, min_scheduling_i def purge_invalid_jobs(self): """ - Check and remove any invalid/outdated jobs in the JobTable for this autopopulate table - Job keys that are in the JobTable (regardless of status) but are no longer in the `key_source` - (e.g. jobs added but entries in upstream table(s) got deleted) - This is potentially a time-consuming process - but should not expect to have to run very often + Check and remove any invalid/outdated jobs in the JobTable for this autopopulate table. + + This method handles two types of invalid jobs: + 1. Jobs that are no longer in the `key_source` (e.g. entries in upstream table(s) got deleted) + 2. Jobs with "success" status that are no longer in the target table (e.g. entries in target table got deleted) + + The method is potentially time-consuming as it needs to: + - Compare all jobs against the current key_source + - For success jobs, verify their existence in the target table + - Delete any jobs that fail these checks + + This cleanup should not need to run very often, but helps maintain database consistency. """ - - invalid_count = len(self.jobs) - len(self._jobs_to_do({})) invalid_removed = 0 - if invalid_count > 0: - for key, job_key in zip(*self.jobs.fetch("KEY", "key")): - if not (self._jobs_to_do({}) & job_key): + + invalid_success = len(self.jobs & "status = 'success'") - len(self.target) + if invalid_success > 0: + for key, job_key in zip(*(self.jobs & "status = 'success'").fetch("KEY", "key")): + if not (self.target & job_key): (self.jobs & key).delete() invalid_removed += 1 - logger.info( - f"{invalid_removed}/{invalid_count} invalid jobs removed for `{to_camel_case(self.target.table_name)}`" - ) + keys2do = self._jobs_to_do({}).fetch("KEY") + invalid_incomplete = len(self.jobs & "status != 'success'") - len(keys2do) + if invalid_incomplete > 0: + for key, job_key in zip(*(self.jobs & "status != 'success'").fetch("KEY", "key")): + if job_key not in keys2do: + (self.jobs & key).delete() + invalid_removed += 1 + + logger.info( + f"{invalid_removed} invalid jobs removed for `{to_camel_case(self.target.table_name)}`" + ) From c9a575084c32d6430f07b3b2d2ae146cf2ddc92b Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Fri, 30 May 2025 13:24:20 -0500 Subject: [PATCH 21/37] docs: new `jobs_orchestration.md` docs --- docs/jobs_orchestration.md | 132 +++++++++++++++++++++++++++++++++++++ 1 file changed, 132 insertions(+) create mode 100644 docs/jobs_orchestration.md diff --git a/docs/jobs_orchestration.md b/docs/jobs_orchestration.md new file mode 100644 index 000000000..3f5cac3e2 --- /dev/null +++ b/docs/jobs_orchestration.md @@ -0,0 +1,132 @@ +# DataJoint Jobs System + +This document describes the behavior and mechanism of DataJoint's jobs reservation and execution system. + +## Jobs Table Structure + +The jobs table (`~jobs`) is a system table that tracks the state and execution of jobs in the DataJoint pipeline. It has the following key fields: + +- `table_name`: The full table name being populated +- `key_hash`: A hash of the job's primary key +- `status`: Current job status, one of: + - `scheduled`: Job is queued for execution + - `reserved`: Job is currently being processed + - `error`: Job failed with an error + - `ignore`: Job is marked to be ignored + - `success`: Job completed successfully +- `key`: JSON structure containing the job's primary key +- `error_message`: Error message if job failed +- `error_stack`: Stack trace if job failed +- `user`: Database user who created the job +- `host`: System hostname where job was created +- `pid`: Process ID of the job +- `connection_id`: Database connection ID +- `timestamp`: When the job status was last changed +- `run_duration`: How long the job took to execute (in seconds) +- `run_version`: Version information of the code/environment + +## Job Scheduling Process + +The `schedule_jobs` method implements an optimization strategy to prevent excessive scheduling: + +1. **Rate Limiting**: + - Uses `min_scheduling_interval` (configurable via `dj.config["min_scheduling_interval"]`) + - Default interval is 5 seconds + - Can be overridden per call + +2. **Scheduling Logic**: + - Checks for recent scheduling events within the interval + - Skips scheduling if recent events exist + - Otherwise, finds keys that need computation by: + 1. Querying the `key_source` to get all possible keys + 2. Excluding keys that already exist in the target table + 3. Excluding keys that are already in the jobs table with incompatible status + (i.e., `scheduled`, `reserved`, or `success`) + - Schedules each valid key as a new job + - Records scheduling events for rate limiting + +3. **Job States**: + - New jobs start as `scheduled` + - Jobs can be rescheduled if in `error` or `ignore` state (with `force=True`) + - Prevents rescheduling if job is `scheduled`, `reserved`, or `success` + +## Populate Process Flow + +The `populate()` method orchestrates the job execution process: + +1. **Initialization**: + - Optionally schedules new jobs (controlled by `schedule_jobs` parameter) + +2. **Job Selection**: + - If `reserve_jobs=True`: + - Fetches `scheduled` jobs from the jobs table + - Applies any restrictions to the job set + - Attempts to reserve each job before processing + - Skips jobs that cannot be reserved (already taken by another process) + - If `reserve_jobs=False`: + - Uses traditional direct computation approach + +3. **Execution**: + - Processes jobs in specified order (`original`, `reverse`, or `random`) + - Supports single or multi-process execution + - For reserved jobs: + - Updates job status to `reserved` during processing + - Records execution metrics (duration, version) + - Updates status to `success` or `error` on completion + - Records errors and execution metrics + +4. **Cleanup**: + - Optionally purges invalid jobs + +## Job Cleanup Process + +The `purge_invalid_jobs` method maintains database consistency by removing invalid jobs: + +1. **Invalid Success Jobs**: + - Identifies jobs marked as `success` but not present in the target table + - These typically occur when target table entries are deleted + +2. **Invalid Incomplete Jobs**: + - Identifies jobs in `scheduled`/`error`/`ignore` state that are no longer in the `key_source` + - These typically occur when upstream table entries are deleted + +3. **Cleanup Characteristics**: + - Potentially time-consuming operation + - Should not need to run frequently + - Helps maintain database consistency + +## Jobs Table Maintenance + +The "freshness" and consistency of the jobs table depends on regular maintenance through two key operations: + +1. **Scheduling Updates** (`schedule_jobs`): + - Adds new jobs to the table + - Should be run frequently enough to keep up with new data + - Rate-limited by `min_scheduling_interval` to prevent overload + - Example: Run every few minutes in a cron job for active pipelines + - Event-driven approach: `inserts` in upstream tables auto trigger this step + +2. **Cleanup** (`purge_invalid_jobs`): + - Removes invalid or outdated jobs + - Should be run periodically to maintain consistency + - More resource-intensive than scheduling + - Example: Run daily during low-activity periods + - Event-driven approach: `deletes` in upstream or target tables auto trigger this step + +The balance between these operations affects: +- How quickly new jobs are discovered and scheduled +- How long invalid jobs remain in the table +- Database size and query performance +- Overall system responsiveness + +Recommended maintenance schedule: +```python +# Example: Run scheduling frequently +dj.config["min_scheduling_interval"] = 300 # 5 minutes + +# Example: Run cleanup daily +# (implement as a cron job or scheduled task) +def daily_cleanup(): + for table in your_pipeline_tables: + table.purge_invalid_jobs() +``` \ No newline at end of file From 28df6c26d1ff3084c81bb7548ef2ef79744aa75c Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Fri, 30 May 2025 13:24:42 -0500 Subject: [PATCH 22/37] Update jobs_orchestration.md --- docs/jobs_orchestration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/jobs_orchestration.md b/docs/jobs_orchestration.md index 3f5cac3e2..2d3ad26e0 100644 --- a/docs/jobs_orchestration.md +++ b/docs/jobs_orchestration.md @@ -14,7 +14,7 @@ The jobs table (`~jobs`) is a system table that tracks the state and execution o - `error`: Job failed with an error - `ignore`: Job is marked to be ignored - `success`: Job completed successfully -- `key`: JSON structure containing the job's primary key +- `key`: JSON structure containing the job's primary key (query-able) - `error_message`: Error message if job failed - `error_stack`: Stack trace if job failed - `user`: Database user who created the job From b0308e2ade9a7e858adadd0da51a67272a74b226 Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Fri, 30 May 2025 13:25:43 -0500 Subject: [PATCH 23/37] Update jobs_orchestration.md --- docs/jobs_orchestration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/jobs_orchestration.md b/docs/jobs_orchestration.md index 2d3ad26e0..098106186 100644 --- a/docs/jobs_orchestration.md +++ b/docs/jobs_orchestration.md @@ -1,4 +1,4 @@ -# DataJoint Jobs System +# DataJoint Jobs Orchestration mechanism This document describes the behavior and mechanism of DataJoint's jobs reservation and execution system. From e9f5377fdd24f5ddb00ebf527df61f83d754fa96 Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Fri, 30 May 2025 14:18:09 -0500 Subject: [PATCH 24/37] feat: add `run_metadata` column to `jobs` table --- datajoint/jobs.py | 32 +++++++++++++++++--------------- docs/jobs_orchestration.md | 2 +- 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/datajoint/jobs.py b/datajoint/jobs.py index a3c58d49f..a90e29cbf 100644 --- a/datajoint/jobs.py +++ b/datajoint/jobs.py @@ -43,7 +43,7 @@ def __init__(self, conn, database): connection_id = 0 : bigint unsigned # connection_id() timestamp :timestamp # timestamp of the job status change or scheduled time run_duration=null : float # run duration in seconds - run_version="" : varchar(255) # some string representation of the code/env version of a run (e.g. git commit hash) + run_metadata=null :json # metadata about the run (e.g. code version, environment info) index(table_name, status) index(status) index(timestamp) # for ordering jobs @@ -185,14 +185,15 @@ def ignore(self, table_name, key, message=""): return True - def complete(self, table_name, key, run_duration=None, run_version=""): + def complete(self, table_name, key, run_duration=None, run_metadata=None): """ Log a completed job. When a job is completed, its reservation entry is deleted. - :param table_name: `database`.`table_name` - :param key: the dict of the job's primary key - :param run_duration: duration in second of the job run - :param run_version: some string representation of the code/env version of a run (e.g. git commit hash) + Args: + table_name: `database`.`table_name` + key: the dict of the job's primary key + run_duration: duration in second of the job run + run_metadata: dict containing metadata about the run (e.g. code version, environment info) """ job_key = dict(table_name=table_name, key_hash=key_hash(key)) if self & job_key: @@ -212,24 +213,25 @@ def complete(self, table_name, key, run_duration=None, run_version=""): user=self._user, key=_jsonify(key), run_duration=run_duration, - run_version=run_version, + run_metadata=_jsonify(run_metadata) if run_metadata else None, timestamp=datetime.datetime.utcnow(), ), replace=True, ignore_extra_fields=True, ) - def error(self, table_name, key, error_message, error_stack=None, run_duration=None, run_version=""): + def error(self, table_name, key, error_message, error_stack=None, run_duration=None, run_metadata=None): """ Log an error message. The job reservation is replaced with an error entry. if an error occurs, leave an entry describing the problem - :param table_name: `database`.`table_name` - :param key: the dict of the job's primary key - :param error_message: string error message - :param error_stack: stack trace - :param run_duration: duration in second of the job run - :param run_version: some string representation of the code/env version of a run (e.g. git commit hash) + Args: + table_name: `database`.`table_name` + key: the dict of the job's primary key + error_message: string error message + error_stack: stack trace + run_duration: duration in second of the job run + run_metadata: dict containing metadata about the run (e.g. code version, environment info) """ if len(error_message) > ERROR_MESSAGE_LENGTH: error_message = ( @@ -250,7 +252,7 @@ def error(self, table_name, key, error_message, error_stack=None, run_duration=N error_message=error_message, error_stack=error_stack, run_duration=run_duration, - run_version=run_version, + run_metadata=_jsonify(run_metadata) if run_metadata else None, timestamp=datetime.datetime.utcnow(), ), replace=True, diff --git a/docs/jobs_orchestration.md b/docs/jobs_orchestration.md index 098106186..df28477a8 100644 --- a/docs/jobs_orchestration.md +++ b/docs/jobs_orchestration.md @@ -23,7 +23,7 @@ The jobs table (`~jobs`) is a system table that tracks the state and execution o - `connection_id`: Database connection ID - `timestamp`: When the job status was last changed - `run_duration`: How long the job took to execute (in seconds) -- `run_version`: Version information of the code/environment +- `run_metadata`: JSON structure containing metadata about the run (e.g. code version, environment info, system state) ## Job Scheduling Process From e7c8943705b512bc04289c8e26398849be4c3fd9 Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Fri, 13 Jun 2025 13:36:05 -0500 Subject: [PATCH 25/37] fix: improve error handling when `make_fetch` referential integrity fails --- datajoint/autopopulate.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/datajoint/autopopulate.py b/datajoint/autopopulate.py index d5cabe062..c88891049 100644 --- a/datajoint/autopopulate.py +++ b/datajoint/autopopulate.py @@ -412,11 +412,10 @@ def _populate1( != deepdiff.DeepHash(fetched_data, ignore_iterable_order=False)[ fetched_data ] - ): # rollback due to referential integrity fail - self.connection.cancel_transaction() - logger.warning( - f"Referential integrity failed for {key} -> {self.target.full_table_name}") - return False + ): # raise error if fetched data has changed + raise DataJointError( + "Referential integrity failed - the `make_fetch` data has changed." + ) gen.send(computed_result) # insert except (KeyboardInterrupt, SystemExit, Exception) as error: From e55bbcb6e935ec72e3cfe8c7f6e3cbd5e023c2f4 Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Fri, 13 Jun 2025 13:38:33 -0500 Subject: [PATCH 26/37] style: black format --- datajoint/autopopulate.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datajoint/autopopulate.py b/datajoint/autopopulate.py index c88891049..1b0e6c12c 100644 --- a/datajoint/autopopulate.py +++ b/datajoint/autopopulate.py @@ -105,8 +105,8 @@ def make(self, key): The method can be implemented either as: (a) Regular method: All three steps are performed in a single database transaction. The method must return None. - (b) Generator method: - The make method is split into three functions: + (b) Generator method: + The make method is split into three functions: - `make_fetch`: Fetches data from the parent tables. - `make_compute`: Computes secondary attributes based on the fetched data. - `make_insert`: Inserts the computed data into the current table. @@ -124,7 +124,7 @@ def make(self, key): self.make_insert(key, *computed_result) commit_transaction - + Importantly, the output of make_fetch is a tuple that serves as the input into `make_compute`. The output of `make_compute` is a tuple that serves as the input into `make_insert`. From 964743efdb45f43bce9564cb625b7c890454daa6 Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Fri, 13 Jun 2025 13:40:11 -0500 Subject: [PATCH 27/37] style: format --- datajoint/autopopulate.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datajoint/autopopulate.py b/datajoint/autopopulate.py index 1b0e6c12c..461972cfa 100644 --- a/datajoint/autopopulate.py +++ b/datajoint/autopopulate.py @@ -414,7 +414,7 @@ def _populate1( ] ): # raise error if fetched data has changed raise DataJointError( - "Referential integrity failed - the `make_fetch` data has changed." + "Referential integrity failed! The `make_fetch` data has changed" ) gen.send(computed_result) # insert From 15f791c44616e64d5078b022581c430487d4cd4d Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Tue, 1 Jul 2025 11:19:08 -0500 Subject: [PATCH 28/37] feat: add `_job` hidden column for `Imported` `Computed` tables --- datajoint/declare.py | 7 +++++++ tests/test_declare.py | 29 +++++++++++++++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/datajoint/declare.py b/datajoint/declare.py index 304476798..240174bee 100644 --- a/datajoint/declare.py +++ b/datajoint/declare.py @@ -327,6 +327,13 @@ def declare(full_table_name, definition, context): for attr in metadata_attr_sql ) + # Add hidden _job column for imported and computed tables + if table_name.startswith(("_", "__")): + # This is an imported or computed table (single or double underscore prefix) + attribute_sql.append( + "`_job` json NULL COMMENT 'Hidden column for job tracking metadata'" + ) + if not primary_key: raise DataJointError("Table must have a primary key") diff --git a/tests/test_declare.py b/tests/test_declare.py index 828021939..af4d46ad4 100644 --- a/tests/test_declare.py +++ b/tests/test_declare.py @@ -427,3 +427,32 @@ def test_add_hidden_timestamp_disabled(disable_add_hidden_timestamp, schema_any) ), msg assert not any(a.is_hidden for a in Experiment().heading._attributes.values()), msg assert not any(a.is_hidden for a in Experiment().heading.attributes.values()), msg + + +def test_hidden_job_column_for_imported_computed_tables(schema_any): + """Test that hidden _job column is added to imported and computed tables but not manual/lookup tables""" + + # Manual and Lookup tables should NOT have _job column + manual_attrs = Image().heading._attributes + lookup_attrs = Subject().heading._attributes + + assert not any(a.name == "_job" for a in manual_attrs.values()), "Manual table should not have _job column" + assert not any(a.name == "_job" for a in lookup_attrs.values()), "Lookup table should not have _job column" + + # Imported and Computed tables SHOULD have _job column + imported_attrs = Experiment().heading._attributes + computed_attrs = SigIntTable().heading._attributes + + assert any(a.name == "_job" for a in imported_attrs.values()), "Imported table should have _job column" + assert any(a.name == "_job" for a in computed_attrs.values()), "Computed table should have _job column" + + # Verify the _job column is hidden and has correct type + imported_job_attr = next(a for a in imported_attrs.values() if a.name == "_job") + computed_job_attr = next(a for a in computed_attrs.values() if a.name == "_job") + + assert imported_job_attr.is_hidden, "_job column should be hidden" + assert computed_job_attr.is_hidden, "_job column should be hidden" + assert "json" in imported_job_attr.sql.lower(), "_job column should be JSON type" + assert "json" in computed_job_attr.sql.lower(), "_job column should be JSON type" + assert "null" in imported_job_attr.sql.lower(), "_job column should be nullable" + assert "null" in computed_job_attr.sql.lower(), "_job column should be nullable" From 18727e9aaf3977e55aa52382e46c06ec5ed5298e Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Tue, 1 Jul 2025 11:20:19 -0500 Subject: [PATCH 29/37] chore: rename `purge_valid_jobs` -> `purge_jobs` --- datajoint/autopopulate.py | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/datajoint/autopopulate.py b/datajoint/autopopulate.py index 7aec57249..1889c8fb3 100644 --- a/datajoint/autopopulate.py +++ b/datajoint/autopopulate.py @@ -7,6 +7,8 @@ import random import signal import traceback +import os +import platform import deepdiff from tqdm import tqdm @@ -270,7 +272,7 @@ def populate( ) if schedule_jobs: - self.schedule_jobs(*restrictions, purge_invalid_jobs=False) + self.schedule_jobs(*restrictions) # define and set up signal handler for SIGTERM: if reserve_jobs: @@ -344,7 +346,7 @@ def handler(signum, frame): del self.connection._conn.ctx # SSLContext is not pickleable with ( mp.Pool( - processes, _initialize_populate, (self, jobs, populate_kwargs) + processes, _initialize_populate, (self, True, populate_kwargs) ) as pool, ( tqdm(desc="Processes: ", total=nkeys) @@ -375,10 +377,10 @@ def handler(signum, frame): def _populate1( self, key, - reserve_jobs, - suppress_errors, - return_exception_objects, - make_kwargs=None, + reserve_jobs: bool, + suppress_errors: bool, + return_exception_objects: bool, + make_kwargs: dict = None, ): """ populates table for one source key, calling self.make inside a transaction. @@ -475,6 +477,7 @@ def _populate1( datetime.datetime.utcnow() - make_start ).total_seconds(), ) + logger.debug(f"Success making {key} -> {self.target.full_table_name}") return True finally: @@ -511,7 +514,7 @@ def _Jobs(self): def jobs(self): return self._Jobs & {"table_name": self.target.table_name} - def schedule_jobs(self, *restrictions, purge_invalid_jobs=True, min_scheduling_interval=None): + def schedule_jobs(self, *restrictions, purge_jobs=False, min_scheduling_interval=None): """ Schedule new jobs for this autopopulate table by finding keys that need computation. @@ -525,7 +528,7 @@ def schedule_jobs(self, *restrictions, purge_invalid_jobs=True, min_scheduling_i Args: restrictions: a list of restrictions each restrict (table.key_source - target.proj()) - purge_invalid_jobs: if True, remove invalid entry from the jobs table (potentially expensive operation) + purge_jobs: if True, remove orphaned jobs from the jobs table (potentially expensive operation) min_scheduling_interval: minimum time in seconds that must have passed since last job scheduling. If None, uses the value from dj.config["min_scheduling_interval"] (default: None) @@ -565,14 +568,14 @@ def schedule_jobs(self, *restrictions, purge_invalid_jobs=True, min_scheduling_i f"{schedule_count} new jobs scheduled for `{to_camel_case(self.target.table_name)}`" ) finally: - if purge_invalid_jobs: - self.purge_invalid_jobs() + if purge_jobs: + self.purge_jobs() - def purge_invalid_jobs(self): + def purge_jobs(self): """ - Check and remove any invalid/outdated jobs in the JobTable for this autopopulate table. + Check and remove any orphaned/outdated jobs in the JobTable for this autopopulate table. - This method handles two types of invalid jobs: + This method handles two types of orphaned jobs: 1. Jobs that are no longer in the `key_source` (e.g. entries in upstream table(s) got deleted) 2. Jobs with "success" status that are no longer in the target table (e.g. entries in target table got deleted) From efbb920f39eb55bbda0f1251211390a34bf51d68 Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Tue, 1 Jul 2025 11:21:10 -0500 Subject: [PATCH 30/37] feat: insert `_job` metadata upon `make` completion --- datajoint/autopopulate.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/datajoint/autopopulate.py b/datajoint/autopopulate.py index 1889c8fb3..9dc910525 100644 --- a/datajoint/autopopulate.py +++ b/datajoint/autopopulate.py @@ -477,6 +477,18 @@ def _populate1( datetime.datetime.utcnow() - make_start ).total_seconds(), ) + # Update the _job column with the job metadata for newly populated entries + if "_job" in self.target.heading._attributes: + job_metadata = { + "execution_time": make_start, + "execution_duration": (datetime.datetime.utcnow() - make_start).total_seconds(), + "host": platform.node(), + "pid": os.getpid(), + "connection_id": self.connection.connection_id, + "user": self._Jobs._user, + } + for k in (self.target & key).fetch("KEY"): + self.target.update1({**k, "_job": job_metadata}) logger.debug(f"Success making {key} -> {self.target.full_table_name}") return True From 918cc9d3b227775a63749944a2d4e73cd4496100 Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Tue, 1 Jul 2025 11:26:58 -0500 Subject: [PATCH 31/37] chore: minor code optimization in `purge_jobs` --- datajoint/autopopulate.py | 32 ++++++++++++++++++-------------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/datajoint/autopopulate.py b/datajoint/autopopulate.py index 9dc910525..624df14d0 100644 --- a/datajoint/autopopulate.py +++ b/datajoint/autopopulate.py @@ -600,20 +600,24 @@ def purge_jobs(self): """ invalid_removed = 0 - invalid_success = len(self.jobs & "status = 'success'") - len(self.target) - if invalid_success > 0: - for key, job_key in zip(*(self.jobs & "status = 'success'").fetch("KEY", "key")): - if not (self.target & job_key): - (self.jobs & key).delete() - invalid_removed += 1 - - keys2do = self._jobs_to_do({}).fetch("KEY") - invalid_incomplete = len(self.jobs & "status != 'success'") - len(keys2do) - if invalid_incomplete > 0: - for key, job_key in zip(*(self.jobs & "status != 'success'").fetch("KEY", "key")): - if job_key not in keys2do: - (self.jobs & key).delete() - invalid_removed += 1 + success_query = self.jobs & {"table_name": self.target.table_name} & "status = 'success'" + if success_query: + invalid_success = len(success_query) - len(self.target) + if invalid_success > 0: + for key, job_key in zip(*success_query.fetch("KEY", "key")): + if not (self.target & job_key): + (self.jobs & key).delete() + invalid_removed += 1 + + incomplete_query = self.jobs & {"table_name": self.target.table_name} & "status != 'success'" + if incomplete_query: + keys2do = self._jobs_to_do({}).fetch("KEY") + invalid_incomplete = len(incomplete_query) - len(keys2do) + if invalid_incomplete > 0: + for key, job_key in zip(*incomplete_query.fetch("KEY", "key")): + if job_key not in keys2do: + (self.jobs & key).delete() + invalid_removed += 1 logger.info( f"{invalid_removed} invalid jobs removed for `{to_camel_case(self.target.table_name)}`" From dcfeaf5c7a60ef612e6cc3a0a829ef4726f58b71 Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Tue, 1 Jul 2025 11:43:36 -0500 Subject: [PATCH 32/37] feat: remove logging of `success` jobs in Jobs table --- datajoint/autopopulate.py | 9 --------- datajoint/jobs.py | 28 ++-------------------------- 2 files changed, 2 insertions(+), 35 deletions(-) diff --git a/datajoint/autopopulate.py b/datajoint/autopopulate.py index 624df14d0..18b32748c 100644 --- a/datajoint/autopopulate.py +++ b/datajoint/autopopulate.py @@ -600,15 +600,6 @@ def purge_jobs(self): """ invalid_removed = 0 - success_query = self.jobs & {"table_name": self.target.table_name} & "status = 'success'" - if success_query: - invalid_success = len(success_query) - len(self.target) - if invalid_success > 0: - for key, job_key in zip(*success_query.fetch("KEY", "key")): - if not (self.target & job_key): - (self.jobs & key).delete() - invalid_removed += 1 - incomplete_query = self.jobs & {"table_name": self.target.table_name} & "status != 'success'" if incomplete_query: keys2do = self._jobs_to_do({}).fetch("KEY") diff --git a/datajoint/jobs.py b/datajoint/jobs.py index a90e29cbf..eff36f092 100644 --- a/datajoint/jobs.py +++ b/datajoint/jobs.py @@ -185,40 +185,16 @@ def ignore(self, table_name, key, message=""): return True - def complete(self, table_name, key, run_duration=None, run_metadata=None): + def complete(self, table_name, key): """ Log a completed job. When a job is completed, its reservation entry is deleted. Args: table_name: `database`.`table_name` key: the dict of the job's primary key - run_duration: duration in second of the job run - run_metadata: dict containing metadata about the run (e.g. code version, environment info) """ job_key = dict(table_name=table_name, key_hash=key_hash(key)) - if self & job_key: - current_status = (self & job_key).fetch1("status") - if current_status == "success": - return - - with config(enable_python_native_blobs=True): - self.insert1( - dict( - table_name=table_name, - key_hash=key_hash(key), - status="success", - host=platform.node(), - pid=os.getpid(), - connection_id=self.connection.connection_id, - user=self._user, - key=_jsonify(key), - run_duration=run_duration, - run_metadata=_jsonify(run_metadata) if run_metadata else None, - timestamp=datetime.datetime.utcnow(), - ), - replace=True, - ignore_extra_fields=True, - ) + (self & job_key).delete() def error(self, table_name, key, error_message, error_stack=None, run_duration=None, run_metadata=None): """ From 1f773fa1d5f88ace26e4e12519a0dd9b8708b420 Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Tue, 1 Jul 2025 11:51:55 -0500 Subject: [PATCH 33/37] docs: minor updates --- docs/jobs_orchestration.md | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/docs/jobs_orchestration.md b/docs/jobs_orchestration.md index df28477a8..382a051ba 100644 --- a/docs/jobs_orchestration.md +++ b/docs/jobs_orchestration.md @@ -72,21 +72,22 @@ The `populate()` method orchestrates the job execution process: - For reserved jobs: - Updates job status to `reserved` during processing - Records execution metrics (duration, version) - - Updates status to `success` or `error` on completion + - On successful completion: remove job from the jobs table + - On error: update job status to `error` - Records errors and execution metrics 4. **Cleanup**: - - Optionally purges invalid jobs + - Optionally purges orphaned/outdated jobs ## Job Cleanup Process -The `purge_invalid_jobs` method maintains database consistency by removing invalid jobs: +The `purge_jobs` method maintains database consistency by removing orphaned jobs: -1. **Invalid Success Jobs**: +1. **Orphaned Success Jobs**: - Identifies jobs marked as `success` but not present in the target table - These typically occur when target table entries are deleted -2. **Invalid Incomplete Jobs**: +2. **Orphaned Incomplete Jobs**: - Identifies jobs in `scheduled`/`error`/`ignore` state that are no longer in the `key_source` - These typically occur when upstream table entries are deleted @@ -106,8 +107,8 @@ The "freshness" and consistency of the jobs table depends on regular maintenance - Example: Run every few minutes in a cron job for active pipelines - Event-driven approach: `inserts` in upstream tables auto trigger this step -2. **Cleanup** (`purge_invalid_jobs`): - - Removes invalid or outdated jobs +2. **Cleanup** (`purge_jobs`): + - Removes orphaned or outdated jobs - Should be run periodically to maintain consistency - More resource-intensive than scheduling - Example: Run daily during low-activity periods @@ -115,7 +116,7 @@ The "freshness" and consistency of the jobs table depends on regular maintenance The balance between these operations affects: - How quickly new jobs are discovered and scheduled -- How long invalid jobs remain in the table +- How long orphaned jobs remain in the table - Database size and query performance - Overall system responsiveness @@ -128,5 +129,5 @@ dj.config["min_scheduling_interval"] = 300 # 5 minutes # (implement as a cron job or scheduled task) def daily_cleanup(): for table in your_pipeline_tables: - table.purge_invalid_jobs() + table.purge_jobs() ``` \ No newline at end of file From 7184ce5314581967a3a8041a4fe3e8e8677375e2 Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Tue, 1 Jul 2025 15:20:10 -0500 Subject: [PATCH 34/37] format: black --- datajoint/autopopulate.py | 66 +++++++++++++++++++++++---------------- datajoint/jobs.py | 10 +++++- tests/test_declare.py | 28 +++++++++++------ 3 files changed, 66 insertions(+), 38 deletions(-) diff --git a/datajoint/autopopulate.py b/datajoint/autopopulate.py index 18b32748c..c0a832d39 100644 --- a/datajoint/autopopulate.py +++ b/datajoint/autopopulate.py @@ -1,4 +1,5 @@ """This module defines class dj.AutoPopulate""" + import contextlib import datetime import inspect @@ -270,24 +271,25 @@ def populate( raise DataJointError( "The order argument must be one of %s" % str(valid_order) ) - + if schedule_jobs: self.schedule_jobs(*restrictions) # define and set up signal handler for SIGTERM: if reserve_jobs: + def handler(signum, frame): logger.info("Populate terminated by SIGTERM") raise SystemExit("SIGTERM received") + old_handler = signal.signal(signal.SIGTERM, handler) # retrieve `keys` if not provided if keys is None: if reserve_jobs: - keys = ( - self.jobs - & {'status': 'scheduled'} - ).fetch("key", order_by="timestamp", limit=limit) + keys = (self.jobs & {"status": "scheduled"}).fetch( + "key", order_by="timestamp", limit=limit + ) if restrictions: # hitting the `key_source` again to apply the restrictions # this is expensive/suboptimal @@ -300,8 +302,7 @@ def handler(signum, frame): # exclude "error", "ignore" or "reserved" jobs if reserve_jobs: exclude_key_hashes = ( - self.jobs - & 'status in ("error", "ignore", "reserved")' + self.jobs & 'status in ("error", "ignore", "reserved")' ).fetch("key_hash") keys = [key for key in keys if key_hash(key) not in exclude_key_hashes] @@ -473,15 +474,15 @@ def _populate1( self._Jobs.complete( self.target.table_name, self._job_key(key), - run_duration=( - datetime.datetime.utcnow() - make_start - ).total_seconds(), + run_duration=(datetime.datetime.utcnow() - make_start).total_seconds(), ) # Update the _job column with the job metadata for newly populated entries if "_job" in self.target.heading._attributes: job_metadata = { "execution_time": make_start, - "execution_duration": (datetime.datetime.utcnow() - make_start).total_seconds(), + "execution_duration": ( + datetime.datetime.utcnow() - make_start + ).total_seconds(), "host": platform.node(), "pid": os.getpid(), "connection_id": self.connection.connection_id, @@ -526,44 +527,50 @@ def _Jobs(self): def jobs(self): return self._Jobs & {"table_name": self.target.table_name} - def schedule_jobs(self, *restrictions, purge_jobs=False, min_scheduling_interval=None): + def schedule_jobs( + self, *restrictions, purge_jobs=False, min_scheduling_interval=None + ): """ Schedule new jobs for this autopopulate table by finding keys that need computation. - + This method implements an optimization strategy to avoid excessive scheduling: 1. First checks if jobs were scheduled recently (within min_scheduling_interval) 2. If recent scheduling event exists, skips scheduling to prevent database load 3. Otherwise, finds keys that need computation and schedules them - + The method also optionally purges invalid jobs (jobs that no longer exist in key_source) to maintain database cleanliness. - + Args: restrictions: a list of restrictions each restrict (table.key_source - target.proj()) purge_jobs: if True, remove orphaned jobs from the jobs table (potentially expensive operation) min_scheduling_interval: minimum time in seconds that must have passed since last job scheduling. If None, uses the value from dj.config["min_scheduling_interval"] (default: None) - + Returns: None """ __scheduled_event = { "table_name": self.target.table_name, - "__type__": "jobs scheduling event" - } - + "__type__": "jobs scheduling event", + } + if min_scheduling_interval is None: min_scheduling_interval = config["min_scheduling_interval"] if min_scheduling_interval > 0: recent_scheduling_event = ( - self._Jobs.proj(last_scheduled="TIMESTAMPDIFF(SECOND, timestamp, UTC_TIMESTAMP())") + self._Jobs.proj( + last_scheduled="TIMESTAMPDIFF(SECOND, timestamp, UTC_TIMESTAMP())" + ) & {"table_name": f"__{self.target.table_name}__"} & {"key_hash": key_hash(__scheduled_event)} & f"last_scheduled <= {min_scheduling_interval}" ) if recent_scheduling_event: - logger.info(f"Skip jobs scheduling for `{to_camel_case(self.target.table_name)}` (last scheduled {recent_scheduling_event.fetch1('last_scheduled')} seconds ago)") + logger.info( + f"Skip jobs scheduling for `{to_camel_case(self.target.table_name)}` (last scheduled {recent_scheduling_event.fetch1('last_scheduled')} seconds ago)" + ) return try: @@ -574,8 +581,11 @@ def schedule_jobs(self, *restrictions, purge_jobs=False, min_scheduling_interval except Exception as e: logger.exception(str(e)) else: - self._Jobs.ignore(f"__{self.target.table_name}__", __scheduled_event, - message=f"Jobs scheduling event: {__scheduled_event['table_name']}") + self._Jobs.ignore( + f"__{self.target.table_name}__", + __scheduled_event, + message=f"Jobs scheduling event: {__scheduled_event['table_name']}", + ) logger.info( f"{schedule_count} new jobs scheduled for `{to_camel_case(self.target.table_name)}`" ) @@ -586,21 +596,23 @@ def schedule_jobs(self, *restrictions, purge_jobs=False, min_scheduling_interval def purge_jobs(self): """ Check and remove any orphaned/outdated jobs in the JobTable for this autopopulate table. - + This method handles two types of orphaned jobs: 1. Jobs that are no longer in the `key_source` (e.g. entries in upstream table(s) got deleted) 2. Jobs with "success" status that are no longer in the target table (e.g. entries in target table got deleted) - + The method is potentially time-consuming as it needs to: - Compare all jobs against the current key_source - For success jobs, verify their existence in the target table - Delete any jobs that fail these checks - + This cleanup should not need to run very often, but helps maintain database consistency. """ invalid_removed = 0 - incomplete_query = self.jobs & {"table_name": self.target.table_name} & "status != 'success'" + incomplete_query = ( + self.jobs & {"table_name": self.target.table_name} & "status != 'success'" + ) if incomplete_query: keys2do = self._jobs_to_do({}).fetch("KEY") invalid_incomplete = len(incomplete_query) - len(keys2do) diff --git a/datajoint/jobs.py b/datajoint/jobs.py index eff36f092..e12716f96 100644 --- a/datajoint/jobs.py +++ b/datajoint/jobs.py @@ -196,7 +196,15 @@ def complete(self, table_name, key): job_key = dict(table_name=table_name, key_hash=key_hash(key)) (self & job_key).delete() - def error(self, table_name, key, error_message, error_stack=None, run_duration=None, run_metadata=None): + def error( + self, + table_name, + key, + error_message, + error_stack=None, + run_duration=None, + run_metadata=None, + ): """ Log an error message. The job reservation is replaced with an error entry. if an error occurs, leave an entry describing the problem diff --git a/tests/test_declare.py b/tests/test_declare.py index af4d46ad4..6dc726e3f 100644 --- a/tests/test_declare.py +++ b/tests/test_declare.py @@ -431,25 +431,33 @@ def test_add_hidden_timestamp_disabled(disable_add_hidden_timestamp, schema_any) def test_hidden_job_column_for_imported_computed_tables(schema_any): """Test that hidden _job column is added to imported and computed tables but not manual/lookup tables""" - + # Manual and Lookup tables should NOT have _job column manual_attrs = Image().heading._attributes lookup_attrs = Subject().heading._attributes - - assert not any(a.name == "_job" for a in manual_attrs.values()), "Manual table should not have _job column" - assert not any(a.name == "_job" for a in lookup_attrs.values()), "Lookup table should not have _job column" - + + assert not any( + a.name == "_job" for a in manual_attrs.values() + ), "Manual table should not have _job column" + assert not any( + a.name == "_job" for a in lookup_attrs.values() + ), "Lookup table should not have _job column" + # Imported and Computed tables SHOULD have _job column imported_attrs = Experiment().heading._attributes computed_attrs = SigIntTable().heading._attributes - - assert any(a.name == "_job" for a in imported_attrs.values()), "Imported table should have _job column" - assert any(a.name == "_job" for a in computed_attrs.values()), "Computed table should have _job column" - + + assert any( + a.name == "_job" for a in imported_attrs.values() + ), "Imported table should have _job column" + assert any( + a.name == "_job" for a in computed_attrs.values() + ), "Computed table should have _job column" + # Verify the _job column is hidden and has correct type imported_job_attr = next(a for a in imported_attrs.values() if a.name == "_job") computed_job_attr = next(a for a in computed_attrs.values() if a.name == "_job") - + assert imported_job_attr.is_hidden, "_job column should be hidden" assert computed_job_attr.is_hidden, "_job column should be hidden" assert "json" in imported_job_attr.sql.lower(), "_job column should be JSON type" From 9d3a9e45b19ebf25b88173af73a5f60dcc98bf57 Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Tue, 1 Jul 2025 15:37:15 -0500 Subject: [PATCH 35/37] chore: remove the optional `purge_jobs` in `schedule_jobs` --- datajoint/autopopulate.py | 11 ++--------- docs/jobs_orchestration.md | 8 ++++---- 2 files changed, 6 insertions(+), 13 deletions(-) diff --git a/datajoint/autopopulate.py b/datajoint/autopopulate.py index c0a832d39..98f291c32 100644 --- a/datajoint/autopopulate.py +++ b/datajoint/autopopulate.py @@ -528,7 +528,7 @@ def jobs(self): return self._Jobs & {"table_name": self.target.table_name} def schedule_jobs( - self, *restrictions, purge_jobs=False, min_scheduling_interval=None + self, *restrictions, min_scheduling_interval=None ): """ Schedule new jobs for this autopopulate table by finding keys that need computation. @@ -538,12 +538,8 @@ def schedule_jobs( 2. If recent scheduling event exists, skips scheduling to prevent database load 3. Otherwise, finds keys that need computation and schedules them - The method also optionally purges invalid jobs (jobs that no longer exist in key_source) - to maintain database cleanliness. - Args: restrictions: a list of restrictions each restrict (table.key_source - target.proj()) - purge_jobs: if True, remove orphaned jobs from the jobs table (potentially expensive operation) min_scheduling_interval: minimum time in seconds that must have passed since last job scheduling. If None, uses the value from dj.config["min_scheduling_interval"] (default: None) @@ -589,11 +585,8 @@ def schedule_jobs( logger.info( f"{schedule_count} new jobs scheduled for `{to_camel_case(self.target.table_name)}`" ) - finally: - if purge_jobs: - self.purge_jobs() - def purge_jobs(self): + def cleanup_jobs(self): """ Check and remove any orphaned/outdated jobs in the JobTable for this autopopulate table. diff --git a/docs/jobs_orchestration.md b/docs/jobs_orchestration.md index 382a051ba..126280840 100644 --- a/docs/jobs_orchestration.md +++ b/docs/jobs_orchestration.md @@ -77,11 +77,11 @@ The `populate()` method orchestrates the job execution process: - Records errors and execution metrics 4. **Cleanup**: - - Optionally purges orphaned/outdated jobs + - Optionally clean up orphaned/outdated jobs ## Job Cleanup Process -The `purge_jobs` method maintains database consistency by removing orphaned jobs: +The `cleanup_jobs` method maintains database consistency by removing orphaned jobs: 1. **Orphaned Success Jobs**: - Identifies jobs marked as `success` but not present in the target table @@ -107,7 +107,7 @@ The "freshness" and consistency of the jobs table depends on regular maintenance - Example: Run every few minutes in a cron job for active pipelines - Event-driven approach: `inserts` in upstream tables auto trigger this step -2. **Cleanup** (`purge_jobs`): +2. **Cleanup** (`cleanup_jobs`): - Removes orphaned or outdated jobs - Should be run periodically to maintain consistency - More resource-intensive than scheduling @@ -129,5 +129,5 @@ dj.config["min_scheduling_interval"] = 300 # 5 minutes # (implement as a cron job or scheduled task) def daily_cleanup(): for table in your_pipeline_tables: - table.purge_jobs() + table.cleanup_jobs() ``` \ No newline at end of file From 269c4af7152203540a8000b48be3817eb6995786 Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Wed, 2 Jul 2025 15:50:55 -0500 Subject: [PATCH 36/37] chore: code cleanup --- datajoint/autopopulate.py | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/datajoint/autopopulate.py b/datajoint/autopopulate.py index 98f291c32..ff147c37a 100644 --- a/datajoint/autopopulate.py +++ b/datajoint/autopopulate.py @@ -601,20 +601,15 @@ def cleanup_jobs(self): This cleanup should not need to run very often, but helps maintain database consistency. """ - invalid_removed = 0 - - incomplete_query = ( - self.jobs & {"table_name": self.target.table_name} & "status != 'success'" - ) - if incomplete_query: + removed = 0 + if len(self.jobs): keys2do = self._jobs_to_do({}).fetch("KEY") - invalid_incomplete = len(incomplete_query) - len(keys2do) - if invalid_incomplete > 0: - for key, job_key in zip(*incomplete_query.fetch("KEY", "key")): + if len(self.jobs) - len(keys2do) > 0: + for key, job_key in zip(*self.jobs.fetch("KEY", "key")): if job_key not in keys2do: (self.jobs & key).delete() - invalid_removed += 1 + removed += 1 logger.info( - f"{invalid_removed} invalid jobs removed for `{to_camel_case(self.target.table_name)}`" + f"{removed} invalid jobs removed for `{to_camel_case(self.target.table_name)}`" ) From 3d7c4ea711721904ea51299e17926f6032dae6e6 Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Thu, 3 Jul 2025 16:42:36 -0500 Subject: [PATCH 37/37] fix: update job metadata in the make's transaction --- datajoint/autopopulate.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/datajoint/autopopulate.py b/datajoint/autopopulate.py index ff147c37a..d2dd8b564 100644 --- a/datajoint/autopopulate.py +++ b/datajoint/autopopulate.py @@ -470,16 +470,9 @@ def _populate1( logger.error(error) return key, error if return_exception_objects else error_message else: - self.connection.commit_transaction() - self._Jobs.complete( - self.target.table_name, - self._job_key(key), - run_duration=(datetime.datetime.utcnow() - make_start).total_seconds(), - ) # Update the _job column with the job metadata for newly populated entries if "_job" in self.target.heading._attributes: job_metadata = { - "execution_time": make_start, "execution_duration": ( datetime.datetime.utcnow() - make_start ).total_seconds(), @@ -490,7 +483,12 @@ def _populate1( } for k in (self.target & key).fetch("KEY"): self.target.update1({**k, "_job": job_metadata}) - + self.connection.commit_transaction() + self._Jobs.complete( + self.target.table_name, + self._job_key(key), + run_duration=(datetime.datetime.utcnow() - make_start).total_seconds(), + ) logger.debug(f"Success making {key} -> {self.target.full_table_name}") return True finally: