Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
273 changes: 273 additions & 0 deletions api/apps/canvas_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
from api.db.services.file_service import FileService
from api.db.services.user_service import TenantService
from api.db.services.user_canvas_version import UserCanvasVersionService
from api.db.services.schedule_agent_service import ScheduleAgentService

from api.settings import RetCode
from api.utils import get_uuid
from api.utils.api_utils import get_json_result, server_error_response, validate_request, get_data_error_result
Expand Down Expand Up @@ -471,6 +473,277 @@ def sessions(canvas_id):
return server_error_response(e)


@manager.route('/schedule/create', methods=['POST']) # type: ignore # noqa: F821
@validate_request("canvas_id", "name", "frequency_type")
@login_required
def create_schedule():
req = request.json
req["tenant_id"] = current_user.id # Using user_id as tenant_id for simplicity
req["created_by"] = current_user.id
req["id"] = get_uuid()

try:
# Validate schedule data
ScheduleAgentService.validate_schedule_data(**req)

# Check if canvas exists and user has permission
e, canvas = UserCanvasService.get_by_id(req["canvas_id"])
if not e:
return get_data_error_result(message="Canvas not found")

if not UserCanvasService.query(user_id=current_user.id, id=req["canvas_id"]):
return get_json_result(
data=False, message='Only owner of canvas authorized for this operation.',
code=RetCode.OPERATING_ERROR)

# Handle execute_date conversion - only date part
if req.get('execute_date') and isinstance(req['execute_date'], str):
from datetime import datetime
# Parse date string (YYYY-MM-DD format) and convert to datetime with midnight time
date_str = req['execute_date'].split('T')[0] # Remove time part if present
req['execute_date'] = datetime.strptime(date_str, '%Y-%m-%d').date()

schedule = ScheduleAgentService.create_schedule(**req)
return get_json_result(data=schedule.to_dict() if hasattr(schedule, 'to_dict') else schedule)

except ValueError as e:
return get_data_error_result(message=str(e))
except Exception as e:
return server_error_response(e)

@manager.route('/schedule/update', methods=['POST']) # type: ignore # noqa: F821
@validate_request("frequency_type", "id")
@login_required
def update_schedule():
req = request.json
schedule_id = req.get("id")
if not schedule_id:
return get_data_error_result(message="Schedule ID is required for update")
try:
ScheduleAgentService.validate_schedule_data(**req)

e, schedule = ScheduleAgentService.get_by_id(schedule_id)
if not e:
return get_data_error_result(message="Schedule not found")

if schedule.created_by != current_user.id:
return get_json_result(
data=False, message='Only owner of schedule authorized for this operation.',
code=RetCode.OPERATING_ERROR)
ScheduleAgentService.validate_schedule_data(**req)
result = ScheduleAgentService.update_by_id(schedule_id, req)

if result:
logging.info(f"[CANVAS_APP] Successfully updated schedule {schedule_id}")
else:
logging.error(f"[CANVAS_APP] Failed to update schedule {schedule_id}")

return get_json_result(data=True)

except ValueError as e:
return get_data_error_result(message=str(e))
except Exception as e:
return server_error_response(e)

@manager.route('/schedule/frequency-options', methods=['GET']) # type: ignore # noqa: F821
@login_required
def get_frequency_options():
"""Get available frequency options and their configurations"""
options = {
"frequency_types": [
{
"value": "once",
"label": "One Time",
"description": "Execute once at a specific date and time",
"required_fields": ["execute_date", "execute_time"]
},
{
"value": "daily",
"label": "Daily",
"description": "Execute every day at a specific time",
"required_fields": ["execute_time"]
},
{
"value": "weekly",
"label": "Weekly",
"description": "Execute on specific days of the week",
"required_fields": ["days_of_week", "execute_time"]
},
{
"value": "monthly",
"label": "Monthly",
"description": "Execute on a specific day of each month",
"required_fields": ["day_of_month", "execute_time"]
}
],
"days_of_week": [
{"value": 1, "label": "Monday"},
{"value": 2, "label": "Tuesday"},
{"value": 3, "label": "Wednesday"},
{"value": 4, "label": "Thursday"},
{"value": 5, "label": "Friday"},
{"value": 6, "label": "Saturday"},
{"value": 7, "label": "Sunday"}
],
"time_format": "HH:MM:SS",
"date_format": "ISO 8601 (YYYY-MM-DDTHH:MM:SS.sssZ)"
}

