diff --git a/backend/app/admin/crud/crud_opera_log.py b/backend/app/admin/crud/crud_opera_log.py index 93aaa050..6acd3d40 100644 --- a/backend/app/admin/crud/crud_opera_log.py +++ b/backend/app/admin/crud/crud_opera_log.py @@ -37,11 +37,21 @@ async def create(self, db: AsyncSession, obj: CreateOperaLogParam) -> None: 创建操作日志 :param db: 数据库会话 - :param obj: 创建操作日志参数 + :param obj: 操作日志创建参数 :return: """ await self.create_model(db, obj) + async def bulk_create(self, db: AsyncSession, objs: list[CreateOperaLogParam]) -> None: + """ + 批量创建操作日志 + + :param db: 数据库会话 + :param objs: 操作日志创建参数列表 + :return: + """ + await self.create_models(db, objs) + async def delete(self, db: AsyncSession, pks: list[int]) -> int: """ 批量删除操作日志 diff --git a/backend/app/admin/service/opera_log_service.py b/backend/app/admin/service/opera_log_service.py index c06ce899..c6d5c9ec 100644 --- a/backend/app/admin/service/opera_log_service.py +++ b/backend/app/admin/service/opera_log_service.py @@ -33,6 +33,17 @@ async def create(*, obj: CreateOperaLogParam) -> None: async with async_db_session.begin() as db: await opera_log_dao.create(db, obj) + @staticmethod + async def bulk_create(*, objs: list[CreateOperaLogParam]) -> None: + """ + 批量创建操作日志 + + :param objs: 操作日志创建参数列表 + :return: + """ + async with async_db_session.begin() as db: + await opera_log_dao.bulk_create(db, objs) + @staticmethod async def delete(*, obj: DeleteOperaLogParam) -> int: """ diff --git a/backend/common/queue.py b/backend/common/queue.py new file mode 100644 index 00000000..7c65a722 --- /dev/null +++ b/backend/common/queue.py @@ -0,0 +1,29 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +import asyncio + +from asyncio import Queue + + +async def batch_dequeue(queue: Queue, max_items: int, timeout: float) -> list: + """ + 从异步队列中获取多个项目 + + :param queue: 用于获取项目的 `asyncio.Queue` 队列 + :param max_items: 从队列中获取的最大项目数量 + :param timeout: 总的等待超时时间(秒) + :return: + """ + items = [] + + async def collector(): + while len(items) < max_items: + item = await queue.get() + items.append(item) + + try: + await asyncio.wait_for(collector(), timeout=timeout) + except asyncio.TimeoutError: + pass + + return items diff --git a/backend/core/conf.py b/backend/core/conf.py index a86027ef..a57298e6 100644 --- a/backend/core/conf.py +++ b/backend/core/conf.py @@ -177,6 +177,8 @@ class Settings(BaseSettings): 'new_password', 'confirm_password', ] + OPERA_LOG_QUEUE_BATCH_CONSUME_SIZE: int = 100 + OPERA_LOG_QUEUE_TIMEOUT: int = 60 # 1 分钟 # Plugin 配置 PLUGIN_PIP_CHINA: bool = True diff --git a/backend/core/registrar.py b/backend/core/registrar.py index 439a2420..b1173168 100644 --- a/backend/core/registrar.py +++ b/backend/core/registrar.py @@ -2,6 +2,7 @@ # -*- coding: utf-8 -*- import os +from asyncio import create_task from contextlib import asynccontextmanager from typing import AsyncGenerator @@ -47,6 +48,8 @@ async def register_init(app: FastAPI) -> AsyncGenerator[None, None]: prefix=settings.REQUEST_LIMITER_REDIS_PREFIX, http_callback=http_limit_callback, ) + # 创建操作日志任务 + create_task(OperaLogMiddleware.consumer()) yield diff --git a/backend/middleware/opera_log_middleware.py b/backend/middleware/opera_log_middleware.py index ce773548..263815cf 100644 --- a/backend/middleware/opera_log_middleware.py +++ b/backend/middleware/opera_log_middleware.py @@ -2,7 +2,7 @@ # -*- coding: utf-8 -*- import time -from asyncio import create_task +from asyncio import Queue from typing import Any from asgiref.sync import sync_to_async @@ -15,6 +15,7 @@ from backend.app.admin.service.opera_log_service import opera_log_service from backend.common.enums import OperaLogCipherType, StatusType from backend.common.log import log +from backend.common.queue import batch_dequeue from backend.core.conf import settings from backend.utils.encrypt import AESCipher, ItsDCipher, Md5Cipher from backend.utils.trace_id import get_request_trace_id @@ -23,6 +24,8 @@ class OperaLogMiddleware(BaseHTTPMiddleware): """操作日志中间件""" + opera_log_queue: Queue = Queue(maxsize=100000) + async def dispatch(self, request: Request, call_next: Any) -> Response: """ 处理请求并记录操作日志 @@ -108,7 +111,7 @@ async def dispatch(self, request: Request, call_next: Any) -> Response: cost_time=elapsed, # 可能和日志存在微小差异(可忽略) opera_time=request.state.start_time, ) - create_task(opera_log_service.create(obj=opera_log_in)) # noqa: ignore + await self.opera_log_queue.put(opera_log_in) # 错误抛出 if error: @@ -191,3 +194,21 @@ def desensitization(args: dict[str, Any]) -> dict[str, Any] | None: case _: args[arg_type][key] = '******' return args + + @classmethod + async def consumer(cls) -> None: + """操作日志消费者""" + while True: + logs = await batch_dequeue( + cls.opera_log_queue, + max_items=settings.OPERA_LOG_QUEUE_BATCH_CONSUME_SIZE, + timeout=settings.OPERA_LOG_QUEUE_TIMEOUT, + ) + if logs: + try: + if settings.DATABASE_ECHO: + log.info('自动执行【操作日志批量创建】任务...') + await opera_log_service.bulk_create(objs=logs) + finally: + if not cls.opera_log_queue.empty(): + cls.opera_log_queue.task_done()