Files
012-kaopeilian/backend/scripts/simple_rollback.py
111 998211c483 feat: 初始化考培练系统项目
- 从服务器拉取完整代码
- 按框架规范整理项目结构
- 配置 Drone CI 测试环境部署
- 包含后端(FastAPI)、前端(Vue3)、管理端

技术栈: Vue3 + TypeScript + FastAPI + MySQL
2026-01-24 19:33:28 +08:00

248 lines
8.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
考培练系统 - 简化数据库回滚工具
基于MySQL Binlog的快速回滚方案
使用方法:
1. 查看Binlog文件: python scripts/simple_rollback.py --list
2. 模拟回滚: python scripts/simple_rollback.py --time "2024-12-20 10:30:00"
3. 实际回滚: python scripts/simple_rollback.py --time "2024-12-20 10:30:00" --execute
"""
import asyncio
import argparse
import subprocess
import tempfile
import os
from datetime import datetime
from pathlib import Path
import aiomysql
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class SimpleRollbackTool:
"""简化回滚工具"""
def __init__(self):
self.host = "localhost"
self.port = 3306
self.user = "root"
self.password = "root"
self.database = "kaopeilian"
self.connection = None
async def connect(self):
"""连接数据库"""
try:
self.connection = await aiomysql.connect(
host=self.host,
port=self.port,
user=self.user,
password=self.password,
db=self.database,
charset='utf8mb4'
)
logger.info("✅ 数据库连接成功")
except Exception as e:
logger.error(f"❌ 数据库连接失败: {e}")
raise
async def close(self):
"""关闭连接"""
if self.connection:
self.connection.close()
async def list_binlogs(self):
"""列出Binlog文件"""
cursor = await self.connection.cursor()
await cursor.execute("SHOW BINARY LOGS")
result = await cursor.fetchall()
await cursor.close()
print("\n📋 可用的Binlog文件:")
print("-" * 60)
for i, row in enumerate(result, 1):
print(f"{i:2d}. {row[0]} ({row[1]} bytes)")
print("-" * 60)
def extract_sql_from_binlog(self, binlog_file: str, start_time: str) -> str:
"""从Binlog提取SQL语句"""
# 使用mysqlbinlog工具解析
cmd = [
'docker', 'exec', 'kaopeilian-mysql',
'mysqlbinlog',
'--base64-output=decode-rows',
'-v',
'--start-datetime', start_time,
'--database', self.database,
f'/var/lib/mysql/{binlog_file}'
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
if result.returncode == 0:
return result.stdout
else:
logger.error(f"mysqlbinlog执行失败: {result.stderr}")
return ""
except Exception as e:
logger.error(f"执行mysqlbinlog异常: {e}")
return ""
def generate_rollback_sql(self, binlog_content: str) -> list:
"""生成回滚SQL语句"""
rollback_sqls = []
# 简单的SQL解析和反转
lines = binlog_content.split('\n')
current_table = None
for line in lines:
line = line.strip()
# 检测表名
if '### UPDATE' in line and '`' in line:
table_match = line.split('`')[1] if '`' in line else None
if table_match:
current_table = table_match
# 检测INSERT操作生成DELETE
elif '### INSERT INTO' in line and '`' in line:
table_match = line.split('`')[1] if '`' in line else None
if table_match:
current_table = table_match
# 检测DELETE操作生成INSERT
elif '### DELETE FROM' in line and '`' in line:
table_match = line.split('`')[1] if '`' in line else None
if table_match:
current_table = table_match
# 检测WHERE条件
elif '### WHERE' in line and current_table:
# 提取WHERE条件
where_part = line.replace('### WHERE', '').strip()
if where_part:
rollback_sqls.append(f"-- 需要手动处理 {current_table} 表的回滚")
rollback_sqls.append(f"-- WHERE条件: {where_part}")
return rollback_sqls
async def create_backup_before_rollback(self) -> str:
"""回滚前创建备份"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_file = f"/tmp/kaopeilian_backup_{timestamp}.sql"
# 使用mysqldump创建备份
cmd = [
'docker', 'exec', 'kaopeilian-mysql',
'mysqldump',
'-uroot', '-proot',
'--single-transaction',
'--routines',
'--triggers',
self.database
]
try:
with open(backup_file, 'w') as f:
result = subprocess.run(cmd, stdout=f, stderr=subprocess.PIPE, text=True)
if result.returncode == 0:
logger.info(f"✅ 备份已创建: {backup_file}")
return backup_file
else:
logger.error(f"❌ 备份失败: {result.stderr}")
return ""
except Exception as e:
logger.error(f"❌ 备份异常: {e}")
return ""
async def rollback_by_time(self, target_time: str, execute: bool = False):
"""根据时间点回滚"""
logger.info(f"🎯 开始回滚到时间点: {target_time}")
# 获取最新的Binlog文件
cursor = await self.connection.cursor()
await cursor.execute("SHOW BINARY LOGS")
binlog_files = await cursor.fetchall()
await cursor.close()
if not binlog_files:
logger.error("❌ 未找到Binlog文件")
return
# 使用最新的Binlog文件
latest_binlog = binlog_files[-1][0]
logger.info(f"📁 使用Binlog文件: {latest_binlog}")
# 提取SQL
binlog_content = self.extract_sql_from_binlog(latest_binlog, target_time)
if not binlog_content:
logger.error("❌ 无法从Binlog提取SQL")
return
# 生成回滚SQL
rollback_sqls = self.generate_rollback_sql(binlog_content)
if not rollback_sqls:
logger.warning("⚠️ 未找到需要回滚的操作")
return
print("\n🔄 回滚SQL语句:")
print("-" * 60)
for i, sql in enumerate(rollback_sqls, 1):
print(f"{i:2d}. {sql}")
print("-" * 60)
if not execute:
logger.info("🔍 这是模拟执行,使用 --execute 参数实际执行")
return
# 创建备份
backup_file = await self.create_backup_before_rollback()
if not backup_file:
logger.error("❌ 备份失败,取消回滚操作")
return
# 确认执行
confirm = input("\n⚠️ 确认执行回滚操作?这将修改数据库数据!(yes/no): ")
if confirm.lower() != 'yes':
logger.info("❌ 用户取消回滚操作")
return
# 执行回滚这里需要根据实际情况手动执行SQL
logger.info("✅ 回滚操作准备完成")
logger.info(f"📁 备份文件位置: {backup_file}")
logger.info("📝 请手动执行上述SQL语句完成回滚")
async def main():
"""主函数"""
parser = argparse.ArgumentParser(description='考培练系统 - 简化数据库回滚工具')
parser.add_argument('--list', action='store_true', help='列出Binlog文件')
parser.add_argument('--time', help='回滚到的时间点 (格式: YYYY-MM-DD HH:MM:SS)')
parser.add_argument('--execute', action='store_true', help='实际执行回滚')
args = parser.parse_args()
tool = SimpleRollbackTool()
try:
await tool.connect()
if args.list:
await tool.list_binlogs()
elif args.time:
await tool.rollback_by_time(args.time, args.execute)
else:
parser.print_help()
except Exception as e:
logger.error(f"❌ 程序执行异常: {e}")
finally:
await tool.close()
if __name__ == "__main__":
asyncio.run(main())