""" API 测试脚本 """ import asyncio import json from typing import Optional import httpx # API基础URL BASE_URL = "http://localhost:8000/api/v1" class APITester: def __init__(self): self.client = httpx.AsyncClient(base_url=BASE_URL) self.token: Optional[str] = None async def login(self, username: str, password: str): """测试登录""" print(f"\n1. 测试登录 - 用户: {username}") response = await self.client.post("/auth/login", json={ "username": username, "password": password }) if response.status_code == 200: data = response.json() self.token = data["data"]["token"]["access_token"] print(f"✓ 登录成功!") print(f" 用户信息: {data['data']['user']['username']} ({data['data']['user']['role']})") print(f" 令牌: {self.token[:20]}...") return True else: print(f"✗ 登录失败: {response.status_code}") print(f" 错误信息: {response.json()}") return False async def get_current_user(self): """测试获取当前用户""" print("\n2. 测试获取当前用户信息") headers = {"Authorization": f"Bearer {self.token}"} response = await self.client.get("/users/me", headers=headers) if response.status_code == 200: data = response.json() print(f"✓ 获取成功!") print(f" 用户: {data['data']['username']}") print(f" 邮箱: {data['data']['email']}") print(f" 角色: {data['data']['role']}") else: print(f"✗ 获取失败: {response.status_code}") print(f" 错误信息: {response.json()}") async def list_users(self): """测试获取用户列表""" print("\n3. 测试获取用户列表") headers = {"Authorization": f"Bearer {self.token}"} response = await self.client.get("/users?page=1&page_size=10", headers=headers) if response.status_code == 200: data = response.json() print(f"✓ 获取成功!") print(f" 总数: {data['data']['total']}") print(f" 用户列表:") for user in data['data']['items']: print(f" - {user['username']} ({user['role']}) - {user['email']}") else: print(f"✗ 获取失败: {response.status_code}") print(f" 错误信息: {response.json()}") async def create_user(self): """测试创建用户(需要管理员权限)""" print("\n4. 测试创建用户(需要管理员权限)") headers = {"Authorization": f"Bearer {self.token}"} new_user = { "username": "testuser", "email": "testuser@example.com", "password": "test123456", "full_name": "测试用户", "role": "trainee" } response = await self.client.post("/users", json=new_user, headers=headers) if response.status_code == 201: data = response.json() print(f"✓ 创建成功!") print(f" 新用户: {data['data']['username']} - {data['data']['email']}") else: print(f"✗ 创建失败: {response.status_code}") print(f" 错误信息: {response.json()}") async def update_password(self): """测试修改密码""" print("\n5. 测试修改密码") headers = {"Authorization": f"Bearer {self.token}"} # 先尝试错误的旧密码 response = await self.client.put("/users/me/password", json={ "old_password": "wrongpassword", "new_password": "newpass123" }, headers=headers) if response.status_code != 200: print(f"✓ 正确拒绝了错误的旧密码") # 使用正确的旧密码 response = await self.client.put("/users/me/password", json={ "old_password": "admin123", # 假设是admin用户 "new_password": "admin123" # 改回原密码 }, headers=headers) if response.status_code == 200: print(f"✓ 密码修改成功!") else: print(f" 注意: 密码修改测试可能因为旧密码不匹配而失败") async def test_unauthorized(self): """测试未授权访问""" print("\n6. 测试未授权访问") # 不带token访问需要认证的接口 response = await self.client.get("/users/me") if response.status_code == 403: print(f"✓ 正确拒绝了未授权访问") else: print(f"✗ 未授权访问测试失败: {response.status_code}") async def close(self): """关闭客户端""" await self.client.aclose() async def main(): """主测试函数""" print("=================================") print("考培练系统 API 测试") print("=================================") tester = APITester() try: # 测试未授权访问 await tester.test_unauthorized() # 测试管理员登录 print("\n--- 管理员测试 ---") if await tester.login("admin", "admin123"): await tester.get_current_user() await tester.list_users() await tester.create_user() await tester.update_password() # 测试普通用户登录 print("\n\n--- 普通用户测试 ---") if await tester.login("trainee1", "trainee123"): await tester.get_current_user() await tester.list_users() await tester.create_user() # 应该失败(权限不足) print("\n\n测试完成!") except Exception as e: print(f"\n错误: {str(e)}") print("提示: 请确保服务器正在运行 (http://localhost:8000)") finally: await tester.close() if __name__ == "__main__": asyncio.run(main())