Files
012-kaopeilian/backend/scripts/seed_statistics_demo_data.py
111 998211c483 feat: 初始化考培练系统项目
- 从服务器拉取完整代码
- 按框架规范整理项目结构
- 配置 Drone CI 测试环境部署
- 包含后端(FastAPI)、前端(Vue3)、管理端

技术栈: Vue3 + TypeScript + FastAPI + MySQL
2026-01-24 19:33:28 +08:00

409 lines
14 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python
"""
为统计分析页面生成轻医美场景的模拟数据
生成的数据包括:
1. 考试记录exams- 不同时间段、不同课程、不同分数
2. 错题记录exam_mistakes- 不同难度、不同知识点
3. 陪练会话practice_sessions- 不同时间的陪练记录
4. 知识点数据knowledge_points- 轻医美相关知识点
目标:确保统计分析页面的每个模块都能显示数据
"""
import sys
import asyncio
from datetime import datetime, timedelta
import random
from pathlib import Path
# 添加项目路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select, func
from app.models.exam import Exam, Question
from app.models.exam_mistake import ExamMistake
from app.models.course import Course, KnowledgePoint
from app.models.practice import PracticeSession
from app.models.user import User
from app.core.logger import get_logger
logger = get_logger(__name__)
# 数据库连接(开发测试环境)
DATABASE_URL = "mysql+aiomysql://root:nj861021@localhost:3306/kaopeilian"
# 轻医美知识点数据
BEAUTY_KNOWLEDGE_POINTS = {
"皮肤生理学基础": [
"皮肤结构与层次",
"皮肤类型与特征",
"皮肤屏障功能",
"皮肤老化机制",
"皮肤色素形成",
"皮肤水分平衡"
],
"医美产品知识与应用": [
"透明质酸的应用",
"肉毒素的作用机理",
"光子嫩肤原理",
"果酸焕肤技术",
"维生素C美白",
"胶原蛋白补充"
],
"美容仪器操作与维护": [
"超声刀操作流程",
"热玛吉治疗参数",
"皮秒激光使用",
"射频美容仪器",
"冷光美肤仪",
"水光注射仪"
],
"轻医美销售技巧": [
"客户需求分析",
"项目推荐话术",
"价格异议处理",
"成交技巧",
"客户关系维护",
"套餐设计方法"
],
"客户服务与投诉处理": [
"服务标准流程",
"投诉应对技巧",
"客户期望管理",
"售后跟踪服务",
"客户满意度提升",
"危机公关处理"
]
}
# 难度级别
DIFFICULTY_LEVELS = ["easy", "medium", "hard"]
async def clear_old_demo_data(db: AsyncSession, user_id: int):
"""清理旧的演示数据"""
logger.info(f"清理用户 {user_id} 的旧演示数据...")
# 清理考试记录(会级联删除错题记录)
from sqlalchemy import delete
await db.execute(delete(Exam).where(Exam.user_id == user_id))
# 清理陪练会话
await db.execute(delete(PracticeSession).where(PracticeSession.user_id == user_id))
await db.commit()
logger.info("旧数据清理完成")
async def get_or_create_knowledge_points(db: AsyncSession, course_id: int, course_name: str) -> list:
"""获取或创建知识点"""
# 检查是否已有知识点
result = await db.execute(
select(KnowledgePoint).where(
KnowledgePoint.course_id == course_id,
KnowledgePoint.is_deleted == False
)
)
existing_kps = result.scalars().all()
if existing_kps:
logger.info(f"课程 {course_name} 已有 {len(existing_kps)} 个知识点")
return existing_kps
# 创建新知识点
knowledge_points = []
if course_name in BEAUTY_KNOWLEDGE_POINTS:
for kp_name in BEAUTY_KNOWLEDGE_POINTS[course_name]:
kp = KnowledgePoint(
course_id=course_id,
name=kp_name,
description=f"{course_name}中的重要知识点:{kp_name}",
type="核心概念",
source=0, # 手动创建
is_deleted=False
)
db.add(kp)
knowledge_points.append(kp)
await db.commit()
# 刷新以获取ID
for kp in knowledge_points:
await db.refresh(kp)
logger.info(f"为课程 {course_name} 创建了 {len(knowledge_points)} 个知识点")
return knowledge_points
async def create_exam_records(db: AsyncSession, user_id: int, courses: list):
"""创建考试记录"""
logger.info("创建考试记录...")
end_date = datetime.now()
exams_created = 0
# 在过去60天内创建考试记录
for days_ago in range(60, 0, -1):
exam_date = end_date - timedelta(days=days_ago)
# 跳过一些日期(不是每天都考试)
if random.random() > 0.3: # 30%的日子有考试
continue
# 每天可能考1-2次
num_exams = random.choice([1, 1, 1, 2])
for _ in range(num_exams):
# 随机选择课程
course = random.choice(courses)
# 生成考试分数(呈现进步趋势)
# 早期分数较低,后期分数较高
progress_factor = (60 - days_ago) / 60 # 0 到 1
base_score = 60 + (progress_factor * 20) # 60-80分基础
score_variance = random.uniform(-10, 15)
round1_score = max(50, min(100, base_score + score_variance))
# 创建考试记录
exam = Exam(
user_id=user_id,
course_id=course.id,
exam_name=f"{course.name}测试",
question_count=10,
total_score=100.0,
pass_score=60.0,
start_time=exam_date,
end_time=exam_date + timedelta(minutes=random.randint(15, 45)),
duration_minutes=random.randint(15, 45),
round1_score=round(round1_score, 1),
round2_score=None,
round3_score=None,
score=round(round1_score, 1),
is_passed=round1_score >= 60,
status="submitted"
)
db.add(exam)
exams_created += 1
await db.commit()
logger.info(f"创建了 {exams_created} 条考试记录")
return exams_created
async def create_exam_mistakes(db: AsyncSession, user_id: int, courses: list):
"""创建错题记录"""
logger.info("创建错题记录...")
# 获取用户的所有考试
result = await db.execute(
select(Exam).where(Exam.user_id == user_id).order_by(Exam.start_time)
)
exams = result.scalars().all()
mistakes_created = 0
for exam in exams:
# 找到对应的课程
course = next((c for c in courses if c.id == exam.course_id), None)
if not course:
continue
# 获取该课程的知识点
result = await db.execute(
select(KnowledgePoint).where(
KnowledgePoint.course_id == course.id,
KnowledgePoint.is_deleted == False
)
)
knowledge_points = result.scalars().all()
if not knowledge_points:
continue
# 根据分数决定错题数(分数越低,错题越多)
score = exam.round1_score or 70
mistake_rate = (100 - score) / 100 # 0.0 到 0.5
num_mistakes = int(exam.question_count * mistake_rate)
num_mistakes = max(1, min(num_mistakes, exam.question_count - 1))
# 创建错题
for i in range(num_mistakes):
# 随机选择知识点
kp = random.choice(knowledge_points)
# 随机选择题型
question_types = ["single_choice", "multiple_choice", "true_false", "fill_blank", "essay"]
question_type = random.choice(question_types)
mistake = ExamMistake(
user_id=user_id,
exam_id=exam.id,
question_id=None, # AI生成的题目
knowledge_point_id=kp.id,
question_content=f"关于{kp.name}的问题{i+1}",
correct_answer="正确答案",
user_answer="用户错误答案",
question_type=question_type
)
db.add(mistake)
mistakes_created += 1
await db.commit()
logger.info(f"创建了 {mistakes_created} 条错题记录")
return mistakes_created
async def create_practice_sessions(db: AsyncSession, user_id: int):
"""创建陪练会话记录"""
logger.info("创建陪练会话记录...")
end_date = datetime.now()
sessions_created = 0
# 在过去60天内创建陪练记录
for days_ago in range(60, 0, -1):
session_date = end_date - timedelta(days=days_ago)
# 跳过一些日期
if random.random() > 0.25: # 25%的日子有陪练
continue
# 每次陪练的时长(秒)
duration_seconds = random.randint(600, 1800) # 10-30分钟
# 场景类型
scene_types = ["电话销售", "面对面咨询", "客户投诉处理", "售后服务", "产品介绍"]
scene_name = random.choice(scene_types)
session = PracticeSession(
session_id=f"session_{user_id}_{int(session_date.timestamp())}",
user_id=user_id,
scene_id=random.randint(1, 5),
scene_name=scene_name,
scene_type=scene_name,
start_time=session_date,
end_time=session_date + timedelta(seconds=duration_seconds),
duration_seconds=duration_seconds,
turns=random.randint(10, 30),
status="completed",
is_deleted=False
)
db.add(session)
sessions_created += 1
await db.commit()
logger.info(f"创建了 {sessions_created} 条陪练会话记录")
return sessions_created
async def main():
"""主函数"""
logger.info("=" * 60)
logger.info("开始为统计分析页面生成轻医美场景的模拟数据")
logger.info("=" * 60)
# 创建数据库连接
engine = create_async_engine(DATABASE_URL, echo=False)
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
try:
async with async_session() as db:
# 1. 获取测试用户admin或第一个用户
result = await db.execute(
select(User).where(User.username == "admin")
)
user = result.scalar_one_or_none()
if not user:
# 如果没有admin使用第一个用户
result = await db.execute(select(User).limit(1))
user = result.scalar_one_or_none()
if not user:
logger.error("❌ 未找到用户,请先创建用户")
return
logger.info(f"📝 使用用户: {user.username} (ID: {user.id})")
# 2. 获取轻医美相关课程
result = await db.execute(
select(Course).where(
Course.is_deleted == False,
Course.status == "published"
)
)
courses = result.scalars().all()
if not courses:
logger.error("❌ 未找到已发布的课程")
return
logger.info(f"📚 找到 {len(courses)} 门课程")
# 3. 清理旧数据(可选)
clear_old = input("\n是否清理该用户的旧数据?(y/n): ").lower()
if clear_old == 'y':
await clear_old_demo_data(db, user.id)
# 4. 为每门课程创建知识点
logger.info("\n" + "=" * 60)
logger.info("步骤 1/3: 创建知识点")
logger.info("=" * 60)
for course in courses:
await get_or_create_knowledge_points(db, course.id, course.name)
# 5. 创建考试记录
logger.info("\n" + "=" * 60)
logger.info("步骤 2/3: 创建考试记录")
logger.info("=" * 60)
exams_count = await create_exam_records(db, user.id, courses)
# 6. 创建错题记录
logger.info("\n" + "=" * 60)
logger.info("步骤 3/3: 创建错题记录")
logger.info("=" * 60)
mistakes_count = await create_exam_mistakes(db, user.id, courses)
# 7. 创建陪练会话记录
logger.info("\n" + "=" * 60)
logger.info("步骤 4/4: 创建陪练会话记录")
logger.info("=" * 60)
sessions_count = await create_practice_sessions(db, user.id)
# 8. 统计信息
logger.info("\n" + "=" * 60)
logger.info("✅ 数据生成完成!")
logger.info("=" * 60)
logger.info(f"用户: {user.username} (ID: {user.id})")
logger.info(f"考试记录: {exams_count}")
logger.info(f"错题记录: {mistakes_count}")
logger.info(f"陪练记录: {sessions_count}")
logger.info("=" * 60)
logger.info("\n现在可以访问统计分析页面查看数据:")
logger.info("http://localhost:5173/analysis/statistics")
logger.info("=" * 60)
except Exception as e:
logger.error(f"❌ 生成数据失败: {e}")
import traceback
traceback.print_exc()
finally:
await engine.dispose()
if __name__ == "__main__":
asyncio.run(main())