Testing¶
Use pytest-amgi to drive an AsyncFast app in-process from a test. The
amgi_producer fixture starts the AMGI lifespan protocol before returning a
producer, and shuts lifespan down when the test finishes.
pip install pytest-amgi
The examples below use an app defined in app.py and tests in
test_app.py.
Basic Test¶
Send a message with AMGIProducer.send() and assert the app acked it:
@app.channel("orders")
async def orders(payload: int) -> None:
assert payload == 1
async def test_message(amgi_producer: AMGIProducerFactory) -> None:
producer = await amgi_producer(app)
result = await producer.send("orders", json=1)
result.assert_acked()
Headers¶
Headers can be sent as a mapping. AsyncFast can read them with
Header.
@app.channel("orders.with-header")
async def orders_with_header(trace_id: Annotated[str, Header()]) -> None:
assert trace_id == "trace-1"
async def test_message_header(amgi_producer: AMGIProducerFactory) -> None:
producer = await amgi_producer(app)
result = await producer.send("orders.with-header", headers={"trace-id": "trace-1"})
result.assert_acked()
Message Sends¶
If a handler sends follow-up messages with MessageSender, assert
them with AMGIMessageResult.assert_has_message_send().
@dataclass
class ProcessOrder(Message, address="orders.process"):
payload: dict[str, int]
@app.channel("orders.created")
async def orders_created(
message_sender: MessageSender[ProcessOrder],
) -> None:
await message_sender.send(ProcessOrder(payload={"id": 1}))
async def test_message_send(amgi_producer: AMGIProducerFactory) -> None:
producer = await amgi_producer(app)
result = await producer.send("orders.created")
result.assert_acked()
result.assert_has_message_send("orders.process", json={"id": 1})
For multiple follow-up messages, use
AMGIMessageResult.assert_has_message_sends(). This checks the number
and order of messages sent by the app.
@dataclass
class ReserveInventory(Message, address="inventory.reserve"):
payload: dict[str, int]
@dataclass
class SendReceipt(Message, address="email.receipt"):
payload: dict[str, int]
@app.channel("orders.created.multiple")
async def orders_created_multiple(
message_sender: MessageSender[ReserveInventory | SendReceipt],
) -> None:
await message_sender.send(ReserveInventory(payload={"id": 1}))
await message_sender.send(SendReceipt(payload={"id": 1}))
async def test_multiple_message_sends(amgi_producer: AMGIProducerFactory) -> None:
producer = await amgi_producer(app)
result = await producer.send("orders.created.multiple")
result.assert_acked()
result.assert_has_message_sends(
[
Message("inventory.reserve", json={"id": 1}),
Message("email.receipt", json={"id": 1}),
]
)
Nacks¶
Use AMGIMessageResult.assert_nacked() when the app rejects a message.
@app.channel("orders.invalid")
async def orders_invalid() -> None:
raise RuntimeError("invalid order")
async def test_message_nack(amgi_producer: AMGIProducerFactory) -> None:
producer = await amgi_producer(app)
result = await producer.send("orders.invalid")
result.assert_nacked(match="invalid order")