# 考培练系统后端协作机制设计 ## 1. 全局上下文共享机制 ### 1.1 项目上下文管理 ```python # app/core/project_context.py from dataclasses import dataclass from typing import Dict, Any, List import json from pathlib import Path @dataclass class ProjectContext: """项目全局上下文,供所有Agent共享""" # 项目基础信息 project_name: str = "考培练系统" project_version: str = "1.0.0" base_path: Path = Path("/Users/nongjun/Desktop/Ai公司/本地开发与测试") # 技术栈信息 tech_stack: Dict[str, str] = None # 数据库schema database_schema: Dict[str, Any] = None # API规范 api_specifications: Dict[str, Any] = None # 业务规则 business_rules: Dict[str, Any] = None # 模块依赖关系 module_dependencies: Dict[str, List[str]] = None def __post_init__(self): """初始化默认值""" if self.tech_stack is None: self.tech_stack = { "backend": "Python 3.8+ + FastAPI", "frontend": "Vue3 + TypeScript + Element Plus", "database": "MySQL 8.0", "cache": "Redis", "ai_platforms": ["Coze", "Dify"] } if self.module_dependencies is None: self.module_dependencies = { "auth": [], "user": ["auth"], "course": ["auth", "user"], "exam": ["auth", "user", "dify"], "training": ["auth", "user", "coze"], "analytics": ["auth", "user", "exam", "training"], "admin": ["auth", "user"] } @classmethod def load_from_docs(cls, docs_path: Path) -> "ProjectContext": """从项目文档加载上下文""" context = cls() # 加载数据库架构 db_schema_path = docs_path / "数据库架构.md" if db_schema_path.exists(): context.database_schema = cls._parse_database_schema(db_schema_path) # 加载API规范 api_spec_path = docs_path / "技术架构设计.md" if api_spec_path.exists(): context.api_specifications = cls._parse_api_specs(api_spec_path) # 加载业务规则 prd_path = docs_path / "产品需求文档 (PRD).md" if prd_path.exists(): context.business_rules = cls._parse_business_rules(prd_path) return context def to_json(self) -> str: """导出为JSON格式,便于Agent读取""" return json.dumps({ "project_name": self.project_name, "project_version": self.project_version, "tech_stack": self.tech_stack, "module_dependencies": self.module_dependencies, "database_tables": list(self.database_schema.keys()) if self.database_schema else [], "api_endpoints": list(self.api_specifications.keys()) if self.api_specifications else [] }, ensure_ascii=False, indent=2) # 创建全局上下文实例 project_context = ProjectContext.load_from_docs( Path("/Users/nongjun/Desktop/Ai公司/本地开发与测试/考培练系统规划") ) ``` ### 1.2 Agent间通信接口 ```python # app/core/agent_communication.py from abc import ABC, abstractmethod from typing import Any, Dict, List, Optional from datetime import datetime class AgentInterface(ABC): """所有Agent必须实现的接口""" def __init__(self, agent_name: str, module_name: str): self.agent_name = agent_name self.module_name = module_name self.start_time = datetime.now() self.context = project_context @abstractmethod def get_module_info(self) -> Dict[str, Any]: """返回模块基本信息""" pass @abstractmethod def get_api_endpoints(self) -> List[Dict[str, str]]: """返回模块提供的API端点""" pass @abstractmethod def get_dependencies(self) -> List[str]: """返回模块依赖的其他模块""" pass @abstractmethod def get_exported_services(self) -> Dict[str, Any]: """返回模块导出的服务""" pass def validate_integration(self, other_module: str) -> bool: """验证与其他模块的集成是否正确""" dependencies = self.get_dependencies() return other_module in dependencies or self.module_name in project_context.module_dependencies.get(other_module, []) # 示例:Auth模块的Agent实现 class AuthAgent(AgentInterface): def __init__(self): super().__init__("Agent-Auth", "auth") def get_module_info(self) -> Dict[str, Any]: return { "name": self.module_name, "description": "认证授权模块", "version": "1.0.0", "status": "developing", "owner": self.agent_name } def get_api_endpoints(self) -> List[Dict[str, str]]: return [ {"method": "POST", "path": "/api/v1/auth/login", "description": "用户登录"}, {"method": "POST", "path": "/api/v1/auth/register", "description": "用户注册"}, {"method": "POST", "path": "/api/v1/auth/logout", "description": "用户登出"}, {"method": "POST", "path": "/api/v1/auth/refresh", "description": "刷新Token"} ] def get_dependencies(self) -> List[str]: return [] # Auth模块不依赖其他模块 def get_exported_services(self) -> Dict[str, Any]: return { "get_current_user": "获取当前登录用户", "require_admin": "需要管理员权限", "create_access_token": "创建访问令牌", "verify_password": "验证密码" } ``` ## 2. 代码集成策略 ### 2.1 Git分支管理 ```bash # 主分支 main # 生产环境代码 develop # 开发环境主分支 # 功能分支(每个Agent一个) feature/auth # Agent-Auth的开发分支 feature/user # Agent-User的开发分支 feature/course # Agent-Course的开发分支 feature/exam # Agent-Exam的开发分支 feature/training # Agent-Training的开发分支 feature/analytics # Agent-Analytics的开发分支 feature/admin # Agent-Admin的开发分支 feature/coze-integration # Agent-Coze的开发分支 feature/dify-integration # Agent-Dify的开发分支 # 集成分支 integration/daily # 每日集成分支 integration/weekly # 每周集成分支 ``` ### 2.2 代码合并流程 ```yaml # .github/workflows/integration.yml name: Daily Integration on: schedule: - cron: '0 2 * * *' # 每天凌晨2点执行 workflow_dispatch: # 支持手动触发 jobs: integration: runs-on: ubuntu-latest steps: - name: Checkout code uses: actions/checkout@v3 with: ref: develop - name: Create integration branch run: | git checkout -b integration/$(date +%Y%m%d) - name: Merge feature branches run: | # 按依赖顺序合并 for branch in auth user coze-integration dify-integration course exam training analytics admin; do echo "Merging feature/$branch..." git merge --no-ff origin/feature/$branch || true done - name: Run tests run: | make test-all - name: Check code quality run: | make lint make type-check ``` ### 2.3 模块接口契约 ```python # app/contracts/auth_contract.py from typing import Protocol, Optional from app.schemas.user import User class AuthServiceProtocol(Protocol): """Auth模块对外提供的服务契约""" async def authenticate_user(self, username: str, password: str) -> Optional[User]: """验证用户身份""" ... async def create_access_token(self, user_id: int) -> str: """创建访问令牌""" ... async def get_current_user(self, token: str) -> Optional[User]: """根据token获取当前用户""" ... # app/contracts/user_contract.py class UserServiceProtocol(Protocol): """User模块对外提供的服务契约""" async def get_user_by_id(self, user_id: int) -> Optional[User]: """根据ID获取用户""" ... async def get_users_by_team(self, team_id: int) -> List[User]: """获取团队成员""" ... ``` ## 3. 通信与协调机制 ### 3.1 内部消息总线 ```python # app/core/message_bus.py import asyncio from typing import Dict, Any, Callable, List from enum import Enum import json from datetime import datetime class MessageType(Enum): """消息类型""" MODULE_READY = "module.ready" MODULE_ERROR = "module.error" DATA_UPDATED = "data.updated" TASK_COMPLETED = "task.completed" INTEGRATION_REQUEST = "integration.request" class Message: """消息实体""" def __init__( self, type: MessageType, source: str, target: str = "*", # *表示广播 data: Dict[str, Any] = None, correlation_id: str = None ): self.type = type self.source = source self.target = target self.data = data or {} self.correlation_id = correlation_id self.timestamp = datetime.now() def to_json(self) -> str: return json.dumps({ "type": self.type.value, "source": self.source, "target": self.target, "data": self.data, "correlation_id": self.correlation_id, "timestamp": self.timestamp.isoformat() }) class MessageBus: """模块间消息总线""" def __init__(self): self._subscribers: Dict[MessageType, List[Callable]] = {} self._queue: asyncio.Queue = asyncio.Queue() self._running = False def subscribe(self, message_type: MessageType, handler: Callable): """订阅消息""" if message_type not in self._subscribers: self._subscribers[message_type] = [] self._subscribers[message_type].append(handler) async def publish(self, message: Message): """发布消息""" await self._queue.put(message) async def start(self): """启动消息处理""" self._running = True while self._running: try: message = await asyncio.wait_for(self._queue.get(), timeout=1.0) await self._process_message(message) except asyncio.TimeoutError: continue async def _process_message(self, message: Message): """处理消息""" handlers = self._subscribers.get(message.type, []) for handler in handlers: try: await handler(message) except Exception as e: print(f"Error processing message: {e}") # 全局消息总线实例 message_bus = MessageBus() # 使用示例 async def handle_module_ready(message: Message): print(f"Module {message.source} is ready") message_bus.subscribe(MessageType.MODULE_READY, handle_module_ready) # 在Auth模块启动时 await message_bus.publish(Message( type=MessageType.MODULE_READY, source="auth", data={"endpoints": ["/api/v1/auth/login", "/api/v1/auth/register"]} )) ``` ### 3.2 服务注册与发现 ```python # app/core/service_registry.py from typing import Dict, Any, Optional import json class ServiceRegistry: """服务注册中心""" def __init__(self): self._services: Dict[str, Dict[str, Any]] = {} def register(self, service_name: str, service_info: Dict[str, Any]): """注册服务""" self._services[service_name] = { **service_info, "registered_at": datetime.now().isoformat(), "status": "active" } print(f"Service {service_name} registered") def unregister(self, service_name: str): """注销服务""" if service_name in self._services: self._services[service_name]["status"] = "inactive" def get_service(self, service_name: str) -> Optional[Dict[str, Any]]: """获取服务信息""" service = self._services.get(service_name) if service and service["status"] == "active": return service return None def list_services(self) -> Dict[str, Dict[str, Any]]: """列出所有活跃服务""" return { name: info for name, info in self._services.items() if info["status"] == "active" } # 全局服务注册中心 service_registry = ServiceRegistry() # 服务注册示例 service_registry.register("auth", { "module": "auth", "version": "1.0.0", "endpoints": [ "/api/v1/auth/login", "/api/v1/auth/register" ], "dependencies": [], "health_check": "/api/v1/auth/health" }) ``` ## 4. 数据一致性保证 ### 4.1 分布式事务管理 ```python # app/core/transaction_manager.py from typing import List, Callable, Any import asyncio from contextlib import asynccontextmanager class TransactionManager: """分布式事务管理器(使用Saga模式)""" def __init__(self): self._transactions = {} async def execute_saga( self, saga_id: str, steps: List[Dict[str, Any]] ) -> bool: """ 执行Saga事务 Args: saga_id: 事务ID steps: 事务步骤列表,每个步骤包含: - name: 步骤名称 - action: 执行函数 - compensate: 补偿函数 - args: 参数 """ completed_steps = [] try: # 执行所有步骤 for step in steps: result = await step["action"](**step.get("args", {})) completed_steps.append({ "step": step, "result": result }) return True except Exception as e: # 发生错误,执行补偿操作 print(f"Saga {saga_id} failed at step {step['name']}: {e}") # 反向执行补偿操作 for completed in reversed(completed_steps): try: await completed["step"]["compensate"]( completed["result"], **completed["step"].get("args", {}) ) except Exception as comp_error: print(f"Compensation failed: {comp_error}") return False # 使用示例:创建用户并分配课程 async def create_user_and_assign_course(user_data: dict, course_id: int): saga = TransactionManager() # 定义事务步骤 steps = [ { "name": "create_user", "action": user_service.create_user, "compensate": user_service.delete_user, "args": {"data": user_data} }, { "name": "assign_course", "action": course_service.assign_to_user, "compensate": course_service.unassign_from_user, "args": {"course_id": course_id} }, { "name": "init_analytics", "action": analytics_service.init_user_data, "compensate": analytics_service.cleanup_user_data, "args": {} } ] success = await saga.execute_saga("create_user_flow", steps) return success ``` ### 4.2 数据版本控制 ```python # app/core/data_versioning.py from typing import Any, Dict, Optional from datetime import datetime import json class DataVersion: """数据版本控制""" def __init__(self, entity_type: str, entity_id: str): self.entity_type = entity_type self.entity_id = entity_id self._versions = [] def save_version(self, data: Dict[str, Any], author: str): """保存新版本""" version = { "version": len(self._versions) + 1, "data": data, "author": author, "timestamp": datetime.now().isoformat(), "hash": self._calculate_hash(data) } self._versions.append(version) return version["version"] def get_version(self, version_number: int) -> Optional[Dict[str, Any]]: """获取指定版本""" for v in self._versions: if v["version"] == version_number: return v return None def get_latest(self) -> Optional[Dict[str, Any]]: """获取最新版本""" return self._versions[-1] if self._versions else None def _calculate_hash(self, data: Dict[str, Any]) -> str: """计算数据哈希值""" import hashlib json_str = json.dumps(data, sort_keys=True) return hashlib.sha256(json_str.encode()).hexdigest() ``` ## 5. 错误处理与恢复 ### 5.1 统一错误处理 ```python # app/core/error_handling.py from typing import Optional, Dict, Any import traceback from datetime import datetime class ErrorContext: """错误上下文信息""" def __init__( self, module: str, operation: str, user_id: Optional[int] = None, request_id: Optional[str] = None ): self.module = module self.operation = operation self.user_id = user_id self.request_id = request_id self.timestamp = datetime.now() self.additional_data = {} class ErrorHandler: """统一错误处理器""" def __init__(self): self._error_handlers = {} self._error_log = [] def register_handler(self, error_type: type, handler: Callable): """注册错误处理器""" self._error_handlers[error_type] = handler async def handle_error( self, error: Exception, context: ErrorContext ) -> Dict[str, Any]: """处理错误""" error_record = { "error_type": type(error).__name__, "error_message": str(error), "traceback": traceback.format_exc(), "context": { "module": context.module, "operation": context.operation, "user_id": context.user_id, "request_id": context.request_id, "timestamp": context.timestamp.isoformat() } } # 记录错误 self._error_log.append(error_record) # 查找并执行特定的错误处理器 handler = self._error_handlers.get(type(error)) if handler: try: return await handler(error, context) except Exception as handler_error: print(f"Error handler failed: {handler_error}") # 默认处理 return { "handled": False, "error_id": self._generate_error_id(), "user_message": "系统错误,请稍后重试" } def _generate_error_id(self) -> str: """生成错误ID""" import uuid return f"ERR-{uuid.uuid4().hex[:8]}" # 全局错误处理器 error_handler = ErrorHandler() # 注册特定错误处理器 async def handle_database_error(error: Exception, context: ErrorContext): # 数据库错误特殊处理 if "duplicate" in str(error).lower(): return { "handled": True, "user_message": "数据已存在,请勿重复操作" } return {"handled": False} error_handler.register_handler(DatabaseError, handle_database_error) ``` ### 5.2 断路器模式 ```python # app/core/circuit_breaker.py from typing import Callable, Any import asyncio from datetime import datetime, timedelta from enum import Enum class CircuitState(Enum): CLOSED = "closed" # 正常状态 OPEN = "open" # 断开状态 HALF_OPEN = "half_open" # 半开状态 class CircuitBreaker: """断路器实现""" def __init__( self, failure_threshold: int = 5, recovery_timeout: int = 60, expected_exception: type = Exception ): self.failure_threshold = failure_threshold self.recovery_timeout = recovery_timeout self.expected_exception = expected_exception self._failure_count = 0 self._last_failure_time = None self._state = CircuitState.CLOSED async def call(self, func: Callable, *args, **kwargs) -> Any: """通过断路器调用函数""" if self._state == CircuitState.OPEN: if self._should_attempt_reset(): self._state = CircuitState.HALF_OPEN else: raise Exception("Circuit breaker is OPEN") try: result = await func(*args, **kwargs) self._on_success() return result except self.expected_exception as e: self._on_failure() raise e def _on_success(self): """调用成功""" self._failure_count = 0 self._state = CircuitState.CLOSED def _on_failure(self): """调用失败""" self._failure_count += 1 self._last_failure_time = datetime.now() if self._failure_count >= self.failure_threshold: self._state = CircuitState.OPEN def _should_attempt_reset(self) -> bool: """是否应该尝试重置""" return ( self._last_failure_time and datetime.now() - self._last_failure_time > timedelta(seconds=self.recovery_timeout) ) # 使用示例 coze_circuit_breaker = CircuitBreaker( failure_threshold=3, recovery_timeout=30 ) async def call_coze_api(data): return await coze_circuit_breaker.call( coze_client.send_message, data ) ``` ## 6. 监控与日志 ### 6.1 性能监控 ```python # app/core/monitoring.py import time from typing import Dict, Any from functools import wraps import asyncio class PerformanceMonitor: """性能监控器""" def __init__(self): self._metrics = {} def record_metric(self, metric_name: str, value: float, tags: Dict[str, str] = None): """记录指标""" if metric_name not in self._metrics: self._metrics[metric_name] = [] self._metrics[metric_name].append({ "value": value, "timestamp": time.time(), "tags": tags or {} }) def measure_time(self, metric_name: str): """测量执行时间装饰器""" def decorator(func): @wraps(func) async def async_wrapper(*args, **kwargs): start_time = time.time() try: result = await func(*args, **kwargs) execution_time = time.time() - start_time self.record_metric( f"{metric_name}.execution_time", execution_time, {"status": "success"} ) return result except Exception as e: execution_time = time.time() - start_time self.record_metric( f"{metric_name}.execution_time", execution_time, {"status": "error", "error_type": type(e).__name__} ) raise @wraps(func) def sync_wrapper(*args, **kwargs): start_time = time.time() try: result = func(*args, **kwargs) execution_time = time.time() - start_time self.record_metric( f"{metric_name}.execution_time", execution_time, {"status": "success"} ) return result except Exception as e: execution_time = time.time() - start_time self.record_metric( f"{metric_name}.execution_time", execution_time, {"status": "error", "error_type": type(e).__name__} ) raise return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper return decorator def get_metrics_summary(self) -> Dict[str, Any]: """获取指标摘要""" summary = {} for metric_name, values in self._metrics.items(): if values: metric_values = [v["value"] for v in values] summary[metric_name] = { "count": len(values), "min": min(metric_values), "max": max(metric_values), "avg": sum(metric_values) / len(metric_values) } return summary # 全局性能监控器 performance_monitor = PerformanceMonitor() # 使用示例 @performance_monitor.measure_time("api.user.get_by_id") async def get_user_by_id(user_id: int): # 业务逻辑 pass ``` ### 6.2 统一日志管理 ```python # app/core/logging_config.py import logging import sys from pathlib import Path import structlog from pythonjsonlogger import jsonlogger def setup_logging(log_level: str = "INFO", log_dir: Path = None): """配置统一的日志系统""" # 创建日志目录 if log_dir: log_dir.mkdir(parents=True, exist_ok=True) # 配置结构化日志 structlog.configure( processors=[ structlog.stdlib.filter_by_level, structlog.stdlib.add_logger_name, structlog.stdlib.add_log_level, structlog.stdlib.PositionalArgumentsFormatter(), structlog.processors.TimeStamper(fmt="iso"), structlog.processors.StackInfoRenderer(), structlog.processors.format_exc_info, structlog.processors.UnicodeDecoder(), structlog.processors.CallsiteParameterAdder( parameters=[ structlog.processors.CallsiteParameter.FILENAME, structlog.processors.CallsiteParameter.LINENO, structlog.processors.CallsiteParameter.FUNC_NAME, ] ), structlog.processors.dict_tracebacks, structlog.processors.JSONRenderer() ], context_class=dict, logger_factory=structlog.stdlib.LoggerFactory(), wrapper_class=structlog.stdlib.BoundLogger, cache_logger_on_first_use=True, ) # 配置Python标准日志 logging.basicConfig( format="%(message)s", stream=sys.stdout, level=getattr(logging, log_level.upper()) ) # 添加JSON格式化器 json_formatter = jsonlogger.JsonFormatter( "%(timestamp)s %(level)s %(name)s %(message)s", timestamp=True ) # 文件处理器 if log_dir: file_handler = logging.FileHandler(log_dir / "app.log") file_handler.setFormatter(json_formatter) logging.getLogger().addHandler(file_handler) return structlog.get_logger() # 在应用启动时配置日志 logger = setup_logging( log_level="INFO", log_dir=Path("/Users/nongjun/Desktop/Ai公司/本地开发与测试/logs") ) ``` ## 7. 开发工具与脚本 ### 7.1 Makefile配置 ```makefile # Makefile .PHONY: help install test lint format type-check run-dev run-prod clean help: @echo "Available commands:" @echo " make install Install dependencies" @echo " make test Run tests" @echo " make lint Run linters" @echo " make format Format code" @echo " make type-check Run type checking" @echo " make run-dev Run development server" @echo " make run-prod Run production server" @echo " make clean Clean temporary files" install: pip install -r requirements/dev.txt test: pytest tests/ -v --cov=app --cov-report=html test-module: @echo "Testing specific module: $(MODULE)" pytest tests/$(MODULE)/ -v lint: flake8 app/ tests/ pylint app/ format: black app/ tests/ isort app/ tests/ type-check: mypy app/ --strict run-dev: uvicorn app.main:app --reload --host 0.0.0.0 --port 8000 run-prod: gunicorn app.main:app -w 4 -k uvicorn.workers.UvicornWorker clean: find . -type d -name __pycache__ -exec rm -rf {} + find . -type f -name "*.pyc" -delete rm -rf .pytest_cache/ rm -rf .mypy_cache/ rm -rf htmlcov/ rm -rf dist/ rm -rf *.egg-info # 模块特定命令 test-auth: make test-module MODULE=auth test-user: make test-module MODULE=user test-integration: pytest tests/integration/ -v # 数据库命令 db-init: alembic init migrations db-migrate: alembic revision --autogenerate -m "$(MESSAGE)" db-upgrade: alembic upgrade head db-downgrade: alembic downgrade -1 ``` ### 7.2 开发辅助脚本 ```python # scripts/check_module_integration.py """检查模块集成状态""" import asyncio import httpx from typing import Dict, List import json async def check_module_health(base_url: str, modules: List[str]) -> Dict[str, bool]: """检查各模块健康状态""" results = {} async with httpx.AsyncClient() as client: for module in modules: try: response = await client.get(f"{base_url}/api/v1/{module}/health") results[module] = response.status_code == 200 except Exception: results[module] = False return results async def check_module_dependencies(base_url: str) -> Dict[str, List[str]]: """检查模块依赖关系""" async with httpx.AsyncClient() as client: response = await client.get(f"{base_url}/api/v1/system/dependencies") return response.json() async def main(): base_url = "http://localhost:8000" modules = ["auth", "user", "course", "exam", "training", "analytics", "admin"] print("检查模块健康状态...") health_results = await check_module_health(base_url, modules) print("\n模块状态:") for module, status in health_results.items(): status_emoji = "✅" if status else "❌" print(f" {status_emoji} {module}") print("\n检查模块依赖...") dependencies = await check_module_dependencies(base_url) print("\n模块依赖关系:") for module, deps in dependencies.items(): print(f" {module}: {', '.join(deps) if deps else '无依赖'}") if __name__ == "__main__": asyncio.run(main()) ``` ## 8. 协作检查清单 每个Agent在开发过程中需要确保: ### 开发前 - [ ] 已阅读并理解项目上下文文档 - [ ] 已了解所负责模块的依赖关系 - [ ] 已查看相关模块的接口契约 - [ ] 已配置开发环境并通过测试 ### 开发中 - [ ] 遵循统一的代码规范 - [ ] 实现规定的接口契约 - [ ] 编写必要的单元测试 - [ ] 记录关键操作日志 - [ ] 处理异常情况 ### 集成前 - [ ] 通过所有代码质量检查 - [ ] 完成API文档编写 - [ ] 与依赖模块进行集成测试 - [ ] 更新模块状态到服务注册中心 ### 集成后 - [ ] 确认集成测试通过 - [ ] 监控模块运行状态 - [ ] 及时响应其他模块的问题 - [ ] 参与每日站会同步进展