""" 同步核心表结构(teams, user_teams),自动适配 users.id 类型。 步骤: 1) 检查 users.id 的数据类型(INT 或 BIGINT) 2) 如缺失则创建 teams、user_teams 表;外键列按兼容类型创建 3) 如存在则跳过 运行: cd kaopeilian-backend && python3 scripts/sync_core_tables.py """ import asyncio import sys from pathlib import Path sys.path.append(str(Path(__file__).resolve().parent.parent)) from sqlalchemy import text from sqlalchemy.ext.asyncio import create_async_engine from app.core.config import get_settings async def table_exists(conn, db_name: str, table: str) -> bool: result = await conn.execute( text( """ SELECT COUNT(1) FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = :db AND TABLE_NAME = :table """ ), {"db": db_name, "table": table}, ) return (result.scalar() or 0) > 0 async def get_users_id_type(conn, db_name: str) -> str: result = await conn.execute( text( """ SELECT COLUMN_TYPE FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = :db AND TABLE_NAME = 'users' AND COLUMN_NAME = 'id' """ ), {"db": db_name}, ) col_type = result.scalar() or "int" # 正规化 col_type = col_type.lower() if "bigint" in col_type: return "BIGINT" return "INT" async def sync_core_tables(): settings = get_settings() db_url = settings.DATABASE_URL db_name = db_url.split("/")[-1].split("?")[0] engine = create_async_engine(settings.DATABASE_URL, echo=False) created = [] async with engine.begin() as conn: # 检查 users.id 类型 user_id_type = await get_users_id_type(conn, db_name) # 创建 teams if not await table_exists(conn, db_name, "teams"): await conn.execute( text( f""" CREATE TABLE `teams` ( `id` INT AUTO_INCREMENT PRIMARY KEY, `name` VARCHAR(100) NOT NULL UNIQUE, `code` VARCHAR(50) NOT NULL UNIQUE, `description` TEXT, `team_type` VARCHAR(50) DEFAULT 'department', `is_active` BOOLEAN DEFAULT TRUE, `leader_id` {user_id_type}, `parent_id` INT, `created_at` DATETIME DEFAULT CURRENT_TIMESTAMP, `updated_at` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, `is_deleted` BOOLEAN DEFAULT FALSE, `deleted_at` DATETIME, `created_by` {user_id_type}, `updated_by` {user_id_type}, FOREIGN KEY (`leader_id`) REFERENCES users(id) ON DELETE SET NULL, FOREIGN KEY (`parent_id`) REFERENCES teams(id) ON DELETE CASCADE, INDEX idx_team_type (team_type), INDEX idx_is_active (is_active) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci """ ) ) created.append("teams") # 创建 user_teams if not await table_exists(conn, db_name, "user_teams"): await conn.execute( text( f""" CREATE TABLE `user_teams` ( `user_id` {user_id_type} NOT NULL, `team_id` INT NOT NULL, `role` VARCHAR(50) DEFAULT 'member', `joined_at` DATETIME DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (`user_id`, `team_id`), FOREIGN KEY (`user_id`) REFERENCES users(id) ON DELETE CASCADE, FOREIGN KEY (`team_id`) REFERENCES teams(id) ON DELETE CASCADE ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci """ ) ) created.append("user_teams") await engine.dispose() return created async def main(): try: created = await sync_core_tables() if created: print("创建表:", ", ".join(created)) else: print("核心表已存在,无需创建。") except Exception as exc: import traceback print("同步失败:", str(exc)) traceback.print_exc() sys.exit(1) if __name__ == "__main__": asyncio.run(main())