Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/agent_workflow_server/services/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ async def _call_webhook(run: Run) -> None:
async with httpx.AsyncClient() as client:
response = await client.post(
run["webhook"],
data=run_data,
content=run_data,
headers={"Content-Type": "application/json"},
)

Expand Down
43 changes: 43 additions & 0 deletions tests/mock.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import json
from typing import AsyncGenerator, List, Optional

from aiohttp import web
Copy link
Member Author

@mtrinell mtrinell May 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't make fastapi work here (looks like it collides with the other fastapi server, being in the same event loop) - so using aiohttp


from agent_workflow_server.agents.base import BaseAdapter, BaseAgent
from agent_workflow_server.generated.manifest.models.agent_manifest import AgentManifest
from agent_workflow_server.services.message import Message
Expand Down Expand Up @@ -105,3 +107,44 @@ def load_agent(


mock_agent = MockAgentImpl()


MOCK_WEBSERVER_DEFAULT_PORT = 9753
MOCK_WEBSERVER_DEFAULT_HOST = "127.0.0.1"
MOCK_WEBSERVER_WEBHOOK_PATH = "webhook"


class TestWebServer:
def __init__(self, host: str, port: int, path: str):
self.host = host
self.port = port
self.path = "/" + path.strip("/")
self.webhook_payload = None
self._runner = None
self._site = None
self._app = web.Application()
self._app.router.add_post(self.path, self._webhook_handler)
self._server_task = None

async def _webhook_handler(self, request: web.Request) -> web.Response:
self.webhook_payload = await request.read()
return web.Response(status=200)

async def start(self):
self._runner = web.AppRunner(self._app)
await self._runner.setup()
self._site = web.TCPSite(self._runner, self.host, self.port)
await self._site.start()

async def stop(self):
if self._site:
await self._site.stop()
if self._runner:
await self._runner.cleanup()

async def __aenter__(self):
await self.start()
return self

async def __aexit__(self, exc_type, exc, tb):
await self.stop()
23 changes: 22 additions & 1 deletion tests/test_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@
MOCK_RUN_OUTPUT_ERROR,
MOCK_RUN_OUTPUT_INTERRUPT,
MOCK_RUN_OUTPUT_STREAM,
MOCK_WEBSERVER_DEFAULT_HOST,
MOCK_WEBSERVER_DEFAULT_PORT,
MOCK_WEBSERVER_WEBHOOK_PATH,
MockAdapter,
TestWebServer,
)


Expand All @@ -45,7 +49,7 @@
recursion_limit=3,
configurable={"mock-key": "mock-value"},
),
webhook="http://some-host:8000/webhook",
webhook=f"http://{MOCK_WEBSERVER_DEFAULT_HOST}:{MOCK_WEBSERVER_DEFAULT_PORT}/{MOCK_WEBSERVER_WEBHOOK_PATH}",
),
"success",
MOCK_RUN_OUTPUT,
Expand Down Expand Up @@ -76,6 +80,14 @@ async def test_invoke(
expected_status: RunStatus,
expected_output: dict,
):
if run_create_mock.webhook:
mock_server = TestWebServer(
host=MOCK_WEBSERVER_DEFAULT_HOST,
port=MOCK_WEBSERVER_DEFAULT_PORT,
path=MOCK_WEBSERVER_WEBHOOK_PATH,
)
await mock_server.start()

mocker.patch("agent_workflow_server.agents.load.ADAPTERS", [MockAdapter()])

try:
Expand Down Expand Up @@ -103,8 +115,17 @@ async def test_invoke(
assert run.creation.config == run_create_mock.config
assert run.creation.webhook == run_create_mock.webhook
assert output == expected_output

# Check if the webhook was called with the expected payload
if run_create_mock.webhook:
assert mock_server.webhook_payload.decode("utf-8") == run.model_dump_json(
by_alias=True, exclude_unset=True
)

finally:
worker_task.cancel()
if run_create_mock.webhook and mock_server:
await mock_server.stop()
try:
await worker_task
except asyncio.CancelledError:
Expand Down