from __future__ import annotations
from typing import (
TYPE_CHECKING,
Any,
ClassVar,
)
from advanced_alchemy.exceptions import NotFoundError
from advanced_alchemy.extensions.fastapi import (
repository,
service,
)
from advanced_alchemy.service import (
ModelDictT,
OffsetPagination,
schema_dump,
)
from cashews import cache
from sqlalchemy import func, select
from sqlalchemy.orm import (
joinedload,
load_only,
noload,
)
from app.config.constants import DEFAULT_USER_ROLE_SLUG
from app.db import models as m
from app.domain.users.schemas import User as UserDto
from app.lib import crypt
from app.lib.exceptions import (
NotFoundException,
PermissionDeniedException,
UnauthorizedException,
UserNotFound,
)
if TYPE_CHECKING:
from uuid import UUID
from app.domain.users.filters import UserFilters
from app.domain.users.schemas import PasswordUpdate
[docs]
class UserService(service.SQLAlchemyAsyncRepositoryService[m.User]):
"""Handles database operations for users."""
class UserRepository(repository.SQLAlchemyAsyncRepository[m.User]):
"""User SQLAlchemy Repository."""
model_type = m.User
repository_type = UserRepository
match_fields: ClassVar[list[str]] = ["email"]
default_role: ClassVar[str] = DEFAULT_USER_ROLE_SLUG
async def to_model_on_create(self, data: ModelDictT[m.User]) -> ModelDictT[m.User]:
data = schema_dump(data)
return await self._populate_with_hashed_password(data)
async def to_model_on_update(self, data: ModelDictT[m.User]) -> ModelDictT[m.User]:
data = schema_dump(data)
return await self._populate_with_hashed_password(data)
@staticmethod
async def _populate_with_hashed_password(data: dict[str, Any]) -> dict[str, Any]:
if (password := data.pop("password", None)) is not None:
data["password"] = await crypt.get_password_hash(password=password)
return data
[docs]
async def authenticate(self, username: str, password: str) -> m.User:
"""Authenticate a user.
Args:
username (str): User email.
password (str): User password.
Raises:
UnauthorizedException: If the user is not found, not verified, or inactive.
Returns:
~app.db.models.user.User: The user object.
"""
statement = select(self.model_type).where(func.lower(self.model_type.email) == username.lower())
db_obj = await self.get_one_or_none(
statement=statement,
load=[
load_only(m.User.id, m.User.email, m.User.is_active, m.User.password),
noload(m.User.role),
],
)
if (
db_obj is None
or not await crypt.verify_password(plain_password=password, hashed_password=db_obj.password)
or not db_obj.is_active
):
raise UnauthorizedException(message="Invalid credentials or account is unavailable")
return db_obj
[docs]
async def update_password(self, data: PasswordUpdate, user_id: UUID) -> None:
"""Modify the stored user password.
Args:
data (PasswordUpdate): The Pydantic schema with current and new passwords.
user_id (UUID): The unique ID of the target user.
Raises:
UnauthorizedException: If the current password is incorrect.
"""
user_obj = await self.get(
item_id=user_id,
load=[
load_only(m.User.id, m.User.password),
noload(m.User.role),
],
)
if not await crypt.verify_password(data.current_password, user_obj.password):
msg = "Current password is incorrect"
raise UnauthorizedException(message=msg)
user_obj.password = await crypt.get_password_hash(password=data.new_password)
[docs]
async def get_and_validate_for_role_change(self, email: str) -> m.User:
"""Retrieve an active user by email for role modification.
Args:
email (str): The email of the user whose role is being modified.
Returns:
~app.db.models.user.User: The User model object from the database.
Raises:
UserNotFound: If no user is found with the given email.
PermissionDeniedException: If the user is found but their account is inactive.
"""
statement = select(self.model_type).where(func.lower(self.model_type.email) == email.lower())
user_obj = await self.get_one_or_none(statement=statement)
if user_obj is None:
raise UserNotFound
if not user_obj.is_active:
msg = f"Cannot modify role for inactive user {user_obj.email}"
raise PermissionDeniedException(message=msg)
return user_obj
[docs]
@cache(ttl="1m", key="users_list:{params}")
async def get_users_paginated_dto(self, params: UserFilters) -> OffsetPagination[UserDto]:
"""Provide a filtered and paginated list of users with caching."""
filters = params.aa_technical_filters
results, total = await self.get_many_and_count(
*filters,
load=[
joinedload(self.model_type.role).load_only(m.Role.name, m.Role.slug),
],
)
return self.to_schema(data=results, total=total, filters=filters, schema_type=UserDto)
[docs]
class RoleService(service.SQLAlchemyAsyncRepositoryService[m.Role]):
"""Handles database operations for roles."""
class RoleRepository(repository.SQLAlchemyAsyncRepository[m.Role]):
"""Role SQLAlchemy Repository."""
model_type = m.Role
repository_type = RoleRepository
match_fields: ClassVar[list[str]] = ["name"]
[docs]
async def get_id_and_slug_by_slug(self, slug: str) -> m.Role:
"""Retrieve the role object with column optimization."""
try:
return await self.get_one(
slug=slug,
load=load_only(self.model_type.id, self.model_type.slug),
)
except NotFoundError as exc:
msg = f"Role with slug '{slug}' not found"
raise NotFoundException(message=msg) from exc
[docs]
async def get_default_role(self, default_role_slug: str) -> m.Role:
"""Retrieve the default role object with column optimization.
Args:
default_role_slug (str): The slug of the default role (e.g., 'application-access').
Returns:
Role: A Role object (with `id`, `name`, and `slug` loaded).
Raises:
NotFoundError: Signals a **critical infrastructure failure**. This role is required,
and its absence means that the initial database seeding did not complete.
"""
return await self.get_one(
slug=default_role_slug,
load=load_only(self.model_type.id, self.model_type.name, self.model_type.slug),
)