""" 智能学习推荐服务 基于用户能力评估、错题记录和学习历史推荐学习内容 """ import logging from datetime import datetime, timedelta from typing import List, Dict, Any, Optional from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, and_, func, desc from sqlalchemy.orm import selectinload from app.models.user import User from app.models.course import Course, CourseStatus, CourseMaterial, KnowledgePoint from app.models.exam import ExamResult from app.models.exam_mistake import ExamMistake from app.models.user_course_progress import UserCourseProgress, ProgressStatus from app.models.ability import AbilityAssessment logger = logging.getLogger(__name__) class RecommendationService: """ 智能学习推荐服务 推荐策略: 1. 基于错题分析:推荐与错题相关的知识点和课程 2. 基于能力评估:推荐弱项能力相关的课程 3. 基于学习进度:推荐未完成的课程继续学习 4. 基于热门课程:推荐学习人数多的课程 5. 基于岗位要求:推荐岗位必修课程 """ def __init__(self, db: AsyncSession): self.db = db async def get_recommendations( self, user_id: int, limit: int = 10, include_reasons: bool = True, ) -> List[Dict[str, Any]]: """ 获取个性化学习推荐 Args: user_id: 用户ID limit: 推荐数量上限 include_reasons: 是否包含推荐理由 Returns: 推荐课程列表,包含课程信息和推荐理由 """ recommendations = [] # 1. 基于错题推荐 mistake_recs = await self._get_mistake_based_recommendations(user_id) recommendations.extend(mistake_recs) # 2. 基于能力评估推荐 ability_recs = await self._get_ability_based_recommendations(user_id) recommendations.extend(ability_recs) # 3. 基于未完成课程推荐 progress_recs = await self._get_progress_based_recommendations(user_id) recommendations.extend(progress_recs) # 4. 基于热门课程推荐 popular_recs = await self._get_popular_recommendations(user_id) recommendations.extend(popular_recs) # 去重并排序 seen_course_ids = set() unique_recs = [] for rec in recommendations: if rec["course_id"] not in seen_course_ids: seen_course_ids.add(rec["course_id"]) unique_recs.append(rec) # 按优先级排序 priority_map = { "mistake": 1, "ability": 2, "progress": 3, "popular": 4, } unique_recs.sort(key=lambda x: priority_map.get(x.get("source", ""), 5)) # 限制数量 result = unique_recs[:limit] # 移除 source 字段如果不需要理由 if not include_reasons: for rec in result: rec.pop("source", None) rec.pop("reason", None) return result async def _get_mistake_based_recommendations( self, user_id: int, limit: int = 3, ) -> List[Dict[str, Any]]: """基于错题推荐""" recommendations = [] try: # 获取用户最近的错题 result = await self.db.execute( select(ExamMistake).where( ExamMistake.user_id == user_id ).order_by( desc(ExamMistake.created_at) ).limit(50) ) mistakes = result.scalars().all() if not mistakes: return recommendations # 统计错题涉及的知识点 knowledge_point_counts = {} for mistake in mistakes: if hasattr(mistake, 'knowledge_point_id') and mistake.knowledge_point_id: kp_id = mistake.knowledge_point_id knowledge_point_counts[kp_id] = knowledge_point_counts.get(kp_id, 0) + 1 if not knowledge_point_counts: return recommendations # 找出错误最多的知识点对应的课程 top_kp_ids = sorted( knowledge_point_counts.keys(), key=lambda x: knowledge_point_counts[x], reverse=True )[:5] course_result = await self.db.execute( select(Course, KnowledgePoint).join( KnowledgePoint, Course.id == KnowledgePoint.course_id ).where( and_( KnowledgePoint.id.in_(top_kp_ids), Course.status == CourseStatus.PUBLISHED, Course.is_deleted == False, ) ).distinct() ) for course, kp in course_result.all()[:limit]: recommendations.append({ "course_id": course.id, "course_name": course.name, "category": course.category.value if course.category else None, "cover_image": course.cover_image, "description": course.description, "source": "mistake", "reason": f"您在「{kp.name}」知识点上有错题,建议复习相关内容", }) except Exception as e: logger.error(f"基于错题推荐失败: {str(e)}") return recommendations async def _get_ability_based_recommendations( self, user_id: int, limit: int = 3, ) -> List[Dict[str, Any]]: """基于能力评估推荐""" recommendations = [] try: # 获取用户最近的能力评估 result = await self.db.execute( select(AbilityAssessment).where( AbilityAssessment.user_id == user_id ).order_by( desc(AbilityAssessment.created_at) ).limit(1) ) assessment = result.scalar_one_or_none() if not assessment: return recommendations # 解析能力评估结果,找出弱项 scores = {} if hasattr(assessment, 'dimension_scores') and assessment.dimension_scores: scores = assessment.dimension_scores elif hasattr(assessment, 'scores') and assessment.scores: scores = assessment.scores if not scores: return recommendations # 找出分数最低的维度 weak_dimensions = sorted( scores.items(), key=lambda x: x[1] if isinstance(x[1], (int, float)) else 0 )[:3] # 根据弱项维度推荐课程(按分类匹配) category_map = { "专业知识": "technology", "沟通能力": "business", "管理能力": "management", } for dim_name, score in weak_dimensions: if isinstance(score, (int, float)) and score < 70: category = category_map.get(dim_name) if category: course_result = await self.db.execute( select(Course).where( and_( Course.category == category, Course.status == CourseStatus.PUBLISHED, Course.is_deleted == False, ) ).order_by( desc(Course.student_count) ).limit(1) ) course = course_result.scalar_one_or_none() if course: recommendations.append({ "course_id": course.id, "course_name": course.name, "category": course.category.value if course.category else None, "cover_image": course.cover_image, "description": course.description, "source": "ability", "reason": f"您的「{dim_name}」能力评分较低({score}分),推荐学习此课程提升", }) except Exception as e: logger.error(f"基于能力评估推荐失败: {str(e)}") return recommendations[:limit] async def _get_progress_based_recommendations( self, user_id: int, limit: int = 3, ) -> List[Dict[str, Any]]: """基于学习进度推荐""" recommendations = [] try: # 获取未完成的课程 result = await self.db.execute( select(UserCourseProgress, Course).join( Course, UserCourseProgress.course_id == Course.id ).where( and_( UserCourseProgress.user_id == user_id, UserCourseProgress.status == ProgressStatus.IN_PROGRESS.value, Course.is_deleted == False, ) ).order_by( desc(UserCourseProgress.last_accessed_at) ).limit(limit) ) for progress, course in result.all(): recommendations.append({ "course_id": course.id, "course_name": course.name, "category": course.category.value if course.category else None, "cover_image": course.cover_image, "description": course.description, "progress_percent": progress.progress_percent, "source": "progress", "reason": f"继续学习,已完成 {progress.progress_percent:.0f}%", }) except Exception as e: logger.error(f"基于进度推荐失败: {str(e)}") return recommendations async def _get_popular_recommendations( self, user_id: int, limit: int = 3, ) -> List[Dict[str, Any]]: """基于热门课程推荐""" recommendations = [] try: # 获取用户已学习的课程ID learned_result = await self.db.execute( select(UserCourseProgress.course_id).where( UserCourseProgress.user_id == user_id ) ) learned_course_ids = [row[0] for row in learned_result.all()] # 获取热门课程(排除已学习的) query = select(Course).where( and_( Course.status == CourseStatus.PUBLISHED, Course.is_deleted == False, ) ).order_by( desc(Course.student_count) ).limit(limit + len(learned_course_ids)) result = await self.db.execute(query) courses = result.scalars().all() for course in courses: if course.id not in learned_course_ids: recommendations.append({ "course_id": course.id, "course_name": course.name, "category": course.category.value if course.category else None, "cover_image": course.cover_image, "description": course.description, "student_count": course.student_count, "source": "popular", "reason": f"热门课程,已有 {course.student_count} 人学习", }) if len(recommendations) >= limit: break except Exception as e: logger.error(f"基于热门推荐失败: {str(e)}") return recommendations async def get_knowledge_point_recommendations( self, user_id: int, limit: int = 5, ) -> List[Dict[str, Any]]: """ 获取知识点级别的推荐 基于错题和能力评估推荐具体的知识点 """ recommendations = [] try: # 获取错题涉及的知识点 mistake_result = await self.db.execute( select( KnowledgePoint, func.count(ExamMistake.id).label('mistake_count') ).join( ExamMistake, ExamMistake.knowledge_point_id == KnowledgePoint.id ).where( ExamMistake.user_id == user_id ).group_by( KnowledgePoint.id ).order_by( desc('mistake_count') ).limit(limit) ) for kp, count in mistake_result.all(): recommendations.append({ "knowledge_point_id": kp.id, "name": kp.name, "description": kp.description, "type": kp.type, "course_id": kp.course_id, "mistake_count": count, "reason": f"您在此知识点有 {count} 道错题,建议重点复习", }) except Exception as e: logger.error(f"知识点推荐失败: {str(e)}") return recommendations # 便捷函数 def get_recommendation_service(db: AsyncSession) -> RecommendationService: """获取推荐服务实例""" return RecommendationService(db)