Files
012-kaopeilian/backend/app/services/growth_path_service.py
yuliang_guo 920c6a64c8
All checks were successful
continuous-integration/drone/push Build is passing
feat: 成长路径支持多岗位关联 + 增强拖拽功能
前端:
- 岗位选择改为多选模式
- 增强拖拽视觉反馈(高亮、动画提示)
- 列表显示多个岗位标签

后端:
- 添加 position_ids 字段支持多岗位
- 兼容旧版 position_id 单选数据
- 返回 position_names 数组
2026-01-30 16:19:40 +08:00

777 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
成长路径服务
"""
import logging
from typing import List, Optional, Dict, Any
from datetime import datetime
from decimal import Decimal
from sqlalchemy import select, func, and_, delete
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.models.course import GrowthPath, Course
from app.models.growth_path import (
GrowthPathNode,
UserGrowthPathProgress,
UserNodeCompletion,
GrowthPathStatus,
NodeStatus,
)
from app.models.user_course_progress import UserCourseProgress, ProgressStatus
from app.models.position import Position
from app.schemas.growth_path import (
GrowthPathCreate,
GrowthPathUpdate,
GrowthPathNodeCreate,
TraineeGrowthPathResponse,
TraineeStageResponse,
TraineeNodeResponse,
)
logger = logging.getLogger(__name__)
class GrowthPathService:
"""成长路径服务"""
# =====================================================
# 管理端 - CRUD
# =====================================================
async def create_growth_path(
self,
db: AsyncSession,
data: GrowthPathCreate,
created_by: int
) -> GrowthPath:
"""创建成长路径"""
# 检查名称是否重复
existing = await db.execute(
select(GrowthPath).where(
and_(
GrowthPath.name == data.name,
GrowthPath.is_deleted == False
)
)
)
if existing.scalar_one_or_none():
raise ValueError(f"成长路径名称 '{data.name}' 已存在")
# 处理岗位关联:优先使用 position_ids兼容 position_id
position_ids = data.position_ids
position_id = data.position_id
if position_ids is None and position_id is not None:
position_ids = [position_id]
elif position_ids and not position_id:
position_id = position_ids[0] if position_ids else None
# 创建成长路径
growth_path = GrowthPath(
name=data.name,
description=data.description,
target_role=data.target_role,
position_id=position_id,
position_ids=position_ids,
stages=[s.model_dump() for s in data.stages] if data.stages else None,
estimated_duration_days=data.estimated_duration_days,
is_active=data.is_active,
sort_order=data.sort_order,
)
db.add(growth_path)
await db.flush()
# 创建节点
if data.nodes:
for node_data in data.nodes:
node = GrowthPathNode(
growth_path_id=growth_path.id,
course_id=node_data.course_id,
stage_name=node_data.stage_name,
title=node_data.title,
description=node_data.description,
order_num=node_data.order_num,
is_required=node_data.is_required,
prerequisites=node_data.prerequisites,
estimated_days=node_data.estimated_days,
)
db.add(node)
await db.commit()
await db.refresh(growth_path)
logger.info(f"创建成长路径: {growth_path.name}, ID: {growth_path.id}")
return growth_path
async def update_growth_path(
self,
db: AsyncSession,
path_id: int,
data: GrowthPathUpdate
) -> GrowthPath:
"""更新成长路径"""
growth_path = await db.get(GrowthPath, path_id)
if not growth_path or growth_path.is_deleted:
raise ValueError("成长路径不存在")
# 更新基本信息
update_data = data.model_dump(exclude_unset=True, exclude={'nodes'})
if 'stages' in update_data and update_data['stages']:
update_data['stages'] = [s.model_dump() if hasattr(s, 'model_dump') else s for s in update_data['stages']]
# 处理 position_ids 和 position_id 的兼容
if 'position_ids' in update_data and update_data['position_ids']:
update_data['position_id'] = update_data['position_ids'][0]
elif 'position_id' in update_data and update_data['position_id'] and 'position_ids' not in update_data:
update_data['position_ids'] = [update_data['position_id']]
for key, value in update_data.items():
setattr(growth_path, key, value)
# 如果提供了节点,整体替换
if data.nodes is not None:
# 删除旧节点
await db.execute(
delete(GrowthPathNode).where(GrowthPathNode.growth_path_id == path_id)
)
# 创建新节点
for node_data in data.nodes:
node = GrowthPathNode(
growth_path_id=path_id,
course_id=node_data.course_id,
stage_name=node_data.stage_name,
title=node_data.title,
description=node_data.description,
order_num=node_data.order_num,
is_required=node_data.is_required,
prerequisites=node_data.prerequisites,
estimated_days=node_data.estimated_days,
)
db.add(node)
await db.commit()
await db.refresh(growth_path)
logger.info(f"更新成长路径: {growth_path.name}, ID: {path_id}")
return growth_path
async def delete_growth_path(self, db: AsyncSession, path_id: int) -> bool:
"""删除成长路径(软删除)"""
growth_path = await db.get(GrowthPath, path_id)
if not growth_path or growth_path.is_deleted:
raise ValueError("成长路径不存在")
growth_path.is_deleted = True
growth_path.deleted_at = datetime.now()
await db.commit()
logger.info(f"删除成长路径: {growth_path.name}, ID: {path_id}")
return True
async def get_growth_path(
self,
db: AsyncSession,
path_id: int
) -> Optional[Dict[str, Any]]:
"""获取成长路径详情"""
result = await db.execute(
select(GrowthPath)
.options(selectinload(GrowthPath.nodes))
.where(
and_(
GrowthPath.id == path_id,
GrowthPath.is_deleted == False
)
)
)
growth_path = result.scalar_one_or_none()
if not growth_path:
return None
# 获取岗位名称(支持多岗位)
position_ids = growth_path.position_ids or []
# 兼容旧数据
if not position_ids and growth_path.position_id:
position_ids = [growth_path.position_id]
position_names = []
for pid in position_ids:
position = await db.get(Position, pid)
if position:
position_names.append(position.name)
# 兼容旧版 position_name取第一个
position_name = position_names[0] if position_names else None
# 获取课程名称
nodes_data = []
for node in sorted(growth_path.nodes, key=lambda x: x.order_num):
if node.is_deleted:
continue
course = await db.get(Course, node.course_id)
nodes_data.append({
"id": node.id,
"growth_path_id": node.growth_path_id,
"course_id": node.course_id,
"course_name": course.name if course else None,
"stage_name": node.stage_name,
"title": node.title,
"description": node.description,
"order_num": node.order_num,
"is_required": node.is_required,
"prerequisites": node.prerequisites,
"estimated_days": node.estimated_days,
"created_at": node.created_at,
"updated_at": node.updated_at,
})
return {
"id": growth_path.id,
"name": growth_path.name,
"description": growth_path.description,
"target_role": growth_path.target_role,
"position_id": growth_path.position_id,
"position_ids": position_ids,
"position_name": position_name,
"position_names": position_names,
"stages": growth_path.stages,
"estimated_duration_days": growth_path.estimated_duration_days,
"is_active": growth_path.is_active,
"sort_order": growth_path.sort_order,
"nodes": nodes_data,
"node_count": len(nodes_data),
"created_at": growth_path.created_at,
"updated_at": growth_path.updated_at,
}
async def list_growth_paths(
self,
db: AsyncSession,
position_id: Optional[int] = None,
is_active: Optional[bool] = None,
page: int = 1,
page_size: int = 20
) -> Dict[str, Any]:
"""获取成长路径列表"""
query = select(GrowthPath).where(GrowthPath.is_deleted == False)
if position_id is not None:
query = query.where(GrowthPath.position_id == position_id)
if is_active is not None:
query = query.where(GrowthPath.is_active == is_active)
# 计算总数
count_result = await db.execute(
select(func.count(GrowthPath.id)).where(GrowthPath.is_deleted == False)
)
total = count_result.scalar() or 0
# 分页
query = query.order_by(GrowthPath.sort_order, GrowthPath.id.desc())
query = query.offset((page - 1) * page_size).limit(page_size)
result = await db.execute(query)
paths = result.scalars().all()
items = []
for path in paths:
# 获取岗位名称(支持多岗位)
position_ids = path.position_ids or []
if not position_ids and path.position_id:
position_ids = [path.position_id]
position_names = []
for pid in position_ids:
position = await db.get(Position, pid)
if position:
position_names.append(position.name)
position_name = position_names[0] if position_names else None
# 获取节点数量
node_count_result = await db.execute(
select(func.count(GrowthPathNode.id)).where(
and_(
GrowthPathNode.growth_path_id == path.id,
GrowthPathNode.is_deleted == False
)
)
)
node_count = node_count_result.scalar() or 0
items.append({
"id": path.id,
"name": path.name,
"description": path.description,
"position_id": path.position_id,
"position_ids": position_ids,
"position_name": position_name,
"position_names": position_names,
"is_active": path.is_active,
"node_count": node_count,
"estimated_duration_days": path.estimated_duration_days,
"created_at": path.created_at,
})
return {
"items": items,
"total": total,
"page": page,
"page_size": page_size,
}
# =====================================================
# 学员端 - 获取成长路径
# =====================================================
async def get_trainee_growth_path(
self,
db: AsyncSession,
user_id: int,
position_id: Optional[int] = None
) -> Optional[TraineeGrowthPathResponse]:
"""
获取学员的成长路径(含进度)
如果指定了岗位ID返回该岗位的成长路径
否则根据用户岗位自动匹配
"""
# 查找成长路径
query = select(GrowthPath).where(
and_(
GrowthPath.is_deleted == False,
GrowthPath.is_active == True
)
)
if position_id:
query = query.where(GrowthPath.position_id == position_id)
query = query.order_by(GrowthPath.sort_order).limit(1)
result = await db.execute(query)
growth_path = result.scalar_one_or_none()
if not growth_path:
return None
# 获取岗位名称
position_name = None
if growth_path.position_id:
position = await db.get(Position, growth_path.position_id)
if position:
position_name = position.name
# 获取所有节点
nodes_result = await db.execute(
select(GrowthPathNode).where(
and_(
GrowthPathNode.growth_path_id == growth_path.id,
GrowthPathNode.is_deleted == False
)
).order_by(GrowthPathNode.order_num)
)
nodes = nodes_result.scalars().all()
# 获取用户进度
progress_result = await db.execute(
select(UserGrowthPathProgress).where(
and_(
UserGrowthPathProgress.user_id == user_id,
UserGrowthPathProgress.growth_path_id == growth_path.id
)
)
)
user_progress = progress_result.scalar_one_or_none()
# 获取用户节点完成情况
completions_result = await db.execute(
select(UserNodeCompletion).where(
and_(
UserNodeCompletion.user_id == user_id,
UserNodeCompletion.growth_path_id == growth_path.id
)
)
)
completions = {c.node_id: c for c in completions_result.scalars().all()}
# 获取用户课程学习进度
course_ids = [n.course_id for n in nodes]
course_progress_result = await db.execute(
select(UserCourseProgress).where(
and_(
UserCourseProgress.user_id == user_id,
UserCourseProgress.course_id.in_(course_ids)
)
)
)
course_progress_map = {cp.course_id: cp for cp in course_progress_result.scalars().all()}
# 构建节点响应
completed_node_ids = set(user_progress.completed_node_ids or []) if user_progress else set()
nodes_by_stage: Dict[str, List[TraineeNodeResponse]] = {}
for node in nodes:
# 获取课程信息
course = await db.get(Course, node.course_id)
# 计算节点状态
node_status = self._calculate_node_status(
node=node,
completed_node_ids=completed_node_ids,
completions=completions,
course_progress_map=course_progress_map
)
# 获取课程进度
course_prog = course_progress_map.get(node.course_id)
progress = float(course_prog.progress) if course_prog else 0
node_response = TraineeNodeResponse(
id=node.id,
course_id=node.course_id,
title=node.title,
description=node.description,
stage_name=node.stage_name,
is_required=node.is_required,
estimated_days=node.estimated_days,
order_num=node.order_num,
status=node_status,
progress=progress,
course_name=course.name if course else None,
course_cover=course.cover_image if course else None,
)
stage_name = node.stage_name or "默认阶段"
if stage_name not in nodes_by_stage:
nodes_by_stage[stage_name] = []
nodes_by_stage[stage_name].append(node_response)
# 构建阶段响应
stages = []
stage_configs = growth_path.stages or []
stage_order = {s.get('name', ''): s.get('order', i) for i, s in enumerate(stage_configs)}
for stage_name, stage_nodes in sorted(nodes_by_stage.items(), key=lambda x: stage_order.get(x[0], 999)):
completed_in_stage = sum(1 for n in stage_nodes if n.status == NodeStatus.COMPLETED.value)
stage_desc = next((s.get('description') for s in stage_configs if s.get('name') == stage_name), None)
stages.append(TraineeStageResponse(
name=stage_name,
description=stage_desc,
completed=completed_in_stage,
total=len(stage_nodes),
nodes=stage_nodes,
))
# 计算总体进度
total_nodes = len(nodes)
completed_count = len(completed_node_ids)
total_progress = (completed_count / total_nodes * 100) if total_nodes > 0 else 0
return TraineeGrowthPathResponse(
id=growth_path.id,
name=growth_path.name,
description=growth_path.description,
position_id=growth_path.position_id,
position_name=position_name,
total_progress=round(total_progress, 1),
completed_nodes=completed_count,
total_nodes=total_nodes,
status=user_progress.status if user_progress else GrowthPathStatus.NOT_STARTED.value,
started_at=user_progress.started_at if user_progress else None,
estimated_completion_days=growth_path.estimated_duration_days,
stages=stages,
)
def _calculate_node_status(
self,
node: GrowthPathNode,
completed_node_ids: set,
completions: Dict[int, UserNodeCompletion],
course_progress_map: Dict[int, UserCourseProgress]
) -> str:
"""计算节点状态"""
# 已完成
if node.id in completed_node_ids:
return NodeStatus.COMPLETED.value
# 检查前置节点
prerequisites = node.prerequisites or []
if prerequisites:
for prereq_id in prerequisites:
if prereq_id not in completed_node_ids:
return NodeStatus.LOCKED.value
# 检查用户节点记录
completion = completions.get(node.id)
if completion:
if completion.status == NodeStatus.IN_PROGRESS.value:
return NodeStatus.IN_PROGRESS.value
if completion.status == NodeStatus.COMPLETED.value:
return NodeStatus.COMPLETED.value
# 检查课程进度
course_progress = course_progress_map.get(node.course_id)
if course_progress:
if course_progress.status == ProgressStatus.COMPLETED.value:
return NodeStatus.COMPLETED.value
if course_progress.progress > 0:
return NodeStatus.IN_PROGRESS.value
# 前置已完成,当前未开始
return NodeStatus.UNLOCKED.value
# =====================================================
# 学员端 - 开始/完成
# =====================================================
async def start_growth_path(
self,
db: AsyncSession,
user_id: int,
growth_path_id: int
) -> UserGrowthPathProgress:
"""开始学习成长路径"""
# 检查成长路径是否存在
growth_path = await db.get(GrowthPath, growth_path_id)
if not growth_path or growth_path.is_deleted or not growth_path.is_active:
raise ValueError("成长路径不存在或未启用")
# 检查是否已开始
existing = await db.execute(
select(UserGrowthPathProgress).where(
and_(
UserGrowthPathProgress.user_id == user_id,
UserGrowthPathProgress.growth_path_id == growth_path_id
)
)
)
if existing.scalar_one_or_none():
raise ValueError("已开始学习此成长路径")
# 创建进度记录
progress = UserGrowthPathProgress(
user_id=user_id,
growth_path_id=growth_path_id,
status=GrowthPathStatus.IN_PROGRESS.value,
started_at=datetime.now(),
last_activity_at=datetime.now(),
)
db.add(progress)
# 获取第一个节点(无前置依赖的)
first_node_result = await db.execute(
select(GrowthPathNode).where(
and_(
GrowthPathNode.growth_path_id == growth_path_id,
GrowthPathNode.is_deleted == False
)
).order_by(GrowthPathNode.order_num).limit(1)
)
first_node = first_node_result.scalar_one_or_none()
if first_node:
progress.current_node_id = first_node.id
# 解锁第一个节点
completion = UserNodeCompletion(
user_id=user_id,
growth_path_id=growth_path_id,
node_id=first_node.id,
status=NodeStatus.UNLOCKED.value,
unlocked_at=datetime.now(),
)
db.add(completion)
await db.commit()
await db.refresh(progress)
logger.info(f"用户 {user_id} 开始学习成长路径 {growth_path_id}")
return progress
async def complete_node(
self,
db: AsyncSession,
user_id: int,
node_id: int
) -> Dict[str, Any]:
"""完成节点"""
# 获取节点
node = await db.get(GrowthPathNode, node_id)
if not node or node.is_deleted:
raise ValueError("节点不存在")
# 获取用户进度
progress_result = await db.execute(
select(UserGrowthPathProgress).where(
and_(
UserGrowthPathProgress.user_id == user_id,
UserGrowthPathProgress.growth_path_id == node.growth_path_id
)
)
)
progress = progress_result.scalar_one_or_none()
if not progress:
raise ValueError("请先开始学习此成长路径")
# 检查前置节点是否完成
completed_node_ids = set(progress.completed_node_ids or [])
prerequisites = node.prerequisites or []
for prereq_id in prerequisites:
if prereq_id not in completed_node_ids:
raise ValueError("前置节点未完成")
# 更新节点完成状态
completion_result = await db.execute(
select(UserNodeCompletion).where(
and_(
UserNodeCompletion.user_id == user_id,
UserNodeCompletion.node_id == node_id
)
)
)
completion = completion_result.scalar_one_or_none()
if not completion:
completion = UserNodeCompletion(
user_id=user_id,
growth_path_id=node.growth_path_id,
node_id=node_id,
)
db.add(completion)
completion.status = NodeStatus.COMPLETED.value
completion.course_progress = Decimal("100.00")
completion.completed_at = datetime.now()
# 更新用户进度
completed_node_ids.add(node_id)
progress.completed_node_ids = list(completed_node_ids)
progress.last_activity_at = datetime.now()
# 计算总进度
total_nodes_result = await db.execute(
select(func.count(GrowthPathNode.id)).where(
and_(
GrowthPathNode.growth_path_id == node.growth_path_id,
GrowthPathNode.is_deleted == False
)
)
)
total_nodes = total_nodes_result.scalar() or 0
progress.total_progress = Decimal(str(len(completed_node_ids) / total_nodes * 100)) if total_nodes > 0 else Decimal("0")
# 检查是否全部完成
if len(completed_node_ids) >= total_nodes:
progress.status = GrowthPathStatus.COMPLETED.value
progress.completed_at = datetime.now()
# 解锁下一个节点
next_nodes = await self._get_unlockable_nodes(
db, node.growth_path_id, completed_node_ids
)
await db.commit()
logger.info(f"用户 {user_id} 完成节点 {node_id}")
return {
"completed": True,
"total_progress": float(progress.total_progress),
"is_path_completed": progress.status == GrowthPathStatus.COMPLETED.value,
"unlocked_nodes": [n.id for n in next_nodes],
}
async def _get_unlockable_nodes(
self,
db: AsyncSession,
growth_path_id: int,
completed_node_ids: set
) -> List[GrowthPathNode]:
"""获取可解锁的节点"""
nodes_result = await db.execute(
select(GrowthPathNode).where(
and_(
GrowthPathNode.growth_path_id == growth_path_id,
GrowthPathNode.is_deleted == False,
GrowthPathNode.id.notin_(completed_node_ids) if completed_node_ids else True
)
)
)
nodes = nodes_result.scalars().all()
unlockable = []
for node in nodes:
prerequisites = node.prerequisites or []
if all(p in completed_node_ids for p in prerequisites):
unlockable.append(node)
return unlockable
# =====================================================
# 同步课程进度到节点
# =====================================================
async def sync_course_progress(
self,
db: AsyncSession,
user_id: int,
course_id: int,
progress: float
):
"""
同步课程学习进度到成长路径节点
当用户完成课程学习时调用
"""
# 查找包含该课程的节点
nodes_result = await db.execute(
select(GrowthPathNode).where(
and_(
GrowthPathNode.course_id == course_id,
GrowthPathNode.is_deleted == False
)
)
)
nodes = nodes_result.scalars().all()
for node in nodes:
# 获取用户在该成长路径的进度
progress_result = await db.execute(
select(UserGrowthPathProgress).where(
and_(
UserGrowthPathProgress.user_id == user_id,
UserGrowthPathProgress.growth_path_id == node.growth_path_id
)
)
)
user_progress = progress_result.scalar_one_or_none()
if not user_progress:
continue
# 更新节点完成记录
completion_result = await db.execute(
select(UserNodeCompletion).where(
and_(
UserNodeCompletion.user_id == user_id,
UserNodeCompletion.node_id == node.id
)
)
)
completion = completion_result.scalar_one_or_none()
if not completion:
completion = UserNodeCompletion(
user_id=user_id,
growth_path_id=node.growth_path_id,
node_id=node.id,
status=NodeStatus.IN_PROGRESS.value,
started_at=datetime.now(),
)
db.add(completion)
completion.course_progress = Decimal(str(progress))
# 如果进度达到100%,自动完成节点
if progress >= 100:
await self.complete_node(db, user_id, node.id)
await db.commit()
# 全局实例
growth_path_service = GrowthPathService()