- 从服务器拉取完整代码 - 按框架规范整理项目结构 - 配置 Drone CI 测试环境部署 - 包含后端(FastAPI)、前端(Vue3)、管理端 技术栈: Vue3 + TypeScript + FastAPI + MySQL
1067 lines
31 KiB
Markdown
1067 lines
31 KiB
Markdown
# 考培练系统后端协作机制设计
|
||
|
||
## 1. 全局上下文共享机制
|
||
|
||
### 1.1 项目上下文管理
|
||
```python
|
||
# app/core/project_context.py
|
||
from dataclasses import dataclass
|
||
from typing import Dict, Any, List
|
||
import json
|
||
from pathlib import Path
|
||
|
||
@dataclass
|
||
class ProjectContext:
|
||
"""项目全局上下文,供所有Agent共享"""
|
||
|
||
# 项目基础信息
|
||
project_name: str = "考培练系统"
|
||
project_version: str = "1.0.0"
|
||
base_path: Path = Path("/Users/nongjun/Desktop/Ai公司/本地开发与测试")
|
||
|
||
# 技术栈信息
|
||
tech_stack: Dict[str, str] = None
|
||
|
||
# 数据库schema
|
||
database_schema: Dict[str, Any] = None
|
||
|
||
# API规范
|
||
api_specifications: Dict[str, Any] = None
|
||
|
||
# 业务规则
|
||
business_rules: Dict[str, Any] = None
|
||
|
||
# 模块依赖关系
|
||
module_dependencies: Dict[str, List[str]] = None
|
||
|
||
def __post_init__(self):
|
||
"""初始化默认值"""
|
||
if self.tech_stack is None:
|
||
self.tech_stack = {
|
||
"backend": "Python 3.8+ + FastAPI",
|
||
"frontend": "Vue3 + TypeScript + Element Plus",
|
||
"database": "MySQL 8.0",
|
||
"cache": "Redis",
|
||
"ai_platforms": ["Coze", "Dify"]
|
||
}
|
||
|
||
if self.module_dependencies is None:
|
||
self.module_dependencies = {
|
||
"auth": [],
|
||
"user": ["auth"],
|
||
"course": ["auth", "user"],
|
||
"exam": ["auth", "user", "dify"],
|
||
"training": ["auth", "user", "coze"],
|
||
"analytics": ["auth", "user", "exam", "training"],
|
||
"admin": ["auth", "user"]
|
||
}
|
||
|
||
@classmethod
|
||
def load_from_docs(cls, docs_path: Path) -> "ProjectContext":
|
||
"""从项目文档加载上下文"""
|
||
context = cls()
|
||
|
||
# 加载数据库架构
|
||
db_schema_path = docs_path / "数据库架构.md"
|
||
if db_schema_path.exists():
|
||
context.database_schema = cls._parse_database_schema(db_schema_path)
|
||
|
||
# 加载API规范
|
||
api_spec_path = docs_path / "技术架构设计.md"
|
||
if api_spec_path.exists():
|
||
context.api_specifications = cls._parse_api_specs(api_spec_path)
|
||
|
||
# 加载业务规则
|
||
prd_path = docs_path / "产品需求文档 (PRD).md"
|
||
if prd_path.exists():
|
||
context.business_rules = cls._parse_business_rules(prd_path)
|
||
|
||
return context
|
||
|
||
def to_json(self) -> str:
|
||
"""导出为JSON格式,便于Agent读取"""
|
||
return json.dumps({
|
||
"project_name": self.project_name,
|
||
"project_version": self.project_version,
|
||
"tech_stack": self.tech_stack,
|
||
"module_dependencies": self.module_dependencies,
|
||
"database_tables": list(self.database_schema.keys()) if self.database_schema else [],
|
||
"api_endpoints": list(self.api_specifications.keys()) if self.api_specifications else []
|
||
}, ensure_ascii=False, indent=2)
|
||
|
||
# 创建全局上下文实例
|
||
project_context = ProjectContext.load_from_docs(
|
||
Path("/Users/nongjun/Desktop/Ai公司/本地开发与测试/考培练系统规划")
|
||
)
|
||
```
|
||
|
||
### 1.2 Agent间通信接口
|
||
```python
|
||
# app/core/agent_communication.py
|
||
from abc import ABC, abstractmethod
|
||
from typing import Any, Dict, List, Optional
|
||
from datetime import datetime
|
||
|
||
class AgentInterface(ABC):
|
||
"""所有Agent必须实现的接口"""
|
||
|
||
def __init__(self, agent_name: str, module_name: str):
|
||
self.agent_name = agent_name
|
||
self.module_name = module_name
|
||
self.start_time = datetime.now()
|
||
self.context = project_context
|
||
|
||
@abstractmethod
|
||
def get_module_info(self) -> Dict[str, Any]:
|
||
"""返回模块基本信息"""
|
||
pass
|
||
|
||
@abstractmethod
|
||
def get_api_endpoints(self) -> List[Dict[str, str]]:
|
||
"""返回模块提供的API端点"""
|
||
pass
|
||
|
||
@abstractmethod
|
||
def get_dependencies(self) -> List[str]:
|
||
"""返回模块依赖的其他模块"""
|
||
pass
|
||
|
||
@abstractmethod
|
||
def get_exported_services(self) -> Dict[str, Any]:
|
||
"""返回模块导出的服务"""
|
||
pass
|
||
|
||
def validate_integration(self, other_module: str) -> bool:
|
||
"""验证与其他模块的集成是否正确"""
|
||
dependencies = self.get_dependencies()
|
||
return other_module in dependencies or self.module_name in project_context.module_dependencies.get(other_module, [])
|
||
|
||
# 示例:Auth模块的Agent实现
|
||
class AuthAgent(AgentInterface):
|
||
def __init__(self):
|
||
super().__init__("Agent-Auth", "auth")
|
||
|
||
def get_module_info(self) -> Dict[str, Any]:
|
||
return {
|
||
"name": self.module_name,
|
||
"description": "认证授权模块",
|
||
"version": "1.0.0",
|
||
"status": "developing",
|
||
"owner": self.agent_name
|
||
}
|
||
|
||
def get_api_endpoints(self) -> List[Dict[str, str]]:
|
||
return [
|
||
{"method": "POST", "path": "/api/v1/auth/login", "description": "用户登录"},
|
||
{"method": "POST", "path": "/api/v1/auth/register", "description": "用户注册"},
|
||
{"method": "POST", "path": "/api/v1/auth/logout", "description": "用户登出"},
|
||
{"method": "POST", "path": "/api/v1/auth/refresh", "description": "刷新Token"}
|
||
]
|
||
|
||
def get_dependencies(self) -> List[str]:
|
||
return [] # Auth模块不依赖其他模块
|
||
|
||
def get_exported_services(self) -> Dict[str, Any]:
|
||
return {
|
||
"get_current_user": "获取当前登录用户",
|
||
"require_admin": "需要管理员权限",
|
||
"create_access_token": "创建访问令牌",
|
||
"verify_password": "验证密码"
|
||
}
|
||
```
|
||
|
||
## 2. 代码集成策略
|
||
|
||
### 2.1 Git分支管理
|
||
```bash
|
||
# 主分支
|
||
main # 生产环境代码
|
||
develop # 开发环境主分支
|
||
|
||
# 功能分支(每个Agent一个)
|
||
feature/auth # Agent-Auth的开发分支
|
||
feature/user # Agent-User的开发分支
|
||
feature/course # Agent-Course的开发分支
|
||
feature/exam # Agent-Exam的开发分支
|
||
feature/training # Agent-Training的开发分支
|
||
feature/analytics # Agent-Analytics的开发分支
|
||
feature/admin # Agent-Admin的开发分支
|
||
feature/coze-integration # Agent-Coze的开发分支
|
||
feature/dify-integration # Agent-Dify的开发分支
|
||
|
||
# 集成分支
|
||
integration/daily # 每日集成分支
|
||
integration/weekly # 每周集成分支
|
||
```
|
||
|
||
### 2.2 代码合并流程
|
||
```yaml
|
||
# .github/workflows/integration.yml
|
||
name: Daily Integration
|
||
|
||
on:
|
||
schedule:
|
||
- cron: '0 2 * * *' # 每天凌晨2点执行
|
||
workflow_dispatch: # 支持手动触发
|
||
|
||
jobs:
|
||
integration:
|
||
runs-on: ubuntu-latest
|
||
steps:
|
||
- name: Checkout code
|
||
uses: actions/checkout@v3
|
||
with:
|
||
ref: develop
|
||
|
||
- name: Create integration branch
|
||
run: |
|
||
git checkout -b integration/$(date +%Y%m%d)
|
||
|
||
- name: Merge feature branches
|
||
run: |
|
||
# 按依赖顺序合并
|
||
for branch in auth user coze-integration dify-integration course exam training analytics admin; do
|
||
echo "Merging feature/$branch..."
|
||
git merge --no-ff origin/feature/$branch || true
|
||
done
|
||
|
||
- name: Run tests
|
||
run: |
|
||
make test-all
|
||
|
||
- name: Check code quality
|
||
run: |
|
||
make lint
|
||
make type-check
|
||
```
|
||
|
||
### 2.3 模块接口契约
|
||
```python
|
||
# app/contracts/auth_contract.py
|
||
from typing import Protocol, Optional
|
||
from app.schemas.user import User
|
||
|
||
class AuthServiceProtocol(Protocol):
|
||
"""Auth模块对外提供的服务契约"""
|
||
|
||
async def authenticate_user(self, username: str, password: str) -> Optional[User]:
|
||
"""验证用户身份"""
|
||
...
|
||
|
||
async def create_access_token(self, user_id: int) -> str:
|
||
"""创建访问令牌"""
|
||
...
|
||
|
||
async def get_current_user(self, token: str) -> Optional[User]:
|
||
"""根据token获取当前用户"""
|
||
...
|
||
|
||
# app/contracts/user_contract.py
|
||
class UserServiceProtocol(Protocol):
|
||
"""User模块对外提供的服务契约"""
|
||
|
||
async def get_user_by_id(self, user_id: int) -> Optional[User]:
|
||
"""根据ID获取用户"""
|
||
...
|
||
|
||
async def get_users_by_team(self, team_id: int) -> List[User]:
|
||
"""获取团队成员"""
|
||
...
|
||
```
|
||
|
||
## 3. 通信与协调机制
|
||
|
||
### 3.1 内部消息总线
|
||
```python
|
||
# app/core/message_bus.py
|
||
import asyncio
|
||
from typing import Dict, Any, Callable, List
|
||
from enum import Enum
|
||
import json
|
||
from datetime import datetime
|
||
|
||
class MessageType(Enum):
|
||
"""消息类型"""
|
||
MODULE_READY = "module.ready"
|
||
MODULE_ERROR = "module.error"
|
||
DATA_UPDATED = "data.updated"
|
||
TASK_COMPLETED = "task.completed"
|
||
INTEGRATION_REQUEST = "integration.request"
|
||
|
||
class Message:
|
||
"""消息实体"""
|
||
def __init__(
|
||
self,
|
||
type: MessageType,
|
||
source: str,
|
||
target: str = "*", # *表示广播
|
||
data: Dict[str, Any] = None,
|
||
correlation_id: str = None
|
||
):
|
||
self.type = type
|
||
self.source = source
|
||
self.target = target
|
||
self.data = data or {}
|
||
self.correlation_id = correlation_id
|
||
self.timestamp = datetime.now()
|
||
|
||
def to_json(self) -> str:
|
||
return json.dumps({
|
||
"type": self.type.value,
|
||
"source": self.source,
|
||
"target": self.target,
|
||
"data": self.data,
|
||
"correlation_id": self.correlation_id,
|
||
"timestamp": self.timestamp.isoformat()
|
||
})
|
||
|
||
class MessageBus:
|
||
"""模块间消息总线"""
|
||
|
||
def __init__(self):
|
||
self._subscribers: Dict[MessageType, List[Callable]] = {}
|
||
self._queue: asyncio.Queue = asyncio.Queue()
|
||
self._running = False
|
||
|
||
def subscribe(self, message_type: MessageType, handler: Callable):
|
||
"""订阅消息"""
|
||
if message_type not in self._subscribers:
|
||
self._subscribers[message_type] = []
|
||
self._subscribers[message_type].append(handler)
|
||
|
||
async def publish(self, message: Message):
|
||
"""发布消息"""
|
||
await self._queue.put(message)
|
||
|
||
async def start(self):
|
||
"""启动消息处理"""
|
||
self._running = True
|
||
while self._running:
|
||
try:
|
||
message = await asyncio.wait_for(self._queue.get(), timeout=1.0)
|
||
await self._process_message(message)
|
||
except asyncio.TimeoutError:
|
||
continue
|
||
|
||
async def _process_message(self, message: Message):
|
||
"""处理消息"""
|
||
handlers = self._subscribers.get(message.type, [])
|
||
for handler in handlers:
|
||
try:
|
||
await handler(message)
|
||
except Exception as e:
|
||
print(f"Error processing message: {e}")
|
||
|
||
# 全局消息总线实例
|
||
message_bus = MessageBus()
|
||
|
||
# 使用示例
|
||
async def handle_module_ready(message: Message):
|
||
print(f"Module {message.source} is ready")
|
||
|
||
message_bus.subscribe(MessageType.MODULE_READY, handle_module_ready)
|
||
|
||
# 在Auth模块启动时
|
||
await message_bus.publish(Message(
|
||
type=MessageType.MODULE_READY,
|
||
source="auth",
|
||
data={"endpoints": ["/api/v1/auth/login", "/api/v1/auth/register"]}
|
||
))
|
||
```
|
||
|
||
### 3.2 服务注册与发现
|
||
```python
|
||
# app/core/service_registry.py
|
||
from typing import Dict, Any, Optional
|
||
import json
|
||
|
||
class ServiceRegistry:
|
||
"""服务注册中心"""
|
||
|
||
def __init__(self):
|
||
self._services: Dict[str, Dict[str, Any]] = {}
|
||
|
||
def register(self, service_name: str, service_info: Dict[str, Any]):
|
||
"""注册服务"""
|
||
self._services[service_name] = {
|
||
**service_info,
|
||
"registered_at": datetime.now().isoformat(),
|
||
"status": "active"
|
||
}
|
||
print(f"Service {service_name} registered")
|
||
|
||
def unregister(self, service_name: str):
|
||
"""注销服务"""
|
||
if service_name in self._services:
|
||
self._services[service_name]["status"] = "inactive"
|
||
|
||
def get_service(self, service_name: str) -> Optional[Dict[str, Any]]:
|
||
"""获取服务信息"""
|
||
service = self._services.get(service_name)
|
||
if service and service["status"] == "active":
|
||
return service
|
||
return None
|
||
|
||
def list_services(self) -> Dict[str, Dict[str, Any]]:
|
||
"""列出所有活跃服务"""
|
||
return {
|
||
name: info
|
||
for name, info in self._services.items()
|
||
if info["status"] == "active"
|
||
}
|
||
|
||
# 全局服务注册中心
|
||
service_registry = ServiceRegistry()
|
||
|
||
# 服务注册示例
|
||
service_registry.register("auth", {
|
||
"module": "auth",
|
||
"version": "1.0.0",
|
||
"endpoints": [
|
||
"/api/v1/auth/login",
|
||
"/api/v1/auth/register"
|
||
],
|
||
"dependencies": [],
|
||
"health_check": "/api/v1/auth/health"
|
||
})
|
||
```
|
||
|
||
## 4. 数据一致性保证
|
||
|
||
### 4.1 分布式事务管理
|
||
```python
|
||
# app/core/transaction_manager.py
|
||
from typing import List, Callable, Any
|
||
import asyncio
|
||
from contextlib import asynccontextmanager
|
||
|
||
class TransactionManager:
|
||
"""分布式事务管理器(使用Saga模式)"""
|
||
|
||
def __init__(self):
|
||
self._transactions = {}
|
||
|
||
async def execute_saga(
|
||
self,
|
||
saga_id: str,
|
||
steps: List[Dict[str, Any]]
|
||
) -> bool:
|
||
"""
|
||
执行Saga事务
|
||
|
||
Args:
|
||
saga_id: 事务ID
|
||
steps: 事务步骤列表,每个步骤包含:
|
||
- name: 步骤名称
|
||
- action: 执行函数
|
||
- compensate: 补偿函数
|
||
- args: 参数
|
||
"""
|
||
completed_steps = []
|
||
|
||
try:
|
||
# 执行所有步骤
|
||
for step in steps:
|
||
result = await step["action"](**step.get("args", {}))
|
||
completed_steps.append({
|
||
"step": step,
|
||
"result": result
|
||
})
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
# 发生错误,执行补偿操作
|
||
print(f"Saga {saga_id} failed at step {step['name']}: {e}")
|
||
|
||
# 反向执行补偿操作
|
||
for completed in reversed(completed_steps):
|
||
try:
|
||
await completed["step"]["compensate"](
|
||
completed["result"],
|
||
**completed["step"].get("args", {})
|
||
)
|
||
except Exception as comp_error:
|
||
print(f"Compensation failed: {comp_error}")
|
||
|
||
return False
|
||
|
||
# 使用示例:创建用户并分配课程
|
||
async def create_user_and_assign_course(user_data: dict, course_id: int):
|
||
saga = TransactionManager()
|
||
|
||
# 定义事务步骤
|
||
steps = [
|
||
{
|
||
"name": "create_user",
|
||
"action": user_service.create_user,
|
||
"compensate": user_service.delete_user,
|
||
"args": {"data": user_data}
|
||
},
|
||
{
|
||
"name": "assign_course",
|
||
"action": course_service.assign_to_user,
|
||
"compensate": course_service.unassign_from_user,
|
||
"args": {"course_id": course_id}
|
||
},
|
||
{
|
||
"name": "init_analytics",
|
||
"action": analytics_service.init_user_data,
|
||
"compensate": analytics_service.cleanup_user_data,
|
||
"args": {}
|
||
}
|
||
]
|
||
|
||
success = await saga.execute_saga("create_user_flow", steps)
|
||
return success
|
||
```
|
||
|
||
### 4.2 数据版本控制
|
||
```python
|
||
# app/core/data_versioning.py
|
||
from typing import Any, Dict, Optional
|
||
from datetime import datetime
|
||
import json
|
||
|
||
class DataVersion:
|
||
"""数据版本控制"""
|
||
|
||
def __init__(self, entity_type: str, entity_id: str):
|
||
self.entity_type = entity_type
|
||
self.entity_id = entity_id
|
||
self._versions = []
|
||
|
||
def save_version(self, data: Dict[str, Any], author: str):
|
||
"""保存新版本"""
|
||
version = {
|
||
"version": len(self._versions) + 1,
|
||
"data": data,
|
||
"author": author,
|
||
"timestamp": datetime.now().isoformat(),
|
||
"hash": self._calculate_hash(data)
|
||
}
|
||
self._versions.append(version)
|
||
return version["version"]
|
||
|
||
def get_version(self, version_number: int) -> Optional[Dict[str, Any]]:
|
||
"""获取指定版本"""
|
||
for v in self._versions:
|
||
if v["version"] == version_number:
|
||
return v
|
||
return None
|
||
|
||
def get_latest(self) -> Optional[Dict[str, Any]]:
|
||
"""获取最新版本"""
|
||
return self._versions[-1] if self._versions else None
|
||
|
||
def _calculate_hash(self, data: Dict[str, Any]) -> str:
|
||
"""计算数据哈希值"""
|
||
import hashlib
|
||
json_str = json.dumps(data, sort_keys=True)
|
||
return hashlib.sha256(json_str.encode()).hexdigest()
|
||
```
|
||
|
||
## 5. 错误处理与恢复
|
||
|
||
### 5.1 统一错误处理
|
||
```python
|
||
# app/core/error_handling.py
|
||
from typing import Optional, Dict, Any
|
||
import traceback
|
||
from datetime import datetime
|
||
|
||
class ErrorContext:
|
||
"""错误上下文信息"""
|
||
def __init__(
|
||
self,
|
||
module: str,
|
||
operation: str,
|
||
user_id: Optional[int] = None,
|
||
request_id: Optional[str] = None
|
||
):
|
||
self.module = module
|
||
self.operation = operation
|
||
self.user_id = user_id
|
||
self.request_id = request_id
|
||
self.timestamp = datetime.now()
|
||
self.additional_data = {}
|
||
|
||
class ErrorHandler:
|
||
"""统一错误处理器"""
|
||
|
||
def __init__(self):
|
||
self._error_handlers = {}
|
||
self._error_log = []
|
||
|
||
def register_handler(self, error_type: type, handler: Callable):
|
||
"""注册错误处理器"""
|
||
self._error_handlers[error_type] = handler
|
||
|
||
async def handle_error(
|
||
self,
|
||
error: Exception,
|
||
context: ErrorContext
|
||
) -> Dict[str, Any]:
|
||
"""处理错误"""
|
||
error_record = {
|
||
"error_type": type(error).__name__,
|
||
"error_message": str(error),
|
||
"traceback": traceback.format_exc(),
|
||
"context": {
|
||
"module": context.module,
|
||
"operation": context.operation,
|
||
"user_id": context.user_id,
|
||
"request_id": context.request_id,
|
||
"timestamp": context.timestamp.isoformat()
|
||
}
|
||
}
|
||
|
||
# 记录错误
|
||
self._error_log.append(error_record)
|
||
|
||
# 查找并执行特定的错误处理器
|
||
handler = self._error_handlers.get(type(error))
|
||
if handler:
|
||
try:
|
||
return await handler(error, context)
|
||
except Exception as handler_error:
|
||
print(f"Error handler failed: {handler_error}")
|
||
|
||
# 默认处理
|
||
return {
|
||
"handled": False,
|
||
"error_id": self._generate_error_id(),
|
||
"user_message": "系统错误,请稍后重试"
|
||
}
|
||
|
||
def _generate_error_id(self) -> str:
|
||
"""生成错误ID"""
|
||
import uuid
|
||
return f"ERR-{uuid.uuid4().hex[:8]}"
|
||
|
||
# 全局错误处理器
|
||
error_handler = ErrorHandler()
|
||
|
||
# 注册特定错误处理器
|
||
async def handle_database_error(error: Exception, context: ErrorContext):
|
||
# 数据库错误特殊处理
|
||
if "duplicate" in str(error).lower():
|
||
return {
|
||
"handled": True,
|
||
"user_message": "数据已存在,请勿重复操作"
|
||
}
|
||
return {"handled": False}
|
||
|
||
error_handler.register_handler(DatabaseError, handle_database_error)
|
||
```
|
||
|
||
### 5.2 断路器模式
|
||
```python
|
||
# app/core/circuit_breaker.py
|
||
from typing import Callable, Any
|
||
import asyncio
|
||
from datetime import datetime, timedelta
|
||
from enum import Enum
|
||
|
||
class CircuitState(Enum):
|
||
CLOSED = "closed" # 正常状态
|
||
OPEN = "open" # 断开状态
|
||
HALF_OPEN = "half_open" # 半开状态
|
||
|
||
class CircuitBreaker:
|
||
"""断路器实现"""
|
||
|
||
def __init__(
|
||
self,
|
||
failure_threshold: int = 5,
|
||
recovery_timeout: int = 60,
|
||
expected_exception: type = Exception
|
||
):
|
||
self.failure_threshold = failure_threshold
|
||
self.recovery_timeout = recovery_timeout
|
||
self.expected_exception = expected_exception
|
||
|
||
self._failure_count = 0
|
||
self._last_failure_time = None
|
||
self._state = CircuitState.CLOSED
|
||
|
||
async def call(self, func: Callable, *args, **kwargs) -> Any:
|
||
"""通过断路器调用函数"""
|
||
if self._state == CircuitState.OPEN:
|
||
if self._should_attempt_reset():
|
||
self._state = CircuitState.HALF_OPEN
|
||
else:
|
||
raise Exception("Circuit breaker is OPEN")
|
||
|
||
try:
|
||
result = await func(*args, **kwargs)
|
||
self._on_success()
|
||
return result
|
||
except self.expected_exception as e:
|
||
self._on_failure()
|
||
raise e
|
||
|
||
def _on_success(self):
|
||
"""调用成功"""
|
||
self._failure_count = 0
|
||
self._state = CircuitState.CLOSED
|
||
|
||
def _on_failure(self):
|
||
"""调用失败"""
|
||
self._failure_count += 1
|
||
self._last_failure_time = datetime.now()
|
||
|
||
if self._failure_count >= self.failure_threshold:
|
||
self._state = CircuitState.OPEN
|
||
|
||
def _should_attempt_reset(self) -> bool:
|
||
"""是否应该尝试重置"""
|
||
return (
|
||
self._last_failure_time and
|
||
datetime.now() - self._last_failure_time > timedelta(seconds=self.recovery_timeout)
|
||
)
|
||
|
||
# 使用示例
|
||
coze_circuit_breaker = CircuitBreaker(
|
||
failure_threshold=3,
|
||
recovery_timeout=30
|
||
)
|
||
|
||
async def call_coze_api(data):
|
||
return await coze_circuit_breaker.call(
|
||
coze_client.send_message,
|
||
data
|
||
)
|
||
```
|
||
|
||
## 6. 监控与日志
|
||
|
||
### 6.1 性能监控
|
||
```python
|
||
# app/core/monitoring.py
|
||
import time
|
||
from typing import Dict, Any
|
||
from functools import wraps
|
||
import asyncio
|
||
|
||
class PerformanceMonitor:
|
||
"""性能监控器"""
|
||
|
||
def __init__(self):
|
||
self._metrics = {}
|
||
|
||
def record_metric(self, metric_name: str, value: float, tags: Dict[str, str] = None):
|
||
"""记录指标"""
|
||
if metric_name not in self._metrics:
|
||
self._metrics[metric_name] = []
|
||
|
||
self._metrics[metric_name].append({
|
||
"value": value,
|
||
"timestamp": time.time(),
|
||
"tags": tags or {}
|
||
})
|
||
|
||
def measure_time(self, metric_name: str):
|
||
"""测量执行时间装饰器"""
|
||
def decorator(func):
|
||
@wraps(func)
|
||
async def async_wrapper(*args, **kwargs):
|
||
start_time = time.time()
|
||
try:
|
||
result = await func(*args, **kwargs)
|
||
execution_time = time.time() - start_time
|
||
self.record_metric(
|
||
f"{metric_name}.execution_time",
|
||
execution_time,
|
||
{"status": "success"}
|
||
)
|
||
return result
|
||
except Exception as e:
|
||
execution_time = time.time() - start_time
|
||
self.record_metric(
|
||
f"{metric_name}.execution_time",
|
||
execution_time,
|
||
{"status": "error", "error_type": type(e).__name__}
|
||
)
|
||
raise
|
||
|
||
@wraps(func)
|
||
def sync_wrapper(*args, **kwargs):
|
||
start_time = time.time()
|
||
try:
|
||
result = func(*args, **kwargs)
|
||
execution_time = time.time() - start_time
|
||
self.record_metric(
|
||
f"{metric_name}.execution_time",
|
||
execution_time,
|
||
{"status": "success"}
|
||
)
|
||
return result
|
||
except Exception as e:
|
||
execution_time = time.time() - start_time
|
||
self.record_metric(
|
||
f"{metric_name}.execution_time",
|
||
execution_time,
|
||
{"status": "error", "error_type": type(e).__name__}
|
||
)
|
||
raise
|
||
|
||
return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper
|
||
return decorator
|
||
|
||
def get_metrics_summary(self) -> Dict[str, Any]:
|
||
"""获取指标摘要"""
|
||
summary = {}
|
||
for metric_name, values in self._metrics.items():
|
||
if values:
|
||
metric_values = [v["value"] for v in values]
|
||
summary[metric_name] = {
|
||
"count": len(values),
|
||
"min": min(metric_values),
|
||
"max": max(metric_values),
|
||
"avg": sum(metric_values) / len(metric_values)
|
||
}
|
||
return summary
|
||
|
||
# 全局性能监控器
|
||
performance_monitor = PerformanceMonitor()
|
||
|
||
# 使用示例
|
||
@performance_monitor.measure_time("api.user.get_by_id")
|
||
async def get_user_by_id(user_id: int):
|
||
# 业务逻辑
|
||
pass
|
||
```
|
||
|
||
### 6.2 统一日志管理
|
||
```python
|
||
# app/core/logging_config.py
|
||
import logging
|
||
import sys
|
||
from pathlib import Path
|
||
import structlog
|
||
from pythonjsonlogger import jsonlogger
|
||
|
||
def setup_logging(log_level: str = "INFO", log_dir: Path = None):
|
||
"""配置统一的日志系统"""
|
||
|
||
# 创建日志目录
|
||
if log_dir:
|
||
log_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
# 配置结构化日志
|
||
structlog.configure(
|
||
processors=[
|
||
structlog.stdlib.filter_by_level,
|
||
structlog.stdlib.add_logger_name,
|
||
structlog.stdlib.add_log_level,
|
||
structlog.stdlib.PositionalArgumentsFormatter(),
|
||
structlog.processors.TimeStamper(fmt="iso"),
|
||
structlog.processors.StackInfoRenderer(),
|
||
structlog.processors.format_exc_info,
|
||
structlog.processors.UnicodeDecoder(),
|
||
structlog.processors.CallsiteParameterAdder(
|
||
parameters=[
|
||
structlog.processors.CallsiteParameter.FILENAME,
|
||
structlog.processors.CallsiteParameter.LINENO,
|
||
structlog.processors.CallsiteParameter.FUNC_NAME,
|
||
]
|
||
),
|
||
structlog.processors.dict_tracebacks,
|
||
structlog.processors.JSONRenderer()
|
||
],
|
||
context_class=dict,
|
||
logger_factory=structlog.stdlib.LoggerFactory(),
|
||
wrapper_class=structlog.stdlib.BoundLogger,
|
||
cache_logger_on_first_use=True,
|
||
)
|
||
|
||
# 配置Python标准日志
|
||
logging.basicConfig(
|
||
format="%(message)s",
|
||
stream=sys.stdout,
|
||
level=getattr(logging, log_level.upper())
|
||
)
|
||
|
||
# 添加JSON格式化器
|
||
json_formatter = jsonlogger.JsonFormatter(
|
||
"%(timestamp)s %(level)s %(name)s %(message)s",
|
||
timestamp=True
|
||
)
|
||
|
||
# 文件处理器
|
||
if log_dir:
|
||
file_handler = logging.FileHandler(log_dir / "app.log")
|
||
file_handler.setFormatter(json_formatter)
|
||
logging.getLogger().addHandler(file_handler)
|
||
|
||
return structlog.get_logger()
|
||
|
||
# 在应用启动时配置日志
|
||
logger = setup_logging(
|
||
log_level="INFO",
|
||
log_dir=Path("/Users/nongjun/Desktop/Ai公司/本地开发与测试/logs")
|
||
)
|
||
```
|
||
|
||
## 7. 开发工具与脚本
|
||
|
||
### 7.1 Makefile配置
|
||
```makefile
|
||
# Makefile
|
||
.PHONY: help install test lint format type-check run-dev run-prod clean
|
||
|
||
help:
|
||
@echo "Available commands:"
|
||
@echo " make install Install dependencies"
|
||
@echo " make test Run tests"
|
||
@echo " make lint Run linters"
|
||
@echo " make format Format code"
|
||
@echo " make type-check Run type checking"
|
||
@echo " make run-dev Run development server"
|
||
@echo " make run-prod Run production server"
|
||
@echo " make clean Clean temporary files"
|
||
|
||
install:
|
||
pip install -r requirements/dev.txt
|
||
|
||
test:
|
||
pytest tests/ -v --cov=app --cov-report=html
|
||
|
||
test-module:
|
||
@echo "Testing specific module: $(MODULE)"
|
||
pytest tests/$(MODULE)/ -v
|
||
|
||
lint:
|
||
flake8 app/ tests/
|
||
pylint app/
|
||
|
||
format:
|
||
black app/ tests/
|
||
isort app/ tests/
|
||
|
||
type-check:
|
||
mypy app/ --strict
|
||
|
||
run-dev:
|
||
uvicorn app.main:app --reload --host 0.0.0.0 --port 8000
|
||
|
||
run-prod:
|
||
gunicorn app.main:app -w 4 -k uvicorn.workers.UvicornWorker
|
||
|
||
clean:
|
||
find . -type d -name __pycache__ -exec rm -rf {} +
|
||
find . -type f -name "*.pyc" -delete
|
||
rm -rf .pytest_cache/
|
||
rm -rf .mypy_cache/
|
||
rm -rf htmlcov/
|
||
rm -rf dist/
|
||
rm -rf *.egg-info
|
||
|
||
# 模块特定命令
|
||
test-auth:
|
||
make test-module MODULE=auth
|
||
|
||
test-user:
|
||
make test-module MODULE=user
|
||
|
||
test-integration:
|
||
pytest tests/integration/ -v
|
||
|
||
# 数据库命令
|
||
db-init:
|
||
alembic init migrations
|
||
|
||
db-migrate:
|
||
alembic revision --autogenerate -m "$(MESSAGE)"
|
||
|
||
db-upgrade:
|
||
alembic upgrade head
|
||
|
||
db-downgrade:
|
||
alembic downgrade -1
|
||
```
|
||
|
||
### 7.2 开发辅助脚本
|
||
```python
|
||
# scripts/check_module_integration.py
|
||
"""检查模块集成状态"""
|
||
|
||
import asyncio
|
||
import httpx
|
||
from typing import Dict, List
|
||
import json
|
||
|
||
async def check_module_health(base_url: str, modules: List[str]) -> Dict[str, bool]:
|
||
"""检查各模块健康状态"""
|
||
results = {}
|
||
|
||
async with httpx.AsyncClient() as client:
|
||
for module in modules:
|
||
try:
|
||
response = await client.get(f"{base_url}/api/v1/{module}/health")
|
||
results[module] = response.status_code == 200
|
||
except Exception:
|
||
results[module] = False
|
||
|
||
return results
|
||
|
||
async def check_module_dependencies(base_url: str) -> Dict[str, List[str]]:
|
||
"""检查模块依赖关系"""
|
||
async with httpx.AsyncClient() as client:
|
||
response = await client.get(f"{base_url}/api/v1/system/dependencies")
|
||
return response.json()
|
||
|
||
async def main():
|
||
base_url = "http://localhost:8000"
|
||
modules = ["auth", "user", "course", "exam", "training", "analytics", "admin"]
|
||
|
||
print("检查模块健康状态...")
|
||
health_results = await check_module_health(base_url, modules)
|
||
|
||
print("\n模块状态:")
|
||
for module, status in health_results.items():
|
||
status_emoji = "✅" if status else "❌"
|
||
print(f" {status_emoji} {module}")
|
||
|
||
print("\n检查模块依赖...")
|
||
dependencies = await check_module_dependencies(base_url)
|
||
|
||
print("\n模块依赖关系:")
|
||
for module, deps in dependencies.items():
|
||
print(f" {module}: {', '.join(deps) if deps else '无依赖'}")
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(main())
|
||
```
|
||
|
||
## 8. 协作检查清单
|
||
|
||
每个Agent在开发过程中需要确保:
|
||
|
||
### 开发前
|
||
- [ ] 已阅读并理解项目上下文文档
|
||
- [ ] 已了解所负责模块的依赖关系
|
||
- [ ] 已查看相关模块的接口契约
|
||
- [ ] 已配置开发环境并通过测试
|
||
|
||
### 开发中
|
||
- [ ] 遵循统一的代码规范
|
||
- [ ] 实现规定的接口契约
|
||
- [ ] 编写必要的单元测试
|
||
- [ ] 记录关键操作日志
|
||
- [ ] 处理异常情况
|
||
|
||
### 集成前
|
||
- [ ] 通过所有代码质量检查
|
||
- [ ] 完成API文档编写
|
||
- [ ] 与依赖模块进行集成测试
|
||
- [ ] 更新模块状态到服务注册中心
|
||
|
||
### 集成后
|
||
- [ ] 确认集成测试通过
|
||
- [ ] 监控模块运行状态
|
||
- [ ] 及时响应其他模块的问题
|
||
- [ ] 参与每日站会同步进展
|
||
|