diff --git a/CHANGELOG.md b/CHANGELOG.md index 862c8b3f..7670113b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,11 @@ All notable changes to Merlin will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] + +### Changed +- Removed old monitor code from v1.0 + ## [2.0.0b4] ### Added diff --git a/merlin/cli/commands/monitor.py b/merlin/cli/commands/monitor.py index c03e3a03..8cfc7475 100644 --- a/merlin/cli/commands/monitor.py +++ b/merlin/cli/commands/monitor.py @@ -23,7 +23,6 @@ from merlin.cli.commands.command_entry_point import CommandEntryPoint from merlin.cli.utils import get_merlin_spec_with_override from merlin.monitor.monitor import Monitor -from merlin.router import check_merlin_status LOG = logging.getLogger("merlin") @@ -52,14 +51,6 @@ def add_parser(self, subparsers: ArgumentParser): ) monitor.set_defaults(func=self.process_command) monitor.add_argument("specification", type=str, help="Path to a Merlin YAML spec file") - monitor.add_argument( - "--steps", - nargs="+", - type=str, - dest="steps", - default=["all"], - help="The specific steps (tasks on the server) in the YAML file defining the queues you want to monitor", - ) monitor.add_argument( "--vars", action="store", @@ -116,17 +107,8 @@ def process_command(self, args: Namespace): # Give the user time to queue up jobs in case they haven't already time.sleep(args.sleep) - if args.steps != ["all"]: - LOG.warning( - "The `--steps` argument of the `merlin monitor` command is set to be deprecated in Merlin v1.14 " - "For now, using this argument will tell merlin to use the version of the monitor command from Merlin v1.12." - ) - # Check if we still need our allocation - while check_merlin_status(args, spec): - LOG.info("Monitor: found tasks in queues and/or tasks being processed") - time.sleep(args.sleep) - else: - monitor = Monitor(spec, args.sleep, args.task_server, no_restart=args.no_restart, auto_cleanup=not args.disable_gc) - monitor.monitor_all_runs() + # Monitor the allocation + monitor = Monitor(spec, args.sleep, args.task_server, no_restart=args.no_restart, auto_cleanup=not args.disable_gc) + monitor.monitor_all_runs() LOG.info("Monitor: ... stop condition met") diff --git a/merlin/router.py b/merlin/router.py index 2b8e376f..2af92435 100644 --- a/merlin/router.py +++ b/merlin/router.py @@ -12,18 +12,12 @@ decoupled from the logic the tasks are running. """ import logging -import time -from argparse import Namespace from typing import Dict, List, Tuple -from merlin.exceptions import NoWorkersException from merlin.spec.specification import MerlinSpec from merlin.study.celeryadapter import ( build_set_of_queues, - check_celery_workers_processing, dump_celery_queue_info, - get_active_celery_queues, - get_workers_from_app, purge_celery_tasks, query_celery_queues, run_celery, @@ -158,25 +152,6 @@ def query_queues( return {} -def get_workers(task_server: str) -> List[str]: - """ - This function queries the designated task server to obtain a list of all - workers that are currently connected. - - Args: - task_server: The task server to query. - - Returns: - A list of all connected workers. If the task server is not supported, - an empty list is returned. - """ - if task_server == "celery": # pylint: disable=R1705 - return get_workers_from_app() - else: - LOG.error("Celery is not specified as the task server!") - return [] - - def stop_workers(task_server: str, spec_worker_names: List[str], queues: List[str], workers_regex: str): """ This function sends a command to stop workers that match the specified @@ -196,167 +171,3 @@ def stop_workers(task_server: str, spec_worker_names: List[str], queues: List[st stop_celery_workers(queues, spec_worker_names, workers_regex) else: LOG.error("Celery is not specified as the task server!") - - -# TODO in Merlin 2.0 delete all of the below functions since we're deprecating the old version of the monitor -# and a lot of this stuff is in the new monitor classes -def get_active_queues(task_server: str) -> Dict[str, List[str]]: - """ - Retrieve a dictionary of active queues and their associated workers for the specified task server. - - This function queries the given task server for its active queues and gathers - information about which workers are currently monitoring these queues. It supports - the 'celery' task server and returns a structured dictionary containing the queue - names as keys and lists of worker names as values. - - Args: - task_server: The task server to query for active queues. - - Returns: - A dictionary where:\n - - The keys are the names of the active queues. - - The values are lists of worker names that are currently attached to those queues. - """ - active_queues = {} - - if task_server == "celery": - from merlin.celery import app # pylint: disable=C0415 - - active_queues, _ = get_active_celery_queues(app) - else: - LOG.error("Only celery can be configured currently.") - - return active_queues - - -def wait_for_workers(sleep: int, task_server: str, spec: MerlinSpec): # noqa - """ - Wait for workers to start up by checking their status at regular intervals. - - This function monitors the specified task server for the startup of worker processes. - It checks for the existence of the expected workers up to 10 times, sleeping for a - specified number of seconds between each check. If no workers are detected after - the maximum number of attempts, it raises an error to terminate the monitoring - process, indicating a potential issue with the task server. - - Args: - sleep: The number of seconds to pause between each check for worker status. - task_server: The task server from which to query for worker status. - spec (spec.specification.MerlinSpec): An instance of the - [`MerlinSpec`][spec.specification.MerlinSpec] class that contains the - specification for the workers being monitored. - - Raises: - NoWorkersException: If no workers are detected after the maximum number of checks. - """ - # Get the names of the workers that we're looking for - worker_names = spec.get_worker_names() - LOG.info(f"Checking for the following workers: {worker_names}") - - # Loop until workers are detected - count = 0 - max_count = 10 - while count < max_count: - # This list will include strings comprised of the worker name with the hostname e.g. worker_name@host. - worker_status = get_workers(task_server) - LOG.info(f"Monitor: checking for workers, running workers = {worker_status} ...") - - # Check to see if any of the workers we're looking for in 'worker_names' have started - check = any(any(iwn in iws for iws in worker_status) for iwn in worker_names) - if check: - break - - # Increment count and sleep until the next check - count += 1 - time.sleep(sleep) - - # If no workers were started in time, raise an exception to stop the monitor - if count == max_count: - raise NoWorkersException("Monitor: no workers available to process the non-empty queue") - - -def check_workers_processing(queues_in_spec: List[str], task_server: str) -> bool: - """ - Check if any workers are still processing tasks by querying the task server. - - Args: - queues_in_spec: A list of queue names to check for active tasks. - task_server: The task server from which to query the processing status. - - Returns: - True if workers are still processing tasks, False otherwise. - """ - result = False - - if task_server == "celery": - from merlin.celery import app # pylint: disable=import-outside-toplevel - - result = check_celery_workers_processing(queues_in_spec, app) - else: - LOG.error("Celery is not specified as the task server!") - - return result - - -def check_merlin_status(args: Namespace, spec: MerlinSpec) -> bool: - """ - Function to check Merlin workers and queues to keep the allocation alive. - - This function monitors the status of workers and jobs within the specified task server - and the provided Merlin specification. It checks for active tasks and workers, ensuring - that the allocation remains valid. - - Args: - args: Parsed command-line interface arguments, including task server - specifications and sleep duration. - spec (spec.specification.MerlinSpec): The parsed spec.yaml as a - [`MerlinSpec`][spec.specification.MerlinSpec] object, containing queue - and worker definitions. - - Returns: - True if there are still tasks being processed, False otherwise. - """ - # Initialize the variable to track if there are still active tasks - active_tasks = False - - # Get info about jobs and workers in our spec from celery - queue_status = query_queues(args.task_server, spec, args.steps, None, verbose=False) - LOG.debug(f"Monitor: queue_status: {queue_status}") - - # Count the number of jobs that are active - # (Adding up the number of consumers in the same way is inaccurate so we won't do that) - total_jobs = 0 - for queue_info in queue_status.values(): - total_jobs += queue_info["jobs"] - - # Get the queues defined in the spec - queues_in_spec = spec.get_queue_list(["all"] if args.steps is None else args.steps) - LOG.debug(f"Monitor: queues_in_spec: {queues_in_spec}") - - # Get the active queues and the workers that are watching them - active_queues = get_active_queues(args.task_server) - LOG.debug(f"Monitor: active_queues: {active_queues}") - - # Count the number of workers that are active - consumers = set() - for active_queue, workers_on_queue in active_queues.items(): - if active_queue in queues_in_spec: - consumers |= set(workers_on_queue) - LOG.debug(f"Monitor: consumers found: {consumers}") - total_consumers = len(consumers) - - LOG.info(f"Monitor: found {total_jobs} jobs in queues and {total_consumers} workers alive") - - # If there are no workers, wait for the workers to start - if total_consumers == 0: - wait_for_workers(args.sleep, args.task_server, spec=spec) - - # If we're here, workers have started and jobs should be queued - if total_jobs > 0: - active_tasks = True - # If there are no jobs left, see if any workers are still processing them - elif total_jobs == 0: - active_tasks = check_workers_processing(queues_in_spec, args.task_server) - - LOG.debug(f"Monitor: active_tasks: {active_tasks}") - return active_tasks diff --git a/merlin/study/celeryadapter.py b/merlin/study/celeryadapter.py index c1c1ea1e..789891e0 100644 --- a/merlin/study/celeryadapter.py +++ b/merlin/study/celeryadapter.py @@ -430,56 +430,6 @@ def query_celery_queues(queues: List[str], app: Celery = None, config: Config = return queue_info -def get_workers_from_app() -> List[str]: - """ - Retrieve a list of all workers connected to the Celery application. - - This function uses the Celery control interface to inspect the current state - of the application and returns a list of workers that are currently connected. - If no workers are found, an empty list is returned. - - Returns: - A list of worker names that are currently connected to the Celery application. - If no workers are connected, an empty list is returned. - """ - from merlin.celery import app # pylint: disable=C0415 - - i = app.control.inspect() - workers = i.ping() - if workers is None: - return [] - return [*workers] - - -def check_celery_workers_processing(queues_in_spec: List[str], app: Celery) -> bool: - """ - Check if any Celery workers are currently processing tasks from specified queues. - - This function queries the Celery application to determine if there are any active - tasks being processed by workers for the given list of queues. It returns a boolean - indicating whether any tasks are currently active. - - Args: - queues_in_spec: A list of queue names to check for active tasks. - app: The Celery application instance used for querying. - - Returns: - True if any workers are processing tasks in the specified queues; False - otherwise. - """ - # Query celery for active tasks - active_tasks = app.control.inspect().active() - - # Search for the queues we provided if necessary - if active_tasks is not None: - for tasks in active_tasks.values(): - for task in tasks: - if task["delivery_info"]["routing_key"] in queues_in_spec: - return True - - return False - - def purge_celery_tasks(queues: str, force: bool) -> int: """ Purge Celery tasks from the specified queues. diff --git a/tests/unit/cli/commands/test_monitor_entry_point.py b/tests/unit/cli/commands/test_monitor_entry_point.py index af8ea388..1e4e16c6 100644 --- a/tests/unit/cli/commands/test_monitor_entry_point.py +++ b/tests/unit/cli/commands/test_monitor_entry_point.py @@ -8,10 +8,8 @@ Tests for the `monitor.py` file of the `cli/` folder. """ -import logging from argparse import Namespace -from _pytest.capture import CaptureFixture from pytest_mock import MockerFixture from merlin.cli.commands.monitor import MonitorCommand @@ -31,7 +29,6 @@ def test_add_parser_sets_up_monitor_command(create_parser: FixtureCallable): assert hasattr(args, "func") assert args.func.__name__ == command.process_command.__name__ assert args.specification == "spec.yaml" - assert args.steps == ["all"] assert args.variables is None assert args.task_server == "celery" assert args.sleep == 60 @@ -55,7 +52,6 @@ def test_process_command_all_steps(mocker: MockerFixture): command = MonitorCommand() args = Namespace( specification="spec.yaml", - steps=["all"], variables=None, task_server="celery", sleep=5, @@ -66,38 +62,3 @@ def test_process_command_all_steps(mocker: MockerFixture): monitor_class.assert_called_once_with(mock_spec, 5, "celery", no_restart=False, auto_cleanup=False) mock_monitor.monitor_all_runs.assert_called_once() - - -def test_monitor_process_command_with_specific_steps(mocker: MockerFixture, caplog: CaptureFixture): - """ - Test the case when `args.steps != ['all']` -> uses `check_merlin_status()` in a loop. - - Args: - mocker: PyTest mocker fixture. - caplog: PyTest caplog fixture. - """ - caplog.set_level(logging.INFO) - - mock_spec = mocker.Mock() - mock_get_spec = mocker.patch("merlin.cli.commands.monitor.get_merlin_spec_with_override", return_value=(mock_spec, None)) - mock_sleep = mocker.patch("time.sleep") - - # simulate 2 iterations - mock_check_status = mocker.patch("merlin.cli.commands.monitor.check_merlin_status", side_effect=[True, True, False]) - - command = MonitorCommand() - args = Namespace( - specification="workflow.yaml", - steps=["step1"], - variables=None, - task_server="celery", - sleep=5, - no_restart=False, - ) - command.process_command(args) - - mock_get_spec.assert_called_once_with(args) - assert mock_sleep.call_count == 3 # 1 before loop, 2 in loop - assert mock_check_status.call_count == 3 - assert "Monitor: found tasks in queues and/or tasks being processed" in caplog.text - assert "Monitor: ... stop condition met" in caplog.text