Files
yuliang_guo 659f60e765
Some checks failed
continuous-integration/drone/push Build is failing
fix: 修复课程权限和添加409冲突统一处理
1. 课程权限修复:
   - 创建课程: require_admin -> require_admin_or_manager
   - 更新课程: require_admin -> require_admin_or_manager
   - 现在manager角色也可以创建和编辑课程

2. 全局409冲突处理:
   - 添加IntegrityError异常处理器
   - 自动识别常见冲突类型(用户名/邮箱/手机号/名称/编码)
   - 返回友好的中文错误提示
2026-02-02 16:21:02 +08:00

270 lines
8.0 KiB
Python

"""考培练系统后端主应用"""
import logging
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from fastapi.staticfiles import StaticFiles
from fastapi.exceptions import RequestValidationError
import json
import os
from app.core.config import get_settings
from app.api.v1 import api_router
# 配置日志
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
settings = get_settings()
@asynccontextmanager
async def lifespan(app: FastAPI):
"""应用生命周期管理"""
# 启动时执行
logger.info(f"启动 {settings.APP_NAME} v{settings.APP_VERSION}")
# 初始化 Redis
try:
from app.core.redis import init_redis, close_redis
await init_redis()
logger.info("Redis 初始化成功")
except Exception as e:
logger.warning(f"Redis 初始化失败(非致命): {e}")
# 初始化定时任务调度器
try:
from app.core.scheduler import scheduler_manager
from app.core.database import async_session_factory
await scheduler_manager.init(async_session_factory)
scheduler_manager.start()
logger.info("定时任务调度器启动成功")
except Exception as e:
logger.warning(f"定时任务调度器启动失败(非致命): {e}")
yield
# 关闭时执行
# 停止定时任务调度器
try:
from app.core.scheduler import scheduler_manager
scheduler_manager.stop()
logger.info("定时任务调度器已停止")
except Exception as e:
logger.warning(f"停止定时任务调度器失败: {e}")
try:
from app.core.redis import close_redis
await close_redis()
logger.info("Redis 连接已关闭")
except Exception as e:
logger.warning(f"关闭 Redis 连接失败: {e}")
logger.info("应用关闭")
# 自定义 JSON 响应类,确保中文正确编码
class UTF8JSONResponse(JSONResponse):
def render(self, content) -> bytes:
return json.dumps(
content,
ensure_ascii=False,
allow_nan=False,
indent=None,
separators=(",", ":"),
).encode("utf-8")
# 创建FastAPI应用
app = FastAPI(
title=settings.APP_NAME,
version=settings.APP_VERSION,
description="考培练系统后端API",
lifespan=lifespan,
# 确保响应正确的 UTF-8 编码
default_response_class=UTF8JSONResponse,
)
# 配置CORS
app.add_middleware(
CORSMiddleware,
allow_origins=settings.CORS_ORIGINS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 添加限流中间件
from app.core.middleware import RateLimitMiddleware, SecurityHeadersMiddleware
app.add_middleware(
RateLimitMiddleware,
requests_per_minute=120, # 每分钟最大请求数
burst_limit=200, # 突发请求限制
)
# 添加安全响应头中间件
app.add_middleware(SecurityHeadersMiddleware)
# 健康检查端点
@app.get("/health")
async def health_check():
"""健康检查"""
return {
"status": "healthy",
"service": settings.APP_NAME,
"version": settings.APP_VERSION,
}
# 根路径
@app.get("/")
async def root():
"""根路径"""
return {
"message": f"欢迎使用{settings.APP_NAME}",
"version": settings.APP_VERSION,
"docs": "/docs",
}
# 注册路由
app.include_router(api_router, prefix="/api/v1")
# 挂载静态文件目录
# 创建上传目录(如果不存在)
upload_path = settings.UPLOAD_PATH
os.makedirs(upload_path, exist_ok=True)
# 挂载上传文件目录为静态文件服务
app.mount("/static/uploads", StaticFiles(directory=upload_path), name="uploads")
# 请求验证错误处理 (422)
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
"""处理请求验证错误,记录详细日志"""
logger.error(f"请求验证错误 [{request.method} {request.url.path}]: {exc.errors()}")
return JSONResponse(
status_code=422,
content={
"code": 422,
"message": "请求参数验证失败",
"detail": exc.errors(),
},
)
# JSON 解析错误处理
from json import JSONDecodeError
@app.exception_handler(JSONDecodeError)
async def json_decode_exception_handler(request: Request, exc: JSONDecodeError):
"""处理 JSON 解析错误"""
logger.warning(f"JSON解析错误 [{request.method} {request.url.path}]: {exc}")
return JSONResponse(
status_code=400,
content={
"code": 400,
"message": "请求体格式错误,需要有效的 JSON",
"detail": str(exc),
},
)
# HTTP 异常处理
from fastapi import HTTPException
@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException):
"""处理 HTTP 异常"""
return JSONResponse(
status_code=exc.status_code,
content={
"code": exc.status_code,
"message": exc.detail,
},
)
# 数据库唯一约束冲突处理 (409)
from sqlalchemy.exc import IntegrityError
@app.exception_handler(IntegrityError)
async def integrity_error_handler(request: Request, exc: IntegrityError):
"""处理数据库唯一约束冲突错误"""
error_msg = str(exc.orig) if exc.orig else str(exc)
logger.warning(f"数据库冲突 [{request.method} {request.url.path}]: {error_msg}")
# 解析常见的冲突类型,提供友好的错误信息
friendly_message = "数据冲突,该记录可能已存在"
if "Duplicate entry" in error_msg:
# MySQL 唯一约束冲突
if "username" in error_msg.lower():
friendly_message = "用户名已存在,请使用其他用户名"
elif "email" in error_msg.lower():
friendly_message = "邮箱已被注册,请使用其他邮箱"
elif "phone" in error_msg.lower():
friendly_message = "手机号已被注册,请使用其他手机号"
elif "name" in error_msg.lower():
friendly_message = "名称已存在,请使用其他名称"
elif "code" in error_msg.lower():
friendly_message = "编码已存在,请使用其他编码"
else:
friendly_message = "该记录已存在,不能重复创建"
elif "FOREIGN KEY" in error_msg.upper():
friendly_message = "关联的数据不存在或已被删除"
elif "cannot be null" in error_msg.lower():
friendly_message = "必填字段不能为空"
return JSONResponse(
status_code=409,
content={
"code": 409,
"message": friendly_message,
"detail": error_msg if settings.DEBUG else None,
},
)
# 全局异常处理
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
"""全局异常处理"""
error_msg = str(exc)
# 检查是否是 Content-Type 相关错误
if "Expecting value" in error_msg or "JSON" in error_msg.upper():
logger.warning(f"请求体解析错误 [{request.method} {request.url.path}]: {error_msg}")
return JSONResponse(
status_code=400,
content={
"code": 400,
"message": "请求体格式错误,请使用 application/json",
},
)
logger.error(f"未处理的异常: {exc}", exc_info=True)
return JSONResponse(
status_code=500,
content={
"code": 500,
"message": "内部服务器错误",
"detail": str(exc) if settings.DEBUG else None,
},
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"app.main:app",
host=settings.HOST,
port=settings.PORT,
reload=settings.DEBUG,
log_level=settings.LOG_LEVEL.lower(),
)
# 测试热重载 - Fri Sep 26 03:37:07 CST 2025