Skip to content

Readme and retry jobs #684

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: deployments-webhook
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,10 @@ Make sure docker is running.
```bash
docker exec -it middleware-dev tail --lines 500 -f /var/log/postgres/postgres.log
```
**Queue logs**
```bash
docker exec -it middleware-dev tail --lines 500 -f /var/log/queue/queue.log
```


## 🛠️ Manual Setup
Expand Down Expand Up @@ -306,6 +310,13 @@ To set up middleware locally, follow these steps:
```
NOTE: Open this sync sever in a new terminal window after activating the virtual environment only after starting analytics server.

- For queue worker:
```bash
PYTHONPATH=. PROCRASTINATE_APP=procrastinate_worker.app procrastinate worker
```
NOTE: This step is required only if you intend to receive webhooks.
Make sure you are in `analytics_server` directory and virtual environment is activated.

6. **Web Server Setup**

- Install NodeJs 22.x either [manually](https://nodejs.org/en/download) or using a tool like [nvm](https://github.com/nvm-sh/nvm) or [volta](https://volta.sh/).
Expand Down
19 changes: 19 additions & 0 deletions backend/analytics_server/mhq/api/request_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from functools import wraps
import traceback
from typing import Dict, List
from uuid import UUID
from mhq.exceptions.webhook import WebhookException

from mhq.store.models.code.enums import TeamReposDeploymentType
from flask import request
Expand Down Expand Up @@ -74,6 +76,23 @@ def new_func(*args, **kwargs):
return decorator


def wrap_webhook_exceptions(f):
@wraps(f)
def decorator_function(*args, **kwargs):
try:
return f(*args, **kwargs)
except WebhookException:
raise
except Exception as e:
tb = traceback.format_exc()
raise WebhookException(
message=f"Unexpected error: {str(e)}",
resolution=f"{tb}",
) from e

return decorator_function


def coerce_workflow_filter(filter_data: str) -> WorkflowFilter:
workflow_filter_processor = get_workflow_filter_processor()
return workflow_filter_processor.create_workflow_filter_from_json_string(
Expand Down
23 changes: 15 additions & 8 deletions backend/analytics_server/mhq/api/webhooks.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,37 @@
from flask import Blueprint, request
from typing import Any, Dict, List
from mhq.api.request_utils import wrap_webhook_exceptions
from mhq.exceptions.webhook import InvalidEventTypeError
from mhq.service.query_validator import get_query_validator
from mhq.store.models.webhooks.enums import WebhookEventRequestType
from mhq.service.webhooks.factory import WebhookEventFactory
from mhq.store.models.events.enums import EventType, WEBHOOK_EVENTS
from mhq.service.events.factory import WebhookEventFactory
from mhq.service.queue.tasks import WebhookQueue

app = Blueprint("webhooks", __name__)


@app.route("/public/webhook/<event_type>", methods={"POST"})
@wrap_webhook_exceptions
def receive_webhook_workflows(event_type: str):
webhook_event_type = WebhookEventRequestType(event_type)
secret_key = request.headers.get("X-API-KEY")

if event_type not in WEBHOOK_EVENTS:
raise InvalidEventTypeError(event_type)

event_type = EventType(event_type)
api_key = request.headers.get("X-API-KEY")

query_validator = get_query_validator()
default_org = query_validator.get_default_org()
org_id = str(default_org.id)
query_validator.api_key_validator(secret_key, org_id)
query_validator.api_key_validator(api_key, org_id)

webhook_event_factory = WebhookEventFactory()
webhook_service = webhook_event_factory(webhook_event_type)
webhook_service = webhook_event_factory(event_type)

payload: Dict[str, List[Any]] = request.get_json()
webhook_service.validate_payload(payload)
webhook_event_id = webhook_service.save_webhook_event(org_id, payload)
event_id = webhook_service.save_webhook_event(org_id, payload)

job_id = WebhookQueue.enqueue_webhook.defer(webhook_event_id=str(webhook_event_id))
job_id = WebhookQueue.enqueue_webhook.defer(event_id=str(event_id))

return {"message": "Job enqueued successfully", "job_id": job_id}, 200
21 changes: 21 additions & 0 deletions backend/analytics_server/mhq/service/events/event_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from mhq.store.models.events.events import Event
from mhq.store.repos.events import EventsRepoService
from typing import Optional


class EventService:
def __init__(self, event_repo_service: EventsRepoService):
self._event_repo_service: EventsRepoService = event_repo_service

def get_event(self, event_id: str) -> Optional[Event]:
return self._event_repo_service.get_event(event_id)

def create_event(self, event: Event) -> str:
return self._event_repo_service.create_event(event)

def update_event(self, event: Event):
return self._event_repo_service.update_event(event)


def get_event_service():
return EventService(event_repo_service=EventsRepoService())
11 changes: 11 additions & 0 deletions backend/analytics_server/mhq/service/events/factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from mhq.service.events.factory_abstract import WebhookEventHandler
from mhq.store.models.events.enums import EventType
from mhq.service.events.webhook_workflow_handler import get_webhook_workflow_handler


class WebhookEventFactory:
def __call__(self, event_type: EventType) -> WebhookEventHandler:
if event_type == EventType.WORKFLOW:
return get_webhook_workflow_handler()

raise NotImplementedError(f"Unknown event type - {event_type}")
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from abc import ABC, abstractmethod
from typing import Any, Dict
from mhq.store.models.webhooks.webhooks import WebhookEvent
from mhq.store.models.events.events import Event


class WebhookEventHandler(ABC):
Expand All @@ -17,13 +17,13 @@ def save_webhook_event(self, org_id: str, payload: Dict[str, Any]) -> str:
"""

@abstractmethod
def process_webhook_event(self, webhook_event: WebhookEvent):
def process_webhook_event(self, webhook_event: Event):
"""
Executes the main business logic for processing the webhook event.
"""

@abstractmethod
def prune_synced_data(self, webhook_event: WebhookEvent):
def prune_synced_data(self, webhook_event: Event):
"""
Prunes the synced data based on Interval.
"""
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def __post_init__(self):

for field in required_fields:
if getattr(self, field) is None:
error_message = f"Workflow run missing required field: '{field}'"
error_message = f"Workflow run is missing required field: '{field}'"
raise InvalidPayloadError(error_message)

try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,19 @@
from mhq.utils.time import Interval
from mhq.exceptions.webhook import PayloadLimitExceededError, InvalidPayloadError
from mhq.store.models.code.workflows.enums import RepoWorkflowRunsStatus
from mhq.service.webhooks.factory_abstract import WebhookEventHandler
from mhq.store.models.webhooks.webhooks import WebhookEvent, WebhookEventRequestType
from mhq.service.webhooks.models.webhook import WebhookWorkflowRun
from mhq.service.events.factory_abstract import WebhookEventHandler
from mhq.store.models.events.events import Event, EventType, EventSource
from mhq.service.events.models.webhook import WebhookWorkflowRun
from typing import Any, List, Dict, Optional, Set, Tuple
from mhq.store.models.code.workflows.workflows import (
RepoWorkflow,
RepoWorkflowRuns,
RepoWorkflowType,
RepoWorkflowProviders,
)
from mhq.service.webhooks.webhook_event_service import (
get_webhook_service,
WebhookEventService,
from mhq.service.events.event_service import (
get_event_service,
EventService,
)
from mhq.service.deployments.deployment_service import (
get_deployments_service,
Expand All @@ -31,11 +31,11 @@
class WebhookWorkflowHandler(WebhookEventHandler):
def __init__(
self,
webhook_event_service: WebhookEventService,
event_service: EventService,
repository_service: RepositoryService,
deployments_service: DeploymentsService,
):
self._webhook_event_service: WebhookEventService = webhook_event_service
self._event_service: EventService = event_service
self._repository_service: RepositoryService = repository_service
self._deployments_service: DeploymentsService = deployments_service

Expand All @@ -50,16 +50,16 @@ def validate_payload(self, payload: Dict[str, List[Any]]):
self._adapt_payload(payload)

def save_webhook_event(self, org_id: str, payload: Dict[str, List[Any]]) -> str:
webhook_event = WebhookEvent(
webhook_event = Event(
org_id=org_id,
request_type=WebhookEventRequestType.WORKFLOW,
request_data=payload,
type=EventType.WORKFLOW,
source=EventSource.WEBHOOK,
data=payload,
)
return self._webhook_event_service.create_webhook_event(webhook_event)

def process_webhook_event(self, webhook_event: WebhookEvent):
workflow_runs = self._adapt_payload(webhook_event.request_data)
return self._event_service.create_event(webhook_event)

def process_webhook_event(self, webhook_event: Event):
workflow_runs = self._adapt_payload(webhook_event.data)
repo_urls_set: Set[str] = set()

for workflow in workflow_runs:
Expand Down Expand Up @@ -258,7 +258,7 @@ def _check_and_update_duration(

def get_webhook_workflow_handler() -> WebhookWorkflowHandler:
return WebhookWorkflowHandler(
get_webhook_service(),
get_event_service(),
get_repository_service(),
get_deployments_service(),
)
10 changes: 5 additions & 5 deletions backend/analytics_server/mhq/service/query_validator.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from datetime import timedelta
from typing import List
from typing import List, Optional

from werkzeug.exceptions import NotFound, BadRequest
from mhq.exceptions.webhook import InvalidApiKeyError
Expand Down Expand Up @@ -74,15 +74,15 @@ def users_validator(self, user_ids: List[str]) -> List[Users]:

return users

def api_key_validator(self, secret_key: str | None, org_id: str) -> str:
api_key = self.repo_service.get_access_token(
def api_key_validator(self, api_key: Optional[str], org_id: str) -> str:
saved_api_key = self.repo_service.get_access_token(
org_id, UserIdentityProvider.WEBHOOK
)

if not api_key or api_key != secret_key:
if not saved_api_key or saved_api_key != api_key:
raise InvalidApiKeyError()

return api_key
return saved_api_key


def get_query_validator():
Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,38 @@
from mhq.service.webhooks.factory import WebhookEventFactory
from mhq.service.webhooks.webhook_event_service import (
get_webhook_service,
WebhookEventService,
from mhq.service.events.factory import WebhookEventFactory
from mhq.service.events.event_service import (
get_event_service,
EventService,
)
from procrastinate import jobs, RetryStrategy
from mhq.utils.log import LOG
import traceback


# total_wait = wait + lineal_wait * attempts + exponential_wait ** (attempts + 1)
MAX_RETRIES = 2
WAIT = 60
LINEAR_WAIT = 540
NEXT_ATTEMPT_TIME = [60, 600] # 1min, 10min

retry = RetryStrategy(max_attempts=MAX_RETRIES, wait=WAIT, linear_wait=LINEAR_WAIT)


class WebhookQueueHandler:
def __init__(self, webhooks_service: WebhookEventService):
self.webhooks_service = webhooks_service
def __init__(self, event_service: EventService):
self.event_service = event_service

def webhook_receiver_handler(self, webhook_event_id: str):
def handle(self, event_id: str, job: jobs.Job):
webhook_event = None
try:
webhook_event = self.webhooks_service.get_webhook_event(webhook_event_id)
webhook_event = self.event_service.get_event(event_id)
if not webhook_event:
raise Exception("Webhook payload not found in database.")
webhook_event_factory = WebhookEventFactory()
webhook_event_handler = webhook_event_factory(webhook_event.request_type)
webhook_event_handler = webhook_event_factory(webhook_event.type)
webhook_event_handler.process_webhook_event(webhook_event)

webhook_event.error = None
self.webhooks_service.update_webhook_event(webhook_event)
self.event_service.update_event(webhook_event)
except Exception as e:
if not webhook_event:
raise e
Expand All @@ -31,9 +42,21 @@ def webhook_receiver_handler(self, webhook_event_id: str):
"args": e.args,
"traceback": traceback.format_exc(),
}
self.webhooks_service.update_webhook_event(webhook_event)
self.event_service.update_event(webhook_event)

if job.attempts < len(NEXT_ATTEMPT_TIME):
next_attempt_minutes = int(NEXT_ATTEMPT_TIME[job.attempts] / 60)
time_unit = "minute" if next_attempt_minutes == 1 else "minutes"
LOG.info(
f"Job got failed. It will be retried in {next_attempt_minutes} {time_unit}."
)
elif job.attempts == len(NEXT_ATTEMPT_TIME):
LOG.info(
f"Job has reached the maximum of {MAX_RETRIES + 1} attempts and is now marked as permanently failed. No further retries will be made."
)

raise e


def get_webhook_queue_handler():
return WebhookQueueHandler(webhooks_service=get_webhook_service())
return WebhookQueueHandler(event_service=get_event_service())
24 changes: 21 additions & 3 deletions backend/analytics_server/mhq/service/queue/tasks.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,31 @@
from procrastinate_worker import app, flask_app
from mhq.service.queue.task_handlers.webhook_queue_handler import (
get_webhook_queue_handler,
retry,
)
from procrastinate import JobContext


class WebhookQueue:
@staticmethod
@app.task(queue="webhookQueue", name="WebhookQueue.enqueue_webhook")
def enqueue_webhook(webhook_event_id: str):
@app.task(
queue="webhookQueue",
name="WebhookQueue.enqueue_webhook",
pass_context=True,
retry=retry,
)
def enqueue_webhook(job_context: JobContext, event_id: str):
with flask_app.app_context():
webhook_queue_handler = get_webhook_queue_handler()
webhook_queue_handler.webhook_receiver_handler(webhook_event_id)
webhook_queue_handler.handle(event_id, job_context.job)


class CronJobQueue:
@staticmethod
@app.periodic(cron="0 */6 * * *") # Every 6 hours
@app.task(queue="cronJobQueue", name="CronJobQueue.retry_stalled_jobs")
async def retry_stalled_jobs():
stalled_jobs = await app.job_manager.get_stalled_jobs()
for job in stalled_jobs:
await app.job_manager.retry_job(job)

11 changes: 0 additions & 11 deletions backend/analytics_server/mhq/service/webhooks/factory.py

This file was deleted.

Loading