Catalogs Domain Services¶
Domain services encapsulating the business logic and caching strategies for catalog models.
—
Base Catalog Service (BaseCatalogService)¶
- class BaseCatalogService[source]¶
Bases:
SQLAlchemyAsyncRepositoryService[T, Any],GenericBase service for managing catalog data with automated caching.
- async get_list_items(params: CatalogFilters) list[S][source]¶
Retrieve a list of catalog items from database or cache with filtering.
- async get_managed_objs(target_objs: list[S]) list[T][source]¶
Merge detached catalog data into the current session without DB hits.
- async upsert_many(data: BulkModelDictT[T], *, auto_expunge: bool | None = None, auto_commit: bool | None = None, no_merge: bool = False, match_fields: list[str] | str | None = None, error_messages: ErrorMessages | EmptyType | None = <class 'advanced_alchemy.utils.dataclass.Empty'>, load: LoadSpec | None = None, execution_options: dict[str, Any] | None = None, uniquify: bool | None = None, bind_group: str | None = None, schema_dump_config: SchemaDumpConfig | None = None) Sequence[T][source]¶
Upsert multiple records and invalidate the associated cache.
- async create(data: ModelDictT[T], *, auto_commit: bool | None = None, auto_expunge: bool | None = None, auto_refresh: bool | None = None, error_messages: ErrorMessages | EmptyType | None = <class 'advanced_alchemy.utils.dataclass.Empty'>, bind_group: str | None = None, schema_dump_config: SchemaDumpConfig | None = None) T[source]¶
Create a new catalog item and invalidate cache.
- async update(data: ModelDictT[T], item_id: Any | None = None, *, attribute_names: Iterable[str] | None = None, with_for_update: ForUpdateParameter = None, auto_commit: bool | None = None, auto_expunge: bool | None = None, auto_refresh: bool | None = None, id_attribute: str | InstrumentedAttribute[Any] | None = None, error_messages: ErrorMessages | EmptyType | None = <class 'advanced_alchemy.utils.dataclass.Empty'>, load: LoadSpec | None = None, execution_options: dict[str, Any] | None = None, uniquify: bool | None = None, bind_group: str | None = None, schema_dump_config: SchemaDumpConfig | None = None) T[source]¶
Update a catalog item and invalidate cache.
- async delete(item_id: PrimaryKeyType, *, auto_commit: bool | None = None, auto_expunge: bool | None = None, id_attribute: str | InstrumentedAttribute[Any] | None = None, error_messages: ErrorMessages | EmptyType | None = <class 'advanced_alchemy.utils.dataclass.Empty'>, load: LoadSpec | None = None, execution_options: dict[str, Any] | None = None, uniquify: bool | None = None, bind_group: str | None = None) T[source]¶
Delete a catalog item and invalidate cache.
—
Muscle Group Service (MuscleGroupService)¶
- class MuscleGroupService[source]¶
Bases:
BaseCatalogService[MuscleGroup,MuscleGroupRead]Handles database operations for muscle groups.
- read_schema¶
alias of
MuscleGroupRead
—
Equipment Service (EquipmentService)¶
- class EquipmentService[source]¶
Bases:
BaseCatalogService[Equipment,EquipmentRead]Handles database operations for equipment.
- read_schema¶
alias of
EquipmentRead
—