#!/usr/bin/env python3 """ 测试用户岗位同步功能 验证用户编辑页面的岗位变更是否真正落库 """ import os import sys import asyncio from sqlalchemy import create_engine, text from sqlalchemy.ext.asyncio import create_async_engine # 添加项目根目录到 Python 路径 sys.path.append(os.path.dirname(os.path.abspath(__file__))) # 导入 local_config 以设置环境变量 import local_config # 获取数据库配置 DATABASE_URL = os.environ.get("DATABASE_URL") # 将同步 URL 转换为异步 URL if DATABASE_URL.startswith("mysql+pymysql://"): ASYNC_DATABASE_URL = DATABASE_URL.replace("mysql+pymysql://", "mysql+aiomysql://") else: ASYNC_DATABASE_URL = DATABASE_URL async def test_user_position_relationships(): """测试用户-岗位关系""" print(f"\n=== 用户岗位关系测试 ===") print(f"数据库URL: {ASYNC_DATABASE_URL}") # 创建异步引擎 engine = create_async_engine(ASYNC_DATABASE_URL, echo=False) try: async with engine.connect() as conn: # 1. 查看所有岗位 print("\n1. 当前系统中的所有岗位:") result = await conn.execute(text(""" SELECT id, name, code, status FROM positions ORDER BY id """)) positions = result.fetchall() if not positions: print(" [警告] 系统中没有岗位数据!") else: for p in positions: status = "启用" if p.status == "active" else "停用" print(f" - ID: {p.id}, 名称: {p.name}, 编码: {p.code}, 状态: {status}") # 2. 查看用户-岗位关系 print("\n2. 用户-岗位关联关系:") result = await conn.execute(text(""" SELECT pm.position_id, pm.user_id, p.name as position_name, u.username, u.full_name, pm.created_at FROM position_members pm JOIN positions p ON pm.position_id = p.id JOIN users u ON pm.user_id = u.id ORDER BY pm.position_id, pm.user_id """)) members = result.fetchall() if not members: print(" [提示] 暂无用户-岗位关联") else: current_position_id = None for m in members: if m.position_id != current_position_id: current_position_id = m.position_id print(f"\n 岗位: {m.position_name} (ID: {m.position_id})") print(f" - 用户: {m.username} ({m.full_name}), ID: {m.user_id}, 加入时间: {m.created_at}") # 3. 检查特定用户的岗位 print("\n3. 查看特定用户的岗位信息:") # 查询几个示例用户 result = await conn.execute(text(""" SELECT id, username, full_name FROM users WHERE role != 'admin' LIMIT 5 """)) sample_users = result.fetchall() for user in sample_users: result = await conn.execute(text(""" SELECT p.id, p.name, p.code FROM positions p JOIN position_members pm ON p.id = pm.position_id WHERE pm.user_id = :user_id """), {"user_id": user.id}) user_positions = result.fetchall() if user_positions: positions_str = ", ".join([f"{p.name}(ID:{p.id})" for p in user_positions]) print(f" - 用户 {user.username} ({user.full_name}): {positions_str}") else: print(f" - 用户 {user.username} ({user.full_name}): 无岗位") # 4. 统计每个岗位的成员数量 print("\n4. 统计每个岗位的成员数量:") result = await conn.execute(text(""" SELECT p.id, p.name, COUNT(pm.user_id) as member_count FROM positions p LEFT JOIN position_members pm ON p.id = pm.position_id GROUP BY p.id, p.name ORDER BY p.id """)) counts = result.fetchall() for c in counts: print(f" - 岗位 {c.name} (ID:{c.id}) 成员数: {c.member_count}") except Exception as e: print(f"\n[错误] 数据库操作失败: {e}") import traceback traceback.print_exc() finally: await engine.dispose() async def verify_position_sync(user_id: int): """验证特定用户的岗位同步情况""" print(f"\n=== 验证用户 ID:{user_id} 的岗位同步 ===") engine = create_async_engine(ASYNC_DATABASE_URL, echo=False) try: async with engine.connect() as conn: # 获取用户信息 result = await conn.execute(text(""" SELECT username, full_name FROM users WHERE id = :user_id """), {"user_id": user_id}) user = result.fetchone() if not user: print(f"[错误] 用户 ID:{user_id} 不存在") return print(f"用户: {user.username} ({user.full_name})") # 获取用户的岗位 result = await conn.execute(text(""" SELECT p.id, p.name, p.code, pm.created_at FROM positions p JOIN position_members pm ON p.id = pm.position_id WHERE pm.user_id = :user_id ORDER BY pm.created_at DESC """), {"user_id": user_id}) positions = result.fetchall() if positions: print(f"当前岗位:") for p in positions: print(f" - {p.name} (ID:{p.id}, 编码:{p.code}) - 加入时间: {p.created_at}") else: print("当前岗位: 无") except Exception as e: print(f"\n[错误] 验证失败: {e}") import traceback traceback.print_exc() finally: await engine.dispose() if __name__ == "__main__": # 运行测试 asyncio.run(test_user_position_relationships()) # 如果命令行提供了用户ID,验证该用户 if len(sys.argv) > 1: try: user_id = int(sys.argv[1]) asyncio.run(verify_position_sync(user_id)) except ValueError: print(f"\n[错误] 无效的用户ID: {sys.argv[1]}")