Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
f873097
Creation of fail_fast policy
DavideCorigliano-Unimib Dec 20, 2025
f8c338b
Setting fail_fast: false as default mode
DavideCorigliano-Unimib Dec 20, 2025
d38485a
Refined fail_fast: true as behaviour
DavideCorigliano-Unimib Dec 20, 2025
07c13d8
Refined fail_fast: true when missing
DavideCorigliano-Unimib Dec 22, 2025
fb98d9d
Perfected fail_fast: true - DAG outcome
DavideCorigliano-Unimib Dec 22, 2025
536de57
Fine tuned rules for exit-task
DavideCorigliano-Unimib Dec 23, 2025
5cfa4ab
Fine tuned rules for execution status
DavideCorigliano-Unimib Dec 23, 2025
f0e8e13
Restyling made with sqlite3
DavideCorigliano-Unimib Dec 24, 2025
f8cc026
Updated files for running dags
DavideCorigliano-Unimib Dec 24, 2025
7e6657f
Rewiring DB to previous running - DAG running
DavideCorigliano-Unimib Dec 24, 2025
9ef94e0
( dags | executions | tasks ) tables rewired and improved
DavideCorigliano-Unimib Dec 26, 2025
b6ce455
Added and implemented task_attempts table in DB
DavideCorigliano-Unimib Dec 26, 2025
2612705
Added and implemented task_dependencies table in DB
DavideCorigliano-Unimib Dec 26, 2025
e0e8414
Partially restored logs table in DB
DavideCorigliano-Unimib Dec 27, 2025
60f90ef
Completely restored logs table in DB
DavideCorigliano-Unimib Dec 27, 2025
c6aef1a
Rewired logs table to client console
DavideCorigliano-Unimib Dec 27, 2025
5b3b789
Specify number of attempt on client console
DavideCorigliano-Unimib Dec 27, 2025
8f14986
Clean up version after DB restyling
DavideCorigliano-Unimib Dec 28, 2025
2f8af06
Added execution column in task_dependencies table in DB
DavideCorigliano-Unimib Dec 28, 2025
2e32b12
Solved bug in task_dependencies table of DB
DavideCorigliano-Unimib Dec 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ abstract: |

dag:
name: "wait_and_retry_with_logging"
fail_fast: false
tasks:

# Initial message
- task_id: "start"
type: "PrintTask"
Expand Down Expand Up @@ -37,7 +37,7 @@ dag:
import random, sys, time
generate_random = random.random()
print(f"Task 2 - Generated: {round(generate_random, 2)}")
if generate_random < 0.7:
if generate_random < 0.9999:
print("Failure occurred")
sys.exit(1)
else:
Expand All @@ -52,4 +52,5 @@ dag:
params:
message: "All retry tasks completed!"
dependencies: ["unstable_task_2"]
# dependency_policy: "none"
dependency_policy: "none"
is_final: true
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ abstract: |

dag:
name: "conditional_branching_triple_tree"

fail_fast: true
tasks:

# ---------------------------------------------------------
Expand Down Expand Up @@ -105,7 +105,7 @@ dag:
- "branch1_step2"
- "branch2_step2"
- "branch3_step2"
dependency_policy: none
dependency_policy: "none"

# ---------------------------------------------------------
# FINAL TASK
Expand All @@ -115,3 +115,5 @@ dag:
params:
message: "Triple-branch conditional pipeline completed successfully!"
dependencies: ["merge_results"]
is_final: true

14 changes: 10 additions & 4 deletions server.log
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
2025-12-16 20:19:12,878 [INFO] __main__: [DEBUG] Initializing Maestro server with database: /home/dave_crd/Sync/VSCode/maestro/maestro.db
2025-12-16 21:57:58,975 [INFO] __main__: [DEBUG] Initializing Maestro server with database: /home/dave_crd/Sync/VSCode/maestro/maestro.db
2025-12-16 22:08:28,317 [INFO] __main__: [DEBUG] Initializing Maestro server with database: /home/dave_crd/Sync/VSCode/maestro/maestro.db
2025-12-16 22:55:04,930 [INFO] __main__: [DEBUG] Initializing Maestro server with database: /home/dave_crd/Sync/VSCode/maestro/maestro.db
2025-12-28 00:30:05,591 [INFO] __main__: [DEBUG] Initializing Maestro server with database: /home/dave_crd/Sync/VSCode/maestro/maestro.db
2025-12-28 09:04:28,695 [INFO] __main__: [DEBUG] Initializing Maestro server with database: /home/dave_crd/Sync/VSCode/maestro/maestro.db
2025-12-28 09:07:59,680 [INFO] __main__: [DEBUG] Initializing Maestro server with database: /home/dave_crd/Sync/VSCode/maestro/maestro.db
2025-12-28 09:15:37,855 [INFO] __main__: [DEBUG] Initializing Maestro server with database: /home/dave_crd/Sync/VSCode/maestro/maestro.db
2025-12-28 11:57:12,471 [INFO] __main__: [DEBUG] Initializing Maestro server with database: /home/dave_crd/Sync/VSCode/maestro/maestro.db
2025-12-28 12:11:22,465 [INFO] __main__: [DEBUG] Initializing Maestro server with database: /home/dave_crd/Sync/VSCode/maestro/maestro.db
2025-12-28 12:12:43,604 [INFO] __main__: [DEBUG] Initializing Maestro server with database: /home/dave_crd/Sync/VSCode/maestro/maestro.db
2025-12-28 12:16:40,057 [INFO] __main__: [DEBUG] Initializing Maestro server with database: /home/dave_crd/Sync/VSCode/maestro/maestro.db
2025-12-28 12:18:53,781 [INFO] __main__: [DEBUG] Initializing Maestro server with database: /home/dave_crd/Sync/VSCode/maestro/maestro.db
2025-12-28 12:20:46,102 [INFO] __main__: [DEBUG] Initializing Maestro server with database: /home/dave_crd/Sync/VSCode/maestro/maestro.db
137 changes: 86 additions & 51 deletions src/maestro/client/api_client.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,34 @@
#!/usr/bin/env python3

