""" 初始化数据库脚本 """ import asyncio import os import sys from pathlib import Path # 添加项目路径 sys.path.append(str(Path(__file__).resolve().parent.parent)) from sqlalchemy import text from sqlalchemy.ext.asyncio import create_async_engine from app.core.config import settings from app.core.logger import logger async def create_database(): """创建数据库(如果不存在)""" # 解析数据库URL db_url_parts = settings.DATABASE_URL.split('/') db_name = db_url_parts[-1].split('?')[0] db_url_without_db = '/'.join(db_url_parts[:-1]) # 连接到MySQL服务器(不指定数据库) engine = create_async_engine(db_url_without_db, echo=True) async with engine.begin() as conn: # 检查数据库是否存在 result = await conn.execute( text(f"SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = '{db_name}'") ) exists = result.scalar() is not None if not exists: # 创建数据库 await conn.execute(text(f"CREATE DATABASE IF NOT EXISTS `{db_name}` CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci")) logger.info(f"数据库 {db_name} 创建成功") else: logger.info(f"数据库 {db_name} 已存在") await engine.dispose() async def main(): """主函数""" try: # 创建数据库 await create_database() # 运行迁移 logger.info("开始运行数据库迁移...") os.system("cd /workspace/kaopeilian-backend && alembic upgrade head") logger.info("数据库初始化完成!") except Exception as e: logger.error(f"数据库初始化失败: {str(e)}") sys.exit(1) if __name__ == "__main__": asyncio.run(main())