""" 岗位管理 API(真实数据库) """ from typing import Optional, List from fastapi import APIRouter, Depends, Query, HTTPException from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, and_, func from sqlalchemy.orm import selectinload import sqlalchemy as sa from app.core.deps import get_current_active_user as get_current_user, get_db, require_admin, require_admin_or_manager from app.schemas.base import ResponseModel, PaginationParams, PaginatedResponse from app.models.position import Position from app.models.position_member import PositionMember from app.models.position_course import PositionCourse from app.models.user import User from app.models.course import Course router = APIRouter(prefix="/admin/positions") @router.get("") async def list_positions( pagination: PaginationParams = Depends(), keyword: Optional[str] = Query(None, description="关键词"), current_user=Depends(require_admin_or_manager), db: AsyncSession = Depends(get_db), ) -> ResponseModel: """分页获取岗位列表(管理员或经理)。""" stmt = select(Position).where(Position.is_deleted == False) if keyword: like = f"%{keyword}%" stmt = stmt.where((Position.name.ilike(like)) | (Position.description.ilike(like))) rows = (await db.execute(stmt)).scalars().all() total = len(rows) sliced = rows[pagination.offset : pagination.offset + pagination.limit] async def to_dict(p: Position) -> dict: """将Position对象转换为字典,并添加统计数据""" d = p.__dict__.copy() d.pop("_sa_instance_state", None) # 统计岗位成员数量 member_count_result = await db.execute( select(func.count(PositionMember.id)).where( and_( PositionMember.position_id == p.id, PositionMember.is_deleted == False ) ) ) d["memberCount"] = member_count_result.scalar() or 0 # 统计必修课程数量 required_count_result = await db.execute( select(func.count(PositionCourse.id)).where( and_( PositionCourse.position_id == p.id, PositionCourse.course_type == "required", PositionCourse.is_deleted == False ) ) ) d["requiredCourses"] = required_count_result.scalar() or 0 # 统计选修课程数量 optional_count_result = await db.execute( select(func.count(PositionCourse.id)).where( and_( PositionCourse.position_id == p.id, PositionCourse.course_type == "optional", PositionCourse.is_deleted == False ) ) ) d["optionalCourses"] = optional_count_result.scalar() or 0 return d # 为每个岗位添加统计数据(使用异步) items = [] for p in sliced: item = await to_dict(p) items.append(item) paged = { "items": items, "total": total, "page": pagination.page, "page_size": pagination.page_size, "pages": (total + pagination.page_size - 1) // pagination.page_size if pagination.page_size else 1, } return ResponseModel(message="获取岗位列表成功", data=paged) @router.get("/tree") async def get_position_tree( current_user=Depends(require_admin_or_manager), db: AsyncSession = Depends(get_db) ) -> ResponseModel: """获取岗位树(管理员或经理)。""" rows = (await db.execute(select(Position).where(Position.is_deleted == False))).scalars().all() id_to_node = {p.id: {**p.__dict__, "children": []} for p in rows} roots: List[dict] = [] for p in rows: node = id_to_node[p.id] parent_id = p.parent_id if parent_id and parent_id in id_to_node: id_to_node[parent_id]["children"].append(node) else: roots.append(node) # 清理 _sa_instance_state def clean(d: dict): d.pop("_sa_instance_state", None) for c in d.get("children", []): clean(c) for r in roots: clean(r) return ResponseModel(message="获取岗位树成功", data=roots) @router.post("") async def create_position( payload: dict, current_user=Depends(require_admin), db: AsyncSession = Depends(get_db) ) -> ResponseModel: obj = Position( name=payload.get("name"), code=payload.get("code"), description=payload.get("description"), parent_id=payload.get("parentId"), status=payload.get("status", "active"), skills=payload.get("skills"), level=payload.get("level"), sort_order=payload.get("sort_order", 0), created_by=current_user.id, ) db.add(obj) await db.commit() await db.refresh(obj) return ResponseModel(message="创建岗位成功", data={"id": obj.id}) @router.put("/{position_id}") async def update_position( position_id: int, payload: dict, current_user=Depends(require_admin), db: AsyncSession = Depends(get_db) ) -> ResponseModel: obj = await db.get(Position, position_id) if not obj or obj.is_deleted: return ResponseModel(code=404, message="岗位不存在") obj.name = payload.get("name", obj.name) obj.code = payload.get("code", obj.code) obj.description = payload.get("description", obj.description) obj.parent_id = payload.get("parentId", obj.parent_id) obj.status = payload.get("status", obj.status) obj.skills = payload.get("skills", obj.skills) obj.level = payload.get("level", obj.level) obj.sort_order = payload.get("sort_order", obj.sort_order) obj.updated_by = current_user.id await db.commit() await db.refresh(obj) # 返回更新后的完整数据 data = obj.__dict__.copy() data.pop("_sa_instance_state", None) return ResponseModel(message="更新岗位成功", data=data) @router.get("/{position_id}") async def get_position_detail( position_id: int, current_user=Depends(require_admin), db: AsyncSession = Depends(get_db) ) -> ResponseModel: obj = await db.get(Position, position_id) if not obj or obj.is_deleted: return ResponseModel(code=404, message="岗位不存在") data = obj.__dict__.copy() data.pop("_sa_instance_state", None) return ResponseModel(data=data) @router.get("/{position_id}/check-delete") async def check_position_delete( position_id: int, current_user=Depends(require_admin), db: AsyncSession = Depends(get_db) ) -> ResponseModel: obj = await db.get(Position, position_id) if not obj or obj.is_deleted: return ResponseModel(code=404, message="岗位不存在") # 检查是否有子岗位 child_count_result = await db.execute( select(func.count(Position.id)).where( and_( Position.parent_id == position_id, Position.is_deleted == False ) ) ) child_count = child_count_result.scalar() or 0 if child_count > 0: return ResponseModel(data={ "deletable": False, "reason": f"该岗位下有 {child_count} 个子岗位,请先删除或移动子岗位" }) # 检查是否有成员(仅作为提醒,不阻止删除) member_count_result = await db.execute( select(func.count(PositionMember.id)).where( and_( PositionMember.position_id == position_id, PositionMember.is_deleted == False ) ) ) member_count = member_count_result.scalar() or 0 warning = "" if member_count > 0: warning = f"注意:该岗位当前有 {member_count} 名成员,删除后这些成员将不再属于此岗位" return ResponseModel(data={"deletable": True, "reason": "", "warning": warning, "member_count": member_count}) @router.delete("/{position_id}") async def delete_position( position_id: int, current_user=Depends(require_admin), db: AsyncSession = Depends(get_db) ) -> ResponseModel: obj = await db.get(Position, position_id) if not obj or obj.is_deleted: return ResponseModel(code=404, message="岗位不存在") # 检查是否有子岗位 child_count_result = await db.execute( select(func.count(Position.id)).where( and_( Position.parent_id == position_id, Position.is_deleted == False ) ) ) child_count = child_count_result.scalar() or 0 if child_count > 0: return ResponseModel( code=400, message=f"该岗位下有 {child_count} 个子岗位,请先删除或移动子岗位" ) # 软删除岗位成员关联 await db.execute( sa.update(PositionMember) .where(PositionMember.position_id == position_id) .values(is_deleted=True) ) # 软删除岗位课程关联 await db.execute( sa.update(PositionCourse) .where(PositionCourse.position_id == position_id) .values(is_deleted=True) ) # 软删除岗位 obj.is_deleted = True await db.commit() return ResponseModel(message="岗位已删除") # ========== 岗位成员管理 API ========== @router.get("/{position_id}/members") async def get_position_members( position_id: int, pagination: PaginationParams = Depends(), keyword: Optional[str] = Query(None, description="搜索关键词"), current_user=Depends(require_admin), db: AsyncSession = Depends(get_db), ) -> ResponseModel: """获取岗位成员列表""" # 验证岗位存在 position = await db.get(Position, position_id) if not position or position.is_deleted: return ResponseModel(code=404, message="岗位不存在") # 构建查询 stmt = ( select(PositionMember, User) .join(User, PositionMember.user_id == User.id) .where( and_( PositionMember.position_id == position_id, PositionMember.is_deleted == False, User.is_deleted == False ) ) ) # 关键词搜索 if keyword: like = f"%{keyword}%" stmt = stmt.where( (User.username.ilike(like)) | (User.full_name.ilike(like)) | (User.email.ilike(like)) ) # 执行查询 result = await db.execute(stmt) rows = result.all() total = len(rows) sliced = rows[pagination.offset : pagination.offset + pagination.limit] # 格式化数据 items = [] for pm, user in sliced: items.append({ "id": pm.id, "user_id": user.id, "username": user.username, "full_name": user.full_name, "email": user.email, "phone": user.phone, "role": pm.role, "joined_at": pm.joined_at.isoformat() if pm.joined_at else None, "user_role": user.role, # 系统角色 "is_active": user.is_active, }) return ResponseModel( message="获取成员列表成功", data={ "items": items, "total": total, "page": pagination.page, "page_size": pagination.page_size, "pages": (total + pagination.page_size - 1) // pagination.page_size if pagination.page_size else 1, } ) @router.post("/{position_id}/members") async def add_position_members( position_id: int, payload: dict, current_user=Depends(require_admin), db: AsyncSession = Depends(get_db), ) -> ResponseModel: """批量添加岗位成员""" # 验证岗位存在 position = await db.get(Position, position_id) if not position or position.is_deleted: return ResponseModel(code=404, message="岗位不存在") user_ids = payload.get("user_ids", []) if not user_ids: return ResponseModel(code=400, message="请选择要添加的用户") # 验证用户存在 users = await db.execute( select(User).where( and_( User.id.in_(user_ids), User.is_deleted == False ) ) ) valid_users = {u.id: u for u in users.scalars().all()} if len(valid_users) != len(user_ids): invalid_ids = set(user_ids) - set(valid_users.keys()) return ResponseModel(code=400, message=f"部分用户不存在: {invalid_ids}") # 检查是否已存在 existing = await db.execute( select(PositionMember).where( and_( PositionMember.position_id == position_id, PositionMember.user_id.in_(user_ids), PositionMember.is_deleted == False ) ) ) existing_user_ids = {pm.user_id for pm in existing.scalars().all()} # 添加新成员 added_count = 0 for user_id in user_ids: if user_id not in existing_user_ids: member = PositionMember( position_id=position_id, user_id=user_id, role=payload.get("role") ) db.add(member) added_count += 1 await db.commit() return ResponseModel( message=f"成功添加 {added_count} 个成员", data={"added_count": added_count} ) @router.delete("/{position_id}/members/{user_id}") async def remove_position_member( position_id: int, user_id: int, current_user=Depends(require_admin), db: AsyncSession = Depends(get_db), ) -> ResponseModel: """移除岗位成员""" # 查找成员关系 member = await db.execute( select(PositionMember).where( and_( PositionMember.position_id == position_id, PositionMember.user_id == user_id, PositionMember.is_deleted == False ) ) ) member = member.scalar_one_or_none() if not member: return ResponseModel(code=404, message="成员关系不存在") # 软删除 member.is_deleted = True await db.commit() return ResponseModel(message="成员已移除") # ========== 岗位课程管理 API ========== @router.get("/{position_id}/courses") async def get_position_courses( position_id: int, course_type: Optional[str] = Query(None, description="课程类型:required/optional"), current_user=Depends(require_admin), db: AsyncSession = Depends(get_db), ) -> ResponseModel: """获取岗位课程列表""" # 验证岗位存在 position = await db.get(Position, position_id) if not position or position.is_deleted: return ResponseModel(code=404, message="岗位不存在") # 构建查询 stmt = ( select(PositionCourse, Course) .join(Course, PositionCourse.course_id == Course.id) .where( and_( PositionCourse.position_id == position_id, PositionCourse.is_deleted == False, Course.is_deleted == False ) ) ) # 课程类型筛选 if course_type: stmt = stmt.where(PositionCourse.course_type == course_type) # 按优先级排序 stmt = stmt.order_by(PositionCourse.priority, PositionCourse.id) # 执行查询 result = await db.execute(stmt) rows = result.all() # 格式化数据 items = [] for pc, course in rows: items.append({ "id": pc.id, "course_id": course.id, "course_name": course.name, "course_description": course.description, "course_category": course.category, "course_status": course.status, "course_duration_hours": course.duration_hours, "course_difficulty_level": course.difficulty_level, "course_type": pc.course_type, "priority": pc.priority, "created_at": pc.created_at.isoformat() if pc.created_at else None, }) # 统计 stats = { "total": len(items), "required_count": sum(1 for item in items if item["course_type"] == "required"), "optional_count": sum(1 for item in items if item["course_type"] == "optional"), } return ResponseModel( message="获取课程列表成功", data={ "items": items, "stats": stats } ) @router.post("/{position_id}/courses") async def add_position_courses( position_id: int, payload: dict, current_user=Depends(require_admin), db: AsyncSession = Depends(get_db), ) -> ResponseModel: """批量添加岗位课程""" # 验证岗位存在 position = await db.get(Position, position_id) if not position or position.is_deleted: return ResponseModel(code=404, message="岗位不存在") course_ids = payload.get("course_ids", []) if not course_ids: return ResponseModel(code=400, message="请选择要添加的课程") course_type = payload.get("course_type", "required") if course_type not in ["required", "optional"]: return ResponseModel(code=400, message="课程类型无效") # 验证课程存在 courses = await db.execute( select(Course).where( and_( Course.id.in_(course_ids), Course.is_deleted == False ) ) ) valid_courses = {c.id: c for c in courses.scalars().all()} if len(valid_courses) != len(course_ids): invalid_ids = set(course_ids) - set(valid_courses.keys()) return ResponseModel(code=400, message=f"部分课程不存在: {invalid_ids}") # 检查是否已存在 existing = await db.execute( select(PositionCourse).where( and_( PositionCourse.position_id == position_id, PositionCourse.course_id.in_(course_ids), PositionCourse.is_deleted == False ) ) ) existing_course_ids = {pc.course_id for pc in existing.scalars().all()} # 获取当前最大优先级 max_priority_result = await db.execute( select(sa.func.max(PositionCourse.priority)).where( and_( PositionCourse.position_id == position_id, PositionCourse.is_deleted == False ) ) ) max_priority = max_priority_result.scalar() or 0 # 添加新课程 added_count = 0 for idx, course_id in enumerate(course_ids): if course_id not in existing_course_ids: pc = PositionCourse( position_id=position_id, course_id=course_id, course_type=course_type, priority=max_priority + idx + 1, ) db.add(pc) added_count += 1 await db.commit() return ResponseModel( message=f"成功添加 {added_count} 门课程", data={"added_count": added_count} ) @router.put("/{position_id}/courses/{pc_id}") async def update_position_course( position_id: int, pc_id: int, payload: dict, current_user=Depends(require_admin), db: AsyncSession = Depends(get_db), ) -> ResponseModel: """更新岗位课程设置""" # 查找课程关系 pc = await db.execute( select(PositionCourse).where( and_( PositionCourse.id == pc_id, PositionCourse.position_id == position_id, PositionCourse.is_deleted == False ) ) ) pc = pc.scalar_one_or_none() if not pc: return ResponseModel(code=404, message="课程关系不存在") # 更新课程类型 if "course_type" in payload: course_type = payload["course_type"] if course_type not in ["required", "optional"]: return ResponseModel(code=400, message="课程类型无效") pc.course_type = course_type # 更新优先级 if "priority" in payload: pc.priority = payload["priority"] # PositionCourse 未继承审计字段,避免写入不存在字段 await db.commit() return ResponseModel(message="更新成功") @router.delete("/{position_id}/courses/{course_id}") async def remove_position_course( position_id: int, course_id: int, current_user=Depends(require_admin), db: AsyncSession = Depends(get_db), ) -> ResponseModel: """移除岗位课程""" # 查找课程关系 pc = await db.execute( select(PositionCourse).where( and_( PositionCourse.position_id == position_id, PositionCourse.course_id == course_id, PositionCourse.is_deleted == False ) ) ) pc = pc.scalar_one_or_none() if not pc: return ResponseModel(code=404, message="课程关系不存在") # 软删除 pc.is_deleted = True # PositionCourse 未继承审计字段,避免写入不存在字段 await db.commit() return ResponseModel(message="课程已移除")