#!/usr/bin/env python """ 测试用户统计接口 """ import asyncio import sys from pathlib import Path import aiomysql import os from datetime import datetime # 添加项目根目录到 Python 路径 project_root = Path(__file__).parent sys.path.append(str(project_root)) from app.core.config import settings async def check_user_data(): """检查用户数据""" try: # 从环境变量或配置中获取数据库连接信息 database_url = os.getenv("DATABASE_URL", settings.DATABASE_URL) # 解析数据库连接字符串 if database_url.startswith("mysql+aiomysql://"): url = database_url.replace("mysql+aiomysql://", "") else: url = database_url # 解析连接参数 auth_host = url.split("@")[1] user_pass = url.split("@")[0] host_port_db = auth_host.split("/") host_port = host_port_db[0].split(":") user = user_pass.split(":")[0] password = user_pass.split(":")[1] host = host_port[0] port = int(host_port[1]) if len(host_port) > 1 else 3306 database = host_port_db[1].split("?")[0] if len(host_port_db) > 1 else "kaopeilian" print(f"连接数据库: {host}:{port}/{database}") # 创建数据库连接 conn = await aiomysql.connect( host=host, port=port, user=user, password=password, db=database, charset='utf8mb4' ) async with conn.cursor() as cursor: # 1. 查看考试记录及状态 print("\n=== 考试记录 ===") await cursor.execute(""" SELECT id, user_id, exam_name, status, score, question_count FROM exams WHERE user_id IN (SELECT id FROM users WHERE username IN ('admin', 'testuser')) ORDER BY created_at DESC LIMIT 10 """) exams = await cursor.fetchall() for exam in exams: print(f"考试ID: {exam[0]}, 用户ID: {exam[1]}, 名称: {exam[2]}, 状态: {exam[3]}, 分数: {exam[4]}, 题数: {exam[5]}") # 2. 查看陪练会话记录 print("\n=== 陪练会话记录 ===") await cursor.execute(""" SELECT id, user_id, scene_id, start_time, duration_seconds, status FROM training_sessions WHERE user_id IN (SELECT id FROM users WHERE username IN ('admin', 'testuser')) ORDER BY created_at DESC LIMIT 10 """) sessions = await cursor.fetchall() for session in sessions: print(f"会话ID: {session[0]}, 用户ID: {session[1]}, 场景ID: {session[2]}, 开始时间: {session[3]}, 时长: {session[4]}秒, 状态: {session[5]}") # 3. 统计每个用户的数据 print("\n=== 用户统计数据 ===") users = [('admin', 1), ('testuser', 3)] for username, user_id in users: print(f"\n用户: {username} (ID: {user_id})") # 学习天数 await cursor.execute(""" SELECT COUNT(DISTINCT DATE(start_time)) FROM training_sessions WHERE user_id = %s """, (user_id,)) learning_days = (await cursor.fetchone())[0] or 0 print(f" 学习天数: {learning_days}") # 总时长 await cursor.execute(""" SELECT COALESCE(SUM(duration_seconds), 0) FROM training_sessions WHERE user_id = %s """, (user_id,)) total_seconds = (await cursor.fetchone())[0] or 0 total_hours = round(float(total_seconds) / 3600.0, 1) if total_seconds else 0.0 print(f" 学习时长: {total_hours} 小时 ({total_seconds} 秒)") # 练习题数 - 检查不同状态 for status in ['completed', 'submitted']: await cursor.execute(""" SELECT COALESCE(SUM(question_count), 0) FROM exams WHERE user_id = %s AND status = %s """, (user_id, status)) questions = (await cursor.fetchone())[0] or 0 print(f" 练习题数({status}): {questions}") # 平均分 - 检查不同状态 for status in ['completed', 'submitted']: await cursor.execute(""" SELECT AVG(score) FROM exams WHERE user_id = %s AND status = %s """, (user_id, status)) avg_score = await cursor.fetchone() avg_score_val = round(float(avg_score[0]), 1) if avg_score[0] is not None else 0.0 print(f" 平均分({status}): {avg_score_val}") conn.close() except Exception as e: print(f"执行失败: {e}") import traceback traceback.print_exc() if __name__ == "__main__": print("检查用户统计数据...") asyncio.run(check_user_data())