Skip to content

Optimize the opera log storage logic through queue #750

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,6 @@ venv/
.python-version
.ruff_cache/
.pytest_cache/
.env
docker-compose-dev.yml
GEMINI.md
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这些内容看起来似乎没有实质用处

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是的,这个是我本地测试的配置文件,为了防止提交到远程仓库添加的忽略配置

10 changes: 10 additions & 0 deletions backend/app/admin/crud/crud_opera_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,16 @@ async def create(self, db: AsyncSession, obj: CreateOperaLogParam) -> None:
"""
await self.create_model(db, obj)

async def batch_create(self, db: AsyncSession, obj_list: list[CreateOperaLogParam]) -> None:
"""
批量创建操作日志

:param db: 数据库会话
:param obj_list: 创建操作日志参数列表
:return:
"""
await self.create_models(db, obj_list)

async def delete(self, db: AsyncSession, pks: list[int]) -> int:
"""
批量删除操作日志
Expand Down
11 changes: 11 additions & 0 deletions backend/app/admin/service/opera_log_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,17 @@ async def create(*, obj: CreateOperaLogParam) -> None:
async with async_db_session.begin() as db:
await opera_log_dao.create(db, obj)

@staticmethod
async def batch_create(*, obj_list: list[CreateOperaLogParam]) -> None:
"""
批量创建操作日志

:param obj: 操作日志创建参数
:return:
"""
async with async_db_session.begin() as db:
await opera_log_dao.batch_create(db, obj_list)

@staticmethod
async def delete(*, obj: DeleteOperaLogParam) -> int:
"""
Expand Down
38 changes: 38 additions & 0 deletions backend/common/queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import asyncio

from asyncio import Queue
from typing import List


async def get_many_from_queue(queue: Queue, max_items: int, timeout: float) -> List:
"""
在指定的超时时间内,从异步队列中批量获取项目。

此函数会尝试从给定的 ``asyncio.Queue`` 中获取最多 ``max_items`` 个项目。
它会为整个获取过程设置一个总的 ``timeout`` 秒数的超时限制。
如果在超时时间内未能收集到 ``max_items`` 个项目,
函数将返回当前已成功获取的所有项目。

:param queue: 用于获取项目的 ``asyncio.Queue`` 队列。
:type queue: asyncio.Queue
:param max_items: 希望从队列中获取的最大项目数量。
:type max_items: int
:param timeout: 总的等待超时时间(秒)。
:type timeout: float
:return: 一个从队列中获取到的项目列表。如果发生超时,列表中的项目数量可能会少于 ``max_items``。
:rtype: List
"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

让我们使用 reStructuredText 风格文档

results = []

async def collector():
while len(results) < max_items:
item = await queue.get()
results.append(item)

try:
await asyncio.wait_for(collector(), timeout=timeout)
except asyncio.TimeoutError:
pass # 超时后返回已有的 items
return results
3 changes: 3 additions & 0 deletions backend/core/registrar.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# -*- coding: utf-8 -*-
import os

from asyncio import create_task
from contextlib import asynccontextmanager
from typing import AsyncGenerator

Expand Down Expand Up @@ -47,6 +48,8 @@ async def register_init(app: FastAPI) -> AsyncGenerator[None, None]:
prefix=settings.REQUEST_LIMITER_REDIS_PREFIX,
http_callback=http_limit_callback,
)
# 启动操作日志消费者
app.state.opera_log_consumer = create_task(OperaLogMiddleware.batch_create_consumer())

yield

Expand Down
26 changes: 24 additions & 2 deletions backend/middleware/opera_log_middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-
import time

from asyncio import create_task
from asyncio import Queue
from typing import Any

from asgiref.sync import sync_to_async
Expand All @@ -15,6 +15,7 @@
from backend.app.admin.service.opera_log_service import opera_log_service
from backend.common.enums import OperaLogCipherType, StatusType
from backend.common.log import log
from backend.common.queue import get_many_from_queue
from backend.core.conf import settings
from backend.utils.encrypt import AESCipher, ItsDCipher, Md5Cipher
from backend.utils.trace_id import get_request_trace_id
Expand All @@ -23,6 +24,9 @@
class OperaLogMiddleware(BaseHTTPMiddleware):
"""操作日志中间件"""

# 操作日志队列, 指定默认队列长度为100000
opera_log_queue: Queue = Queue(maxsize=100000)

async def dispatch(self, request: Request, call_next: Any) -> Response:
"""
处理请求并记录操作日志
Expand Down Expand Up @@ -108,7 +112,7 @@ async def dispatch(self, request: Request, call_next: Any) -> Response:
cost_time=elapsed, # 可能和日志存在微小差异(可忽略)
opera_time=request.state.start_time,
)
create_task(opera_log_service.create(obj=opera_log_in)) # noqa: ignore
await self.opera_log_queue.put(opera_log_in)

# 错误抛出
if error:
Expand Down Expand Up @@ -191,3 +195,21 @@ def desensitization(args: dict[str, Any]) -> dict[str, Any] | None:
case _:
args[arg_type][key] = '******'
return args

@staticmethod
async def batch_create_consumer() -> None:
"""批量创建操作日志消费者"""
while True:
opera_log_queue = OperaLogMiddleware.opera_log_queue
logs = await get_many_from_queue(opera_log_queue, max_items=100, timeout=1)
if len(logs) < 1:
continue
log.info(f'处理日志: {len(logs)} 条.')
try:
await opera_log_service.batch_create(obj_list=logs)
except Exception as e:
log.error(f'批量创建操作日志失败: {e}, logs: {logs}')
finally:
# 防止队列阻塞
if not opera_log_queue.empty():
opera_log_queue.task_done()