sqlalchemy
SQLAlchemy-based implementation of the repository protocol.
SQLAlchemyRepository ¶
Bases: AbstractRepository[ModelT]
SQLAlchemy based implementation of the repository interface.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session |
AsyncSession
|
Session managing the unit-of-work for the operation. |
required |
select_ |
Select[tuple[ModelT]] | None
|
To facilitate customization of the underlying select query. |
None
|
check_health
classmethod
async
¶
Perform a health check on the database.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session |
AsyncSession
|
through which we runa check statement |
required |
Returns:
| Type | Description |
|---|---|
bool
|
|
wrap_sqlalchemy_exception ¶
Do something within context to raise a RepositoryException chained
from an original SQLAlchemyError.
>>> try:
... with wrap_sqlalchemy_exception():
... raise SQLAlchemyError("Original Exception")
... except RepositoryException as exc:
... print(f"caught repository exception from {type(exc.__context__)}")
...
caught repository exception from <class 'sqlalchemy.exc.SQLAlchemyError'>