Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ All notable changes to Merlin will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Changed
- Removed old monitor code from v1.0

## [2.0.0b4]

### Added
Expand Down
24 changes: 3 additions & 21 deletions merlin/cli/commands/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from merlin.cli.commands.command_entry_point import CommandEntryPoint
from merlin.cli.utils import get_merlin_spec_with_override
from merlin.monitor.monitor import Monitor
from merlin.router import check_merlin_status


LOG = logging.getLogger("merlin")
Expand Down Expand Up @@ -52,14 +51,6 @@ def add_parser(self, subparsers: ArgumentParser):
)
monitor.set_defaults(func=self.process_command)
monitor.add_argument("specification", type=str, help="Path to a Merlin YAML spec file")
monitor.add_argument(
"--steps",
nargs="+",
type=str,
dest="steps",
default=["all"],
help="The specific steps (tasks on the server) in the YAML file defining the queues you want to monitor",
)
monitor.add_argument(
"--vars",
action="store",
Expand Down Expand Up @@ -116,17 +107,8 @@ def process_command(self, args: Namespace):
# Give the user time to queue up jobs in case they haven't already
time.sleep(args.sleep)

if args.steps != ["all"]:
LOG.warning(
"The `--steps` argument of the `merlin monitor` command is set to be deprecated in Merlin v1.14 "
"For now, using this argument will tell merlin to use the version of the monitor command from Merlin v1.12."
)
# Check if we still need our allocation
while check_merlin_status(args, spec):
LOG.info("Monitor: found tasks in queues and/or tasks being processed")
time.sleep(args.sleep)
else:
monitor = Monitor(spec, args.sleep, args.task_server, no_restart=args.no_restart, auto_cleanup=not args.disable_gc)
monitor.monitor_all_runs()
# Monitor the allocation
monitor = Monitor(spec, args.sleep, args.task_server, no_restart=args.no_restart, auto_cleanup=not args.disable_gc)
monitor.monitor_all_runs()

LOG.info("Monitor: ... stop condition met")
189 changes: 0 additions & 189 deletions merlin/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,12 @@
decoupled from the logic the tasks are running.
"""
import logging
import time
from argparse import Namespace
from typing import Dict, List, Tuple

from merlin.exceptions import NoWorkersException
from merlin.spec.specification import MerlinSpec
from merlin.study.celeryadapter import (
build_set_of_queues,
check_celery_workers_processing,
dump_celery_queue_info,
get_active_celery_queues,
get_workers_from_app,
purge_celery_tasks,
query_celery_queues,
run_celery,
Expand Down Expand Up @@ -158,25 +152,6 @@ def query_queues(
return {}


def get_workers(task_server: str) -> List[str]:
"""
This function queries the designated task server to obtain a list of all
workers that are currently connected.

Args:
task_server: The task server to query.

Returns:
A list of all connected workers. If the task server is not supported,
an empty list is returned.
"""
if task_server == "celery": # pylint: disable=R1705
return get_workers_from_app()
else:
LOG.error("Celery is not specified as the task server!")
return []


def stop_workers(task_server: str, spec_worker_names: List[str], queues: List[str], workers_regex: str):
"""
This function sends a command to stop workers that match the specified
Expand All @@ -196,167 +171,3 @@ def stop_workers(task_server: str, spec_worker_names: List[str], queues: List[st
stop_celery_workers(queues, spec_worker_names, workers_regex)
else:
LOG.error("Celery is not specified as the task server!")


# TODO in Merlin 2.0 delete all of the below functions since we're deprecating the old version of the monitor
# and a lot of this stuff is in the new monitor classes
def get_active_queues(task_server: str) -> Dict[str, List[str]]:
"""
Retrieve a dictionary of active queues and their associated workers for the specified task server.

This function queries the given task server for its active queues and gathers
information about which workers are currently monitoring these queues. It supports
the 'celery' task server and returns a structured dictionary containing the queue
names as keys and lists of worker names as values.

Args:
task_server: The task server to query for active queues.

Returns:
A dictionary where:\n
- The keys are the names of the active queues.
- The values are lists of worker names that are currently attached to those queues.
"""
active_queues = {}

if task_server == "celery":
from merlin.celery import app # pylint: disable=C0415

active_queues, _ = get_active_celery_queues(app)
else:
LOG.error("Only celery can be configured currently.")

return active_queues


def wait_for_workers(sleep: int, task_server: str, spec: MerlinSpec): # noqa
"""
Wait for workers to start up by checking their status at regular intervals.

This function monitors the specified task server for the startup of worker processes.
It checks for the existence of the expected workers up to 10 times, sleeping for a
specified number of seconds between each check. If no workers are detected after
the maximum number of attempts, it raises an error to terminate the monitoring
process, indicating a potential issue with the task server.

Args:
sleep: The number of seconds to pause between each check for worker status.
task_server: The task server from which to query for worker status.
spec (spec.specification.MerlinSpec): An instance of the
[`MerlinSpec`][spec.specification.MerlinSpec] class that contains the
specification for the workers being monitored.

Raises:
NoWorkersException: If no workers are detected after the maximum number of checks.
"""
# Get the names of the workers that we're looking for
worker_names = spec.get_worker_names()
LOG.info(f"Checking for the following workers: {worker_names}")

# Loop until workers are detected
count = 0
max_count = 10
while count < max_count:
# This list will include strings comprised of the worker name with the hostname e.g. worker_name@host.
worker_status = get_workers(task_server)
LOG.info(f"Monitor: checking for workers, running workers = {worker_status} ...")

# Check to see if any of the workers we're looking for in 'worker_names' have started
check = any(any(iwn in iws for iws in worker_status) for iwn in worker_names)
if check:
break

# Increment count and sleep until the next check
count += 1
time.sleep(sleep)

# If no workers were started in time, raise an exception to stop the monitor
if count == max_count:
raise NoWorkersException("Monitor: no workers available to process the non-empty queue")


def check_workers_processing(queues_in_spec: List[str], task_server: str) -> bool:
"""
Check if any workers are still processing tasks by querying the task server.

Args:
queues_in_spec: A list of queue names to check for active tasks.
task_server: The task server from which to query the processing status.

Returns:
True if workers are still processing tasks, False otherwise.
"""
result = False

if task_server == "celery":
from merlin.celery import app # pylint: disable=import-outside-toplevel

result = check_celery_workers_processing(queues_in_spec, app)
else:
LOG.error("Celery is not specified as the task server!")

return result


def check_merlin_status(args: Namespace, spec: MerlinSpec) -> bool:
"""
Function to check Merlin workers and queues to keep the allocation alive.

This function monitors the status of workers and jobs within the specified task server
and the provided Merlin specification. It checks for active tasks and workers, ensuring
that the allocation remains valid.

Args:
args: Parsed command-line interface arguments, including task server
specifications and sleep duration.
spec (spec.specification.MerlinSpec): The parsed spec.yaml as a
[`MerlinSpec`][spec.specification.MerlinSpec] object, containing queue
and worker definitions.

Returns:
True if there are still tasks being processed, False otherwise.
"""
# Initialize the variable to track if there are still active tasks
active_tasks = False

# Get info about jobs and workers in our spec from celery
queue_status = query_queues(args.task_server, spec, args.steps, None, verbose=False)
LOG.debug(f"Monitor: queue_status: {queue_status}")

# Count the number of jobs that are active
# (Adding up the number of consumers in the same way is inaccurate so we won't do that)
total_jobs = 0
for queue_info in queue_status.values():
total_jobs += queue_info["jobs"]

# Get the queues defined in the spec
queues_in_spec = spec.get_queue_list(["all"] if args.steps is None else args.steps)
LOG.debug(f"Monitor: queues_in_spec: {queues_in_spec}")

# Get the active queues and the workers that are watching them
active_queues = get_active_queues(args.task_server)
LOG.debug(f"Monitor: active_queues: {active_queues}")

# Count the number of workers that are active
consumers = set()
for active_queue, workers_on_queue in active_queues.items():
if active_queue in queues_in_spec:
consumers |= set(workers_on_queue)
LOG.debug(f"Monitor: consumers found: {consumers}")
total_consumers = len(consumers)

LOG.info(f"Monitor: found {total_jobs} jobs in queues and {total_consumers} workers alive")

# If there are no workers, wait for the workers to start
if total_consumers == 0:
wait_for_workers(args.sleep, args.task_server, spec=spec)

# If we're here, workers have started and jobs should be queued
if total_jobs > 0:
active_tasks = True
# If there are no jobs left, see if any workers are still processing them
elif total_jobs == 0:
active_tasks = check_workers_processing(queues_in_spec, args.task_server)

LOG.debug(f"Monitor: active_tasks: {active_tasks}")
return active_tasks
50 changes: 0 additions & 50 deletions merlin/study/celeryadapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,56 +430,6 @@ def query_celery_queues(queues: List[str], app: Celery = None, config: Config =
return queue_info


def get_workers_from_app() -> List[str]:
"""
Retrieve a list of all workers connected to the Celery application.

This function uses the Celery control interface to inspect the current state
of the application and returns a list of workers that are currently connected.
If no workers are found, an empty list is returned.

Returns:
A list of worker names that are currently connected to the Celery application.
If no workers are connected, an empty list is returned.
"""
from merlin.celery import app # pylint: disable=C0415

i = app.control.inspect()
workers = i.ping()
if workers is None:
return []
return [*workers]


def check_celery_workers_processing(queues_in_spec: List[str], app: Celery) -> bool:
"""
Check if any Celery workers are currently processing tasks from specified queues.

This function queries the Celery application to determine if there are any active
tasks being processed by workers for the given list of queues. It returns a boolean
indicating whether any tasks are currently active.

Args:
queues_in_spec: A list of queue names to check for active tasks.
app: The Celery application instance used for querying.

Returns:
True if any workers are processing tasks in the specified queues; False
otherwise.
"""
# Query celery for active tasks
active_tasks = app.control.inspect().active()

# Search for the queues we provided if necessary
if active_tasks is not None:
for tasks in active_tasks.values():
for task in tasks:
if task["delivery_info"]["routing_key"] in queues_in_spec:
return True

return False


def purge_celery_tasks(queues: str, force: bool) -> int:
"""
Purge Celery tasks from the specified queues.
Expand Down
39 changes: 0 additions & 39 deletions tests/unit/cli/commands/test_monitor_entry_point.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@
Tests for the `monitor.py` file of the `cli/` folder.
"""

import logging
from argparse import Namespace

from _pytest.capture import CaptureFixture
from pytest_mock import MockerFixture

from merlin.cli.commands.monitor import MonitorCommand
Expand All @@ -31,7 +29,6 @@ def test_add_parser_sets_up_monitor_command(create_parser: FixtureCallable):
assert hasattr(args, "func")
assert args.func.__name__ == command.process_command.__name__
assert args.specification == "spec.yaml"
assert args.steps == ["all"]
assert args.variables is None
assert args.task_server == "celery"
assert args.sleep == 60
Expand All @@ -55,7 +52,6 @@ def test_process_command_all_steps(mocker: MockerFixture):
command = MonitorCommand()
args = Namespace(
specification="spec.yaml",
steps=["all"],
variables=None,
task_server="celery",
sleep=5,
Expand All @@ -66,38 +62,3 @@ def test_process_command_all_steps(mocker: MockerFixture):

monitor_class.assert_called_once_with(mock_spec, 5, "celery", no_restart=False, auto_cleanup=False)
mock_monitor.monitor_all_runs.assert_called_once()


def test_monitor_process_command_with_specific_steps(mocker: MockerFixture, caplog: CaptureFixture):
"""
Test the case when `args.steps != ['all']` -> uses `check_merlin_status()` in a loop.

Args:
mocker: PyTest mocker fixture.
caplog: PyTest caplog fixture.
"""
caplog.set_level(logging.INFO)

mock_spec = mocker.Mock()
mock_get_spec = mocker.patch("merlin.cli.commands.monitor.get_merlin_spec_with_override", return_value=(mock_spec, None))
mock_sleep = mocker.patch("time.sleep")

# simulate 2 iterations
mock_check_status = mocker.patch("merlin.cli.commands.monitor.check_merlin_status", side_effect=[True, True, False])

command = MonitorCommand()
args = Namespace(
specification="workflow.yaml",
steps=["step1"],
variables=None,
task_server="celery",
sleep=5,
no_restart=False,
)
command.process_command(args)

mock_get_spec.assert_called_once_with(args)
assert mock_sleep.call_count == 3 # 1 before loop, 2 in loop
assert mock_check_status.call_count == 3
assert "Monitor: found tasks in queues and/or tasks being processed" in caplog.text
assert "Monitor: ... stop condition met" in caplog.text