Skip to content

Conversation

igalshilman
Copy link

@igalshilman igalshilman commented Sep 24, 2025

This is a WIP draft of adding restate as a durable execution engine.
Opening this early for visibility and to keep the development in public.

This is the previous discussion as context: #1975 (comment)

The following examples currently work:

Automatic wrapping of tool calls

weather_agent = Agent(
    'openai:gpt-4.1-mini',
    # 'Be concise, reply with one sentence.' is enough for some models (like openai) to use
    # the below tools appropriately, but others like anthropic and gemini require a bit more direction.
    instructions='Be concise, reply with one sentence.',
)

class LatLng(BaseModel):
    lat: float
    lng: float

@weather_agent.tool
async def get_lat_lng(ctx: RunContext[None], location_description: str) -> LatLng:
    """Get the latitude and longitude of a location.

    Args:
        ctx: The context.
        location_description: A description of a location.
    """
    return LatLng(lat = 10.0, lng = 20.0)


@weather_agent.tool
async def get_weather(ctx: RunContext[None], lat: float, lng: float) -> dict[str, Any]:
    """Get the weather at a location.

    Args:
        ctx: The context.
        lat: Latitude of the location.
        lng: Longitude of the location.
    """

    return {
        'temperature': '20 °C',
        'description': 'Sunny', 
    }


def main():
    import restate    
    import uvicorn

    weather_service = restate.Service(name='weather')

    @weather_service.handler()
    async def handle_weather_request(ctx: restate.Context, city: str) -> str:
        restate_agent = RestateAgent(weather_agent, restate_context=ctx)
        result = await restate_agent.run(f'What is the weather like in {city}?')
        return result.output

    uvicorn.run(restate.app(services=[weather_service]), port=9080)

if __name__ == '__main__':
    main()

Advanced usage of the restate_context

It is also possible to pass trough the restate context to the tools (and thus disabling automatic wrapping) to use the context directly for durable rpc, durable timers, durable promises (human in the loop), and tools that have more than a single durable step.

@dataclass
class Deps:
    client: AsyncClient
    restate_context: Context


weather_agent = Agent(
    'openai:gpt-4.1-mini',
    # 'Be concise, reply with one sentence.' is enough for some models (like openai) to use
    # the below tools appropriately, but others like anthropic and gemini require a bit more direction.
    instructions='Be concise, reply with one sentence.',
    deps_type=Deps,
)


@weather_agent.tool
async def get_lat_lng(ctx: RunContext[Deps], location_description: str) -> LatLng:
    """Get the latitude and longitude of a location.

    Args:
        ctx: The context.
        location_description: A description of a location.
    """

    async def action():
        # NOTE: the response here will be random, and is not related to the location description.
        r = await ctx.deps.client.get(
            'https://demo-endpoints.pydantic.workers.dev/latlng',
            params={'location': location_description},
        )
        r.raise_for_status()
        return LatLng.model_validate_json(r.content)

    return await ctx.deps.restate_context.run_typed('Getting lat/lng', action)


@weather_agent.tool
async def get_weather(ctx: RunContext[Deps], lat: float, lng: float) -> dict[str, Any]:
    """Get the weather at a location.

    Args:
        ctx: The context.
        lat: Latitude of the location.
        lng: Longitude of the location.
    """
    
    async def fetch_temperature():
        r = await ctx.deps.client.get(
            'https://demo-endpoints.pydantic.workers.dev/number',
            params={'min': 10, 'max': 30},
        )
        r.raise_for_status()
        return r.text
    
    async def fetch_description():
        r = await ctx.deps.client.get(
            'https://demo-endpoints.pydantic.workers.dev/weather',
            params={'lat': lat, 'lng': lng},
        )
        r.raise_for_status()
        return r.text

    temp_response_fut = ctx.deps.restate_context.run_typed('Fetching temperature', fetch_temperature)
    descr_response_fut = ctx.deps.restate_context.run_typed('Fetching description', fetch_description)

    await restate.asyncio.gather(
       temp_response_fut, descr_response_fut 
    )

    return {
        'temperature': f'{await temp_response_fut} °C',
        'description': await descr_response_fut,
    }
    
def main():
    import uvicorn
    import restate
    weather_service = restate.Service(name='weather')

    @weather_service.handler()
    async def handle_weather_request(ctx: Context, city: str):
        async with AsyncClient() as client:
            restate_agent = RestateAgent(weather_agent, restate_context=ctx)
            deps = Deps(client=client, restate_context=ctx)
            result = await restate_agent.run(f'What is the weather like in {city}?', deps=deps)
            return result.output

    uvicorn.run(app(services=[weather_service]), port=9080)

if __name__ == '__main__':
    main()

@DouweM
Copy link
Collaborator

DouweM commented Sep 26, 2025

@igalshilman Hey Igal, I'm excited to expand our durable execution support! I'd like to discuss our expectations and acceptance criteria for contributions like this on a call early next week; would you mind joining our public Slack and sending me a DM (same username) so we can find some time?

@DouweM DouweM self-assigned this Sep 26, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants