Source code for app.domain.users.controllers.access

"""User Access and Authentication Endpoints.

Handles user registration, login (JWT token issuance via cookies), token refreshing, logout,
and user-specific profile actions.
"""

from time import time
from typing import Annotated

from advanced_alchemy.exceptions import DuplicateKeyError
from fastapi import (
    APIRouter,
    BackgroundTasks,
    Depends,
    Response,
    status,
)
from fastapi.security import OAuth2PasswordRequestForm

from app.config.base import get_settings
from app.domain.users import urls
from app.domain.users.auth import Authenticate
from app.domain.users.deps import (
    RoleServiceDep,
    UserServiceDep,
)
from app.domain.users.jwt_helpers import (
    add_token_to_blacklist,
    create_access_token,
    create_refresh_token,
)
from app.domain.users.schemas import (
    AccountRegister,
    PasswordUpdate,
    User,
    UserAuth,
)
from app.domain.users.utils import (
    get_refresh_context,
    perform_logout_cleanup,
)
from app.lib.exceptions import ConflictException
from app.lib.invalidate_cache import invalidate_user_cache
from app.lib.json_response import MsgSpecJSONResponse

settings = get_settings()

access_router = APIRouter(
    tags=["Access"],
)


[docs] @access_router.post( path=urls.ACCOUNT_REGISTER, operation_id="AccountRegister", name="access:signup", summary="Register a new user.", ) async def signup( users_service: UserServiceDep, roles_service: RoleServiceDep, account_register: AccountRegister, ) -> MsgSpecJSONResponse: """User Signup. Returns: ~app.domain.users.schemas.User: The newly registered user data. Raises: ConflictException: If a user with this email already exists. """ role_obj = await roles_service.get_default_role( default_role_slug=users_service.default_role, ) try: db_obj = await users_service.create( data=account_register.model_dump(exclude_unset=True) | {"role_id": role_obj.id}, auto_refresh=False, ) user = users_service.to_schema(db_obj, schema_type=User) return MsgSpecJSONResponse(content=user, status_code=status.HTTP_201_CREATED) except DuplicateKeyError as exc: msg = "A user with this email already exists" raise ConflictException(message=msg) from exc
[docs] @access_router.post( path=urls.ACCOUNT_LOGIN, operation_id="AccountLogin", status_code=status.HTTP_204_NO_CONTENT, name="access:login", summary="Account login, issue access and refresh tokens.", ) async def signin( users_service: UserServiceDep, form_data: Annotated[OAuth2PasswordRequestForm, Depends()], ) -> Response: """Issue access and refresh tokens. Store tokens in cookies. Returns: Response: HTTP 204 No Content response with access and refresh tokens set as cookies. Raises: UnauthorizedException: If authentication fails (handled by dependencies). """ user = await users_service.authenticate( username=form_data.username, password=form_data.password, ) response = Response(status_code=status.HTTP_204_NO_CONTENT) access_token = create_access_token(user_id=user.id, email=user.email) refresh_token = create_refresh_token(user_id=user.id) response.set_cookie( key="access_token", value=access_token, max_age=settings.jwt.access_token_max_age, httponly=True, samesite="lax", secure=settings.app.COOKIE_SECURE_VALUE, ) response.set_cookie( key="refresh_token", value=refresh_token, max_age=settings.jwt.refresh_token_max_age, httponly=True, samesite="strict", secure=settings.app.COOKIE_SECURE_VALUE, ) return response
[docs] @access_router.post( path=urls.ACCOUNT_REFRESH_TOKEN, operation_id="RefreshAccessToken", status_code=status.HTTP_204_NO_CONTENT, name="access:refresh", summary="Issue a new access token using the refresh token.", ) async def user_auth_refresh_token( background_tasks: BackgroundTasks, user_auth: Annotated[UserAuth, Depends(Authenticate.get_current_user_for_refresh)], ) -> Response: """Get the user by the refresh token and issue a new access token. The expired refresh token is added to the blacklist as a background task. Returns: Response: HTTP 204 No Content response with new access and refresh tokens. """ refresh_jti, refresh_exp = get_refresh_context(user_auth=user_auth) ttl = int(refresh_exp - time()) if ttl > 0: background_tasks.add_task( add_token_to_blacklist, refresh_token_identifier=refresh_jti, ttl=ttl, ) access_token = create_access_token( user_id=user_auth.id, email=user_auth.email, ) refresh_token = create_refresh_token(user_id=user_auth.id) response = Response(status_code=status.HTTP_204_NO_CONTENT) response.set_cookie( key="access_token", value=access_token, max_age=settings.jwt.access_token_max_age, httponly=True, samesite="lax", secure=settings.app.COOKIE_SECURE_VALUE, ) response.set_cookie( key="refresh_token", value=refresh_token, max_age=settings.jwt.refresh_token_max_age, httponly=True, samesite="strict", secure=settings.app.COOKIE_SECURE_VALUE, ) return response
[docs] @access_router.post( path=urls.ACCOUNT_LOGOUT, operation_id="AccountLogout", name="access:logout", summary="Log out, delete tokens from cookies.", ) async def logout( background_tasks: BackgroundTasks, user_auth: Annotated[UserAuth, Depends(Authenticate.get_current_active_user())], refresh_context: Annotated[tuple[str, float], Depends(Authenticate.get_refresh_jti)], ) -> Response: """User Logout. Deletes access and refresh tokens from cookies and invalidates the refresh token JTI in cache as a background task. Returns: Response: HTTP 204 No Content response. """ refresh_jti, refresh_exp = refresh_context ttl = int(refresh_exp - time()) if ttl > 0: background_tasks.add_task( func=perform_logout_cleanup, refresh_jti=refresh_jti, ttl=ttl, user_id=user_auth.id, ) response = Response( status_code=status.HTTP_204_NO_CONTENT, ) response.delete_cookie("access_token") response.delete_cookie("refresh_token") return response
[docs] @access_router.patch( path=urls.ACCOUNT_PWD_UPDATE, operation_id="AccountUpdatePwd", name="access:update-pwd", summary="Update your user password.", ) async def update_password( background_tasks: BackgroundTasks, users_service: UserServiceDep, user_auth: Annotated[UserAuth, Depends(Authenticate.get_current_active_user())], pwd_data: PasswordUpdate, ) -> Response: """Update user password. This action also invalidates the user's authentication cache and deletes access and refresh tokens from cookies. Returns: Response: HTTP 204 No Content response. """ await users_service.update_password( data=pwd_data, user_id=user_auth.id, ) background_tasks.add_task( func=invalidate_user_cache, user_id=user_auth.id, ) response = Response( status_code=status.HTTP_204_NO_CONTENT, ) response.delete_cookie("access_token") response.delete_cookie("refresh_token") return response
[docs] @access_router.get( path=urls.ACCOUNT_PROFILE, operation_id="AccountProfile", name="access:profile", summary="Get information about yourself.", ) async def user_auth_get_self_info( user_auth: Annotated[UserAuth, Depends(Authenticate.get_current_active_user())], ) -> MsgSpecJSONResponse: """Get self account info. Returns: UserAuth: The authenticated user's details data. """ return MsgSpecJSONResponse(content=user_auth)