- 从服务器拉取完整代码 - 按框架规范整理项目结构 - 配置 Drone CI 测试环境部署 - 包含后端(FastAPI)、前端(Vue3)、管理端 技术栈: Vue3 + TypeScript + FastAPI + MySQL
357 lines
12 KiB
Python
357 lines
12 KiB
Python
"""
|
||
SCRM 系统对接服务
|
||
|
||
提供给 SCRM 系统调用的数据查询服务
|
||
"""
|
||
|
||
import logging
|
||
from typing import List, Optional, Dict, Any
|
||
from sqlalchemy import select, func, or_
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
from sqlalchemy.orm import selectinload
|
||
|
||
from app.models.user import User
|
||
from app.models.position import Position
|
||
from app.models.position_member import PositionMember
|
||
from app.models.position_course import PositionCourse
|
||
from app.models.course import Course, KnowledgePoint, CourseMaterial
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class SCRMService:
|
||
"""SCRM 系统数据查询服务"""
|
||
|
||
def __init__(self, db: AsyncSession):
|
||
self.db = db
|
||
|
||
async def get_employee_position(
|
||
self,
|
||
userid: Optional[str] = None,
|
||
name: Optional[str] = None
|
||
) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
根据企微 userid 或员工姓名获取员工岗位信息
|
||
|
||
Args:
|
||
userid: 企微员工 userid(可选)
|
||
name: 员工姓名(可选,支持模糊匹配)
|
||
|
||
Returns:
|
||
员工岗位信息字典,包含 employee_id, userid, name, positions
|
||
如果员工不存在返回 None
|
||
如果按姓名搜索有多个结果,返回列表
|
||
"""
|
||
query = (
|
||
select(User)
|
||
.options(selectinload(User.position_memberships).selectinload(PositionMember.position))
|
||
.where(User.is_deleted.is_(False))
|
||
)
|
||
|
||
# 优先按 wework_userid 精确匹配
|
||
if userid:
|
||
query = query.where(User.wework_userid == userid)
|
||
result = await self.db.execute(query)
|
||
user = result.scalar_one_or_none()
|
||
|
||
if user:
|
||
return self._build_employee_position_data(user)
|
||
|
||
# 其次按姓名匹配(支持精确匹配和模糊匹配)
|
||
if name:
|
||
# 先尝试精确匹配
|
||
exact_query = query.where(User.full_name == name)
|
||
result = await self.db.execute(exact_query)
|
||
users = result.scalars().all()
|
||
|
||
# 如果精确匹配没有结果,尝试模糊匹配
|
||
if not users:
|
||
fuzzy_query = query.where(User.full_name.ilike(f"%{name}%"))
|
||
result = await self.db.execute(fuzzy_query)
|
||
users = result.scalars().all()
|
||
|
||
if len(users) == 1:
|
||
return self._build_employee_position_data(users[0])
|
||
elif len(users) > 1:
|
||
# 多个匹配结果,返回列表供选择
|
||
return {
|
||
"multiple_matches": True,
|
||
"count": len(users),
|
||
"employees": [
|
||
{
|
||
"employee_id": u.id,
|
||
"userid": u.wework_userid,
|
||
"name": u.full_name or u.username,
|
||
"phone": u.phone[-4:] if u.phone else None # 只显示手机号后4位
|
||
}
|
||
for u in users
|
||
]
|
||
}
|
||
|
||
return None
|
||
|
||
def _build_employee_position_data(self, user: User) -> Dict[str, Any]:
|
||
"""构建员工岗位数据"""
|
||
positions = []
|
||
for i, pm in enumerate(user.position_memberships):
|
||
if pm.is_deleted or pm.position.is_deleted:
|
||
continue
|
||
positions.append({
|
||
"position_id": pm.position.id,
|
||
"position_name": pm.position.name,
|
||
"is_primary": i == 0, # 第一个为主岗位
|
||
"joined_at": pm.joined_at.strftime("%Y-%m-%d") if pm.joined_at else None
|
||
})
|
||
|
||
return {
|
||
"employee_id": user.id,
|
||
"userid": user.wework_userid,
|
||
"name": user.full_name or user.username,
|
||
"positions": positions
|
||
}
|
||
|
||
async def get_employee_position_by_id(self, employee_id: int) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
根据员工ID获取岗位信息
|
||
|
||
Args:
|
||
employee_id: 员工ID(users表主键)
|
||
|
||
Returns:
|
||
员工岗位信息字典
|
||
"""
|
||
result = await self.db.execute(
|
||
select(User)
|
||
.options(selectinload(User.position_memberships).selectinload(PositionMember.position))
|
||
.where(User.id == employee_id, User.is_deleted == False)
|
||
)
|
||
user = result.scalar_one_or_none()
|
||
|
||
if not user:
|
||
return None
|
||
|
||
return self._build_employee_position_data(user)
|
||
|
||
async def get_position_courses(
|
||
self,
|
||
position_id: int,
|
||
course_type: Optional[str] = None
|
||
) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
获取指定岗位的课程列表
|
||
|
||
Args:
|
||
position_id: 岗位ID
|
||
course_type: 课程类型筛选(required/optional/all)
|
||
|
||
Returns:
|
||
岗位课程信息字典,包含 position_id, position_name, courses
|
||
如果岗位不存在返回 None
|
||
"""
|
||
# 查询岗位
|
||
position_result = await self.db.execute(
|
||
select(Position).where(Position.id == position_id, Position.is_deleted.is_(False))
|
||
)
|
||
position = position_result.scalar_one_or_none()
|
||
|
||
if not position:
|
||
return None
|
||
|
||
# 查询岗位课程关联
|
||
query = (
|
||
select(PositionCourse, Course)
|
||
.join(Course, PositionCourse.course_id == Course.id)
|
||
.where(
|
||
PositionCourse.position_id == position_id,
|
||
PositionCourse.is_deleted.is_(False),
|
||
Course.is_deleted.is_(False),
|
||
Course.status == "published" # 只返回已发布的课程
|
||
)
|
||
.order_by(PositionCourse.priority.desc())
|
||
)
|
||
|
||
# 课程类型筛选
|
||
if course_type and course_type != "all":
|
||
query = query.where(PositionCourse.course_type == course_type)
|
||
|
||
result = await self.db.execute(query)
|
||
pc_courses = result.all()
|
||
|
||
# 构建课程列表,并统计知识点数量
|
||
courses = []
|
||
for pc, course in pc_courses:
|
||
# 统计该课程的知识点数量
|
||
kp_count_result = await self.db.execute(
|
||
select(func.count(KnowledgePoint.id))
|
||
.where(
|
||
KnowledgePoint.course_id == course.id,
|
||
KnowledgePoint.is_deleted.is_(False)
|
||
)
|
||
)
|
||
kp_count = kp_count_result.scalar() or 0
|
||
|
||
courses.append({
|
||
"course_id": course.id,
|
||
"course_name": course.name,
|
||
"course_type": pc.course_type,
|
||
"priority": pc.priority,
|
||
"knowledge_point_count": kp_count
|
||
})
|
||
|
||
return {
|
||
"position_id": position.id,
|
||
"position_name": position.name,
|
||
"courses": courses
|
||
}
|
||
|
||
async def search_knowledge_points(
|
||
self,
|
||
keywords: List[str],
|
||
position_id: Optional[int] = None,
|
||
course_ids: Optional[List[int]] = None,
|
||
knowledge_type: Optional[str] = None,
|
||
limit: int = 10
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
搜索知识点
|
||
|
||
Args:
|
||
keywords: 搜索关键词列表
|
||
position_id: 岗位ID(用于优先排序)
|
||
course_ids: 限定课程范围
|
||
knowledge_type: 知识点类型筛选
|
||
limit: 返回数量限制
|
||
|
||
Returns:
|
||
搜索结果字典,包含 total 和 items
|
||
"""
|
||
# 基础查询
|
||
query = (
|
||
select(KnowledgePoint, Course)
|
||
.join(Course, KnowledgePoint.course_id == Course.id)
|
||
.where(
|
||
KnowledgePoint.is_deleted.is_(False),
|
||
Course.is_deleted.is_(False),
|
||
Course.status == "published"
|
||
)
|
||
)
|
||
|
||
# 关键词搜索条件(在名称和描述中搜索)
|
||
keyword_conditions = []
|
||
for keyword in keywords:
|
||
keyword_conditions.append(
|
||
or_(
|
||
KnowledgePoint.name.ilike(f"%{keyword}%"),
|
||
KnowledgePoint.description.ilike(f"%{keyword}%")
|
||
)
|
||
)
|
||
if keyword_conditions:
|
||
query = query.where(or_(*keyword_conditions))
|
||
|
||
# 课程范围筛选
|
||
if course_ids:
|
||
query = query.where(KnowledgePoint.course_id.in_(course_ids))
|
||
|
||
# 知识点类型筛选
|
||
if knowledge_type:
|
||
query = query.where(KnowledgePoint.type == knowledge_type)
|
||
|
||
# 如果指定了岗位,优先返回该岗位相关课程的知识点
|
||
if position_id:
|
||
# 获取该岗位的课程ID列表
|
||
pos_course_result = await self.db.execute(
|
||
select(PositionCourse.course_id)
|
||
.where(
|
||
PositionCourse.position_id == position_id,
|
||
PositionCourse.is_deleted.is_(False)
|
||
)
|
||
)
|
||
pos_course_ids = [row[0] for row in pos_course_result.all()]
|
||
|
||
if pos_course_ids:
|
||
# 使用 CASE WHEN 进行排序:岗位相关课程优先
|
||
from sqlalchemy import case
|
||
priority_order = case(
|
||
(KnowledgePoint.course_id.in_(pos_course_ids), 0),
|
||
else_=1
|
||
)
|
||
query = query.order_by(priority_order, KnowledgePoint.id.desc())
|
||
else:
|
||
query = query.order_by(KnowledgePoint.id.desc())
|
||
else:
|
||
query = query.order_by(KnowledgePoint.id.desc())
|
||
|
||
# 执行查询
|
||
result = await self.db.execute(query.limit(limit))
|
||
kp_courses = result.all()
|
||
|
||
# 计算相关度分数(简单实现:匹配的关键词越多分数越高)
|
||
def calc_relevance(kp: KnowledgePoint) -> float:
|
||
text = f"{kp.name} {kp.description or ''}"
|
||
matched = sum(1 for kw in keywords if kw.lower() in text.lower())
|
||
return round(matched / len(keywords), 2) if keywords else 1.0
|
||
|
||
# 构建结果
|
||
items = []
|
||
for kp, course in kp_courses:
|
||
items.append({
|
||
"knowledge_point_id": kp.id,
|
||
"name": kp.name,
|
||
"course_id": course.id,
|
||
"course_name": course.name,
|
||
"type": kp.type,
|
||
"relevance_score": calc_relevance(kp)
|
||
})
|
||
|
||
# 按相关度分数排序
|
||
items.sort(key=lambda x: x["relevance_score"], reverse=True)
|
||
|
||
return {
|
||
"total": len(items),
|
||
"items": items
|
||
}
|
||
|
||
async def get_knowledge_point_detail(self, kp_id: int) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
获取知识点详情
|
||
|
||
Args:
|
||
kp_id: 知识点ID
|
||
|
||
Returns:
|
||
知识点详情字典
|
||
如果知识点不存在返回 None
|
||
"""
|
||
# 查询知识点及关联的课程和资料
|
||
result = await self.db.execute(
|
||
select(KnowledgePoint, Course, CourseMaterial)
|
||
.join(Course, KnowledgePoint.course_id == Course.id)
|
||
.outerjoin(CourseMaterial, KnowledgePoint.material_id == CourseMaterial.id)
|
||
.where(
|
||
KnowledgePoint.id == kp_id,
|
||
KnowledgePoint.is_deleted.is_(False)
|
||
)
|
||
)
|
||
row = result.one_or_none()
|
||
|
||
if not row:
|
||
return None
|
||
|
||
kp, course, material = row
|
||
|
||
return {
|
||
"knowledge_point_id": kp.id,
|
||
"name": kp.name,
|
||
"course_id": course.id,
|
||
"course_name": course.name,
|
||
"type": kp.type,
|
||
"content": kp.description or "", # description 作为知识点内容
|
||
"material_id": material.id if material else None,
|
||
"material_type": material.file_type if material else None,
|
||
"material_url": material.file_url if material else None,
|
||
"topic_relation": kp.topic_relation,
|
||
"source": kp.source,
|
||
"created_at": kp.created_at.strftime("%Y-%m-%d %H:%M:%S") if kp.created_at else None
|
||
}
|
||
|