return get_json_result(data=options)

@manager.route('/schedule/list', methods=['GET']) # type: ignore # noqa: F821
@login_required
def list_schedules():
"""Get schedules list with pagination and search"""
try:
page = int(request.args.get('page', 1))
page_size = int(request.args.get('page_size', 20))
keywords = request.args.get('keywords', '')
canvas_id = request.args.get('canvas_id', '')

# Get schedules created by current user
schedules, total = ScheduleAgentService.get_schedules_paginated(
created_by=current_user.id,
canvas_id=canvas_id,
keywords=keywords,
page=page,
page_size=page_size
)

# Convert to dict and add canvas info
schedule_list = []
for schedule in schedules:
schedule_dict = schedule.to_dict() if hasattr(schedule, 'to_dict') else schedule.__dict__['__data__']

# Get canvas title for display
try:
e, canvas = UserCanvasService.get_by_id(schedule_dict['canvas_id'])
if e:
schedule_dict['canvas_title'] = canvas.title
else:
schedule_dict['canvas_title'] = 'Unknown Canvas'
except Exception:
schedule_dict['canvas_title'] = 'Unknown Canvas'

schedule_list.append(schedule_dict)

return get_json_result(data={
"schedules": schedule_list,
"total": total,
"page": page,
"page_size": page_size
})

except Exception as e:
return server_error_response(e)

@manager.route('/schedule/toggle/<schedule_id>', methods=['POST']) # type: ignore # noqa: F821
@login_required
def toggle_schedule(schedule_id):
"""Toggle schedule enabled status"""
try:
e, schedule = ScheduleAgentService.get_by_id(schedule_id)
if not e:
return get_data_error_result(message="Schedule not found")

if schedule.created_by != current_user.id:
return get_json_result(
data=False, message='Only owner of schedule authorized for this operation.',
code=RetCode.OPERATING_ERROR)

# Toggle enabled status
new_enabled = not schedule.enabled
ScheduleAgentService.update_by_id(schedule_id, {'enabled': new_enabled})

return get_json_result(data={'enabled': new_enabled})

except Exception as e:
return server_error_response(e)

@manager.route('/schedule/delete/<schedule_id>', methods=['DELETE']) # type: ignore # noqa: F821
@login_required
def delete_schedule(schedule_id):
"""Delete a schedule"""
try:
e, schedule = ScheduleAgentService.get_by_id(schedule_id)
if not e:
return get_data_error_result(message="Schedule not found")

if schedule.created_by != current_user.id:
return get_json_result(
data=False, message='Only owner of schedule authorized for this operation.',
code=RetCode.OPERATING_ERROR)

# Delete the schedule
ScheduleAgentService.delete_by_id(schedule_id)

return get_json_result(data=True)

except Exception as e:
return server_error_response(e)

@manager.route('/schedule/history/<schedule_id>', methods=['GET']) # type: ignore # noqa: F821
@login_required
def get_schedule_history(schedule_id):
"""Get execution history for a schedule"""
try:
e, schedule = ScheduleAgentService.get_by_id(schedule_id)
if not e:
return get_data_error_result(message="Schedule not found")

if schedule.created_by != current_user.id:
return get_json_result(
data=False, message='Only owner of schedule authorized for this operation.',
code=RetCode.OPERATING_ERROR)

limit = int(request.args.get('limit', 20))
history = ScheduleAgentService.get_schedule_execution_history(schedule_id, limit)

# Convert to dict
history_list = []
for run in history:
run_dict = run.to_dict() if hasattr(run, 'to_dict') else run.__dict__['__data__']



history_list.append(run_dict)

return get_json_result(data=history_list)