import requests
import json
import time
from typing import Optional, Dict, Any, List, Iterator
from datetime import datetime
import urllib.parse
from datetime import datetime
from typing import Any, Dict, Iterator, List, Optional

import requests


class MaestroAPIClient:
"""REST client for communicating with Maestro API server"""

def __init__(self, base_url: str = "http://localhost:8000", timeout: int = 30):
self.base_url = base_url.rstrip('/')
self.base_url = base_url.rstrip("/")
self.timeout = timeout
self.session = requests.Session()

def _make_request(self, method: str, endpoint: str, **kwargs) -> requests.Response:
"""Make HTTP request with error handling"""
url = f"{self.base_url}{endpoint}"

try:
response = self.session.request(method, url, timeout=self.timeout, **kwargs)
response.raise_for_status()
return response
except requests.exceptions.ConnectionError:
raise ConnectionError(f"Could not connect to Maestro server at {self.base_url}")
raise ConnectionError(
f"Could not connect to Maestro server at {self.base_url}"
)
except requests.exceptions.Timeout:
raise TimeoutError(f"Request to {url} timed out")
except requests.exceptions.HTTPError as e:
Expand All @@ -35,73 +39,96 @@ def _make_request(self, method: str, endpoint: str, **kwargs) -> requests.Respon
raise ValueError(f"Bad request: {error_detail}")
else:
raise RuntimeError(f"HTTP {response.status_code}: {response.text}")

def health_check(self) -> Dict[str, Any]:
"""Check if the server is running"""
response = self._make_request("GET", "/")
return response.json()


def create_dag(self, dag_file_path: str, dag_id: Optional[str] = None) -> Dict[str, Any]:
def create_dag(
self, dag_file_path: str, dag_id: Optional[str] = None
) -> Dict[str, Any]:
"""Create a new DAG from a YAML file"""
data = {"dag_file_path": dag_file_path}
if dag_id:
data["dag_id"] = dag_id
response = self._make_request(method="POST",
endpoint="/v1/dags/create",
json=data)
response = self._make_request(
method="POST", endpoint="/v1/dags/create", json=data
)
return response.json()

