Files
012-kaopeilian/backend/app/api/v1/ability.py
yuliang_guo b2815ef3cb
Some checks are pending
continuous-integration/drone/push Build is running
feat: AI智能学习助手分析结果缓存到Redis
- 后端API添加Redis缓存,有效期4小时
- 支持force_refresh参数强制刷新缓存
- 前端自动加载使用缓存,手动刷新强制更新
- 返回from_cache字段标识数据来源

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-05 12:06:12 +08:00

243 lines
7.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
能力评估API接口
用于智能工牌数据分析、能力评估报告生成等
"""
import json
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.ext.asyncio import AsyncSession
from typing import List, Optional
from app.core.deps import get_current_user, get_db
from app.core.redis import get_redis_client
from app.models.user import User
from app.schemas.base import ResponseModel
from app.schemas.ability import AbilityAssessmentResponse, AbilityAssessmentHistory
from app.services.yanji_service import YanjiService
from app.services.ability_assessment_service import get_ability_assessment_service
import logging
logger = logging.getLogger(__name__)
router = APIRouter()
# Redis 缓存配置
ABILITY_CACHE_KEY_PREFIX = "ability:assessment:"
ABILITY_CACHE_TTL = 4 * 60 * 60 # 4小时缓存
@router.post("/analyze-yanji", response_model=ResponseModel)
async def analyze_yanji_badge_data(
force_refresh: bool = Query(False, description="强制刷新缓存"),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
分析智能工牌数据生成能力评估和课程推荐
使用 Python 原生 AI 服务实现结果缓存4小时。
功能说明:
1. 从言迹智能工牌获取员工的最近10条录音记录
2. 分析对话数据进行能力评估6个维度
3. 基于能力短板生成课程推荐3-5门
4. 保存评估记录到数据库
5. 缓存结果到Redis4小时有效期
要求:
- 用户必须已绑定手机号(用于匹配言迹数据)
参数:
- force_refresh: 是否强制刷新缓存默认False
返回:
- assessment_id: 评估记录ID
- total_score: 综合评分0-100
- dimensions: 能力维度列表6个维度
- recommended_courses: 推荐课程列表3-5门
- conversation_count: 分析的对话数量
- from_cache: 是否来自缓存
"""
# 检查用户是否绑定手机号
if not current_user.phone:
logger.warning(f"用户未绑定手机号: user_id={current_user.id}")
raise HTTPException(
status_code=400,
detail="用户未绑定手机号,无法匹配言迹数据"
)
# Redis 缓存键
cache_key = f"{ABILITY_CACHE_KEY_PREFIX}{current_user.id}"
# 尝试从缓存获取(除非强制刷新)
if not force_refresh:
try:
redis = get_redis_client()
cached_data = await redis.get(cache_key)
if cached_data:
result = json.loads(cached_data)
result['from_cache'] = True
logger.info(
f"从缓存返回能力评估结果: user_id={current_user.id}, "
f"assessment_id={result.get('assessment_id')}"
)
return ResponseModel(
code=200,
message="智能工牌数据分析完成(缓存)",
data=result
)
except Exception as e:
logger.warning(f"读取缓存失败,将重新分析: {e}")
# 获取服务实例
yanji_service = YanjiService()
assessment_service = get_ability_assessment_service()
try:
logger.info(
f"开始分析智能工牌数据: user_id={current_user.id}, "
f"phone={current_user.phone}, force_refresh={force_refresh}"
)
# 调用能力评估服务(使用 Python 原生实现)
result = await assessment_service.analyze_yanji_conversations(
user_id=current_user.id,
phone=current_user.phone,
db=db,
yanji_service=yanji_service,
engine="v2" # 固定使用 V2
)
# 缓存结果到 Redis
try:
redis = get_redis_client()
# 序列化时处理 datetime 对象
cache_data = {
"assessment_id": result["assessment_id"],
"total_score": result["total_score"],
"dimensions": result["dimensions"],
"recommended_courses": result["recommended_courses"],
"conversation_count": result["conversation_count"],
"analyzed_at": result["analyzed_at"].isoformat() if result.get("analyzed_at") else None,
"engine": result.get("engine"),
}
await redis.setex(cache_key, ABILITY_CACHE_TTL, json.dumps(cache_data, ensure_ascii=False))
logger.info(f"能力评估结果已缓存: user_id={current_user.id}, ttl={ABILITY_CACHE_TTL}s")
except Exception as e:
logger.warning(f"缓存结果失败: {e}")
result['from_cache'] = False
logger.info(
f"智能工牌数据分析完成: user_id={current_user.id}, "
f"assessment_id={result['assessment_id']}, "
f"total_score={result['total_score']}"
)
return ResponseModel(
code=200,
message="智能工牌数据分析完成",
data=result
)
except ValueError as e:
# 业务逻辑错误(如未找到录音记录)
logger.warning(f"智能工牌数据分析失败: {e}")
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
# 系统错误
logger.error(f"分析智能工牌数据失败: {e}", exc_info=True)
raise HTTPException(
status_code=500,
detail=f"分析失败: {str(e)}"
)
@router.get("/history", response_model=ResponseModel)
async def get_assessment_history(
limit: int = Query(default=10, ge=1, le=50, description="返回记录数量"),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
获取用户的能力评估历史记录
参数:
- limit: 返回记录数量默认10最大50
返回:
- 评估历史记录列表
"""
assessment_service = get_ability_assessment_service()
try:
history = await assessment_service.get_user_assessment_history(
user_id=current_user.id,
db=db,
limit=limit
)
return ResponseModel(
code=200,
message=f"获取评估历史成功,共{len(history)}",
data={"history": history, "total": len(history)}
)
except Exception as e:
logger.error(f"获取评估历史失败: {e}", exc_info=True)
raise HTTPException(
status_code=500,
detail=f"获取评估历史失败: {str(e)}"
)
@router.get("/{assessment_id}", response_model=ResponseModel)
async def get_assessment_detail(
assessment_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
获取单个评估记录的详细信息
参数:
- assessment_id: 评估记录ID
返回:
- 评估详细信息
"""
assessment_service = get_ability_assessment_service()
try:
detail = await assessment_service.get_assessment_detail(
assessment_id=assessment_id,
db=db
)
# 权限检查:只能查看自己的评估记录
if detail['user_id'] != current_user.id:
raise HTTPException(
status_code=403,
detail="无权访问该评估记录"
)
return ResponseModel(
code=200,
message="获取评估详情成功",
data=detail
)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
except HTTPException:
raise
except Exception as e:
logger.error(f"获取评估详情失败: {e}", exc_info=True)
raise HTTPException(
status_code=500,
detail=f"获取评估详情失败: {str(e)}"
)