except Exception as e:
return server_error_response(e)

@manager.route('/schedule/stats/<schedule_id>', methods=['GET']) # type: ignore # noqa: F821
@login_required
def get_schedule_stats(schedule_id):
"""Get execution statistics for a schedule"""
try:
e, schedule = ScheduleAgentService.get_by_id(schedule_id)
if not e:
return get_data_error_result(message="Schedule not found")

if schedule.created_by != current_user.id:
return get_json_result(
data=False, message='Only owner of schedule authorized for this operation.',
code=RetCode.OPERATING_ERROR)

stats = {
'total_runs': ScheduleAgentService._get_run_count(schedule_id, success_only=False),
'successful_runs': ScheduleAgentService._get_run_count(schedule_id, success_only=True),
'failed_runs': ScheduleAgentService._get_run_count(schedule_id, success_only=False) - ScheduleAgentService._get_run_count(schedule_id, success_only=True),
'last_successful_run': None,
'is_currently_running': ScheduleAgentService._is_currently_running(schedule_id)
}

last_run = ScheduleAgentService._get_last_successful_run(schedule_id)
if last_run:
stats['last_successful_run'] = last_run.to_dict() if hasattr(last_run, 'to_dict') else last_run.__dict__['__data__']

return get_json_result(data=stats)

except Exception as e:
return server_error_response(e)

@manager.route('/prompts', methods=['GET']) # noqa: F821
@login_required
def prompts():
Expand Down
43 changes: 43 additions & 0 deletions api/db/db_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -883,6 +883,46 @@ class Meta:
db_table = "user_canvas_version"


class ScheduleAgent(DataBaseModel):
id = CharField(max_length=32, primary_key=True)
tenant_id = CharField(max_length=32, null=False, index=True)
canvas_id = CharField(max_length=32, null=False, help_text="canvas id to execute", index=True)
name = CharField(max_length=255, null=False, help_text="schedule name", index=True)
description = TextField(null=True, help_text="schedule description")

# Frequency options
frequency_type = CharField(max_length=20, null=False, help_text="once|daily|weekly|monthly", default="once", index=True)

# Time settings
execute_time = CharField(max_length=8, null=True, help_text="HH:MM:SS format", index=True)
execute_date = DateTimeField(null=True, help_text="specific date for one-time execution", index=True)

# Weekly settings
days_of_week = JSONField(null=False, default=[], help_text="[1,2,3,4,5,6,7] where 1=Monday")

# Monthly settings
day_of_month = IntegerField(null=True, help_text="day of month (1-31)", index=True)

enabled = BooleanField(default=True, help_text="whether the schedule is enabled", index=True)
input_params = JSONField(null=False, default={}, help_text="input parameters for agent")
created_by = CharField(max_length=32, null=False, help_text="who created it", index=True)
status = CharField(max_length=1, null=True, help_text="is it validate(0: wasted, 1: validate)", default="1", index=True)

class Meta:
db_table = "schedule_agent"


class ScheduleAgentRun(DataBaseModel):
id = CharField(max_length=32, primary_key=True)
schedule_id = CharField(max_length=32, null=False, help_text="schedule agent id", index=True)
started_at = DateTimeField(null=False, help_text="execution start datetime", index=True)
finished_at = DateTimeField(null=True, help_text="execution finish datetime", index=True)
success = BooleanField(null=True, help_text="execution result", index=True)
error_message = TextField(null=True, help_text="error message if failed")
conversation_id = CharField(max_length=32, null=True, help_text="conversation id from execution", index=True)

class Meta:
db_table = "schedule_agent_run"
class MCPServer(DataBaseModel):
id = CharField(max_length=32, primary_key=True)
name = CharField(max_length=255, null=False, help_text="MCP Server name")
Expand Down Expand Up @@ -942,6 +982,9 @@ class Meta:
db_table = "search"





def migrate_db():
logging.disable(logging.ERROR)
migrator = DatabaseMigrator[settings.DATABASE_TYPE.upper()].value(DB)
Expand Down
Loading