def run_dag(self, dag_id: str, resume: bool = False, fail_fast: bool = True) -> Dict[str, Any]:
def run_dag(
self,
dag_id: str,
resume: bool = False,
fail_fast: Optional[bool] = None,
trigger_type: Optional[str] = None,
triggered_by: Optional[str] = None,
) -> Dict[str, Any]:
"""Run a previously created DAG"""
data = {
"dag_id": dag_id,

data: Dict[str, Any] = {
"resume": resume,
"fail_fast": fail_fast
}
response = self._make_request(method="POST",
endpoint=f"/v1/dags/{dag_id}/run",
json=data)

if fail_fast is not None:
data["fail_fast"] = fail_fast

if trigger_type is not None:
data["trigger_type"] = trigger_type

if triggered_by is not None:
data["triggered_by"] = triggered_by

response = self._make_request(
method="POST",
endpoint=f"/v1/dags/{dag_id}/run",
json=data,
)

return response.json()

def get_dag_status(self, dag_id: str, execution_id: Optional[str] = None) -> Dict[str, Any]:

def get_dag_status(
self, dag_id: str, execution_id: Optional[str] = None
) -> Dict[str, Any]:
"""Get status of a specific DAG execution"""
endpoint = f"/v1/dags/{dag_id}/status"
params = {}
if execution_id:
params["execution_id"] = execution_id

response = self._make_request("GET", endpoint, params=params)
return response.json()

def get_dag_logs_v1(self,
dag_id: str,
execution_id: Optional[str] = None,
limit: int = 100,
task_filter: Optional[str] = None,
level_filter: Optional[str] = None) -> List[Dict[str, Any]]:

def get_dag_logs_v1(
self,
dag_id: str,
execution_id: Optional[str] = None,
limit: int = 100,
task_filter: Optional[str] = None,
level_filter: Optional[str] = None,
) -> List[Dict[str, Any]]:
"""Get logs for a specific DAG execution using the v1 API"""
endpoint = f"/v1/logs/{dag_id}"
params = {"limit": limit}

if execution_id:
params["execution_id"] = execution_id
if task_filter:
params["task_filter"] = task_filter
if level_filter:
params["level_filter"] = level_filter

response = self._make_request("GET", endpoint, params=params)
return response.json()


# TODO: refactor, this is used only for testing
def stream_dag_logs_v1(
self,
dag_id: str,
execution_id: Optional[str] = None,
task_filter: Optional[str] = None,
level_filter: Optional[str] = None
level_filter: Optional[str] = None,
) -> Iterator[Dict[str, Any]]:
"""
Stream logs for a specific DAG execution in real-time using the v1 API.
Expand Down Expand Up @@ -133,46 +160,49 @@ def stream_dag_logs_v1(

line = line.decode("utf-8").strip()
if line.startswith("data: "):
data = line[len("data: "):]
data = line[len("data: ") :]
try:
yield json.loads(data)
except json.JSONDecodeError:
continue

except requests.exceptions.ConnectionError:
raise ConnectionError(f"Could not connect to Maestro server at {self.base_url}")
raise ConnectionError(
f"Could not connect to Maestro server at {self.base_url}"
)
except requests.exceptions.HTTPError as e:
raise RuntimeError(f"HTTP {response.status_code}: {response.text}")


def get_running_dags(self) -> Dict[str, Any]:
"""Get all currently running DAGs"""
response = self._make_request("GET", "/dags/running")
return response.json()

def cancel_dag(self, dag_id: str, execution_id: Optional[str] = None) -> Dict[str, Any]:

def cancel_dag(
self, dag_id: str, execution_id: Optional[str] = None
) -> Dict[str, Any]:
"""Cancel a running DAG execution"""
endpoint = f"/dags/{dag_id}/cancel"
params = {}
if execution_id:
params["execution_id"] = execution_id

response = self._make_request("POST", endpoint, params=params)
return response.json()

def validate_dag(self, dag_file_path: str) -> Dict[str, Any]:
"""Validate a DAG file without executing it"""
endpoint = f"/dags/validate"
data = {"dag_file_path": dag_file_path}

response = self._make_request("POST", endpoint, json=data)
return response.json()

def cleanup_old_executions(self, days: int = 30) -> Dict[str, Any]:
"""Clean up old execution records"""
endpoint = "/dags/cleanup"
params = {"days": days}

response = self._make_request("DELETE", endpoint, params=params)
return response.json()

Expand All @@ -191,7 +221,11 @@ def list_dags(self, status_filter: Optional[str] = None) -> Dict[str, Any]:

if status_filter == "running":
active_dag_lst = self.list_dags_v1("active")
return {"dags": active_dag_lst, "title": "Running DAGs", "count": len(active_dag_lst)}
return {
"dags": active_dag_lst,
"title": "Running DAGs",
"count": len(active_dag_lst),
}
else:
all_dag_lst = self.list_dags_v1("all")
return {"dags": all_dag_lst, "title": "All DAGs", "count": len(all_dag_lst)}
Expand All @@ -202,12 +236,13 @@ def list_dags_v1(self, filter: Optional[str] = None) -> List[Dict[str, Any]]:
params = {}
if filter:
params["status"] = filter

response = self._make_request("GET", endpoint, params=params)
return response.json()

def stop_dag(self, dag_id: str,
execution_id: Optional[str] = None) -> Dict[str, Any]:
def stop_dag(
self, dag_id: str, execution_id: Optional[str] = None
) -> Dict[str, Any]:
"""Stop a running DAG execution"""
endpoint = f"/v1/dags/{dag_id}/stop"
data = {}
Expand All @@ -222,15 +257,15 @@ def resume_dag(self, dag_id: str, execution_id: str) -> Dict[str, Any]:
data = {"dag_id": dag_id, "execution_id": execution_id}
response = self._make_request("POST", endpoint, json=data)
return response.json()

def is_server_running(self) -> bool:
"""Check if the Maestro server is running"""
try:
self.health_check()
return True
except (ConnectionError, TimeoutError):
return False

def wait_for_server(self, max_wait_time: int = 30) -> bool:
"""Wait for the server to become available"""
start_time = time.time()
Expand Down
Loading
Loading