""" 执行指定的 SQL 文件到当前配置的数据库(使用与后端一致的连接)。 用法: cd kaopeilian-backend && python3 scripts/apply_sql_file.py scripts/init_database_unified.sql """ import asyncio import sys from pathlib import Path # 确保可导入应用配置 sys.path.append(str(Path(__file__).resolve().parent.parent)) from sqlalchemy.ext.asyncio import create_async_engine from sqlalchemy import text from app.core.config import get_settings def load_sql_statements(sql_path: Path) -> list[str]: """读取 SQL 文件并按语句拆分(简单分号分割,忽略注释)。""" raw = sql_path.read_text(encoding="utf-8") # 去掉 -- 开头的行注释 lines = [] for line in raw.splitlines(): striped = line.strip() if not striped: continue if striped.startswith("--"): continue lines.append(line) content = "\n".join(lines) # 简单分割;注意保留分号作为语句结束标记 statements: list[str] = [] current = [] for ch in content: current.append(ch) if ch == ";": stmt = "".join(current).strip() if stmt: statements.append(stmt) current = [] # 可能没有以分号结束的尾部 tail = "".join(current).strip() if tail: statements.append(tail) return [s for s in statements if s] async def apply_sql(sql_file: str): settings = get_settings() engine = create_async_engine(settings.DATABASE_URL, echo=False) sql_path = Path(sql_file) if not sql_path.exists(): raise FileNotFoundError(f"SQL 文件不存在: {sql_path}") statements = load_sql_statements(sql_path) async with engine.begin() as conn: # 执行每条语句 for stmt in statements: await conn.execute(text(stmt)) await engine.dispose() async def main(): if len(sys.argv) < 2: print("用法: python3 scripts/apply_sql_file.py ") sys.exit(1) sql_file = sys.argv[1] await apply_sql(sql_file) print("SQL 执行完成") if __name__ == "__main__": asyncio.run(main())