Files
012-kaopeilian/backend/scripts/kaopeilian_rollback.py
111 998211c483 feat: 初始化考培练系统项目
- 从服务器拉取完整代码
- 按框架规范整理项目结构
- 配置 Drone CI 测试环境部署
- 包含后端(FastAPI)、前端(Vue3)、管理端

技术栈: Vue3 + TypeScript + FastAPI + MySQL
2026-01-24 19:33:28 +08:00

395 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
考培练系统 - 专用数据库回滚工具
针对轻医美连锁业务场景的快速回滚方案
功能:
1. 用户数据回滚
2. 课程数据回滚
3. 考试数据回滚
4. 岗位数据回滚
5. 基于Binlog的精确回滚
使用方法:
python scripts/kaopeilian_rollback.py --help
"""
import asyncio
import argparse
import json
from datetime import datetime, timedelta
from typing import List, Dict, Optional
import aiomysql
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class KaopeilianRollbackTool:
"""考培练系统专用回滚工具"""
def __init__(self):
self.host = "localhost"
self.port = 3306
self.user = "root"
self.password = "root"
self.database = "kaopeilian"
self.connection = None
async def connect(self):
"""连接数据库"""
try:
self.connection = await aiomysql.connect(
host=self.host,
port=self.port,
user=self.user,
password=self.password,
db=self.database,
charset='utf8mb4'
)
logger.info("✅ 数据库连接成功")
except Exception as e:
logger.error(f"❌ 数据库连接失败: {e}")
raise
async def close(self):
"""关闭连接"""
if self.connection:
self.connection.close()
async def get_recent_operations(self, hours: int = 24) -> List[Dict]:
"""获取最近的操作记录"""
cursor = await self.connection.cursor(aiomysql.DictCursor)
# 查询最近更新的记录
queries = [
{
'table': 'users',
'sql': f"""
SELECT id, username, full_name, updated_at, 'user' as type
FROM users
WHERE updated_at >= DATE_SUB(NOW(), INTERVAL {hours} HOUR)
ORDER BY updated_at DESC
"""
},
{
'table': 'courses',
'sql': f"""
SELECT id, name, status, updated_at, 'course' as type
FROM courses
WHERE updated_at >= DATE_SUB(NOW(), INTERVAL {hours} HOUR)
ORDER BY updated_at DESC
"""
},
{
'table': 'exams',
'sql': f"""
SELECT id, user_id, course_id, exam_name, score, updated_at, 'exam' as type
FROM exams
WHERE updated_at >= DATE_SUB(NOW(), INTERVAL {hours} HOUR)
ORDER BY updated_at DESC
"""
},
{
'table': 'positions',
'sql': f"""
SELECT id, name, code, status, updated_at, 'position' as type
FROM positions
WHERE updated_at >= DATE_SUB(NOW(), INTERVAL {hours} HOUR)
ORDER BY updated_at DESC
"""
}
]
all_operations = []
for query in queries:
try:
await cursor.execute(query['sql'])
results = await cursor.fetchall()
all_operations.extend(results)
except Exception as e:
logger.warning(f"⚠️ 查询 {query['table']} 表失败: {e}")
await cursor.close()
return all_operations
async def create_data_backup(self, table: str, record_id: int) -> Dict:
"""创建单条记录的备份"""
cursor = await self.connection.cursor(aiomysql.DictCursor)
try:
await cursor.execute(f"SELECT * FROM {table} WHERE id = %s", (record_id,))
record = await cursor.fetchone()
if record:
backup = {
'table': table,
'record_id': record_id,
'data': dict(record),
'backup_time': datetime.now().isoformat()
}
logger.info(f"✅ 已备份 {table} 表记录 ID: {record_id}")
return backup
else:
logger.warning(f"⚠️ 未找到 {table} 表记录 ID: {record_id}")
return None
except Exception as e:
logger.error(f"❌ 备份 {table} 表记录失败: {e}")
return None
finally:
await cursor.close()
async def restore_from_backup(self, backup: Dict) -> bool:
"""从备份恢复数据"""
if not backup:
return False
cursor = await self.connection.cursor()
try:
# 开始事务
await cursor.execute("START TRANSACTION")
table = backup['table']
data = backup['data']
record_id = backup['record_id']
# 构建UPDATE语句
set_clauses = []
values = []
for key, value in data.items():
if key != 'id': # 跳过主键
set_clauses.append(f"`{key}` = %s")
values.append(value)
if set_clauses:
sql = f"UPDATE `{table}` SET {', '.join(set_clauses)} WHERE id = %s"
values.append(record_id)
await cursor.execute(sql, values)
await cursor.execute("COMMIT")
logger.info(f"✅ 已恢复 {table} 表记录 ID: {record_id}")
return True
else:
await cursor.execute("ROLLBACK")
logger.warning(f"⚠️ 没有可恢复的字段")
return False
except Exception as e:
await cursor.execute("ROLLBACK")
logger.error(f"❌ 恢复数据失败: {e}")
return False
finally:
await cursor.close()
async def soft_delete_rollback(self, table: str, record_id: int) -> bool:
"""软删除回滚"""
cursor = await self.connection.cursor()
try:
# 检查表是否有软删除字段
await cursor.execute(f"SHOW COLUMNS FROM {table} LIKE 'is_deleted'")
has_soft_delete = await cursor.fetchone()
if not has_soft_delete:
logger.warning(f"⚠️ {table} 表没有软删除字段")
return False
# 恢复软删除的记录
await cursor.execute("START TRANSACTION")
sql = f"""
UPDATE `{table}`
SET is_deleted = FALSE, deleted_at = NULL
WHERE id = %s
"""
await cursor.execute(sql, (record_id,))
if cursor.rowcount > 0:
await cursor.execute("COMMIT")
logger.info(f"✅ 已恢复软删除记录 {table} ID: {record_id}")
return True
else:
await cursor.execute("ROLLBACK")
logger.warning(f"⚠️ 未找到要恢复的记录 {table} ID: {record_id}")
return False
except Exception as e:
await cursor.execute("ROLLBACK")
logger.error(f"❌ 软删除回滚失败: {e}")
return False
finally:
await cursor.close()
async def rollback_user_operation(self, user_id: int, operation_type: str = "update") -> bool:
"""回滚用户操作"""
logger.info(f"🔄 开始回滚用户操作: ID={user_id}, 类型={operation_type}")
if operation_type == "delete":
return await self.soft_delete_rollback("users", user_id)
else:
# 对于更新操作需要从Binlog或其他方式获取原始数据
logger.warning("⚠️ 用户更新操作回滚需要手动处理")
return False
async def rollback_course_operation(self, course_id: int, operation_type: str = "update") -> bool:
"""回滚课程操作"""
logger.info(f"🔄 开始回滚课程操作: ID={course_id}, 类型={operation_type}")
if operation_type == "delete":
return await self.soft_delete_rollback("courses", course_id)
else:
logger.warning("⚠️ 课程更新操作回滚需要手动处理")
return False
async def rollback_exam_operation(self, exam_id: int) -> bool:
"""回滚考试操作"""
logger.info(f"🔄 开始回滚考试操作: ID={exam_id}")
cursor = await self.connection.cursor()
try:
await cursor.execute("START TRANSACTION")
# 删除考试结果详情
await cursor.execute("DELETE FROM exam_results WHERE exam_id = %s", (exam_id,))
# 删除考试记录
await cursor.execute("DELETE FROM exams WHERE id = %s", (exam_id,))
await cursor.execute("COMMIT")
logger.info(f"✅ 已回滚考试记录 ID: {exam_id}")
return True
except Exception as e:
await cursor.execute("ROLLBACK")
logger.error(f"❌ 考试回滚失败: {e}")
return False
finally:
await cursor.close()
async def rollback_position_operation(self, position_id: int, operation_type: str = "update") -> bool:
"""回滚岗位操作"""
logger.info(f"🔄 开始回滚岗位操作: ID={position_id}, 类型={operation_type}")
if operation_type == "delete":
return await self.soft_delete_rollback("positions", position_id)
else:
logger.warning("⚠️ 岗位更新操作回滚需要手动处理")
return False
async def list_recent_changes(self, hours: int = 24):
"""列出最近的变更"""
logger.info(f"📋 最近 {hours} 小时的数据变更:")
operations = await self.get_recent_operations(hours)
if not operations:
logger.info("📝 没有找到最近的变更记录")
return
# 按类型分组显示
by_type = {}
for op in operations:
op_type = op.get('type', 'unknown')
if op_type not in by_type:
by_type[op_type] = []
by_type[op_type].append(op)
for op_type, ops in by_type.items():
print(f"\n🔸 {op_type.upper()} 类型变更 ({len(ops)} 条):")
print("-" * 60)
for op in ops[:10]: # 只显示前10条
if op_type == 'user':
print(f" ID: {op['id']}, 用户名: {op['username']}, 姓名: {op['full_name']}, 时间: {op['updated_at']}")
elif op_type == 'course':
print(f" ID: {op['id']}, 课程: {op['name']}, 状态: {op['status']}, 时间: {op['updated_at']}")
elif op_type == 'exam':
print(f" ID: {op['id']}, 考试: {op['exam_name']}, 分数: {op['score']}, 时间: {op['updated_at']}")
elif op_type == 'position':
print(f" ID: {op['id']}, 岗位: {op['name']}, 状态: {op['status']}, 时间: {op['updated_at']}")
if len(ops) > 10:
print(f" ... 还有 {len(ops) - 10} 条记录")
async def main():
"""主函数"""
parser = argparse.ArgumentParser(description='考培练系统 - 专用数据库回滚工具')
parser.add_argument('--list', action='store_true', help='列出最近的变更')
parser.add_argument('--hours', type=int, default=24, help='查看最近N小时的变更 (默认24小时)')
parser.add_argument('--rollback-user', type=int, help='回滚用户操作 (用户ID)')
parser.add_argument('--rollback-course', type=int, help='回滚课程操作 (课程ID)')
parser.add_argument('--rollback-exam', type=int, help='回滚考试操作 (考试ID)')
parser.add_argument('--rollback-position', type=int, help='回滚岗位操作 (岗位ID)')
parser.add_argument('--operation-type', choices=['update', 'delete'], default='update', help='操作类型')
parser.add_argument('--execute', action='store_true', help='实际执行回滚')
args = parser.parse_args()
tool = KaopeilianRollbackTool()
try:
await tool.connect()
if args.list:
await tool.list_recent_changes(args.hours)
elif args.rollback_user:
if args.execute:
success = await tool.rollback_user_operation(args.rollback_user, args.operation_type)
if success:
logger.info("✅ 用户回滚完成")
else:
logger.error("❌ 用户回滚失败")
else:
logger.info(f"🔍 模拟回滚用户 ID: {args.rollback_user}, 类型: {args.operation_type}")
logger.info("使用 --execute 参数实际执行")
elif args.rollback_course:
if args.execute:
success = await tool.rollback_course_operation(args.rollback_course, args.operation_type)
if success:
logger.info("✅ 课程回滚完成")
else:
logger.error("❌ 课程回滚失败")
else:
logger.info(f"🔍 模拟回滚课程 ID: {args.rollback_course}, 类型: {args.operation_type}")
logger.info("使用 --execute 参数实际执行")
elif args.rollback_exam:
if args.execute:
success = await tool.rollback_exam_operation(args.rollback_exam)
if success:
logger.info("✅ 考试回滚完成")
else:
logger.error("❌ 考试回滚失败")
else:
logger.info(f"🔍 模拟回滚考试 ID: {args.rollback_exam}")
logger.info("使用 --execute 参数实际执行")
elif args.rollback_position:
if args.execute:
success = await tool.rollback_position_operation(args.rollback_position, args.operation_type)
if success:
logger.info("✅ 岗位回滚完成")
else:
logger.error("❌ 岗位回滚失败")
else:
logger.info(f"🔍 模拟回滚岗位 ID: {args.rollback_position}, 类型: {args.operation_type}")
logger.info("使用 --execute 参数实际执行")
else:
parser.print_help()
except Exception as e:
logger.error(f"❌ 程序执行异常: {e}")
finally:
await tool.close()
if __name__ == "__main__":
asyncio.run(main())