Files
012-kaopeilian/backend/app/api/v1/upload.py
yuliang_guo c3aa4e85e7
All checks were successful
continuous-integration/drone/push Build is passing
feat: 添加PPT/PPTX文件类型支持
1. upload.py: 添加ppt/pptx到允许上传的文件类型
2. knowledge_analysis_v2.py: 添加PPT内容提取方法_extract_ppt_content
3. requirements.txt: 添加python-pptx依赖
2026-01-31 11:49:10 +08:00

276 lines
8.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
文件上传API接口
"""
import os
import shutil
from pathlib import Path
from typing import List, Optional
from datetime import datetime
import hashlib
from fastapi import APIRouter, Depends, UploadFile, File, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.config import settings
from app.core.deps import get_current_user, get_db
from app.models.user import User
from app.models.course import Course
from app.schemas.base import ResponseModel
from app.core.logger import get_logger
logger = get_logger(__name__)
router = APIRouter(prefix="/upload")
# 支持的文件类型和大小限制
# 支持格式TXT、Markdown、MDX、PDF、HTML、Excel、Word、PPT、CSV、VTT、Properties
ALLOWED_EXTENSIONS = {
'txt', 'md', 'mdx', 'pdf', 'html', 'htm',
'xlsx', 'xls', 'docx', 'doc', 'pptx', 'ppt', 'csv', 'vtt', 'properties'
}
MAX_FILE_SIZE = 15 * 1024 * 1024 # 15MB
def get_file_extension(filename: str) -> str:
"""获取文件扩展名"""
return filename.rsplit('.', 1)[1].lower() if '.' in filename else ''
def generate_unique_filename(original_filename: str) -> str:
"""生成唯一的文件名"""
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
random_str = hashlib.md5(f"{original_filename}{timestamp}".encode()).hexdigest()[:8]
ext = get_file_extension(original_filename)
return f"{timestamp}_{random_str}.{ext}"
def get_upload_path(file_type: str = "general") -> Path:
"""获取上传路径"""
base_path = Path(settings.UPLOAD_PATH)
upload_path = base_path / file_type
upload_path.mkdir(parents=True, exist_ok=True)
return upload_path
@router.post("/file", response_model=ResponseModel[dict])
async def upload_file(
file: UploadFile = File(...),
file_type: str = "general",
current_user: User = Depends(get_current_user),
):
"""
上传单个文件
- **file**: 要上传的文件
- **file_type**: 文件类型分类general, course, avatar等
返回:
- **file_url**: 文件访问URL
- **file_name**: 原始文件名
- **file_size**: 文件大小
- **file_type**: 文件类型
"""
try:
# 检查文件扩展名
file_ext = get_file_extension(file.filename)
if file_ext not in ALLOWED_EXTENSIONS:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"不支持的文件类型: {file_ext}"
)
# 读取文件内容
contents = await file.read()
file_size = len(contents)
# 检查文件大小
if file_size > MAX_FILE_SIZE:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"文件大小超过限制,最大允许 {MAX_FILE_SIZE // 1024 // 1024}MB"
)
# 生成唯一文件名
unique_filename = generate_unique_filename(file.filename)
# 获取上传路径
upload_path = get_upload_path(file_type)
file_path = upload_path / unique_filename
# 保存文件
with open(file_path, "wb") as f:
f.write(contents)
# 生成文件访问URL
file_url = f"/static/uploads/{file_type}/{unique_filename}"
logger.info(
"文件上传成功",
user_id=current_user.id,
original_filename=file.filename,
saved_filename=unique_filename,
file_size=file_size,
file_type=file_type,
)
return ResponseModel(
data={
"file_url": file_url,
"file_name": file.filename,
"file_size": file_size,
"file_type": file_ext,
},
message="文件上传成功"
)
except HTTPException:
raise
except Exception as e:
logger.error(f"文件上传失败: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="文件上传失败"
)
@router.post("/course/{course_id}/materials", response_model=ResponseModel[dict])
async def upload_course_material(
course_id: int,
file: UploadFile = File(...),
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""
上传课程资料
- **course_id**: 课程ID
- **file**: 要上传的文件
返回上传结果包含文件URL等信息
"""
try:
# 验证课程是否存在
from sqlalchemy import select
from app.models.course import Course
stmt = select(Course).where(Course.id == course_id, Course.is_deleted == False)
result = await db.execute(stmt)
course = result.scalar_one_or_none()
if not course:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"课程 {course_id} 不存在"
)
# 检查文件扩展名
file_ext = get_file_extension(file.filename)
if file_ext not in ALLOWED_EXTENSIONS:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"不支持的文件类型: {file_ext}"
)
# 读取文件内容
contents = await file.read()
file_size = len(contents)
# 检查文件大小
if file_size > MAX_FILE_SIZE:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"文件大小超过限制,最大允许 {MAX_FILE_SIZE // 1024 // 1024}MB"
)
# 生成唯一文件名
unique_filename = generate_unique_filename(file.filename)
# 创建课程专属目录
course_upload_path = Path(settings.UPLOAD_PATH) / "courses" / str(course_id)
course_upload_path.mkdir(parents=True, exist_ok=True)
# 保存文件
file_path = course_upload_path / unique_filename
with open(file_path, "wb") as f:
f.write(contents)
# 生成文件访问URL
file_url = f"/static/uploads/courses/{course_id}/{unique_filename}"
logger.info(
"课程资料上传成功",
user_id=current_user.id,
course_id=course_id,
original_filename=file.filename,
saved_filename=unique_filename,
file_size=file_size,
)
return ResponseModel(
data={
"file_url": file_url,
"file_name": file.filename,
"file_size": file_size,
"file_type": file_ext,
},
message="课程资料上传成功"
)
except HTTPException:
raise
except Exception as e:
logger.error(f"课程资料上传失败: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="课程资料上传失败"
)
@router.delete("/file", response_model=ResponseModel[bool])
async def delete_file(
file_url: str,
current_user: User = Depends(get_current_user),
):
"""
删除已上传的文件
- **file_url**: 文件URL路径
"""
try:
# 解析文件路径
if not file_url.startswith("/static/uploads/"):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="无效的文件URL"
)
# 转换为实际文件路径
relative_path = file_url.replace("/static/uploads/", "")
file_path = Path(settings.UPLOAD_PATH) / relative_path
# 检查文件是否存在
if not file_path.exists():
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="文件不存在"
)
# 删除文件
os.remove(file_path)
logger.info(
"文件删除成功",
user_id=current_user.id,
file_url=file_url,
)
return ResponseModel(data=True, message="文件删除成功")
except HTTPException:
raise
except Exception as e:
logger.error(f"文件删除失败: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="文件删除失败"
)