#!/usr/bin/env python3 """ 创建团队管理页面测试数据 """ import asyncio import sys from datetime import datetime, timedelta from pathlib import Path import random # 添加项目根目录到 Python 路径 sys.path.insert(0, str(Path(__file__).parent)) from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.core.database import engine, AsyncSessionLocal from app.models.user import User, Team, UserTeam from app.models.position import Position from app.models.position_member import PositionMember from app.models.position_course import PositionCourse from app.models.course import Course from app.models.exam import Exam from app.models.practice import PracticeSession, PracticeReport from passlib.context import CryptContext pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") async def create_team_members(): """创建团队成员数据""" async with AsyncSessionLocal() as db: try: # 1. 检查或创建团队 result = await db.execute(select(Team).where(Team.name == "轻医美销售团队")) team = result.scalar_one_or_none() if not team: team = Team( name="轻医美销售团队", code="SALES_TEAM_001", description="负责轻医美产品销售的核心团队", leader_id=1, # admin created_at=datetime.now(), updated_at=datetime.now() ) db.add(team) await db.flush() print(f"✅ 创建团队: {team.name}") else: print(f"ℹ️ 团队已存在: {team.name}") # 2. 检查或创建岗位 positions_data = [ {"name": "销售专员", "code": "POS_SALES_001", "description": "负责客户接待和产品销售"}, {"name": "销售主管", "code": "POS_MANAGER_001", "description": "负责团队管理和业绩指导"}, {"name": "高级顾问", "code": "POS_CONSULTANT_001", "description": "负责高端客户维护和方案设计"}, ] positions = {} for pos_data in positions_data: result = await db.execute( select(Position).where(Position.name == pos_data["name"]) ) position = result.scalar_one_or_none() if not position: position = Position( name=pos_data["name"], code=pos_data["code"], description=pos_data["description"], created_at=datetime.now(), updated_at=datetime.now() ) db.add(position) await db.flush() print(f"✅ 创建岗位: {position.name}") else: print(f"ℹ️ 岗位已存在: {position.name}") positions[pos_data["name"]] = position # 3. 检查现有课程 result = await db.execute(select(Course).limit(5)) courses = result.scalars().all() if not courses: print("⚠️ 警告:没有课程数据,跳过岗位课程分配") course_ids = [] else: course_ids = [c.id for c in courses] print(f"ℹ️ 找到 {len(courses)} 门课程") # 4. 为岗位分配课程 if course_ids: for position in positions.values(): # 每个岗位分配2-4门课程 assigned_courses = random.sample(course_ids, min(random.randint(2, 4), len(course_ids))) for course_id in assigned_courses: result = await db.execute( select(PositionCourse).where( PositionCourse.position_id == position.id, PositionCourse.course_id == course_id ) ) if not result.scalar_one_or_none(): pc = PositionCourse( position_id=position.id, course_id=course_id, course_type="required" ) db.add(pc) print(f"✅ 为岗位分配了课程") # 5. 创建团队成员 members_data = [ {"username": "zhangsan", "real_name": "张三", "email": "zhangsan@example.com", "position": "销售专员", "days_ago": 15}, {"username": "lisi", "real_name": "李四", "email": "lisi@example.com", "position": "销售专员", "days_ago": 25}, {"username": "wangwu", "real_name": "王五", "email": "wangwu@example.com", "position": "销售主管", "days_ago": 5}, {"username": "zhaoliu", "real_name": "赵六", "email": "zhaoliu@example.com", "position": "高级顾问", "days_ago": 8}, {"username": "sunqi", "real_name": "孙七", "email": "sunqi@example.com", "position": "销售专员", "days_ago": 60}, {"username": "zhouba", "real_name": "周八", "email": "zhouba@example.com", "position": "销售主管", "days_ago": 3}, {"username": "wujiu", "real_name": "吴九", "email": "wujiu@example.com", "position": "高级顾问", "days_ago": 12}, {"username": "zhengshi", "real_name": "郑十", "email": "zhengshi@example.com", "position": "销售专员", "days_ago": 45}, ] created_users = [] for member_data in members_data: # 检查用户是否存在 result = await db.execute( select(User).where(User.username == member_data["username"]) ) user = result.scalar_one_or_none() if not user: user = User( username=member_data["username"], hashed_password=pwd_context.hash("Pass123!"), email=member_data["email"], full_name=member_data["real_name"], role="trainee", phone=f"138{random.randint(10000000, 99999999)}", is_active=True, last_login_at=datetime.now() - timedelta(days=member_data["days_ago"]), created_at=datetime.now() - timedelta(days=member_data["days_ago"] + 30), updated_at=datetime.now() ) db.add(user) await db.flush() print(f"✅ 创建用户: {user.full_name} ({user.username})") else: print(f"ℹ️ 用户已存在: {user.full_name} ({user.username})") created_users.append((user, member_data)) # 添加到团队 result = await db.execute( select(UserTeam).where( UserTeam.user_id == user.id, UserTeam.team_id == team.id ) ) if not result.scalar_one_or_none(): user_team = UserTeam( user_id=user.id, team_id=team.id, joined_at=datetime.now() - timedelta(days=member_data["days_ago"]) ) db.add(user_team) # 分配岗位 position = positions[member_data["position"]] result = await db.execute( select(PositionMember).where( PositionMember.user_id == user.id, PositionMember.position_id == position.id ) ) if not result.scalar_one_or_none(): position_member = PositionMember( position_id=position.id, user_id=user.id, joined_at=datetime.now() - timedelta(days=member_data["days_ago"]) ) db.add(position_member) print(f"✅ 创建了 {len(created_users)} 个团队成员") # 6. 为成员创建考试记录 if course_ids: for user, member_data in created_users: # 每个用户完成1-3门课程的考试 exam_count = random.randint(1, min(3, len(course_ids))) exam_courses = random.sample(course_ids, exam_count) for course_id in exam_courses: # 创建1-2次考试记录 for _ in range(random.randint(1, 2)): days_ago = random.randint(1, member_data["days_ago"]) score = random.randint(60, 98) exam = Exam( user_id=user.id, course_id=course_id, status="completed", round1_score=score, round2_score=score + random.randint(0, 5) if score < 90 else None, round3_score=None, duration_minutes=random.randint(10, 30), start_time=datetime.now() - timedelta(days=days_ago, hours=random.randint(0, 8)), end_time=datetime.now() - timedelta(days=days_ago, hours=random.randint(0, 8)) ) db.add(exam) print(f"✅ 为成员创建了考试记录") # 7. 为成员创建陪练记录 for user, member_data in created_users: # 活跃用户(最近30天登录)创建陪练记录 if member_data["days_ago"] <= 30: practice_count = random.randint(2, 5) for _ in range(practice_count): days_ago = random.randint(1, min(member_data["days_ago"], 25)) duration = random.randint(300, 1800) # 5-30分钟 session = PracticeSession( session_id=f"PS{user.id}{random.randint(1000, 9999)}", user_id=user.id, scene_id=random.randint(1, 5), # 假设有5个场景 status="completed", duration_seconds=duration, turns=random.randint(8, 20), start_time=datetime.now() - timedelta(days=days_ago, hours=random.randint(0, 8)), end_time=datetime.now() - timedelta(days=days_ago, hours=random.randint(0, 8)) ) db.add(session) await db.flush() # 创建陪练报告 total_score = random.randint(75, 95) report = PracticeReport( session_id=session.session_id, total_score=total_score, score_breakdown={ "communication": random.randint(70, 95), "product_knowledge": random.randint(70, 95), "sales_skill": random.randint(70, 95), "service_attitude": random.randint(75, 98), "problem_handling": random.randint(70, 92) }, ability_dimensions={ "沟通表达": random.randint(75, 95), "产品知识": random.randint(70, 92), "销售技巧": random.randint(72, 90), "服务态度": random.randint(80, 98), "应变能力": random.randint(70, 88), "专业素养": random.randint(75, 92) }, suggestions=["态度积极", "表达清晰", "建议加强产品知识学习"] ) db.add(report) print(f"✅ 为活跃成员创建了陪练记录") await db.commit() print("\n🎉 所有测试数据创建完成!") print(f"\n团队:{team.name}") print(f"成员数:{len(created_users)}") print(f"岗位数:{len(positions)}") print("\n测试账号信息:") print("=" * 50) for user, _ in created_users[:3]: # 显示前3个 print(f"用户名: {user.username}") print(f"密码: Pass123!") print(f"姓名: {user.full_name}") print("-" * 50) except Exception as e: await db.rollback() print(f"\n❌ 错误: {e}") import traceback traceback.print_exc() raise if __name__ == "__main__": print("开始创建团队管理测试数据...\n") asyncio.run(create_team_members())