Files
012-kaopeilian/backend/create_team_data.py
111 998211c483 feat: 初始化考培练系统项目
- 从服务器拉取完整代码
- 按框架规范整理项目结构
- 配置 Drone CI 测试环境部署
- 包含后端(FastAPI)、前端(Vue3)、管理端

技术栈: Vue3 + TypeScript + FastAPI + MySQL
2026-01-24 19:33:28 +08:00

288 lines
13 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
创建团队管理页面测试数据
"""
import asyncio
import sys
from datetime import datetime, timedelta
from pathlib import Path
import random
# 添加项目根目录到 Python 路径
sys.path.insert(0, str(Path(__file__).parent))
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import engine, AsyncSessionLocal
from app.models.user import User, Team, UserTeam
from app.models.position import Position
from app.models.position_member import PositionMember
from app.models.position_course import PositionCourse
from app.models.course import Course
from app.models.exam import Exam
from app.models.practice import PracticeSession, PracticeReport
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
async def create_team_members():
"""创建团队成员数据"""
async with AsyncSessionLocal() as db:
try:
# 1. 检查或创建团队
result = await db.execute(select(Team).where(Team.name == "轻医美销售团队"))
team = result.scalar_one_or_none()
if not team:
team = Team(
name="轻医美销售团队",
code="SALES_TEAM_001",
description="负责轻医美产品销售的核心团队",
leader_id=1, # admin
created_at=datetime.now(),
updated_at=datetime.now()
)
db.add(team)
await db.flush()
print(f"✅ 创建团队: {team.name}")
else:
print(f" 团队已存在: {team.name}")
# 2. 检查或创建岗位
positions_data = [
{"name": "销售专员", "code": "POS_SALES_001", "description": "负责客户接待和产品销售"},
{"name": "销售主管", "code": "POS_MANAGER_001", "description": "负责团队管理和业绩指导"},
{"name": "高级顾问", "code": "POS_CONSULTANT_001", "description": "负责高端客户维护和方案设计"},
]
positions = {}
for pos_data in positions_data:
result = await db.execute(
select(Position).where(Position.name == pos_data["name"])
)
position = result.scalar_one_or_none()
if not position:
position = Position(
name=pos_data["name"],
code=pos_data["code"],
description=pos_data["description"],
created_at=datetime.now(),
updated_at=datetime.now()
)
db.add(position)
await db.flush()
print(f"✅ 创建岗位: {position.name}")
else:
print(f" 岗位已存在: {position.name}")
positions[pos_data["name"]] = position
# 3. 检查现有课程
result = await db.execute(select(Course).limit(5))
courses = result.scalars().all()
if not courses:
print("⚠️ 警告:没有课程数据,跳过岗位课程分配")
course_ids = []
else:
course_ids = [c.id for c in courses]
print(f" 找到 {len(courses)} 门课程")
# 4. 为岗位分配课程
if course_ids:
for position in positions.values():
# 每个岗位分配2-4门课程
assigned_courses = random.sample(course_ids, min(random.randint(2, 4), len(course_ids)))
for course_id in assigned_courses:
result = await db.execute(
select(PositionCourse).where(
PositionCourse.position_id == position.id,
PositionCourse.course_id == course_id
)
)
if not result.scalar_one_or_none():
pc = PositionCourse(
position_id=position.id,
course_id=course_id,
course_type="required"
)
db.add(pc)
print(f"✅ 为岗位分配了课程")
# 5. 创建团队成员
members_data = [
{"username": "zhangsan", "real_name": "张三", "email": "zhangsan@example.com", "position": "销售专员", "days_ago": 15},
{"username": "lisi", "real_name": "李四", "email": "lisi@example.com", "position": "销售专员", "days_ago": 25},
{"username": "wangwu", "real_name": "王五", "email": "wangwu@example.com", "position": "销售主管", "days_ago": 5},
{"username": "zhaoliu", "real_name": "赵六", "email": "zhaoliu@example.com", "position": "高级顾问", "days_ago": 8},
{"username": "sunqi", "real_name": "孙七", "email": "sunqi@example.com", "position": "销售专员", "days_ago": 60},
{"username": "zhouba", "real_name": "周八", "email": "zhouba@example.com", "position": "销售主管", "days_ago": 3},
{"username": "wujiu", "real_name": "吴九", "email": "wujiu@example.com", "position": "高级顾问", "days_ago": 12},
{"username": "zhengshi", "real_name": "郑十", "email": "zhengshi@example.com", "position": "销售专员", "days_ago": 45},
]
created_users = []
for member_data in members_data:
# 检查用户是否存在
result = await db.execute(
select(User).where(User.username == member_data["username"])
)
user = result.scalar_one_or_none()
if not user:
user = User(
username=member_data["username"],
hashed_password=pwd_context.hash("Pass123!"),
email=member_data["email"],
full_name=member_data["real_name"],
role="trainee",
phone=f"138{random.randint(10000000, 99999999)}",
is_active=True,
last_login_at=datetime.now() - timedelta(days=member_data["days_ago"]),
created_at=datetime.now() - timedelta(days=member_data["days_ago"] + 30),
updated_at=datetime.now()
)
db.add(user)
await db.flush()
print(f"✅ 创建用户: {user.full_name} ({user.username})")
else:
print(f" 用户已存在: {user.full_name} ({user.username})")
created_users.append((user, member_data))
# 添加到团队
result = await db.execute(
select(UserTeam).where(
UserTeam.user_id == user.id,
UserTeam.team_id == team.id
)
)
if not result.scalar_one_or_none():
user_team = UserTeam(
user_id=user.id,
team_id=team.id,
joined_at=datetime.now() - timedelta(days=member_data["days_ago"])
)
db.add(user_team)
# 分配岗位
position = positions[member_data["position"]]
result = await db.execute(
select(PositionMember).where(
PositionMember.user_id == user.id,
PositionMember.position_id == position.id
)
)
if not result.scalar_one_or_none():
position_member = PositionMember(
position_id=position.id,
user_id=user.id,
joined_at=datetime.now() - timedelta(days=member_data["days_ago"])
)
db.add(position_member)
print(f"✅ 创建了 {len(created_users)} 个团队成员")
# 6. 为成员创建考试记录
if course_ids:
for user, member_data in created_users:
# 每个用户完成1-3门课程的考试
exam_count = random.randint(1, min(3, len(course_ids)))
exam_courses = random.sample(course_ids, exam_count)
for course_id in exam_courses:
# 创建1-2次考试记录
for _ in range(random.randint(1, 2)):
days_ago = random.randint(1, member_data["days_ago"])
score = random.randint(60, 98)
exam = Exam(
user_id=user.id,
course_id=course_id,
status="completed",
round1_score=score,
round2_score=score + random.randint(0, 5) if score < 90 else None,
round3_score=None,
duration_minutes=random.randint(10, 30),
start_time=datetime.now() - timedelta(days=days_ago, hours=random.randint(0, 8)),
end_time=datetime.now() - timedelta(days=days_ago, hours=random.randint(0, 8))
)
db.add(exam)
print(f"✅ 为成员创建了考试记录")
# 7. 为成员创建陪练记录
for user, member_data in created_users:
# 活跃用户最近30天登录创建陪练记录
if member_data["days_ago"] <= 30:
practice_count = random.randint(2, 5)
for _ in range(practice_count):
days_ago = random.randint(1, min(member_data["days_ago"], 25))
duration = random.randint(300, 1800) # 5-30分钟
session = PracticeSession(
session_id=f"PS{user.id}{random.randint(1000, 9999)}",
user_id=user.id,
scene_id=random.randint(1, 5), # 假设有5个场景
status="completed",
duration_seconds=duration,
turns=random.randint(8, 20),
start_time=datetime.now() - timedelta(days=days_ago, hours=random.randint(0, 8)),
end_time=datetime.now() - timedelta(days=days_ago, hours=random.randint(0, 8))
)
db.add(session)
await db.flush()
# 创建陪练报告
total_score = random.randint(75, 95)
report = PracticeReport(
session_id=session.session_id,
total_score=total_score,
score_breakdown={
"communication": random.randint(70, 95),
"product_knowledge": random.randint(70, 95),
"sales_skill": random.randint(70, 95),
"service_attitude": random.randint(75, 98),
"problem_handling": random.randint(70, 92)
},
ability_dimensions={
"沟通表达": random.randint(75, 95),
"产品知识": random.randint(70, 92),
"销售技巧": random.randint(72, 90),
"服务态度": random.randint(80, 98),
"应变能力": random.randint(70, 88),
"专业素养": random.randint(75, 92)
},
suggestions=["态度积极", "表达清晰", "建议加强产品知识学习"]
)
db.add(report)
print(f"✅ 为活跃成员创建了陪练记录")
await db.commit()
print("\n🎉 所有测试数据创建完成!")
print(f"\n团队:{team.name}")
print(f"成员数:{len(created_users)}")
print(f"岗位数:{len(positions)}")
print("\n测试账号信息:")
print("=" * 50)
for user, _ in created_users[:3]: # 显示前3个
print(f"用户名: {user.username}")
print(f"密码: Pass123!")
print(f"姓名: {user.full_name}")
print("-" * 50)
except Exception as e:
await db.rollback()
print(f"\n❌ 错误: {e}")
import traceback
traceback.print_exc()
raise
if __name__ == "__main__":
print("开始创建团队管理测试数据...\n")
asyncio.run(create_team_members())