#!/usr/bin/env python """ 检查用户ID 4的统计数据 """ import asyncio import sys from pathlib import Path import aiomysql import os # 添加项目根目录到 Python 路径 project_root = Path(__file__).parent sys.path.append(str(project_root)) from app.core.config import settings async def check_user_4(): """检查用户ID 4的数据""" try: # 从环境变量或配置中获取数据库连接信息 database_url = os.getenv("DATABASE_URL", settings.DATABASE_URL) # 解析数据库连接字符串 if database_url.startswith("mysql+aiomysql://"): url = database_url.replace("mysql+aiomysql://", "") else: url = database_url # 解析连接参数 auth_host = url.split("@")[1] user_pass = url.split("@")[0] host_port_db = auth_host.split("/") host_port = host_port_db[0].split(":") user = user_pass.split(":")[0] password = user_pass.split(":")[1] host = host_port[0] port = int(host_port[1]) if len(host_port) > 1 else 3306 database = host_port_db[1].split("?")[0] if len(host_port_db) > 1 else "kaopeilian" print(f"连接数据库: {host}:{port}/{database}") # 创建数据库连接 conn = await aiomysql.connect( host=host, port=port, user=user, password=password, db=database, charset='utf8mb4' ) async with conn.cursor() as cursor: # 1. 查看用户信息 print("\n=== 用户信息 ===") await cursor.execute(""" SELECT id, username, email, full_name, role, is_active FROM users WHERE id = 4 """) user_info = await cursor.fetchone() if user_info: print(f"ID: {user_info[0]}, 用户名: {user_info[1]}, 邮箱: {user_info[2]}, 姓名: {user_info[3]}, 角色: {user_info[4]}, 激活: {user_info[5]}") else: print("用户ID 4不存在") # 如果没有ID=4的用户,查看所有用户 print("\n=== 所有用户 ===") await cursor.execute(""" SELECT id, username, email, role FROM users ORDER BY id """) users = await cursor.fetchall() for user in users: print(f"ID: {user[0]}, 用户名: {user[1]}, 邮箱: {user[2]}, 角色: {user[3]}") # 2. 查看用户ID 4的统计数据(如果存在) if user_info: user_id = 4 print(f"\n=== 用户ID {user_id} 统计数据 ===") # 学习天数 await cursor.execute(""" SELECT COUNT(DISTINCT DATE(start_time)) FROM training_sessions WHERE user_id = %s """, (user_id,)) learning_days = (await cursor.fetchone())[0] or 0 print(f"学习天数: {learning_days}") # 总时长 await cursor.execute(""" SELECT COALESCE(SUM(duration_seconds), 0) FROM training_sessions WHERE user_id = %s """, (user_id,)) total_seconds = (await cursor.fetchone())[0] or 0 total_hours = round(float(total_seconds) / 3600.0, 1) if total_seconds else 0.0 print(f"学习时长: {total_hours} 小时") # 练习题数和平均分 await cursor.execute(""" SELECT COALESCE(SUM(question_count), 0), AVG(score) FROM exams WHERE user_id = %s AND status = 'completed' """, (user_id,)) result = await cursor.fetchone() practice_questions = result[0] or 0 avg_score = round(float(result[1]), 1) if result[1] is not None else 0.0 print(f"练习题数: {practice_questions}") print(f"平均分: {avg_score}") conn.close() except Exception as e: print(f"执行失败: {e}") import traceback traceback.print_exc() if __name__ == "__main__": print("检查用户ID 4的数据...") asyncio.run(check_user_4())