"""User Role Management API Endpoints.
Defines the operations for assigning and revoking user roles. Requires superuser privileges.
"""
from typing import Annotated
from fastapi import (
APIRouter,
Depends,
)
from app.domain.users import urls
from app.domain.users.auth import Authenticate
from app.domain.users.deps import (
RoleServiceDep,
UserServiceDep,
)
from app.domain.users.schemas import (
UserAuth,
UserRoleAdd,
UserRoleRevoke,
)
from app.domain.users.utils import check_critical_action_forbidden
from app.lib.exceptions import ConflictException
from app.lib.invalidate_cache import invalidate_user_cache
from app.lib.json_response import MsgSpecJSONResponse
role_router = APIRouter(
tags=["User Account Role"],
)
[docs]
@role_router.patch(
path=urls.ACCOUNT_ASSIGN_ROLE,
name="roles:assign",
)
async def assign_new_role(
super_user: Annotated[UserAuth, Depends(Authenticate.superuser_required())],
users_service: UserServiceDep,
roles_service: RoleServiceDep,
user_add_role: UserRoleAdd,
email: str,
) -> MsgSpecJSONResponse:
"""Assign a new role to the specified user by email.
This operation requires superuser privileges. **Self-assignment is forbidden**
(superuser modifying their own account) for security.
Returns:
MsgSpecJSONResponse: Success message indicating the assigned role.
Raises:
PermissionDeniedException: If the superuser attempts to modify their own account or the system admin.
ConflictException: If the user already has the requested role.
"""
user_obj = await users_service.get_and_validate_for_role_change(
email=email,
)
check_critical_action_forbidden(
target_user=user_obj,
calling_superuser_id=super_user.id,
)
new_role = await roles_service.get_id_and_slug_by_slug(slug=user_add_role.role_slug)
if user_obj.role_id == new_role.id:
msg = f"User {user_obj.email} already has the '{new_role.slug}' role"
raise ConflictException(message=msg)
await users_service.update(data={"role_id": new_role.id}, item_id=user_obj.id)
await invalidate_user_cache(
user_id=user_obj.id,
)
return MsgSpecJSONResponse(
content={"message": f"Successfully assigned the '{new_role.slug}' role to {user_obj.email}"},
)
[docs]
@role_router.patch(
path=urls.ACCOUNT_REVOKE_ROLE,
name="roles:revoke",
)
async def revoke_and_set_default_role(
super_user: Annotated[UserAuth, Depends(Authenticate.superuser_required())],
users_service: UserServiceDep,
roles_service: RoleServiceDep,
user_revoke_role: UserRoleRevoke,
email: str,
) -> MsgSpecJSONResponse:
"""Revoke the specified role from the user by email and set the default role.
This operation requires superuser privileges. **Self-modification is forbidden**
(superuser modifying their own account) for security.
Returns:
MsgSpecJSONResponse: Success message indicating the revoked and default roles.
Raises:
PermissionDeniedException: If the superuser attempts to modify their own account or the system admin.
ConflictException: If the user does not currently have the role that is requested for revocation.
"""
user_obj = await users_service.get_and_validate_for_role_change(
email=email,
)
check_critical_action_forbidden(
target_user=user_obj,
calling_superuser_id=super_user.id,
)
old_role = await roles_service.get_id_and_slug_by_slug(slug=user_revoke_role.role_slug)
if old_role.id == user_obj.role_id:
default_role = await roles_service.get_default_role(
default_role_slug=users_service.default_role,
)
await users_service.update(data={"role_id": default_role.id}, item_id=user_obj.id)
await invalidate_user_cache(
user_id=user_obj.id,
)
return MsgSpecJSONResponse(
content={
"message": (
f"Successfully revoked the '{old_role.slug}' role for {user_obj.email} "
f"and set to default role '{default_role.slug}'"
)
}
)
msg = (
f"User {user_obj.email} currently has the '{user_obj.role_slug}' role, "
f"which does not match the requested role '{old_role.slug}' for revocation"
)
raise ConflictException(message=msg)