Skip to content

asyncfast/amgi

Repository files navigation

AMGI

Tests codecov

AMGI (Asynchronous Messaging Gateway Interface) is the message-driven sibling of ASGI.

Where ASGI standardises the interface between Python applications and HTTP servers, AMGI standardises the interface between Python applications and message brokers.

It enables you to build broker-agnostic, strongly-typed, message-driven applications that are portable across protocols and compute environments without rewriting business logic.

Architecture

AMGI Layered Architecture

AMGI defines a clean separation between:

  • Your application logic
  • The framework layer (e.g. AsyncFast)
  • The AMGI interface contract
  • The underlying protocol implementation

Protocol implementations may include Kafka, SQS, MQTT, Redis, and others - but your application code remains unchanged.

Why AMGI?

Modern systems are message-driven - but each broker has:

  • Different client libraries
  • Different runtime models
  • Different infrastructure constraints
  • Different deployment assumptions

AMGI introduces a minimal interface boundary so that:

  • Switching brokers does not require rewriting your application
  • Running in Lambda is no more complex than running on EC2
  • Local development mirrors production semantics

If it runs on one protocol, it should run on another.

AsyncFast - Typed Message APIs

This repository also includes AsyncFast, a typed microframework built on AMGI, inspired by FastAPI.

If FastAPI made HTTP APIs easy to build, AsyncFast aims to do the same for message-driven systems.

Quick Example (AsyncFast)

from typing import Annotated
from asyncfast import AsyncFast
from asyncfast import Header
from pydantic import BaseModel

app = AsyncFast()


class UserCreated(BaseModel):
    id: int
    email: str


@app.channel("user.created")
async def handle_user_created(
    payload: UserCreated,
    correlation_id: Annotated[str, Header()],
) -> None:
    print(f"User created: {payload.email} ({correlation_id})")

What you get automatically:

  • Typed payload validation
  • Typed headers
  • Clean dependency injection
  • AsyncAPI-aligned schema generation
  • Broker portability

Running on Kafka

from amgi_aiokafka import Server

server = Server(
    app, "my-topic", bootstrap_servers="localhost:9092", group_id="my-group"
)

server.run()

or

asyncfast run amgi-aiokafka main:app my-topic --group-id my-group

Running in AWS Lambda (SQS Trigger)

from amgi_sqs_event_source_mapping import SqsEventSourceMappingHandler

handler = SqsEventSourceMappingHandler(app)

The application code remains unchanged.

What Is AMGI?

At its core, AMGI defines a minimal, low-level callable interface, similar in spirit to ASGI:

async def app(scope, receive, send):
    try:
        # Do some message handling here!
        await send(
            {
                "type": "message.ack",
            }
        )
    except Exception as e:
        await send({"type": "message.nack", "message": str(e)})

AMGI provides:

  • A standard application callable
  • A structured message scope
  • A send mechanism

Frameworks like AsyncFast can be built on top of this interface.

Core Aims

Portable

Applications written against AMGI:

  • Are independent of a specific broker
  • Are independent of compute environment
  • Can run in containers, VMs, or Lambda

Infrastructure changes should not rewrite business logic.

Standards-Based

AMGI and AsyncFast are aligned with:

Schemas are first-class. Validation and documentation are built into the design.

Clean Implementation

Each protocol implementation aims to be:

  • Minimal
  • Explicit
  • Correct
  • Efficient

This is engineered abstraction.

Protocol Implementations

The following AMGI servers are currently available:

Kafka

MQTT

Redis

SQS

Documentation

Contact

For questions or suggestions, please contact jack.burridge@mail.com.

Packages

 
 
 

Contributors

Languages