Async Worker¶
Service object integration¶
You can leverage the async worker without needing to know anything specific about the worker implementation.
The generic Service object includes a method that allows you to enqueue a background task.
Example¶
Let's add a background task that sends an email whenever a new Author
is created.
from typing import Any
from starlite_saqlalchemy import service
from starlite_saqlalchemy.repository.sqlalchemy import SQLAlchemyRepository
from domain.authors import Author, ReadDTO
class Repository(SQLAlchemyRepository[Author]):
model_type = Author
class Service(service.RepositoryService[Author]):
"""Author service object."""
repository_type = Repository
async def create(self, data: Author) -> Author:
created = await super().create(data)
await self.enqueue_background_task(
"send_author_created_email", raw_author=ReadDTO.from_orm(created).dict()
)
return created
async def send_author_created_email(self, raw_author: dict[str, Any]) -> None:
"""Logic here to send the email."""
Don't block the event loop¶
It is important to remember that this worker runs on the same event loop as the application itself, so be mindful that the operations you do in background tasks aren't blocking the loop.
If you need to do computationally heavy work in background tasks, a better pattern would be to use a something like Honcho to start an SAQ worker in a different process to the Starlite application, and run your app in a multicore environment.
Why SAQ¶
I like that it leverages BLMOVE
instead of polling to wait
for jobs: see Pattern: Reliable queue.
SAQ also make a direct comparison to ARQ
in their
README
, so I'll let that
speak for itself:
SAQ is heavily inspired by ARQ but has several enhancements.
- Avoids polling by leveraging BLMOVE or RPOPLPUSH and NOTIFY i. SAQ has much lower latency than ARQ, with delays of < 5ms. ARQ's default polling frequency is 0.5 seconds ii. SAQ is up to 8x faster than ARQ
- Web interface for monitoring queues and workers
- Heartbeat monitor for abandoned jobs
- More robust failure handling i. Storage of stack traces ii. Sweeping stuck jobs iii. Handling of cancelled jobs different from failed jobs (machine redeployments)
- Before and after job hooks
- Easily run multiple workers to leverage more cores