- 从服务器拉取完整代码 - 按框架规范整理项目结构 - 配置 Drone CI 测试环境部署 - 包含后端(FastAPI)、前端(Vue3)、管理端 技术栈: Vue3 + TypeScript + FastAPI + MySQL
139 lines
4.5 KiB
Python
139 lines
4.5 KiB
Python
"""
|
||
同步核心表结构(teams, user_teams),自动适配 users.id 类型。
|
||
|
||
步骤:
|
||
1) 检查 users.id 的数据类型(INT 或 BIGINT)
|
||
2) 如缺失则创建 teams、user_teams 表;外键列按兼容类型创建
|
||
3) 如存在则跳过
|
||
|
||
运行:
|
||
cd kaopeilian-backend && python3 scripts/sync_core_tables.py
|
||
"""
|
||
import asyncio
|
||
import sys
|
||
from pathlib import Path
|
||
|
||
sys.path.append(str(Path(__file__).resolve().parent.parent))
|
||
|
||
from sqlalchemy import text
|
||
from sqlalchemy.ext.asyncio import create_async_engine
|
||
|
||
from app.core.config import get_settings
|
||
|
||
|
||
async def table_exists(conn, db_name: str, table: str) -> bool:
|
||
result = await conn.execute(
|
||
text(
|
||
"""
|
||
SELECT COUNT(1)
|
||
FROM INFORMATION_SCHEMA.TABLES
|
||
WHERE TABLE_SCHEMA = :db AND TABLE_NAME = :table
|
||
"""
|
||
),
|
||
{"db": db_name, "table": table},
|
||
)
|
||
return (result.scalar() or 0) > 0
|
||
|
||
|
||
async def get_users_id_type(conn, db_name: str) -> str:
|
||
result = await conn.execute(
|
||
text(
|
||
"""
|
||
SELECT COLUMN_TYPE
|
||
FROM INFORMATION_SCHEMA.COLUMNS
|
||
WHERE TABLE_SCHEMA = :db AND TABLE_NAME = 'users' AND COLUMN_NAME = 'id'
|
||
"""
|
||
),
|
||
{"db": db_name},
|
||
)
|
||
col_type = result.scalar() or "int"
|
||
# 正规化
|
||
col_type = col_type.lower()
|
||
if "bigint" in col_type:
|
||
return "BIGINT"
|
||
return "INT"
|
||
|
||
|
||
async def sync_core_tables():
|
||
settings = get_settings()
|
||
db_url = settings.DATABASE_URL
|
||
db_name = db_url.split("/")[-1].split("?")[0]
|
||
|
||
engine = create_async_engine(settings.DATABASE_URL, echo=False)
|
||
created = []
|
||
async with engine.begin() as conn:
|
||
# 检查 users.id 类型
|
||
user_id_type = await get_users_id_type(conn, db_name)
|
||
|
||
# 创建 teams
|
||
if not await table_exists(conn, db_name, "teams"):
|
||
await conn.execute(
|
||
text(
|
||
f"""
|
||
CREATE TABLE `teams` (
|
||
`id` INT AUTO_INCREMENT PRIMARY KEY,
|
||
`name` VARCHAR(100) NOT NULL UNIQUE,
|
||
`code` VARCHAR(50) NOT NULL UNIQUE,
|
||
`description` TEXT,
|
||
`team_type` VARCHAR(50) DEFAULT 'department',
|
||
`is_active` BOOLEAN DEFAULT TRUE,
|
||
`leader_id` {user_id_type},
|
||
`parent_id` INT,
|
||
`created_at` DATETIME DEFAULT CURRENT_TIMESTAMP,
|
||
`updated_at` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
|
||
`is_deleted` BOOLEAN DEFAULT FALSE,
|
||
`deleted_at` DATETIME,
|
||
`created_by` {user_id_type},
|
||
`updated_by` {user_id_type},
|
||
FOREIGN KEY (`leader_id`) REFERENCES users(id) ON DELETE SET NULL,
|
||
FOREIGN KEY (`parent_id`) REFERENCES teams(id) ON DELETE CASCADE,
|
||
INDEX idx_team_type (team_type),
|
||
INDEX idx_is_active (is_active)
|
||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
|
||
"""
|
||
)
|
||
)
|
||
created.append("teams")
|
||
|
||
# 创建 user_teams
|
||
if not await table_exists(conn, db_name, "user_teams"):
|
||
await conn.execute(
|
||
text(
|
||
f"""
|
||
CREATE TABLE `user_teams` (
|
||
`user_id` {user_id_type} NOT NULL,
|
||
`team_id` INT NOT NULL,
|
||
`role` VARCHAR(50) DEFAULT 'member',
|
||
`joined_at` DATETIME DEFAULT CURRENT_TIMESTAMP,
|
||
PRIMARY KEY (`user_id`, `team_id`),
|
||
FOREIGN KEY (`user_id`) REFERENCES users(id) ON DELETE CASCADE,
|
||
FOREIGN KEY (`team_id`) REFERENCES teams(id) ON DELETE CASCADE
|
||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
|
||
"""
|
||
)
|
||
)
|
||
created.append("user_teams")
|
||
|
||
await engine.dispose()
|
||
return created
|
||
|
||
|
||
async def main():
|
||
try:
|
||
created = await sync_core_tables()
|
||
if created:
|
||
print("创建表:", ", ".join(created))
|
||
else:
|
||
print("核心表已存在,无需创建。")
|
||
except Exception as exc:
|
||
import traceback
|
||
print("同步失败:", str(exc))
|
||
traceback.print_exc()
|
||
sys.exit(1)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(main())
|
||
|
||
|