Skip to content

worker

SAQ worker and queue.

queue module-attribute

queue = Queue(redis.client)

Async worker queue.

Queue instance instantiated with redis instance.

JobConfig dataclass

Configure a Job.

Used to configure jobs enqueued via Service.enqueue_background_task()

heartbeat class-attribute

heartbeat: int = settings.worker.JOB_HEARTBEAT

Max time a job can survive without emitting a heartbeat. 0 to disable.

job.update() will trigger a heartbeat.

key class-attribute

key: str | None = None

Pass in to control duplicate jobs.

queue class-attribute

queue: Queue = queue

Queue associated with the job.

retries class-attribute

retries: int = settings.worker.JOB_RETRIES

Max attempts for any job.

retry_backoff class-attribute

retry_backoff: bool | float = settings.worker.JOB_RETRY_BACKOFF

If true, use exponential backoff for retry delays.

  • The first retry will have whatever retry_delay is.
  • The second retry will have retry_delay2. The third retry will have retry_delay4. And so on.
  • This always includes jitter, where the final retry delay is a random number between 0 and the calculated retry delay.
  • If retry_backoff is set to a number, that number is the maximum retry delay, in seconds."

retry_delay class-attribute

retry_delay: float = settings.worker.JOB_TTL

Seconds to delay before retrying a job.

timeout class-attribute

timeout: int = settings.worker.JOB_TIMEOUT

Max time a job can run for, in seconds.

Set to 0 for no timeout.

ttl class-attribute

ttl: int = settings.worker.JOB_TTL

Lifetime of available job information, in seconds.

0: indefinite -1: disabled (no info retained)

Queue

Queue(*args, **kwargs)

Bases: saq.Queue

Async task queue.

Names the queue per the application slug - namespaces SAQ's redis keys to the app.

Configures msgspec for JSON serialization/deserialization if not otherwise configured.

Parameters:

Name Type Description Default
*args Any

Passed through to saq.Queue.__init__()

()
**kwargs Any

Passed through to saq.Queue.__init__()

{}

namespace

namespace(key)

Namespace for the Queue.

Parameters:

Name Type Description Default
key str

The unique key to use for the namespace.

required

Returns:

Name Type Description
str str

The worker namespace

Worker

Bases: saq.Worker

Modify behavior of saq worker for orchestration by Starlite.

on_app_startup async

on_app_startup()

Attach the worker to the running event loop.

create_worker_instance

create_worker_instance(functions, before_process=None, after_process=None)

Parameters:

Name Type Description Default
functions Collection[Callable[..., Any] | tuple[str, Callable]]

Functions to be called via the async workers.

required
before_process Callable[[dict[str, Any]], Awaitable[Any]] | None

Async function called before a job processes.

None
after_process Callable[[dict[str, Any]], Awaitable[Any]] | None

Async function called after a job processes.

None

Returns:

Type Description
Worker

The worker instance, instantiated with functions.