Files
012-kaopeilian/backend/app/services/system_log_service.py
111 998211c483 feat: 初始化考培练系统项目
- 从服务器拉取完整代码
- 按框架规范整理项目结构
- 配置 Drone CI 测试环境部署
- 包含后端(FastAPI)、前端(Vue3)、管理端

技术栈: Vue3 + TypeScript + FastAPI + MySQL
2026-01-24 19:33:28 +08:00

171 lines
4.6 KiB
Python

"""
系统日志服务
"""
import logging
from typing import Optional
from datetime import datetime
from sqlalchemy import select, func, or_
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.system_log import SystemLog
from app.schemas.system_log import SystemLogCreate, SystemLogQuery
logger = logging.getLogger(__name__)
class SystemLogService:
"""系统日志服务类"""
async def create_log(
self,
db: AsyncSession,
log_data: SystemLogCreate
) -> SystemLog:
"""
创建系统日志
Args:
db: 数据库会话
log_data: 日志数据
Returns:
创建的日志对象
"""
try:
log = SystemLog(**log_data.model_dump())
db.add(log)
await db.commit()
await db.refresh(log)
return log
except Exception as e:
await db.rollback()
logger.error(f"创建系统日志失败: {str(e)}")
raise
async def get_logs(
self,
db: AsyncSession,
query_params: SystemLogQuery
) -> tuple[list[SystemLog], int]:
"""
查询系统日志列表
Args:
db: 数据库会话
query_params: 查询参数
Returns:
(日志列表, 总数)
"""
try:
# 构建基础查询
stmt = select(SystemLog)
count_stmt = select(func.count(SystemLog.id))
# 应用筛选条件
filters = []
if query_params.level:
filters.append(SystemLog.level == query_params.level)
if query_params.type:
filters.append(SystemLog.type == query_params.type)
if query_params.user:
filters.append(SystemLog.user == query_params.user)
if query_params.keyword:
filters.append(SystemLog.message.like(f"%{query_params.keyword}%"))
if query_params.start_date:
filters.append(SystemLog.created_at >= query_params.start_date)
if query_params.end_date:
filters.append(SystemLog.created_at <= query_params.end_date)
# 应用所有筛选条件
if filters:
stmt = stmt.where(*filters)
count_stmt = count_stmt.where(*filters)
# 获取总数
result = await db.execute(count_stmt)
total = result.scalar_one()
# 应用排序和分页
stmt = stmt.order_by(SystemLog.created_at.desc())
stmt = stmt.offset((query_params.page - 1) * query_params.page_size)
stmt = stmt.limit(query_params.page_size)
# 执行查询
result = await db.execute(stmt)
logs = result.scalars().all()
return list(logs), total
except Exception as e:
logger.error(f"查询系统日志失败: {str(e)}")
raise
async def get_log_by_id(
self,
db: AsyncSession,
log_id: int
) -> Optional[SystemLog]:
"""
根据ID获取日志详情
Args:
db: 数据库会话
log_id: 日志ID
Returns:
日志对象或None
"""
try:
stmt = select(SystemLog).where(SystemLog.id == log_id)
result = await db.execute(stmt)
return result.scalar_one_or_none()
except Exception as e:
logger.error(f"获取日志详情失败: {str(e)}")
raise
async def delete_logs_before_date(
self,
db: AsyncSession,
before_date: datetime
) -> int:
"""
删除指定日期之前的日志(用于日志清理)
Args:
db: 数据库会话
before_date: 截止日期
Returns:
删除的日志数量
"""
try:
stmt = select(SystemLog).where(SystemLog.created_at < before_date)
result = await db.execute(stmt)
logs = result.scalars().all()
count = len(logs)
for log in logs:
await db.delete(log)
await db.commit()
logger.info(f"已删除 {count} 条日志记录")
return count
except Exception as e:
await db.rollback()
logger.error(f"删除日志失败: {str(e)}")
raise
# 创建全局服务实例
system_log_service = SystemLogService()