from __future__ import annotations
from typing import (
TYPE_CHECKING,
Any,
ClassVar,
)
from advanced_alchemy.exceptions import NotFoundError
from advanced_alchemy.extensions.fastapi import (
repository,
service,
)
from advanced_alchemy.service import (
ModelDictT,
OffsetPagination,
schema_dump,
)
from cashews import cache
from sqlalchemy.orm import load_only, noload, selectinload
from app.config.constants import (
DEFAULT_ADMIN_EMAIL,
DEFAULT_USER_ROLE_SLUG,
)
from app.db import models as m
from app.domain.users.schemas import User as UserDto
from app.lib import crypt
from app.lib.exceptions import (
NotFoundException,
PermissionDeniedException,
UnauthorizedException,
)
if TYPE_CHECKING:
from uuid import UUID
from advanced_alchemy.filters import StatementFilter
from app.domain.users.schemas import PasswordUpdate
[docs]
class UserService(service.SQLAlchemyAsyncRepositoryService[m.User]):
"""Handles database operations for users."""
class UserRepository(repository.SQLAlchemyAsyncRepository[m.User]):
"""User SQLAlchemy Repository."""
model_type = m.User
repository_type = UserRepository
match_fields: ClassVar[list[str]] = ["email"]
default_role: ClassVar[str] = DEFAULT_USER_ROLE_SLUG
system_admin_email: ClassVar[str] = DEFAULT_ADMIN_EMAIL
async def to_model_on_create(self, data: ModelDictT[m.User]) -> ModelDictT[m.User]:
data = schema_dump(data)
return await self._populate_with_hashed_password(data)
async def to_model_on_update(self, data: ModelDictT[m.User]) -> ModelDictT[m.User]:
data = schema_dump(data)
return await self._populate_with_hashed_password(data)
@staticmethod
async def _populate_with_hashed_password(data: dict[str, Any]) -> dict[str, Any]:
if (password := data.pop("password", None)) is not None:
data["password"] = await crypt.get_password_hash(password=password)
return data
[docs]
async def authenticate(self, username: str, password: str) -> m.User:
"""Authenticate a user.
Args:
username (str): User email.
password (str): User password.
Raises:
UnauthorizedException: If the user is not found, not verified, or inactive.
Returns:
~app.db.models.user.User: The user object.
"""
db_obj = await self.get_one_or_none(
email=username,
load=[
load_only(m.User.id, m.User.email, m.User.is_active, m.User.password),
noload(m.User.role),
],
)
if (
db_obj is None
or not await crypt.verify_password(plain_password=password, hashed_password=db_obj.password)
or not db_obj.is_active
):
raise UnauthorizedException(message="Invalid credentials or account is unavailable")
return db_obj
[docs]
async def update_password(self, data: PasswordUpdate, user_id: UUID) -> None:
"""Modify the stored user password.
Args:
data (PasswordUpdate): The Pydantic schema with current and new passwords.
user_id (UUID): The unique ID of the target user.
Raises:
UnauthorizedException: If the current password is incorrect.
"""
user_obj = await self.get(
item_id=user_id,
load=[
load_only(m.User.id, m.User.password),
noload(m.User.role),
],
)
if not await crypt.verify_password(data.current_password, user_obj.password):
msg = "Current password is incorrect"
raise UnauthorizedException(message=msg)
user_obj.password = await crypt.get_password_hash(password=data.new_password)
[docs]
def check_critical_action_forbidden(
self,
target_user: m.User,
calling_superuser_id: UUID,
) -> None:
"""Disallow destructive action on self or system admin.
Args:
target_user (:py:class:`~app.db.models.user.User`): The user object targeted for action.
calling_superuser_id (UUID): UUID of the superuser calling the action.
Raises:
PermissionDeniedException: If target is the system admin or the caller themselves.
"""
if target_user.email == self.system_admin_email:
msg = "Forbidden: Cannot modify the primary system administrator account"
raise PermissionDeniedException(message=msg)
if target_user.id == calling_superuser_id:
msg = "Self-action forbidden: Cannot perform destructive action on your own account"
raise PermissionDeniedException(message=msg)
[docs]
@cache(ttl="1m", key="users_list:{filters}")
async def get_users_paginated_dto(self, filters: list[StatementFilter]) -> OffsetPagination[UserDto]:
"""Retrieve a paginated list of users as DTOs."""
results, total = await self.list_and_count(
*filters,
load=[
selectinload(self.model_type.role).options(load_only(m.Role.name, m.Role.slug)),
],
)
return self.to_schema(data=results, total=total, filters=filters, schema_type=UserDto)
[docs]
class RoleService(service.SQLAlchemyAsyncRepositoryService[m.Role]):
"""Handles database operations for roles."""
class RoleRepository(repository.SQLAlchemyAsyncRepository[m.Role]):
"""Role SQLAlchemy Repository."""
model_type = m.Role
repository_type = RoleRepository
match_fields: ClassVar[list[str]] = ["name"]
[docs]
async def get_id_and_slug_by_slug(self, slug: str) -> m.Role:
"""Retrieve the role object with column optimization."""
try:
return await self.get_one(
slug=slug,
load=load_only(self.model_type.id, self.model_type.slug),
)
except NotFoundError as exc:
msg = f"Role with slug '{slug}' not found"
raise NotFoundException(message=msg) from exc
[docs]
async def get_default_role(self, default_role_slug: str) -> m.Role:
"""Retrieve the default role object with column optimization.
Args:
default_role_slug (str): The slug of the default role (e.g., 'application-access').
Returns:
Role: A Role object (with `id`, `name`, and `slug` loaded).
Raises:
NotFoundError: Signals a **critical infrastructure failure**. This role is required,
and its absence means that the initial database seeding did not complete.
"""
return await self.get_one(
slug=default_role_slug,
load=load_only(self.model_type.id, self.model_type.name, self.model_type.slug),
)