Source code for app.domain.users.services

from __future__ import annotations

from typing import (
    TYPE_CHECKING,
    Any,
    ClassVar,
)

from advanced_alchemy.exceptions import NotFoundError
from advanced_alchemy.extensions.fastapi import (
    repository,
    service,
)
from advanced_alchemy.service import (
    ModelDictT,
    OffsetPagination,
    schema_dump,
)
from cashews import cache
from sqlalchemy import func, select
from sqlalchemy.orm import (
    joinedload,
    load_only,
    noload,
)

from app.config.constants import DEFAULT_USER_ROLE_SLUG
from app.db import models as m
from app.domain.users.schemas import User as UserDto
from app.lib import crypt
from app.lib.exceptions import (
    NotFoundException,
    PermissionDeniedException,
    UnauthorizedException,
    UserNotFound,
)

if TYPE_CHECKING:
    from uuid import UUID

    from app.domain.users.filters import UserFilters
    from app.domain.users.schemas import PasswordUpdate


[docs] class UserService(service.SQLAlchemyAsyncRepositoryService[m.User]): """Handles database operations for users.""" class UserRepository(repository.SQLAlchemyAsyncRepository[m.User]): """User SQLAlchemy Repository.""" model_type = m.User repository_type = UserRepository match_fields: ClassVar[list[str]] = ["email"] default_role: ClassVar[str] = DEFAULT_USER_ROLE_SLUG async def to_model_on_create(self, data: ModelDictT[m.User]) -> ModelDictT[m.User]: data = schema_dump(data) return await self._populate_with_hashed_password(data) async def to_model_on_update(self, data: ModelDictT[m.User]) -> ModelDictT[m.User]: data = schema_dump(data) return await self._populate_with_hashed_password(data) @staticmethod async def _populate_with_hashed_password(data: dict[str, Any]) -> dict[str, Any]: if (password := data.pop("password", None)) is not None: data["password"] = await crypt.get_password_hash(password=password) return data
[docs] async def authenticate(self, username: str, password: str) -> m.User: """Authenticate a user. Args: username (str): User email. password (str): User password. Raises: UnauthorizedException: If the user is not found, not verified, or inactive. Returns: ~app.db.models.user.User: The user object. """ statement = select(self.model_type).where(func.lower(self.model_type.email) == username.lower()) db_obj = await self.get_one_or_none( statement=statement, load=[ load_only(m.User.id, m.User.email, m.User.is_active, m.User.password), noload(m.User.role), ], ) if ( db_obj is None or not await crypt.verify_password(plain_password=password, hashed_password=db_obj.password) or not db_obj.is_active ): raise UnauthorizedException(message="Invalid credentials or account is unavailable") return db_obj
[docs] async def update_password(self, data: PasswordUpdate, user_id: UUID) -> None: """Modify the stored user password. Args: data (PasswordUpdate): The Pydantic schema with current and new passwords. user_id (UUID): The unique ID of the target user. Raises: UnauthorizedException: If the current password is incorrect. """ user_obj = await self.get( item_id=user_id, load=[ load_only(m.User.id, m.User.password), noload(m.User.role), ], ) if not await crypt.verify_password(data.current_password, user_obj.password): msg = "Current password is incorrect" raise UnauthorizedException(message=msg) user_obj.password = await crypt.get_password_hash(password=data.new_password)
[docs] async def get_and_validate_for_role_change(self, email: str) -> m.User: """Retrieve an active user by email for role modification. Args: email (str): The email of the user whose role is being modified. Returns: ~app.db.models.user.User: The User model object from the database. Raises: UserNotFound: If no user is found with the given email. PermissionDeniedException: If the user is found but their account is inactive. """ statement = select(self.model_type).where(func.lower(self.model_type.email) == email.lower()) user_obj = await self.get_one_or_none(statement=statement) if user_obj is None: raise UserNotFound if not user_obj.is_active: msg = f"Cannot modify role for inactive user {user_obj.email}" raise PermissionDeniedException(message=msg) return user_obj
[docs] @cache(ttl="1m", key="users_list:{params}") async def get_users_paginated_dto(self, params: UserFilters) -> OffsetPagination[UserDto]: """Provide a filtered and paginated list of users with caching.""" filters = params.aa_technical_filters results, total = await self.get_many_and_count( *filters, load=[ joinedload(self.model_type.role).load_only(m.Role.name, m.Role.slug), ], ) return self.to_schema(data=results, total=total, filters=filters, schema_type=UserDto)
[docs] class RoleService(service.SQLAlchemyAsyncRepositoryService[m.Role]): """Handles database operations for roles.""" class RoleRepository(repository.SQLAlchemyAsyncRepository[m.Role]): """Role SQLAlchemy Repository.""" model_type = m.Role repository_type = RoleRepository match_fields: ClassVar[list[str]] = ["name"]
[docs] async def get_id_and_slug_by_slug(self, slug: str) -> m.Role: """Retrieve the role object with column optimization.""" try: return await self.get_one( slug=slug, load=load_only(self.model_type.id, self.model_type.slug), ) except NotFoundError as exc: msg = f"Role with slug '{slug}' not found" raise NotFoundException(message=msg) from exc
[docs] async def get_default_role(self, default_role_slug: str) -> m.Role: """Retrieve the default role object with column optimization. Args: default_role_slug (str): The slug of the default role (e.g., 'application-access'). Returns: Role: A Role object (with `id`, `name`, and `slug` loaded). Raises: NotFoundError: Signals a **critical infrastructure failure**. This role is required, and its absence means that the initial database seeding did not complete. """ return await self.get_one( slug=default_role_slug, load=load_only(self.model_type.id, self.model_type.name, self.model_type.slug), )