Files
012-kaopeilian/backend/app/api/v1/courses.py
yuliang_guo 58f746cf46
All checks were successful
continuous-integration/drone/push Build is passing
fix: 完整开放manager课程管理权限
将以下API权限从 require_admin 改为 require_admin_or_manager:
- add_course_material: 添加课程资料
- delete_course_material: 删除课程资料
- create_knowledge_point: 创建知识点
- update_knowledge_point: 更新知识点
- delete_knowledge_point: 删除知识点
- create_growth_path: 创建成长路径

Manager现在拥有完整的课程管理权限,包括:
- 课程CRUD
- 课程资料管理
- 知识点管理
- 岗位分配
- 成长路径创建
2026-02-02 16:54:27 +08:00

788 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
课程管理API路由
"""
from typing import List, Optional
from fastapi import APIRouter, Depends, Query, status, BackgroundTasks, Request
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.deps import get_db, get_current_user, require_admin, require_admin_or_manager, User
from app.core.exceptions import NotFoundError, BadRequestError
from app.core.logger import get_logger
from app.services.system_log_service import system_log_service
from app.schemas.system_log import SystemLogCreate
from app.models.course import CourseStatus, CourseCategory
from app.schemas.base import ResponseModel, PaginationParams, PaginatedResponse
from app.schemas.course import (
CourseCreate,
CourseUpdate,
CourseInDB,
CourseList,
CourseMaterialCreate,
CourseMaterialInDB,
KnowledgePointCreate,
KnowledgePointUpdate,
KnowledgePointInDB,
GrowthPathCreate,
GrowthPathInDB,
CourseExamSettingsCreate,
CourseExamSettingsUpdate,
CourseExamSettingsInDB,
CoursePositionAssignment,
CoursePositionAssignmentInDB,
)
from app.services.course_service import (
course_service,
knowledge_point_service,
growth_path_service,
)
logger = get_logger(__name__)
router = APIRouter(prefix="/courses", tags=["courses"])
@router.get("", response_model=ResponseModel[PaginatedResponse[CourseInDB]])
async def get_courses(
page: int = Query(1, ge=1, description="页码"),
size: int = Query(20, ge=1, le=100, description="每页数量"),
status: Optional[CourseStatus] = Query(None, description="课程状态"),
category: Optional[CourseCategory] = Query(None, description="课程分类"),
is_featured: Optional[bool] = Query(None, description="是否推荐"),
keyword: Optional[str] = Query(None, description="搜索关键词"),
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""
获取课程列表(支持分页和筛选)
- **page**: 页码
- **size**: 每页数量
- **status**: 课程状态筛选
- **category**: 课程分类筛选
- **is_featured**: 是否推荐筛选
- **keyword**: 关键词搜索(搜索名称和描述)
"""
page_params = PaginationParams(page=page, page_size=size)
filters = CourseList(
status=status, category=category, is_featured=is_featured, keyword=keyword
)
result = await course_service.get_course_list(
db, page_params=page_params, filters=filters, user_id=current_user.id
)
return ResponseModel(data=result, message="获取课程列表成功")
@router.post(
"", response_model=ResponseModel[CourseInDB], status_code=status.HTTP_201_CREATED
)
async def create_course(
course_in: CourseCreate,
request: Request,
current_user: User = Depends(require_admin_or_manager),
db: AsyncSession = Depends(get_db),
):
"""
创建课程(需要管理员或经理权限)
- **name**: 课程名称
- **description**: 课程描述
- **category**: 课程分类
- **status**: 课程状态(默认为草稿)
- **cover_image**: 封面图片URL
- **duration_hours**: 课程时长(小时)
- **difficulty_level**: 难度等级1-5
- **tags**: 标签列表
- **is_featured**: 是否推荐
"""
course = await course_service.create_course(
db, course_in=course_in, created_by=current_user.id
)
# 记录课程创建日志
await system_log_service.create_log(
db,
SystemLogCreate(
level="INFO",
type="api",
message=f"创建课程: {course.name}",
user_id=current_user.id,
user=current_user.username,
ip=request.client.host if request.client else None,
path="/api/v1/courses",
method="POST",
user_agent=request.headers.get("user-agent")
)
)
return ResponseModel(data=course, message="创建课程成功")
@router.get("/{course_id}", response_model=ResponseModel[CourseInDB])
async def get_course(
course_id: int,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""
获取课程详情
- **course_id**: 课程ID
"""
course = await course_service.get_by_id(db, course_id)
if not course:
raise NotFoundError(f"课程ID {course_id} 不存在")
logger.info(f"查看课程详情 - course_id: {course_id}, user_id: {current_user.id}")
return ResponseModel(data=course, message="获取课程详情成功")
@router.put("/{course_id}", response_model=ResponseModel[CourseInDB])
async def update_course(
course_id: int,
course_in: CourseUpdate,
current_user: User = Depends(require_admin_or_manager),
db: AsyncSession = Depends(get_db),
):
"""
更新课程(需要管理员或经理权限)
- **course_id**: 课程ID
- **course_in**: 更新的课程数据(所有字段都是可选的)
"""
course = await course_service.update_course(
db, course_id=course_id, course_in=course_in, updated_by=current_user.id
)
return ResponseModel(data=course, message="更新课程成功")
@router.delete("/{course_id}", response_model=ResponseModel[bool])
async def delete_course(
course_id: int,
request: Request,
current_user: User = Depends(require_admin_or_manager),
db: AsyncSession = Depends(get_db),
):
"""
删除课程(需要管理员权限)
- **course_id**: 课程ID
说明任意状态均可软删除is_deleted=1请谨慎操作
"""
# 先获取课程信息
course = await course_service.get_by_id(db, course_id)
course_name = course.name if course else f"ID:{course_id}"
success = await course_service.delete_course(
db, course_id=course_id, deleted_by=current_user.id
)
# 记录课程删除日志
if success:
await system_log_service.create_log(
db,
SystemLogCreate(
level="INFO",
type="api",
message=f"删除课程: {course_name}",
user_id=current_user.id,
user=current_user.username,
ip=request.client.host if request.client else None,
path=f"/api/v1/courses/{course_id}",
method="DELETE",
user_agent=request.headers.get("user-agent")
)
)
return ResponseModel(data=success, message="删除课程成功" if success else "删除课程失败")
# 课程资料相关API
@router.post(
"/{course_id}/materials",
response_model=ResponseModel[CourseMaterialInDB],
status_code=status.HTTP_201_CREATED,
)
async def add_course_material(
course_id: int,
material_in: CourseMaterialCreate,
background_tasks: BackgroundTasks,
current_user: User = Depends(require_admin_or_manager),
db: AsyncSession = Depends(get_db),
):
"""
添加课程资料(需要管理员或经理权限)
- **course_id**: 课程ID
- **name**: 资料名称
- **description**: 资料描述
- **file_url**: 文件URL
- **file_type**: 文件类型pdf, doc, docx, ppt, pptx, xls, xlsx, mp4, mp3, zip
- **file_size**: 文件大小(字节)
添加资料后会自动触发知识点分析
"""
material = await course_service.add_course_material(
db, course_id=course_id, material_in=material_in, created_by=current_user.id
)
# 获取课程信息用于知识点分析
course = await course_service.get_by_id(db, course_id)
if course:
# 异步触发知识点分析
from app.services.ai.knowledge_analysis_v2 import knowledge_analysis_service_v2
background_tasks.add_task(
_trigger_knowledge_analysis,
db,
course_id,
material.id,
material.file_url,
course.name,
current_user.id
)
logger.info(
f"资料添加成功,已触发知识点分析 - course_id: {course_id}, material_id: {material.id}, user_id: {current_user.id}"
)
return ResponseModel(data=material, message="添加课程资料成功")
@router.get(
"/{course_id}/materials",
response_model=ResponseModel[List[CourseMaterialInDB]],
)
async def list_course_materials(
course_id: int,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""
获取课程资料列表
- **course_id**: 课程ID
"""
materials = await course_service.get_course_materials(db, course_id=course_id)
return ResponseModel(data=materials, message="获取课程资料列表成功")
@router.delete(
"/{course_id}/materials/{material_id}",
response_model=ResponseModel[bool],
)
async def delete_course_material(
course_id: int,
material_id: int,
current_user: User = Depends(require_admin_or_manager),
db: AsyncSession = Depends(get_db),
):
"""
删除课程资料(需要管理员或经理权限)
- **course_id**: 课程ID
- **material_id**: 资料ID
"""
success = await course_service.delete_course_material(
db, course_id=course_id, material_id=material_id, deleted_by=current_user.id
)
return ResponseModel(data=success, message="删除课程资料成功" if success else "删除课程资料失败")
# 知识点相关API
@router.get(
"/{course_id}/knowledge-points",
response_model=ResponseModel[List[KnowledgePointInDB]],
)
async def get_course_knowledge_points(
course_id: int,
material_id: Optional[int] = Query(None, description="资料ID"),
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""
获取课程的知识点列表
- **course_id**: 课程ID
- **material_id**: 资料ID可选用于筛选特定资料的知识点
"""
# 先检查课程是否存在
course = await course_service.get_by_id(db, course_id)
if not course:
raise NotFoundError(f"课程ID {course_id} 不存在")
knowledge_points = await knowledge_point_service.get_knowledge_points_by_course(
db, course_id=course_id, material_id=material_id
)
return ResponseModel(data=knowledge_points, message="获取知识点列表成功")
@router.post(
"/{course_id}/knowledge-points",
response_model=ResponseModel[KnowledgePointInDB],
status_code=status.HTTP_201_CREATED,
)
async def create_knowledge_point(
course_id: int,
point_in: KnowledgePointCreate,
current_user: User = Depends(require_admin_or_manager),
db: AsyncSession = Depends(get_db),
):
"""
创建知识点(需要管理员或经理权限)
- **course_id**: 课程ID
- **name**: 知识点名称
- **description**: 知识点描述
- **parent_id**: 父知识点ID
- **weight**: 权重0-10
- **is_required**: 是否必修
- **estimated_hours**: 预计学习时间(小时)
"""
knowledge_point = await knowledge_point_service.create_knowledge_point(
db, course_id=course_id, point_in=point_in, created_by=current_user.id
)
return ResponseModel(data=knowledge_point, message="创建知识点成功")
@router.put(
"/knowledge-points/{point_id}", response_model=ResponseModel[KnowledgePointInDB]
)
async def update_knowledge_point(
point_id: int,
point_in: KnowledgePointUpdate,
current_user: User = Depends(require_admin_or_manager),
db: AsyncSession = Depends(get_db),
):
"""
更新知识点(需要管理员或经理权限)
- **point_id**: 知识点ID
- **point_in**: 更新的知识点数据(所有字段都是可选的)
"""
knowledge_point = await knowledge_point_service.update_knowledge_point(
db, point_id=point_id, point_in=point_in, updated_by=current_user.id
)
return ResponseModel(data=knowledge_point, message="更新知识点成功")
@router.delete("/knowledge-points/{point_id}", response_model=ResponseModel[bool])
async def delete_knowledge_point(
point_id: int,
current_user: User = Depends(require_admin_or_manager),
db: AsyncSession = Depends(get_db),
):
"""
删除知识点(需要管理员或经理权限)
- **point_id**: 知识点ID
"""
success = await knowledge_point_service.delete(
db, id=point_id, soft=True, deleted_by=current_user.id
)
if success:
logger.warning("删除知识点", knowledge_point_id=point_id, deleted_by=current_user.id)
return ResponseModel(data=success, message="删除知识点成功" if success else "删除知识点失败")
# 资料知识点关联API
@router.get(
"/materials/{material_id}/knowledge-points",
response_model=ResponseModel[List[KnowledgePointInDB]],
)
async def get_material_knowledge_points(
material_id: int,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""
获取资料关联的知识点列表
"""
knowledge_points = await course_service.get_material_knowledge_points(
db, material_id=material_id
)
return ResponseModel(data=knowledge_points, message="获取知识点列表成功")
@router.post(
"/materials/{material_id}/knowledge-points",
response_model=ResponseModel[List[KnowledgePointInDB]],
status_code=status.HTTP_201_CREATED,
)
async def add_material_knowledge_points(
material_id: int,
knowledge_point_ids: List[int],
current_user: User = Depends(require_admin_or_manager),
db: AsyncSession = Depends(get_db),
):
"""
为资料添加知识点关联(需要管理员或经理权限)
"""
knowledge_points = await course_service.add_material_knowledge_points(
db, material_id=material_id, knowledge_point_ids=knowledge_point_ids
)
return ResponseModel(data=knowledge_points, message="添加知识点成功")
@router.delete(
"/materials/{material_id}/knowledge-points/{knowledge_point_id}",
response_model=ResponseModel[bool],
)
async def remove_material_knowledge_point(
material_id: int,
knowledge_point_id: int,
current_user: User = Depends(require_admin_or_manager),
db: AsyncSession = Depends(get_db),
):
"""
移除资料的知识点关联(需要管理员或经理权限)
"""
success = await course_service.remove_material_knowledge_point(
db, material_id=material_id, knowledge_point_id=knowledge_point_id
)
return ResponseModel(data=success, message="移除知识点成功" if success else "移除失败")
# 成长路径相关API
@router.post(
"/growth-paths",
response_model=ResponseModel[GrowthPathInDB],
status_code=status.HTTP_201_CREATED,
)
async def create_growth_path(
path_in: GrowthPathCreate,
current_user: User = Depends(require_admin_or_manager),
db: AsyncSession = Depends(get_db),
):
"""
创建成长路径(需要管理员或经理权限)
- **name**: 路径名称
- **description**: 路径描述
- **target_role**: 目标角色
- **courses**: 课程列表包含course_id、order、is_required
- **estimated_duration_days**: 预计完成天数
- **is_active**: 是否启用
"""
growth_path = await growth_path_service.create_growth_path(
db, path_in=path_in, created_by=current_user.id
)
return ResponseModel(data=growth_path, message="创建成长路径成功")
@router.get(
"/growth-paths", response_model=ResponseModel[PaginatedResponse[GrowthPathInDB]]
)
async def get_growth_paths(
page: int = Query(1, ge=1, description="页码"),
size: int = Query(20, ge=1, le=100, description="每页数量"),
is_active: Optional[bool] = Query(None, description="是否启用"),
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""
获取成长路径列表
- **page**: 页码
- **size**: 每页数量
- **is_active**: 是否启用筛选
"""
page_params = PaginationParams(page=page, page_size=size)
filters = []
if is_active is not None:
from app.models.course import GrowthPath
filters.append(GrowthPath.is_active == is_active)
result = await growth_path_service.get_page(
db, page_params=page_params, filters=filters
)
return ResponseModel(data=result, message="获取成长路径列表成功")
# 课程考试设置相关API
@router.get(
"/{course_id}/exam-settings",
response_model=ResponseModel[Optional[CourseExamSettingsInDB]],
)
async def get_course_exam_settings(
course_id: int,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""
获取课程的考试设置
- **course_id**: 课程ID
"""
# 检查课程是否存在
course = await course_service.get_by_id(db, course_id)
if not course:
raise NotFoundError(f"课程ID {course_id} 不存在")
# 获取考试设置
from app.services.course_exam_service import course_exam_service
settings = await course_exam_service.get_by_course_id(db, course_id)
# 添加调试日志
if settings:
logger.info(
f"📊 获取考试设置成功 - course_id: {course_id}, "
f"单选: {settings.single_choice_count}, 多选: {settings.multiple_choice_count}, "
f"判断: {settings.true_false_count}, 填空: {settings.fill_blank_count}, "
f"问答: {settings.essay_count}, 难度: {settings.difficulty_level}"
)
else:
logger.warning(f"⚠️ 课程 {course_id} 没有配置考试设置,将使用默认值")
return ResponseModel(data=settings, message="获取考试设置成功")
@router.post(
"/{course_id}/exam-settings",
response_model=ResponseModel[CourseExamSettingsInDB],
status_code=status.HTTP_201_CREATED,
)
async def create_course_exam_settings(
course_id: int,
settings_in: CourseExamSettingsCreate,
current_user: User = Depends(require_admin_or_manager),
db: AsyncSession = Depends(get_db),
):
"""
创建或更新课程的考试设置(需要管理员权限)
- **course_id**: 课程ID
- **settings_in**: 考试设置数据
"""
# 检查课程是否存在
course = await course_service.get_by_id(db, course_id)
if not course:
raise NotFoundError(f"课程ID {course_id} 不存在")
# 创建或更新考试设置
from app.services.course_exam_service import course_exam_service
settings = await course_exam_service.create_or_update(
db, course_id=course_id, settings_in=settings_in, user_id=current_user.id
)
return ResponseModel(data=settings, message="保存考试设置成功")
@router.put(
"/{course_id}/exam-settings",
response_model=ResponseModel[CourseExamSettingsInDB],
)
async def update_course_exam_settings(
course_id: int,
settings_in: CourseExamSettingsUpdate,
current_user: User = Depends(require_admin_or_manager),
db: AsyncSession = Depends(get_db),
):
"""
更新课程的考试设置(需要管理员权限)
- **course_id**: 课程ID
- **settings_in**: 更新的考试设置数据
"""
# 检查课程是否存在
course = await course_service.get_by_id(db, course_id)
if not course:
raise NotFoundError(f"课程ID {course_id} 不存在")
# 更新考试设置
from app.services.course_exam_service import course_exam_service
settings = await course_exam_service.update(
db, course_id=course_id, settings_in=settings_in, user_id=current_user.id
)
return ResponseModel(data=settings, message="更新考试设置成功")
# 课程岗位分配相关API
@router.get(
"/{course_id}/positions",
response_model=ResponseModel[List[CoursePositionAssignmentInDB]],
)
async def get_course_positions(
course_id: int,
course_type: Optional[str] = Query(None, pattern="^(required|optional)$", description="课程类型筛选"),
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""
获取课程的岗位分配列表
- **course_id**: 课程ID
- **course_type**: 课程类型筛选required必修/optional选修
"""
# 检查课程是否存在
course = await course_service.get_by_id(db, course_id)
if not course:
raise NotFoundError(f"课程ID {course_id} 不存在")
# 获取岗位分配列表
from app.services.course_position_service import course_position_service
assignments = await course_position_service.get_course_positions(
db, course_id=course_id, course_type=course_type
)
return ResponseModel(data=assignments, message="获取岗位分配列表成功")
@router.post(
"/{course_id}/positions",
response_model=ResponseModel[List[CoursePositionAssignmentInDB]],
status_code=status.HTTP_201_CREATED,
)
async def assign_course_positions(
course_id: int,
assignments: List[CoursePositionAssignment],
current_user: User = Depends(require_admin_or_manager),
db: AsyncSession = Depends(get_db),
):
"""
批量分配课程到岗位(需要管理员或经理权限)
- **course_id**: 课程ID
- **assignments**: 岗位分配列表
"""
# 检查课程是否存在
course = await course_service.get_by_id(db, course_id)
if not course:
raise NotFoundError(f"课程ID {course_id} 不存在")
# 批量分配岗位
from app.services.course_position_service import course_position_service
result = await course_position_service.batch_assign_positions(
db, course_id=course_id, assignments=assignments, user_id=current_user.id
)
# 发送课程分配通知给相关岗位的学员
try:
from app.models.position_member import PositionMember
from app.services.notification_service import notification_service
from app.schemas.notification import NotificationBatchCreate, NotificationType
# 获取所有分配岗位的学员ID
position_ids = [a.position_id for a in assignments]
if position_ids:
member_result = await db.execute(
select(PositionMember.user_id).where(
PositionMember.position_id.in_(position_ids),
PositionMember.is_deleted == False
).distinct()
)
user_ids = [row[0] for row in member_result.fetchall()]
if user_ids:
notification_batch = NotificationBatchCreate(
user_ids=user_ids,
title="新课程通知",
content=f"您所在岗位有新课程「{course.name}」已分配,请及时学习。",
type=NotificationType.COURSE_ASSIGN,
related_id=course_id,
related_type="course",
sender_id=current_user.id
)
await notification_service.batch_create_notifications(
db=db,
batch_in=notification_batch
)
except Exception as e:
# 通知发送失败不影响课程分配结果
import logging
logging.getLogger(__name__).error(f"发送课程分配通知失败: {str(e)}")
return ResponseModel(data=result, message="岗位分配成功")
@router.delete(
"/{course_id}/positions/{position_id}",
response_model=ResponseModel[bool],
)
async def remove_course_position(
course_id: int,
position_id: int,
current_user: User = Depends(require_admin_or_manager),
db: AsyncSession = Depends(get_db),
):
"""
移除课程的岗位分配(需要管理员权限)
- **course_id**: 课程ID
- **position_id**: 岗位ID
"""
# 检查课程是否存在
course = await course_service.get_by_id(db, course_id)
if not course:
raise NotFoundError(f"课程ID {course_id} 不存在")
# 移除岗位分配
from app.services.course_position_service import course_position_service
success = await course_position_service.remove_position_assignment(
db, course_id=course_id, position_id=position_id, user_id=current_user.id
)
return ResponseModel(data=success, message="移除岗位分配成功" if success else "移除岗位分配失败")
async def _trigger_knowledge_analysis(
db: AsyncSession,
course_id: int,
material_id: int,
file_url: str,
course_title: str,
user_id: int
):
"""
后台触发知识点分析任务
注意:此函数在后台任务中执行,异常不会影响资料添加的成功响应
"""
try:
from app.services.ai.knowledge_analysis_v2 import knowledge_analysis_service_v2
logger.info(
f"后台知识点分析开始 - course_id: {course_id}, material_id: {material_id}, file_url: {file_url}, user_id: {user_id}"
)
result = await knowledge_analysis_service_v2.analyze_course_material(
db=db,
course_id=course_id,
material_id=material_id,
file_url=file_url,
course_title=course_title,
user_id=user_id
)
logger.info(
f"后台知识点分析完成 - course_id: {course_id}, material_id: {material_id}, knowledge_points_count: {result.get('knowledge_points_count', 0)}, user_id: {user_id}"
)
except FileNotFoundError as e:
# 文件不存在时记录警告,但不记录完整堆栈
logger.warning(
f"后台知识点分析失败(文件不存在) - course_id: {course_id}, material_id: {material_id}, "
f"file_url: {file_url}, error: {str(e)}, user_id: {user_id}"
)
except Exception as e:
# 其他异常记录详细信息
logger.error(
f"后台知识点分析失败 - course_id: {course_id}, material_id: {material_id}, error: {str(e)}",
exc_info=True
)