Files
012-kaopeilian/backend/app/services/scrm_service.py
111 998211c483 feat: 初始化考培练系统项目
- 从服务器拉取完整代码
- 按框架规范整理项目结构
- 配置 Drone CI 测试环境部署
- 包含后端(FastAPI)、前端(Vue3)、管理端

技术栈: Vue3 + TypeScript + FastAPI + MySQL
2026-01-24 19:33:28 +08:00

357 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
SCRM 系统对接服务
提供给 SCRM 系统调用的数据查询服务
"""
import logging
from typing import List, Optional, Dict, Any
from sqlalchemy import select, func, or_
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.models.user import User
from app.models.position import Position
from app.models.position_member import PositionMember
from app.models.position_course import PositionCourse
from app.models.course import Course, KnowledgePoint, CourseMaterial
logger = logging.getLogger(__name__)
class SCRMService:
"""SCRM 系统数据查询服务"""
def __init__(self, db: AsyncSession):
self.db = db
async def get_employee_position(
self,
userid: Optional[str] = None,
name: Optional[str] = None
) -> Optional[Dict[str, Any]]:
"""
根据企微 userid 或员工姓名获取员工岗位信息
Args:
userid: 企微员工 userid可选
name: 员工姓名(可选,支持模糊匹配)
Returns:
员工岗位信息字典,包含 employee_id, userid, name, positions
如果员工不存在返回 None
如果按姓名搜索有多个结果,返回列表
"""
query = (
select(User)
.options(selectinload(User.position_memberships).selectinload(PositionMember.position))
.where(User.is_deleted.is_(False))
)
# 优先按 wework_userid 精确匹配
if userid:
query = query.where(User.wework_userid == userid)
result = await self.db.execute(query)
user = result.scalar_one_or_none()
if user:
return self._build_employee_position_data(user)
# 其次按姓名匹配(支持精确匹配和模糊匹配)
if name:
# 先尝试精确匹配
exact_query = query.where(User.full_name == name)
result = await self.db.execute(exact_query)
users = result.scalars().all()
# 如果精确匹配没有结果,尝试模糊匹配
if not users:
fuzzy_query = query.where(User.full_name.ilike(f"%{name}%"))
result = await self.db.execute(fuzzy_query)
users = result.scalars().all()
if len(users) == 1:
return self._build_employee_position_data(users[0])
elif len(users) > 1:
# 多个匹配结果,返回列表供选择
return {
"multiple_matches": True,
"count": len(users),
"employees": [
{
"employee_id": u.id,
"userid": u.wework_userid,
"name": u.full_name or u.username,
"phone": u.phone[-4:] if u.phone else None # 只显示手机号后4位
}
for u in users
]
}
return None
def _build_employee_position_data(self, user: User) -> Dict[str, Any]:
"""构建员工岗位数据"""
positions = []
for i, pm in enumerate(user.position_memberships):
if pm.is_deleted or pm.position.is_deleted:
continue
positions.append({
"position_id": pm.position.id,
"position_name": pm.position.name,
"is_primary": i == 0, # 第一个为主岗位
"joined_at": pm.joined_at.strftime("%Y-%m-%d") if pm.joined_at else None
})
return {
"employee_id": user.id,
"userid": user.wework_userid,
"name": user.full_name or user.username,
"positions": positions
}
async def get_employee_position_by_id(self, employee_id: int) -> Optional[Dict[str, Any]]:
"""
根据员工ID获取岗位信息
Args:
employee_id: 员工IDusers表主键
Returns:
员工岗位信息字典
"""
result = await self.db.execute(
select(User)
.options(selectinload(User.position_memberships).selectinload(PositionMember.position))
.where(User.id == employee_id, User.is_deleted == False)
)
user = result.scalar_one_or_none()
if not user:
return None
return self._build_employee_position_data(user)
async def get_position_courses(
self,
position_id: int,
course_type: Optional[str] = None
) -> Optional[Dict[str, Any]]:
"""
获取指定岗位的课程列表
Args:
position_id: 岗位ID
course_type: 课程类型筛选required/optional/all
Returns:
岗位课程信息字典,包含 position_id, position_name, courses
如果岗位不存在返回 None
"""
# 查询岗位
position_result = await self.db.execute(
select(Position).where(Position.id == position_id, Position.is_deleted.is_(False))
)
position = position_result.scalar_one_or_none()
if not position:
return None
# 查询岗位课程关联
query = (
select(PositionCourse, Course)
.join(Course, PositionCourse.course_id == Course.id)
.where(
PositionCourse.position_id == position_id,
PositionCourse.is_deleted.is_(False),
Course.is_deleted.is_(False),
Course.status == "published" # 只返回已发布的课程
)
.order_by(PositionCourse.priority.desc())
)
# 课程类型筛选
if course_type and course_type != "all":
query = query.where(PositionCourse.course_type == course_type)
result = await self.db.execute(query)
pc_courses = result.all()
# 构建课程列表,并统计知识点数量
courses = []
for pc, course in pc_courses:
# 统计该课程的知识点数量
kp_count_result = await self.db.execute(
select(func.count(KnowledgePoint.id))
.where(
KnowledgePoint.course_id == course.id,
KnowledgePoint.is_deleted.is_(False)
)
)
kp_count = kp_count_result.scalar() or 0
courses.append({
"course_id": course.id,
"course_name": course.name,
"course_type": pc.course_type,
"priority": pc.priority,
"knowledge_point_count": kp_count
})
return {
"position_id": position.id,
"position_name": position.name,
"courses": courses
}
async def search_knowledge_points(
self,
keywords: List[str],
position_id: Optional[int] = None,
course_ids: Optional[List[int]] = None,
knowledge_type: Optional[str] = None,
limit: int = 10
) -> Dict[str, Any]:
"""
搜索知识点
Args:
keywords: 搜索关键词列表
position_id: 岗位ID用于优先排序
course_ids: 限定课程范围
knowledge_type: 知识点类型筛选
limit: 返回数量限制
Returns:
搜索结果字典,包含 total 和 items
"""
# 基础查询
query = (
select(KnowledgePoint, Course)
.join(Course, KnowledgePoint.course_id == Course.id)
.where(
KnowledgePoint.is_deleted.is_(False),
Course.is_deleted.is_(False),
Course.status == "published"
)
)
# 关键词搜索条件(在名称和描述中搜索)
keyword_conditions = []
for keyword in keywords:
keyword_conditions.append(
or_(
KnowledgePoint.name.ilike(f"%{keyword}%"),
KnowledgePoint.description.ilike(f"%{keyword}%")
)
)
if keyword_conditions:
query = query.where(or_(*keyword_conditions))
# 课程范围筛选
if course_ids:
query = query.where(KnowledgePoint.course_id.in_(course_ids))
# 知识点类型筛选
if knowledge_type:
query = query.where(KnowledgePoint.type == knowledge_type)
# 如果指定了岗位,优先返回该岗位相关课程的知识点
if position_id:
# 获取该岗位的课程ID列表
pos_course_result = await self.db.execute(
select(PositionCourse.course_id)
.where(
PositionCourse.position_id == position_id,
PositionCourse.is_deleted.is_(False)
)
)
pos_course_ids = [row[0] for row in pos_course_result.all()]
if pos_course_ids:
# 使用 CASE WHEN 进行排序:岗位相关课程优先
from sqlalchemy import case
priority_order = case(
(KnowledgePoint.course_id.in_(pos_course_ids), 0),
else_=1
)
query = query.order_by(priority_order, KnowledgePoint.id.desc())
else:
query = query.order_by(KnowledgePoint.id.desc())
else:
query = query.order_by(KnowledgePoint.id.desc())
# 执行查询
result = await self.db.execute(query.limit(limit))
kp_courses = result.all()
# 计算相关度分数(简单实现:匹配的关键词越多分数越高)
def calc_relevance(kp: KnowledgePoint) -> float:
text = f"{kp.name} {kp.description or ''}"
matched = sum(1 for kw in keywords if kw.lower() in text.lower())
return round(matched / len(keywords), 2) if keywords else 1.0
# 构建结果
items = []
for kp, course in kp_courses:
items.append({
"knowledge_point_id": kp.id,
"name": kp.name,
"course_id": course.id,
"course_name": course.name,
"type": kp.type,
"relevance_score": calc_relevance(kp)
})
# 按相关度分数排序
items.sort(key=lambda x: x["relevance_score"], reverse=True)
return {
"total": len(items),
"items": items
}
async def get_knowledge_point_detail(self, kp_id: int) -> Optional[Dict[str, Any]]:
"""
获取知识点详情
Args:
kp_id: 知识点ID
Returns:
知识点详情字典
如果知识点不存在返回 None
"""
# 查询知识点及关联的课程和资料
result = await self.db.execute(
select(KnowledgePoint, Course, CourseMaterial)
.join(Course, KnowledgePoint.course_id == Course.id)
.outerjoin(CourseMaterial, KnowledgePoint.material_id == CourseMaterial.id)
.where(
KnowledgePoint.id == kp_id,
KnowledgePoint.is_deleted.is_(False)
)
)
row = result.one_or_none()
if not row:
return None
kp, course, material = row
return {
"knowledge_point_id": kp.id,
"name": kp.name,
"course_id": course.id,
"course_name": course.name,
"type": kp.type,
"content": kp.description or "", # description 作为知识点内容
"material_id": material.id if material else None,
"material_type": material.file_type if material else None,
"material_url": material.file_url if material else None,
"topic_relation": kp.topic_relation,
"source": kp.source,
"created_at": kp.created_at.strftime("%Y-%m-%d %H:%M:%S") if kp.created_at else None
}