#!/usr/bin/env python3 """ 考培练系统 - 简化数据库回滚工具 基于MySQL Binlog的快速回滚方案 使用方法: 1. 查看Binlog文件: python scripts/simple_rollback.py --list 2. 模拟回滚: python scripts/simple_rollback.py --time "2024-12-20 10:30:00" 3. 实际回滚: python scripts/simple_rollback.py --time "2024-12-20 10:30:00" --execute """ import asyncio import argparse import subprocess import tempfile import os from datetime import datetime from pathlib import Path import aiomysql import logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class SimpleRollbackTool: """简化回滚工具""" def __init__(self): self.host = "localhost" self.port = 3306 self.user = "root" self.password = "root" self.database = "kaopeilian" self.connection = None async def connect(self): """连接数据库""" try: self.connection = await aiomysql.connect( host=self.host, port=self.port, user=self.user, password=self.password, db=self.database, charset='utf8mb4' ) logger.info("✅ 数据库连接成功") except Exception as e: logger.error(f"❌ 数据库连接失败: {e}") raise async def close(self): """关闭连接""" if self.connection: self.connection.close() async def list_binlogs(self): """列出Binlog文件""" cursor = await self.connection.cursor() await cursor.execute("SHOW BINARY LOGS") result = await cursor.fetchall() await cursor.close() print("\n📋 可用的Binlog文件:") print("-" * 60) for i, row in enumerate(result, 1): print(f"{i:2d}. {row[0]} ({row[1]} bytes)") print("-" * 60) def extract_sql_from_binlog(self, binlog_file: str, start_time: str) -> str: """从Binlog提取SQL语句""" # 使用mysqlbinlog工具解析 cmd = [ 'docker', 'exec', 'kaopeilian-mysql', 'mysqlbinlog', '--base64-output=decode-rows', '-v', '--start-datetime', start_time, '--database', self.database, f'/var/lib/mysql/{binlog_file}' ] try: result = subprocess.run(cmd, capture_output=True, text=True, timeout=60) if result.returncode == 0: return result.stdout else: logger.error(f"mysqlbinlog执行失败: {result.stderr}") return "" except Exception as e: logger.error(f"执行mysqlbinlog异常: {e}") return "" def generate_rollback_sql(self, binlog_content: str) -> list: """生成回滚SQL语句""" rollback_sqls = [] # 简单的SQL解析和反转 lines = binlog_content.split('\n') current_table = None for line in lines: line = line.strip() # 检测表名 if '### UPDATE' in line and '`' in line: table_match = line.split('`')[1] if '`' in line else None if table_match: current_table = table_match # 检测INSERT操作,生成DELETE elif '### INSERT INTO' in line and '`' in line: table_match = line.split('`')[1] if '`' in line else None if table_match: current_table = table_match # 检测DELETE操作,生成INSERT elif '### DELETE FROM' in line and '`' in line: table_match = line.split('`')[1] if '`' in line else None if table_match: current_table = table_match # 检测WHERE条件 elif '### WHERE' in line and current_table: # 提取WHERE条件 where_part = line.replace('### WHERE', '').strip() if where_part: rollback_sqls.append(f"-- 需要手动处理 {current_table} 表的回滚") rollback_sqls.append(f"-- WHERE条件: {where_part}") return rollback_sqls async def create_backup_before_rollback(self) -> str: """回滚前创建备份""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") backup_file = f"/tmp/kaopeilian_backup_{timestamp}.sql" # 使用mysqldump创建备份 cmd = [ 'docker', 'exec', 'kaopeilian-mysql', 'mysqldump', '-uroot', '-proot', '--single-transaction', '--routines', '--triggers', self.database ] try: with open(backup_file, 'w') as f: result = subprocess.run(cmd, stdout=f, stderr=subprocess.PIPE, text=True) if result.returncode == 0: logger.info(f"✅ 备份已创建: {backup_file}") return backup_file else: logger.error(f"❌ 备份失败: {result.stderr}") return "" except Exception as e: logger.error(f"❌ 备份异常: {e}") return "" async def rollback_by_time(self, target_time: str, execute: bool = False): """根据时间点回滚""" logger.info(f"🎯 开始回滚到时间点: {target_time}") # 获取最新的Binlog文件 cursor = await self.connection.cursor() await cursor.execute("SHOW BINARY LOGS") binlog_files = await cursor.fetchall() await cursor.close() if not binlog_files: logger.error("❌ 未找到Binlog文件") return # 使用最新的Binlog文件 latest_binlog = binlog_files[-1][0] logger.info(f"📁 使用Binlog文件: {latest_binlog}") # 提取SQL binlog_content = self.extract_sql_from_binlog(latest_binlog, target_time) if not binlog_content: logger.error("❌ 无法从Binlog提取SQL") return # 生成回滚SQL rollback_sqls = self.generate_rollback_sql(binlog_content) if not rollback_sqls: logger.warning("⚠️ 未找到需要回滚的操作") return print("\n🔄 回滚SQL语句:") print("-" * 60) for i, sql in enumerate(rollback_sqls, 1): print(f"{i:2d}. {sql}") print("-" * 60) if not execute: logger.info("🔍 这是模拟执行,使用 --execute 参数实际执行") return # 创建备份 backup_file = await self.create_backup_before_rollback() if not backup_file: logger.error("❌ 备份失败,取消回滚操作") return # 确认执行 confirm = input("\n⚠️ 确认执行回滚操作?这将修改数据库数据!(yes/no): ") if confirm.lower() != 'yes': logger.info("❌ 用户取消回滚操作") return # 执行回滚(这里需要根据实际情况手动执行SQL) logger.info("✅ 回滚操作准备完成") logger.info(f"📁 备份文件位置: {backup_file}") logger.info("📝 请手动执行上述SQL语句完成回滚") async def main(): """主函数""" parser = argparse.ArgumentParser(description='考培练系统 - 简化数据库回滚工具') parser.add_argument('--list', action='store_true', help='列出Binlog文件') parser.add_argument('--time', help='回滚到的时间点 (格式: YYYY-MM-DD HH:MM:SS)') parser.add_argument('--execute', action='store_true', help='实际执行回滚') args = parser.parse_args() tool = SimpleRollbackTool() try: await tool.connect() if args.list: await tool.list_binlogs() elif args.time: await tool.rollback_by_time(args.time, args.execute) else: parser.print_help() except Exception as e: logger.error(f"❌ 程序执行异常: {e}") finally: await tool.close() if __name__ == "__main__": asyncio.run(main())