Source code for app.utils.log_utils.setup
from __future__ import annotations
import logging.config
from typing import (
TYPE_CHECKING,
Any,
)
import msgspec
import structlog
from asgi_correlation_id import correlation_id
from app.config.base import get_settings
from app.utils.log_utils.handlers import CustomQueueHandler
if TYPE_CHECKING:
from collections.abc import Mapping
settings = get_settings()
[docs]
def msgspec_dumps_str(data: Mapping[str, Any], **kwargs: Any) -> str:
"""Serialize a log record dictionary to a JSON string using msgspec."""
return msgspec.json.encode(data).decode()
[docs]
def add_correlation(
logger: logging.Logger,
method_name: str,
event_dict: dict[str, Any],
) -> dict[str, Any]:
"""Add request id to log message."""
if request_id := correlation_id.get():
event_dict["request_id"] = request_id
return event_dict
[docs]
def configure_logging() -> None:
"""Set up non-blocking, structured logging for the application."""
shared_processors = [
add_correlation,
structlog.contextvars.merge_contextvars,
structlog.processors.TimeStamper(fmt="iso", utc=True),
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.processors.format_exc_info,
]
processors: list[Any] = [*shared_processors, structlog.stdlib.ProcessorFormatter.wrap_for_formatter]
structlog.configure(
cache_logger_on_first_use=True,
wrapper_class=structlog.make_filtering_bound_logger(settings.log.STRUCTLOG_LEVEL),
processors=processors,
logger_factory=structlog.stdlib.LoggerFactory(),
)
minimal_pre_chain = [
add_correlation,
structlog.processors.TimeStamper(fmt="iso", utc=True),
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
]
logging.config.dictConfig(
{
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"json_console": {
"()": structlog.stdlib.ProcessorFormatter,
"processor": structlog.processors.JSONRenderer(serializer=msgspec_dumps_str),
"foreign_pre_chain": minimal_pre_chain,
},
"plain_console": {
"()": structlog.stdlib.ProcessorFormatter,
"processor": structlog.dev.ConsoleRenderer(
sort_keys=True,
exception_formatter=structlog.dev.plain_traceback,
),
"foreign_pre_chain": minimal_pre_chain,
},
},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"formatter": settings.log.final_formatter,
},
"queue_handler": {
"class": CustomQueueHandler,
"listener": "logging.handlers.QueueListener",
"queue": {
"()": "queue.Queue",
"maxsize": 10000,
},
"handlers": ["console"],
"respect_handler_level": True,
"formatter": None,
},
},
"loggers": {
"": {
"handlers": ["queue_handler"],
"level": settings.log.LEVEL,
},
"app.access": {
"propagate": False,
"level": settings.log.MIDDLEWARE_LOG_LEVEL,
"handlers": ["queue_handler"],
},
"uvicorn.error": {
"propagate": False,
"level": settings.log.ASGI_ERROR_LEVEL,
"handlers": ["queue_handler"],
},
"uvicorn.access": {
"propagate": False,
"level": settings.log.ASGI_ACCESS_LEVEL,
"handlers": ["queue_handler"],
},
"sqlalchemy.engine": {
"propagate": False,
"level": settings.log.SQLALCHEMY_LEVEL,
"handlers": ["queue_handler"],
},
"sqlalchemy.pool": {
"propagate": False,
"level": settings.log.SQLALCHEMY_LEVEL,
"handlers": ["queue_handler"],
},
"_granian": {
"propagate": False,
"level": settings.log.ASGI_ERROR_LEVEL,
"handlers": ["queue_handler"],
},
"granian.access": {
"propagate": False,
"level": settings.log.ASGI_ACCESS_LEVEL,
"handlers": ["queue_handler"],
},
},
}
)