Files
012-kaopeilian/backend/app/services/growth_path_service.py
yuliang_guo 67b3c28d33
All checks were successful
continuous-integration/drone/push Build is passing
debug: 添加成长路径返回数据日志
2026-01-30 18:28:58 +08:00

822 lines
30 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
成长路径服务
"""
import logging
from typing import List, Optional, Dict, Any
from datetime import datetime
from decimal import Decimal
from sqlalchemy import select, func, and_, delete
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.models.course import GrowthPath, Course
from app.models.growth_path import (
GrowthPathNode,
UserGrowthPathProgress,
UserNodeCompletion,
GrowthPathStatus,
NodeStatus,
)
from app.models.user_course_progress import UserCourseProgress, ProgressStatus
from app.models.position import Position
from app.models.position_member import PositionMember
from app.schemas.growth_path import (
GrowthPathCreate,
GrowthPathUpdate,
GrowthPathNodeCreate,
TraineeGrowthPathResponse,
TraineeStageResponse,
TraineeNodeResponse,
)
logger = logging.getLogger(__name__)
class GrowthPathService:
"""成长路径服务"""
# =====================================================
# 管理端 - CRUD
# =====================================================
async def create_growth_path(
self,
db: AsyncSession,
data: GrowthPathCreate,
created_by: int
) -> GrowthPath:
"""创建成长路径"""
# 检查名称是否重复
existing = await db.execute(
select(GrowthPath).where(
and_(
GrowthPath.name == data.name,
GrowthPath.is_deleted == False
)
)
)
if existing.scalar_one_or_none():
raise ValueError(f"成长路径名称 '{data.name}' 已存在")
# 处理岗位关联:优先使用 position_ids兼容 position_id
position_ids = data.position_ids
position_id = data.position_id
if position_ids is None and position_id is not None:
position_ids = [position_id]
elif position_ids and not position_id:
position_id = position_ids[0] if position_ids else None
# 创建成长路径
growth_path = GrowthPath(
name=data.name,
description=data.description,
target_role=data.target_role,
position_id=position_id,
position_ids=position_ids,
stages=[s.model_dump() for s in data.stages] if data.stages else None,
estimated_duration_days=data.estimated_duration_days,
is_active=data.is_active,
sort_order=data.sort_order,
)
db.add(growth_path)
await db.flush()
# 创建节点
if data.nodes:
for node_data in data.nodes:
node = GrowthPathNode(
growth_path_id=growth_path.id,
course_id=node_data.course_id,
stage_name=node_data.stage_name,
title=node_data.title,
description=node_data.description,
order_num=node_data.order_num,
is_required=node_data.is_required,
prerequisites=node_data.prerequisites,
estimated_days=node_data.estimated_days,
)
db.add(node)
await db.commit()
await db.refresh(growth_path)
logger.info(f"创建成长路径: {growth_path.name}, ID: {growth_path.id}")
return growth_path
async def update_growth_path(
self,
db: AsyncSession,
path_id: int,
data: GrowthPathUpdate
) -> GrowthPath:
"""更新成长路径"""
growth_path = await db.get(GrowthPath, path_id)
if not growth_path or growth_path.is_deleted:
raise ValueError("成长路径不存在")
# 更新基本信息
update_data = data.model_dump(exclude_unset=True, exclude={'nodes'})
if 'stages' in update_data and update_data['stages']:
update_data['stages'] = [s.model_dump() if hasattr(s, 'model_dump') else s for s in update_data['stages']]
# 处理 position_ids 和 position_id 的兼容
if 'position_ids' in update_data and update_data['position_ids']:
update_data['position_id'] = update_data['position_ids'][0]
elif 'position_id' in update_data and update_data['position_id'] and 'position_ids' not in update_data:
update_data['position_ids'] = [update_data['position_id']]
for key, value in update_data.items():
setattr(growth_path, key, value)
# 如果提供了节点,整体替换
if data.nodes is not None:
# 删除旧节点
await db.execute(
delete(GrowthPathNode).where(GrowthPathNode.growth_path_id == path_id)
)
# 创建新节点
for node_data in data.nodes:
node = GrowthPathNode(
growth_path_id=path_id,
course_id=node_data.course_id,
stage_name=node_data.stage_name,
title=node_data.title,
description=node_data.description,
order_num=node_data.order_num,
is_required=node_data.is_required,
prerequisites=node_data.prerequisites,
estimated_days=node_data.estimated_days,
)
db.add(node)
await db.commit()
await db.refresh(growth_path)
logger.info(f"更新成长路径: {growth_path.name}, ID: {path_id}")
return growth_path
async def delete_growth_path(self, db: AsyncSession, path_id: int) -> bool:
"""删除成长路径(软删除)"""
growth_path = await db.get(GrowthPath, path_id)
if not growth_path or growth_path.is_deleted:
raise ValueError("成长路径不存在")
growth_path.is_deleted = True
growth_path.deleted_at = datetime.now()
await db.commit()
logger.info(f"删除成长路径: {growth_path.name}, ID: {path_id}")
return True
async def get_growth_path(
self,
db: AsyncSession,
path_id: int
) -> Optional[Dict[str, Any]]:
"""获取成长路径详情"""
result = await db.execute(
select(GrowthPath)
.options(selectinload(GrowthPath.nodes))
.where(
and_(
GrowthPath.id == path_id,
GrowthPath.is_deleted == False
)
)
)
growth_path = result.scalar_one_or_none()
if not growth_path:
return None
# 获取岗位名称(支持多岗位)
position_ids = growth_path.position_ids or []
# 兼容旧数据
if not position_ids and growth_path.position_id:
position_ids = [growth_path.position_id]
position_names = []
for pid in position_ids:
position = await db.get(Position, pid)
if position:
position_names.append(position.name)
# 兼容旧版 position_name取第一个
position_name = position_names[0] if position_names else None
# 获取课程名称
nodes_data = []
for node in sorted(growth_path.nodes, key=lambda x: x.order_num):
if node.is_deleted:
continue
course = await db.get(Course, node.course_id)
nodes_data.append({
"id": node.id,
"growth_path_id": node.growth_path_id,
"course_id": node.course_id,
"course_name": course.name if course else None,
"stage_name": node.stage_name,
"title": node.title,
"description": node.description,
"order_num": node.order_num,
"is_required": node.is_required,
"prerequisites": node.prerequisites,
"estimated_days": node.estimated_days,
"created_at": node.created_at,
"updated_at": node.updated_at,
})
return {
"id": growth_path.id,
"name": growth_path.name,
"description": growth_path.description,
"target_role": growth_path.target_role,
"position_id": growth_path.position_id,
"position_ids": position_ids,
"position_name": position_name,
"position_names": position_names,
"stages": growth_path.stages,
"estimated_duration_days": growth_path.estimated_duration_days,
"is_active": growth_path.is_active,
"sort_order": growth_path.sort_order,
"nodes": nodes_data,
"node_count": len(nodes_data),
"created_at": growth_path.created_at,
"updated_at": growth_path.updated_at,
}
async def list_growth_paths(
self,
db: AsyncSession,
position_id: Optional[int] = None,
is_active: Optional[bool] = None,
page: int = 1,
page_size: int = 20
) -> Dict[str, Any]:
"""获取成长路径列表"""
query = select(GrowthPath).where(GrowthPath.is_deleted == False)
if position_id is not None:
query = query.where(GrowthPath.position_id == position_id)
if is_active is not None:
query = query.where(GrowthPath.is_active == is_active)
# 计算总数
count_result = await db.execute(
select(func.count(GrowthPath.id)).where(GrowthPath.is_deleted == False)
)
total = count_result.scalar() or 0
# 分页
query = query.order_by(GrowthPath.sort_order, GrowthPath.id.desc())
query = query.offset((page - 1) * page_size).limit(page_size)
result = await db.execute(query)
paths = result.scalars().all()
items = []
for path in paths:
# 获取岗位名称(支持多岗位)
position_ids = path.position_ids or []
if not position_ids and path.position_id:
position_ids = [path.position_id]
position_names = []
for pid in position_ids:
position = await db.get(Position, pid)
if position:
position_names.append(position.name)
position_name = position_names[0] if position_names else None
# 获取节点数量
node_count_result = await db.execute(
select(func.count(GrowthPathNode.id)).where(
and_(
GrowthPathNode.growth_path_id == path.id,
GrowthPathNode.is_deleted == False
)
)
)
node_count = node_count_result.scalar() or 0
items.append({
"id": path.id,
"name": path.name,
"description": path.description,
"position_id": path.position_id,
"position_ids": position_ids,
"position_name": position_name,
"position_names": position_names,
"is_active": path.is_active,
"node_count": node_count,
"estimated_duration_days": path.estimated_duration_days,
"created_at": path.created_at,
})
return {
"items": items,
"total": total,
"page": page,
"page_size": page_size,
}
# =====================================================
# 学员端 - 获取成长路径
# =====================================================
async def get_trainee_growth_path(
self,
db: AsyncSession,
user_id: int,
position_id: Optional[int] = None
) -> Optional[TraineeGrowthPathResponse]:
"""
获取学员的成长路径(含进度)
如果指定了岗位ID返回该岗位的成长路径
否则根据用户岗位自动匹配
"""
# 如果没有指定岗位,获取用户的岗位列表
user_position_ids = []
if not position_id:
pos_result = await db.execute(
select(PositionMember.position_id).where(
and_(
PositionMember.user_id == user_id,
PositionMember.is_deleted == False
)
)
)
user_position_ids = [row[0] for row in pos_result.fetchall()]
logger.info(f"用户 {user_id} 的岗位列表: {user_position_ids}")
# 查找成长路径
query = select(GrowthPath).where(
and_(
GrowthPath.is_deleted == False,
GrowthPath.is_active == True
)
)
# 按 sort_order 排序获取所有激活的成长路径
query = query.order_by(GrowthPath.sort_order)
result = await db.execute(query)
all_paths = result.scalars().all()
growth_path = None
if position_id:
# 如果指定了岗位ID查找匹配的路径
for path in all_paths:
# 检查 position_ids (多岗位) 或 position_id (单岗位)
path_position_ids = path.position_ids or []
if path.position_id:
path_position_ids = list(set(path_position_ids + [path.position_id]))
if position_id in path_position_ids:
growth_path = path
break
elif user_position_ids:
# 根据用户岗位匹配
for path in all_paths:
path_position_ids = path.position_ids or []
if path.position_id:
path_position_ids = list(set(path_position_ids + [path.position_id]))
# 检查用户岗位是否与路径岗位有交集
if any(pid in path_position_ids for pid in user_position_ids):
growth_path = path
logger.info(f"匹配到成长路径: {path.id}, 路径岗位: {path_position_ids}")
break
else:
# 没有岗位限制,返回第一个
growth_path = all_paths[0] if all_paths else None
if not growth_path:
return None
# 获取岗位名称
position_name = None
if growth_path.position_id:
position = await db.get(Position, growth_path.position_id)
if position:
position_name = position.name
# 获取所有节点
nodes_result = await db.execute(
select(GrowthPathNode).where(
and_(
GrowthPathNode.growth_path_id == growth_path.id,
GrowthPathNode.is_deleted == False
)
).order_by(GrowthPathNode.order_num)
)
nodes = nodes_result.scalars().all()
# 获取用户进度
progress_result = await db.execute(
select(UserGrowthPathProgress).where(
and_(
UserGrowthPathProgress.user_id == user_id,
UserGrowthPathProgress.growth_path_id == growth_path.id
)
)
)
user_progress = progress_result.scalar_one_or_none()
# 获取用户节点完成情况
completions_result = await db.execute(
select(UserNodeCompletion).where(
and_(
UserNodeCompletion.user_id == user_id,
UserNodeCompletion.growth_path_id == growth_path.id
)
)
)
completions = {c.node_id: c for c in completions_result.scalars().all()}
# 获取用户课程学习进度
course_ids = [n.course_id for n in nodes]
course_progress_result = await db.execute(
select(UserCourseProgress).where(
and_(
UserCourseProgress.user_id == user_id,
UserCourseProgress.course_id.in_(course_ids)
)
)
)
course_progress_map = {cp.course_id: cp for cp in course_progress_result.scalars().all()}
# 构建节点响应
completed_node_ids = set(user_progress.completed_node_ids or []) if user_progress else set()
nodes_by_stage: Dict[str, List[TraineeNodeResponse]] = {}
for node in nodes:
# 获取课程信息
course = await db.get(Course, node.course_id)
# 计算节点状态
node_status = self._calculate_node_status(
node=node,
completed_node_ids=completed_node_ids,
completions=completions,
course_progress_map=course_progress_map
)
# 获取课程进度
course_prog = course_progress_map.get(node.course_id)
progress = float(course_prog.progress) if course_prog else 0
node_response = TraineeNodeResponse(
id=node.id,
course_id=node.course_id,
title=node.title,
description=node.description,
stage_name=node.stage_name,
is_required=node.is_required,
estimated_days=node.estimated_days,
order_num=node.order_num,
status=node_status,
progress=progress,
course_name=course.name if course else None,
course_cover=course.cover_image if course else None,
)
stage_name = node.stage_name or "默认阶段"
if stage_name not in nodes_by_stage:
nodes_by_stage[stage_name] = []
nodes_by_stage[stage_name].append(node_response)
# 构建阶段响应
stages = []
stage_configs = growth_path.stages or []
stage_order = {s.get('name', ''): s.get('order', i) for i, s in enumerate(stage_configs)}
for stage_name, stage_nodes in sorted(nodes_by_stage.items(), key=lambda x: stage_order.get(x[0], 999)):
completed_in_stage = sum(1 for n in stage_nodes if n.status == NodeStatus.COMPLETED.value)
stage_desc = next((s.get('description') for s in stage_configs if s.get('name') == stage_name), None)
stages.append(TraineeStageResponse(
name=stage_name,
description=stage_desc,
completed=completed_in_stage,
total=len(stage_nodes),
nodes=stage_nodes,
))
# 计算总体进度
total_nodes = len(nodes)
completed_count = len(completed_node_ids)
total_progress = (completed_count / total_nodes * 100) if total_nodes > 0 else 0
logger.info(f"成长路径 {growth_path.id} 返回 {len(stages)} 个阶段, {total_nodes} 个节点")
for s in stages:
logger.info(f" 阶段 '{s.name}': {len(s.nodes)} 个节点")
return TraineeGrowthPathResponse(
id=growth_path.id,
name=growth_path.name,
description=growth_path.description,
position_id=growth_path.position_id,
position_name=position_name,
total_progress=round(total_progress, 1),
completed_nodes=completed_count,
total_nodes=total_nodes,
status=user_progress.status if user_progress else GrowthPathStatus.NOT_STARTED.value,
started_at=user_progress.started_at if user_progress else None,
estimated_completion_days=growth_path.estimated_duration_days,
stages=stages,
)
def _calculate_node_status(
self,
node: GrowthPathNode,
completed_node_ids: set,
completions: Dict[int, UserNodeCompletion],
course_progress_map: Dict[int, UserCourseProgress]
) -> str:
"""计算节点状态"""
# 已完成
if node.id in completed_node_ids:
return NodeStatus.COMPLETED.value
# 检查前置节点
prerequisites = node.prerequisites or []
if prerequisites:
for prereq_id in prerequisites:
if prereq_id not in completed_node_ids:
return NodeStatus.LOCKED.value
# 检查用户节点记录
completion = completions.get(node.id)
if completion:
if completion.status == NodeStatus.IN_PROGRESS.value:
return NodeStatus.IN_PROGRESS.value
if completion.status == NodeStatus.COMPLETED.value:
return NodeStatus.COMPLETED.value
# 检查课程进度
course_progress = course_progress_map.get(node.course_id)
if course_progress:
if course_progress.status == ProgressStatus.COMPLETED.value:
return NodeStatus.COMPLETED.value
if course_progress.progress > 0:
return NodeStatus.IN_PROGRESS.value
# 前置已完成,当前未开始
return NodeStatus.UNLOCKED.value
# =====================================================
# 学员端 - 开始/完成
# =====================================================
async def start_growth_path(
self,
db: AsyncSession,
user_id: int,
growth_path_id: int
) -> UserGrowthPathProgress:
"""开始学习成长路径"""
# 检查成长路径是否存在
growth_path = await db.get(GrowthPath, growth_path_id)
if not growth_path or growth_path.is_deleted or not growth_path.is_active:
raise ValueError("成长路径不存在或未启用")
# 检查是否已开始
existing = await db.execute(
select(UserGrowthPathProgress).where(
and_(
UserGrowthPathProgress.user_id == user_id,
UserGrowthPathProgress.growth_path_id == growth_path_id
)
)
)
if existing.scalar_one_or_none():
raise ValueError("已开始学习此成长路径")
# 创建进度记录
progress = UserGrowthPathProgress(
user_id=user_id,
growth_path_id=growth_path_id,
status=GrowthPathStatus.IN_PROGRESS.value,
started_at=datetime.now(),
last_activity_at=datetime.now(),
)
db.add(progress)
# 获取第一个节点(无前置依赖的)
first_node_result = await db.execute(
select(GrowthPathNode).where(
and_(
GrowthPathNode.growth_path_id == growth_path_id,
GrowthPathNode.is_deleted == False
)
).order_by(GrowthPathNode.order_num).limit(1)
)
first_node = first_node_result.scalar_one_or_none()
if first_node:
progress.current_node_id = first_node.id
# 解锁第一个节点
completion = UserNodeCompletion(
user_id=user_id,
growth_path_id=growth_path_id,
node_id=first_node.id,
status=NodeStatus.UNLOCKED.value,
unlocked_at=datetime.now(),
)
db.add(completion)
await db.commit()
await db.refresh(progress)
logger.info(f"用户 {user_id} 开始学习成长路径 {growth_path_id}")
return progress
async def complete_node(
self,
db: AsyncSession,
user_id: int,
node_id: int
) -> Dict[str, Any]:
"""完成节点"""
# 获取节点
node = await db.get(GrowthPathNode, node_id)
if not node or node.is_deleted:
raise ValueError("节点不存在")
# 获取用户进度
progress_result = await db.execute(
select(UserGrowthPathProgress).where(
and_(
UserGrowthPathProgress.user_id == user_id,
UserGrowthPathProgress.growth_path_id == node.growth_path_id
)
)
)
progress = progress_result.scalar_one_or_none()
if not progress:
raise ValueError("请先开始学习此成长路径")
# 检查前置节点是否完成
completed_node_ids = set(progress.completed_node_ids or [])
prerequisites = node.prerequisites or []
for prereq_id in prerequisites:
if prereq_id not in completed_node_ids:
raise ValueError("前置节点未完成")
# 更新节点完成状态
completion_result = await db.execute(
select(UserNodeCompletion).where(
and_(
UserNodeCompletion.user_id == user_id,
UserNodeCompletion.node_id == node_id
)
)
)
completion = completion_result.scalar_one_or_none()
if not completion:
completion = UserNodeCompletion(
user_id=user_id,
growth_path_id=node.growth_path_id,
node_id=node_id,
)
db.add(completion)
completion.status = NodeStatus.COMPLETED.value
completion.course_progress = Decimal("100.00")
completion.completed_at = datetime.now()
# 更新用户进度
completed_node_ids.add(node_id)
progress.completed_node_ids = list(completed_node_ids)
progress.last_activity_at = datetime.now()
# 计算总进度
total_nodes_result = await db.execute(
select(func.count(GrowthPathNode.id)).where(
and_(
GrowthPathNode.growth_path_id == node.growth_path_id,
GrowthPathNode.is_deleted == False
)
)
)
total_nodes = total_nodes_result.scalar() or 0
progress.total_progress = Decimal(str(len(completed_node_ids) / total_nodes * 100)) if total_nodes > 0 else Decimal("0")
# 检查是否全部完成
if len(completed_node_ids) >= total_nodes:
progress.status = GrowthPathStatus.COMPLETED.value
progress.completed_at = datetime.now()
# 解锁下一个节点
next_nodes = await self._get_unlockable_nodes(
db, node.growth_path_id, completed_node_ids
)
await db.commit()
logger.info(f"用户 {user_id} 完成节点 {node_id}")
return {
"completed": True,
"total_progress": float(progress.total_progress),
"is_path_completed": progress.status == GrowthPathStatus.COMPLETED.value,
"unlocked_nodes": [n.id for n in next_nodes],
}
async def _get_unlockable_nodes(
self,
db: AsyncSession,
growth_path_id: int,
completed_node_ids: set
) -> List[GrowthPathNode]:
"""获取可解锁的节点"""
nodes_result = await db.execute(
select(GrowthPathNode).where(
and_(
GrowthPathNode.growth_path_id == growth_path_id,
GrowthPathNode.is_deleted == False,
GrowthPathNode.id.notin_(completed_node_ids) if completed_node_ids else True
)
)
)
nodes = nodes_result.scalars().all()
unlockable = []
for node in nodes:
prerequisites = node.prerequisites or []
if all(p in completed_node_ids for p in prerequisites):
unlockable.append(node)
return unlockable
# =====================================================
# 同步课程进度到节点
# =====================================================
async def sync_course_progress(
self,
db: AsyncSession,
user_id: int,
course_id: int,
progress: float
):
"""
同步课程学习进度到成长路径节点
当用户完成课程学习时调用
"""
# 查找包含该课程的节点
nodes_result = await db.execute(
select(GrowthPathNode).where(
and_(
GrowthPathNode.course_id == course_id,
GrowthPathNode.is_deleted == False
)
)
)
nodes = nodes_result.scalars().all()
for node in nodes:
# 获取用户在该成长路径的进度
progress_result = await db.execute(
select(UserGrowthPathProgress).where(
and_(
UserGrowthPathProgress.user_id == user_id,
UserGrowthPathProgress.growth_path_id == node.growth_path_id
)
)
)
user_progress = progress_result.scalar_one_or_none()
if not user_progress:
continue
# 更新节点完成记录
completion_result = await db.execute(
select(UserNodeCompletion).where(
and_(
UserNodeCompletion.user_id == user_id,
UserNodeCompletion.node_id == node.id
)
)
)
completion = completion_result.scalar_one_or_none()
if not completion:
completion = UserNodeCompletion(
user_id=user_id,
growth_path_id=node.growth_path_id,
node_id=node.id,
status=NodeStatus.IN_PROGRESS.value,
started_at=datetime.now(),
)
db.add(completion)
completion.course_progress = Decimal(str(progress))
# 如果进度达到100%,自动完成节点
if progress >= 100:
await self.complete_node(db, user_id, node.id)
await db.commit()
# 全局实例
growth_path_service = GrowthPathService()