""" 文件上传API接口 """ import os import shutil from pathlib import Path from typing import List, Optional from datetime import datetime import hashlib from fastapi import APIRouter, Depends, UploadFile, File, HTTPException, status from sqlalchemy.ext.asyncio import AsyncSession from app.core.config import settings from app.core.deps import get_current_user, get_db from app.models.user import User from app.models.course import Course from app.schemas.base import ResponseModel from app.core.logger import get_logger logger = get_logger(__name__) router = APIRouter(prefix="/upload") # 支持的文件类型和大小限制 # 支持格式:TXT、Markdown、MDX、PDF、HTML、Excel、Word、PPT、CSV、VTT、Properties ALLOWED_EXTENSIONS = { 'txt', 'md', 'mdx', 'pdf', 'html', 'htm', 'xlsx', 'xls', 'docx', 'doc', 'pptx', 'ppt', 'csv', 'vtt', 'properties' } MAX_FILE_SIZE = 15 * 1024 * 1024 # 15MB def get_file_extension(filename: str) -> str: """获取文件扩展名""" return filename.rsplit('.', 1)[1].lower() if '.' in filename else '' def generate_unique_filename(original_filename: str) -> str: """生成唯一的文件名""" timestamp = datetime.now().strftime('%Y%m%d%H%M%S') random_str = hashlib.md5(f"{original_filename}{timestamp}".encode()).hexdigest()[:8] ext = get_file_extension(original_filename) return f"{timestamp}_{random_str}.{ext}" def get_upload_path(file_type: str = "general") -> Path: """获取上传路径""" base_path = Path(settings.UPLOAD_PATH) upload_path = base_path / file_type upload_path.mkdir(parents=True, exist_ok=True) return upload_path @router.post("/file", response_model=ResponseModel[dict]) async def upload_file( file: UploadFile = File(...), file_type: str = "general", current_user: User = Depends(get_current_user), ): """ 上传单个文件 - **file**: 要上传的文件 - **file_type**: 文件类型分类(general, course, avatar等) 返回: - **file_url**: 文件访问URL - **file_name**: 原始文件名 - **file_size**: 文件大小 - **file_type**: 文件类型 """ try: # 检查文件扩展名 file_ext = get_file_extension(file.filename) if file_ext not in ALLOWED_EXTENSIONS: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"不支持的文件类型: {file_ext}" ) # 读取文件内容 contents = await file.read() file_size = len(contents) # 检查文件大小 if file_size > MAX_FILE_SIZE: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"文件大小超过限制,最大允许 {MAX_FILE_SIZE // 1024 // 1024}MB" ) # 生成唯一文件名 unique_filename = generate_unique_filename(file.filename) # 获取上传路径 upload_path = get_upload_path(file_type) file_path = upload_path / unique_filename # 保存文件 with open(file_path, "wb") as f: f.write(contents) # 生成文件访问URL file_url = f"/static/uploads/{file_type}/{unique_filename}" logger.info( "文件上传成功", user_id=current_user.id, original_filename=file.filename, saved_filename=unique_filename, file_size=file_size, file_type=file_type, ) return ResponseModel( data={ "file_url": file_url, "file_name": file.filename, "file_size": file_size, "file_type": file_ext, }, message="文件上传成功" ) except HTTPException: raise except Exception as e: logger.error(f"文件上传失败: {str(e)}", exc_info=True) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="文件上传失败" ) @router.post("/course/{course_id}/materials", response_model=ResponseModel[dict]) async def upload_course_material( course_id: int, file: UploadFile = File(...), current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """ 上传课程资料 - **course_id**: 课程ID - **file**: 要上传的文件 返回上传结果,包含文件URL等信息 """ try: # 验证课程是否存在 from sqlalchemy import select from app.models.course import Course stmt = select(Course).where(Course.id == course_id, Course.is_deleted == False) result = await db.execute(stmt) course = result.scalar_one_or_none() if not course: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"课程 {course_id} 不存在" ) # 检查文件扩展名 file_ext = get_file_extension(file.filename) if file_ext not in ALLOWED_EXTENSIONS: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"不支持的文件类型: {file_ext}" ) # 读取文件内容 contents = await file.read() file_size = len(contents) # 检查文件大小 if file_size > MAX_FILE_SIZE: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"文件大小超过限制,最大允许 {MAX_FILE_SIZE // 1024 // 1024}MB" ) # 生成唯一文件名 unique_filename = generate_unique_filename(file.filename) # 创建课程专属目录 course_upload_path = Path(settings.UPLOAD_PATH) / "courses" / str(course_id) course_upload_path.mkdir(parents=True, exist_ok=True) # 保存文件 file_path = course_upload_path / unique_filename with open(file_path, "wb") as f: f.write(contents) # 生成文件访问URL file_url = f"/static/uploads/courses/{course_id}/{unique_filename}" logger.info( "课程资料上传成功", user_id=current_user.id, course_id=course_id, original_filename=file.filename, saved_filename=unique_filename, file_size=file_size, ) return ResponseModel( data={ "file_url": file_url, "file_name": file.filename, "file_size": file_size, "file_type": file_ext, }, message="课程资料上传成功" ) except HTTPException: raise except Exception as e: logger.error(f"课程资料上传失败: {str(e)}", exc_info=True) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="课程资料上传失败" ) @router.delete("/file", response_model=ResponseModel[bool]) async def delete_file( file_url: str, current_user: User = Depends(get_current_user), ): """ 删除已上传的文件 - **file_url**: 文件URL路径 """ try: # 解析文件路径 if not file_url.startswith("/static/uploads/"): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="无效的文件URL" ) # 转换为实际文件路径 relative_path = file_url.replace("/static/uploads/", "") file_path = Path(settings.UPLOAD_PATH) / relative_path # 检查文件是否存在 if not file_path.exists(): raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="文件不存在" ) # 删除文件 os.remove(file_path) logger.info( "文件删除成功", user_id=current_user.id, file_url=file_url, ) return ResponseModel(data=True, message="文件删除成功") except HTTPException: raise except Exception as e: logger.error(f"文件删除失败: {str(e)}", exc_info=True) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="文件删除失败" )