From 9bde1b9917bd6407bbb6e6838222e87318de0cfe Mon Sep 17 00:00:00 2001 From: Adrian Lyjak Date: Tue, 23 Sep 2025 15:47:27 -0400 Subject: [PATCH 1/3] Update basic workflow to use human in the loop --- .python-version | 2 +- src/app/workflow.py | 37 ++++++++++++++---- ui/src/pages/Home.tsx | 89 ++++++++++++++++++++++++++++++------------- 3 files changed, 94 insertions(+), 34 deletions(-) diff --git a/.python-version b/.python-version index e4fba21..24ee5b1 100644 --- a/.python-version +++ b/.python-version @@ -1 +1 @@ -3.12 +3.13 diff --git a/src/app/workflow.py b/src/app/workflow.py index b631827..ae55dd2 100644 --- a/src/app/workflow.py +++ b/src/app/workflow.py @@ -2,7 +2,7 @@ import time from workflows import Context, Workflow, step -from workflows.events import StartEvent, StopEvent, Event +from workflows.events import HumanResponseEvent, InputRequiredEvent, StartEvent, StopEvent, Event import logging from datetime import datetime @@ -21,25 +21,48 @@ class WorkflowCompletedEvent(StopEvent): timestamp: str +class PauseEvent(InputRequiredEvent): + timestamp: str + + +class ResumeEvent(HumanResponseEvent): + should_continue: bool + +class OkGoEvent(Event): + message: str + class DefaultWorkflow(Workflow): @step - async def start(self, event: PingEvent, context: Context) -> WorkflowCompletedEvent: + async def start(self, event: PingEvent, context: Context) -> OkGoEvent: + return OkGoEvent(message="OK GO") + + @step + async def loop( + self, event: ResumeEvent | OkGoEvent, context: Context + ) -> PauseEvent | WorkflowCompletedEvent: + if isinstance(event, ResumeEvent) and not event.should_continue: + return WorkflowCompletedEvent( + timestamp="workflow completed at " + + datetime.now().isoformat(timespec="seconds") + ) start = time.monotonic() - logger.info(f"Received message: {event.message}") + logger.info(f"Received message!!!!!: {event}") for i in range(5): - logger.info(f"Processing message: {event.message} {i}") + logger.info(f"Processing message: {event} {i}") elapsed = (time.monotonic() - start) * 1000 context.write_event_to_stream( PongEvent(message=f"+{elapsed:.0f}ms PONG {i + 1}/5 ") ) await asyncio.sleep(0.2) - return WorkflowCompletedEvent( - timestamp="workflow completed at " + + + return PauseEvent( + timestamp="workflow paused at " + datetime.now().isoformat(timespec="seconds") ) -workflow = DefaultWorkflow() +workflow = DefaultWorkflow(timeout=None) if __name__ == "__main__": logging.basicConfig(level=logging.INFO) diff --git a/ui/src/pages/Home.tsx b/ui/src/pages/Home.tsx index bec5fc0..196b5b4 100644 --- a/ui/src/pages/Home.tsx +++ b/ui/src/pages/Home.tsx @@ -1,28 +1,23 @@ import { useState } from "react"; -import { useWorkflowHandler, useWorkflowRun } from "@llamaindex/ui"; +import { + type WorkflowEvent, + useWorkflowHandler, + useWorkflowRun, +} from "@llamaindex/ui"; export default function Home() { const [taskId, setTaskId] = useState(null); const createHandler = useWorkflowRun(); return (
-
+
- This is a basic starter app for LlamaDeploy. It's running a simple - workflow on the backend, and vite with react on the frontend with - llama-ui to call the workflow. Customize this app with your own - workflow and UI. + This is a basic starter app for LlamaDeploy. It's running a simple + Human-in-the-loop workflow on the backend, and vite with react on the frontend with + llama-ui to call the workflow. Customize this app with your own workflow and UI.
- {taskId ? ( - - ) : ( - - - Workflow result will show here. - - - )} + @@ -71,33 +66,75 @@ function RunButton({ ); } -function HandlerOutput({ handlerId }: { handlerId: string }) { +type PongEvent = { type: `${string}.PongEvent`; data: { message: string } }; +type PauseEvent = { type: `${string}.PauseEvent`; data: { timestamp: string } }; + +function isPongEvent(event: WorkflowEvent): event is PongEvent { + return event.type.endsWith(".PongEvent"); +} +function isPauseEvent(event: WorkflowEvent): event is PauseEvent { + return event.type.endsWith(".PauseEvent"); +} + +function HandlerOutput({ handlerId }: { handlerId: string | null }) { // stream events and result from the workflow - const handler = useWorkflowHandler(handlerId); + const handler = useWorkflowHandler(handlerId ?? ""); // read workflow events here - const pongs = handler.events.filter((event) => - event.type.match(/PongEvent$/), - ) as { type: string; data: { message: string } }[]; + const pongsOrResume = handler.events.filter( + (event) => isPongEvent(event) || isPauseEvent(event) + ) as (PongEvent | PauseEvent)[]; const completed = handler.events.find((event) => - event.type.match(/WorkflowCompletedEvent$/), + event.type.endsWith(".WorkflowCompletedEvent") ) as { type: string; data: { timestamp: string } } | undefined; return ( -
+
{completed ? completed.data.timestamp : "Running... "} - {pongs.map((pong, index) => ( + {pongsOrResume.map((pong, index) => ( - {pong.data.message} + {isPongEvent(pong) ? pong.data.message : pong.data.timestamp} + {isPauseEvent(pong) && + index === pongsOrResume.length - 1 && + !completed && ( + + )} ))} + {!completed && ( + + )}
); } From 7bca316d0e269b9d12d4e038a2eb22dbd79ec010 Mon Sep 17 00:00:00 2001 From: Adrian Lyjak Date: Tue, 23 Sep 2025 15:49:39 -0400 Subject: [PATCH 2/3] Fix formats --- pyproject.toml | 1 + src/app/workflow.py | 11 +++++++++-- ui/src/pages/Home.tsx | 11 ++++++----- 3 files changed, 16 insertions(+), 7 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 8077334..ca12e7c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,6 +20,7 @@ dev = [ "ty>=0.0.1a16", "pytest>=8.4.1", "hatch>=1.14.1", + "click>=8.0.0,!=8.3.0", ] diff --git a/src/app/workflow.py b/src/app/workflow.py index ae55dd2..8882ca2 100644 --- a/src/app/workflow.py +++ b/src/app/workflow.py @@ -2,7 +2,13 @@ import time from workflows import Context, Workflow, step -from workflows.events import HumanResponseEvent, InputRequiredEvent, StartEvent, StopEvent, Event +from workflows.events import ( + HumanResponseEvent, + InputRequiredEvent, + StartEvent, + StopEvent, + Event, +) import logging from datetime import datetime @@ -28,9 +34,11 @@ class PauseEvent(InputRequiredEvent): class ResumeEvent(HumanResponseEvent): should_continue: bool + class OkGoEvent(Event): message: str + class DefaultWorkflow(Workflow): @step async def start(self, event: PingEvent, context: Context) -> OkGoEvent: @@ -55,7 +63,6 @@ async def loop( ) await asyncio.sleep(0.2) - return PauseEvent( timestamp="workflow paused at " + datetime.now().isoformat(timespec="seconds") diff --git a/ui/src/pages/Home.tsx b/ui/src/pages/Home.tsx index 196b5b4..3c95fc8 100644 --- a/ui/src/pages/Home.tsx +++ b/ui/src/pages/Home.tsx @@ -12,9 +12,10 @@ export default function Home() {
- This is a basic starter app for LlamaDeploy. It's running a simple - Human-in-the-loop workflow on the backend, and vite with react on the frontend with - llama-ui to call the workflow. Customize this app with your own workflow and UI. + This is a basic starter app for LlamaDeploy. It's running a simple + Human-in-the-loop workflow on the backend, and vite with react on the + frontend with llama-ui to call the workflow. Customize this app with + your own workflow and UI.
@@ -82,10 +83,10 @@ function HandlerOutput({ handlerId }: { handlerId: string | null }) { // read workflow events here const pongsOrResume = handler.events.filter( - (event) => isPongEvent(event) || isPauseEvent(event) + (event) => isPongEvent(event) || isPauseEvent(event), ) as (PongEvent | PauseEvent)[]; const completed = handler.events.find((event) => - event.type.endsWith(".WorkflowCompletedEvent") + event.type.endsWith(".WorkflowCompletedEvent"), ) as { type: string; data: { timestamp: string } } | undefined; return ( From cb495d6f6c2660c3291ca3595d403cceccb19e59 Mon Sep 17 00:00:00 2001 From: Adrian Lyjak Date: Tue, 23 Sep 2025 15:51:33 -0400 Subject: [PATCH 3/3] Hide stop button on initialization --- ui/src/pages/Home.tsx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ui/src/pages/Home.tsx b/ui/src/pages/Home.tsx index 3c95fc8..f9a3a11 100644 --- a/ui/src/pages/Home.tsx +++ b/ui/src/pages/Home.tsx @@ -122,7 +122,7 @@ function HandlerOutput({ handlerId }: { handlerId: string | null }) { )} ))} - {!completed && ( + {!completed && pongsOrResume.length > 0 && (