Files
111 998211c483 feat: 初始化考培练系统项目
- 从服务器拉取完整代码
- 按框架规范整理项目结构
- 配置 Drone CI 测试环境部署
- 包含后端(FastAPI)、前端(Vue3)、管理端

技术栈: Vue3 + TypeScript + FastAPI + MySQL
2026-01-24 19:33:28 +08:00

31 KiB
Raw Permalink Blame History

考培练系统后端协作机制设计

1. 全局上下文共享机制

1.1 项目上下文管理

# app/core/project_context.py
from dataclasses import dataclass
from typing import Dict, Any, List
import json
from pathlib import Path

@dataclass
class ProjectContext:
    """项目全局上下文供所有Agent共享"""
    
    # 项目基础信息
    project_name: str = "考培练系统"
    project_version: str = "1.0.0"
    base_path: Path = Path("/Users/nongjun/Desktop/Ai公司/本地开发与测试")
    
    # 技术栈信息
    tech_stack: Dict[str, str] = None
    
    # 数据库schema
    database_schema: Dict[str, Any] = None
    
    # API规范
    api_specifications: Dict[str, Any] = None
    
    # 业务规则
    business_rules: Dict[str, Any] = None
    
    # 模块依赖关系
    module_dependencies: Dict[str, List[str]] = None
    
    def __post_init__(self):
        """初始化默认值"""
        if self.tech_stack is None:
            self.tech_stack = {
                "backend": "Python 3.8+ + FastAPI",
                "frontend": "Vue3 + TypeScript + Element Plus",
                "database": "MySQL 8.0",
                "cache": "Redis",
                "ai_platforms": ["Coze", "Dify"]
            }
        
        if self.module_dependencies is None:
            self.module_dependencies = {
                "auth": [],
                "user": ["auth"],
                "course": ["auth", "user"],
                "exam": ["auth", "user", "dify"],
                "training": ["auth", "user", "coze"],
                "analytics": ["auth", "user", "exam", "training"],
                "admin": ["auth", "user"]
            }
    
    @classmethod
    def load_from_docs(cls, docs_path: Path) -> "ProjectContext":
        """从项目文档加载上下文"""
        context = cls()
        
        # 加载数据库架构
        db_schema_path = docs_path / "数据库架构.md"
        if db_schema_path.exists():
            context.database_schema = cls._parse_database_schema(db_schema_path)
        
        # 加载API规范
        api_spec_path = docs_path / "技术架构设计.md"
        if api_spec_path.exists():
            context.api_specifications = cls._parse_api_specs(api_spec_path)
        
        # 加载业务规则
        prd_path = docs_path / "产品需求文档 (PRD).md"
        if prd_path.exists():
            context.business_rules = cls._parse_business_rules(prd_path)
        
        return context
    
    def to_json(self) -> str:
        """导出为JSON格式便于Agent读取"""
        return json.dumps({
            "project_name": self.project_name,
            "project_version": self.project_version,
            "tech_stack": self.tech_stack,
            "module_dependencies": self.module_dependencies,
            "database_tables": list(self.database_schema.keys()) if self.database_schema else [],
            "api_endpoints": list(self.api_specifications.keys()) if self.api_specifications else []
        }, ensure_ascii=False, indent=2)

# 创建全局上下文实例
project_context = ProjectContext.load_from_docs(
    Path("/Users/nongjun/Desktop/Ai公司/本地开发与测试/考培练系统规划")
)

1.2 Agent间通信接口

# app/core/agent_communication.py
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional
from datetime import datetime

class AgentInterface(ABC):
    """所有Agent必须实现的接口"""
    
    def __init__(self, agent_name: str, module_name: str):
        self.agent_name = agent_name
        self.module_name = module_name
        self.start_time = datetime.now()
        self.context = project_context
    
    @abstractmethod
    def get_module_info(self) -> Dict[str, Any]:
        """返回模块基本信息"""
        pass
    
    @abstractmethod
    def get_api_endpoints(self) -> List[Dict[str, str]]:
        """返回模块提供的API端点"""
        pass
    
    @abstractmethod
    def get_dependencies(self) -> List[str]:
        """返回模块依赖的其他模块"""
        pass
    
    @abstractmethod
    def get_exported_services(self) -> Dict[str, Any]:
        """返回模块导出的服务"""
        pass
    
    def validate_integration(self, other_module: str) -> bool:
        """验证与其他模块的集成是否正确"""
        dependencies = self.get_dependencies()
        return other_module in dependencies or self.module_name in project_context.module_dependencies.get(other_module, [])

# 示例Auth模块的Agent实现
class AuthAgent(AgentInterface):
    def __init__(self):
        super().__init__("Agent-Auth", "auth")
    
    def get_module_info(self) -> Dict[str, Any]:
        return {
            "name": self.module_name,
            "description": "认证授权模块",
            "version": "1.0.0",
            "status": "developing",
            "owner": self.agent_name
        }
    
    def get_api_endpoints(self) -> List[Dict[str, str]]:
        return [
            {"method": "POST", "path": "/api/v1/auth/login", "description": "用户登录"},
            {"method": "POST", "path": "/api/v1/auth/register", "description": "用户注册"},
            {"method": "POST", "path": "/api/v1/auth/logout", "description": "用户登出"},
            {"method": "POST", "path": "/api/v1/auth/refresh", "description": "刷新Token"}
        ]
    
    def get_dependencies(self) -> List[str]:
        return []  # Auth模块不依赖其他模块
    
    def get_exported_services(self) -> Dict[str, Any]:
        return {
            "get_current_user": "获取当前登录用户",
            "require_admin": "需要管理员权限",
            "create_access_token": "创建访问令牌",
            "verify_password": "验证密码"
        }

2. 代码集成策略

2.1 Git分支管理

# 主分支
main                    # 生产环境代码
develop                 # 开发环境主分支

# 功能分支每个Agent一个
feature/auth            # Agent-Auth的开发分支
feature/user            # Agent-User的开发分支
feature/course          # Agent-Course的开发分支
feature/exam            # Agent-Exam的开发分支
feature/training        # Agent-Training的开发分支
feature/analytics       # Agent-Analytics的开发分支
feature/admin           # Agent-Admin的开发分支
feature/coze-integration    # Agent-Coze的开发分支
feature/dify-integration    # Agent-Dify的开发分支

# 集成分支
integration/daily       # 每日集成分支
integration/weekly      # 每周集成分支

2.2 代码合并流程

# .github/workflows/integration.yml
name: Daily Integration

on:
  schedule:
    - cron: '0 2 * * *'  # 每天凌晨2点执行
  workflow_dispatch:     # 支持手动触发

jobs:
  integration:
    runs-on: ubuntu-latest
    steps:
      - name: Checkout code
        uses: actions/checkout@v3
        with:
          ref: develop
      
      - name: Create integration branch
        run: |
          git checkout -b integration/$(date +%Y%m%d)
      
      - name: Merge feature branches
        run: |
          # 按依赖顺序合并
          for branch in auth user coze-integration dify-integration course exam training analytics admin; do
            echo "Merging feature/$branch..."
            git merge --no-ff origin/feature/$branch || true
          done
      
      - name: Run tests
        run: |
          make test-all
      
      - name: Check code quality
        run: |
          make lint
          make type-check

2.3 模块接口契约

# app/contracts/auth_contract.py
from typing import Protocol, Optional
from app.schemas.user import User

class AuthServiceProtocol(Protocol):
    """Auth模块对外提供的服务契约"""
    
    async def authenticate_user(self, username: str, password: str) -> Optional[User]:
        """验证用户身份"""
        ...
    
    async def create_access_token(self, user_id: int) -> str:
        """创建访问令牌"""
        ...
    
    async def get_current_user(self, token: str) -> Optional[User]:
        """根据token获取当前用户"""
        ...

# app/contracts/user_contract.py
class UserServiceProtocol(Protocol):
    """User模块对外提供的服务契约"""
    
    async def get_user_by_id(self, user_id: int) -> Optional[User]:
        """根据ID获取用户"""
        ...
    
    async def get_users_by_team(self, team_id: int) -> List[User]:
        """获取团队成员"""
        ...

3. 通信与协调机制

3.1 内部消息总线

# app/core/message_bus.py
import asyncio
from typing import Dict, Any, Callable, List
from enum import Enum
import json
from datetime import datetime

class MessageType(Enum):
    """消息类型"""
    MODULE_READY = "module.ready"
    MODULE_ERROR = "module.error"
    DATA_UPDATED = "data.updated"
    TASK_COMPLETED = "task.completed"
    INTEGRATION_REQUEST = "integration.request"

class Message:
    """消息实体"""
    def __init__(
        self,
        type: MessageType,
        source: str,
        target: str = "*",  # *表示广播
        data: Dict[str, Any] = None,
        correlation_id: str = None
    ):
        self.type = type
        self.source = source
        self.target = target
        self.data = data or {}
        self.correlation_id = correlation_id
        self.timestamp = datetime.now()
    
    def to_json(self) -> str:
        return json.dumps({
            "type": self.type.value,
            "source": self.source,
            "target": self.target,
            "data": self.data,
            "correlation_id": self.correlation_id,
            "timestamp": self.timestamp.isoformat()
        })

class MessageBus:
    """模块间消息总线"""
    
    def __init__(self):
        self._subscribers: Dict[MessageType, List[Callable]] = {}
        self._queue: asyncio.Queue = asyncio.Queue()
        self._running = False
    
    def subscribe(self, message_type: MessageType, handler: Callable):
        """订阅消息"""
        if message_type not in self._subscribers:
            self._subscribers[message_type] = []
        self._subscribers[message_type].append(handler)
    
    async def publish(self, message: Message):
        """发布消息"""
        await self._queue.put(message)
    
    async def start(self):
        """启动消息处理"""
        self._running = True
        while self._running:
            try:
                message = await asyncio.wait_for(self._queue.get(), timeout=1.0)
                await self._process_message(message)
            except asyncio.TimeoutError:
                continue
    
    async def _process_message(self, message: Message):
        """处理消息"""
        handlers = self._subscribers.get(message.type, [])
        for handler in handlers:
            try:
                await handler(message)
            except Exception as e:
                print(f"Error processing message: {e}")

# 全局消息总线实例
message_bus = MessageBus()

# 使用示例
async def handle_module_ready(message: Message):
    print(f"Module {message.source} is ready")

message_bus.subscribe(MessageType.MODULE_READY, handle_module_ready)

# 在Auth模块启动时
await message_bus.publish(Message(
    type=MessageType.MODULE_READY,
    source="auth",
    data={"endpoints": ["/api/v1/auth/login", "/api/v1/auth/register"]}
))

3.2 服务注册与发现

# app/core/service_registry.py
from typing import Dict, Any, Optional
import json

class ServiceRegistry:
    """服务注册中心"""
    
    def __init__(self):
        self._services: Dict[str, Dict[str, Any]] = {}
    
    def register(self, service_name: str, service_info: Dict[str, Any]):
        """注册服务"""
        self._services[service_name] = {
            **service_info,
            "registered_at": datetime.now().isoformat(),
            "status": "active"
        }
        print(f"Service {service_name} registered")
    
    def unregister(self, service_name: str):
        """注销服务"""
        if service_name in self._services:
            self._services[service_name]["status"] = "inactive"
    
    def get_service(self, service_name: str) -> Optional[Dict[str, Any]]:
        """获取服务信息"""
        service = self._services.get(service_name)
        if service and service["status"] == "active":
            return service
        return None
    
    def list_services(self) -> Dict[str, Dict[str, Any]]:
        """列出所有活跃服务"""
        return {
            name: info 
            for name, info in self._services.items() 
            if info["status"] == "active"
        }

# 全局服务注册中心
service_registry = ServiceRegistry()

# 服务注册示例
service_registry.register("auth", {
    "module": "auth",
    "version": "1.0.0",
    "endpoints": [
        "/api/v1/auth/login",
        "/api/v1/auth/register"
    ],
    "dependencies": [],
    "health_check": "/api/v1/auth/health"
})

4. 数据一致性保证

4.1 分布式事务管理

# app/core/transaction_manager.py
from typing import List, Callable, Any
import asyncio
from contextlib import asynccontextmanager

class TransactionManager:
    """分布式事务管理器使用Saga模式"""
    
    def __init__(self):
        self._transactions = {}
    
    async def execute_saga(
        self,
        saga_id: str,
        steps: List[Dict[str, Any]]
    ) -> bool:
        """
        执行Saga事务
        
        Args:
            saga_id: 事务ID
            steps: 事务步骤列表,每个步骤包含:
                - name: 步骤名称
                - action: 执行函数
                - compensate: 补偿函数
                - args: 参数
        """
        completed_steps = []
        
        try:
            # 执行所有步骤
            for step in steps:
                result = await step["action"](**step.get("args", {}))
                completed_steps.append({
                    "step": step,
                    "result": result
                })
            
            return True
            
        except Exception as e:
            # 发生错误,执行补偿操作
            print(f"Saga {saga_id} failed at step {step['name']}: {e}")
            
            # 反向执行补偿操作
            for completed in reversed(completed_steps):
                try:
                    await completed["step"]["compensate"](
                        completed["result"],
                        **completed["step"].get("args", {})
                    )
                except Exception as comp_error:
                    print(f"Compensation failed: {comp_error}")
            
            return False

# 使用示例:创建用户并分配课程
async def create_user_and_assign_course(user_data: dict, course_id: int):
    saga = TransactionManager()
    
    # 定义事务步骤
    steps = [
        {
            "name": "create_user",
            "action": user_service.create_user,
            "compensate": user_service.delete_user,
            "args": {"data": user_data}
        },
        {
            "name": "assign_course",
            "action": course_service.assign_to_user,
            "compensate": course_service.unassign_from_user,
            "args": {"course_id": course_id}
        },
        {
            "name": "init_analytics",
            "action": analytics_service.init_user_data,
            "compensate": analytics_service.cleanup_user_data,
            "args": {}
        }
    ]
    
    success = await saga.execute_saga("create_user_flow", steps)
    return success

4.2 数据版本控制

# app/core/data_versioning.py
from typing import Any, Dict, Optional
from datetime import datetime
import json

class DataVersion:
    """数据版本控制"""
    
    def __init__(self, entity_type: str, entity_id: str):
        self.entity_type = entity_type
        self.entity_id = entity_id
        self._versions = []
    
    def save_version(self, data: Dict[str, Any], author: str):
        """保存新版本"""
        version = {
            "version": len(self._versions) + 1,
            "data": data,
            "author": author,
            "timestamp": datetime.now().isoformat(),
            "hash": self._calculate_hash(data)
        }
        self._versions.append(version)
        return version["version"]
    
    def get_version(self, version_number: int) -> Optional[Dict[str, Any]]:
        """获取指定版本"""
        for v in self._versions:
            if v["version"] == version_number:
                return v
        return None
    
    def get_latest(self) -> Optional[Dict[str, Any]]:
        """获取最新版本"""
        return self._versions[-1] if self._versions else None
    
    def _calculate_hash(self, data: Dict[str, Any]) -> str:
        """计算数据哈希值"""
        import hashlib
        json_str = json.dumps(data, sort_keys=True)
        return hashlib.sha256(json_str.encode()).hexdigest()

5. 错误处理与恢复

5.1 统一错误处理

# app/core/error_handling.py
from typing import Optional, Dict, Any
import traceback
from datetime import datetime

class ErrorContext:
    """错误上下文信息"""
    def __init__(
        self,
        module: str,
        operation: str,
        user_id: Optional[int] = None,
        request_id: Optional[str] = None
    ):
        self.module = module
        self.operation = operation
        self.user_id = user_id
        self.request_id = request_id
        self.timestamp = datetime.now()
        self.additional_data = {}

class ErrorHandler:
    """统一错误处理器"""
    
    def __init__(self):
        self._error_handlers = {}
        self._error_log = []
    
    def register_handler(self, error_type: type, handler: Callable):
        """注册错误处理器"""
        self._error_handlers[error_type] = handler
    
    async def handle_error(
        self,
        error: Exception,
        context: ErrorContext
    ) -> Dict[str, Any]:
        """处理错误"""
        error_record = {
            "error_type": type(error).__name__,
            "error_message": str(error),
            "traceback": traceback.format_exc(),
            "context": {
                "module": context.module,
                "operation": context.operation,
                "user_id": context.user_id,
                "request_id": context.request_id,
                "timestamp": context.timestamp.isoformat()
            }
        }
        
        # 记录错误
        self._error_log.append(error_record)
        
        # 查找并执行特定的错误处理器
        handler = self._error_handlers.get(type(error))
        if handler:
            try:
                return await handler(error, context)
            except Exception as handler_error:
                print(f"Error handler failed: {handler_error}")
        
        # 默认处理
        return {
            "handled": False,
            "error_id": self._generate_error_id(),
            "user_message": "系统错误,请稍后重试"
        }
    
    def _generate_error_id(self) -> str:
        """生成错误ID"""
        import uuid
        return f"ERR-{uuid.uuid4().hex[:8]}"

# 全局错误处理器
error_handler = ErrorHandler()

# 注册特定错误处理器
async def handle_database_error(error: Exception, context: ErrorContext):
    # 数据库错误特殊处理
    if "duplicate" in str(error).lower():
        return {
            "handled": True,
            "user_message": "数据已存在,请勿重复操作"
        }
    return {"handled": False}

error_handler.register_handler(DatabaseError, handle_database_error)

5.2 断路器模式

# app/core/circuit_breaker.py
from typing import Callable, Any
import asyncio
from datetime import datetime, timedelta
from enum import Enum

class CircuitState(Enum):
    CLOSED = "closed"      # 正常状态
    OPEN = "open"          # 断开状态
    HALF_OPEN = "half_open"  # 半开状态

class CircuitBreaker:
    """断路器实现"""
    
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: int = 60,
        expected_exception: type = Exception
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.expected_exception = expected_exception
        
        self._failure_count = 0
        self._last_failure_time = None
        self._state = CircuitState.CLOSED
    
    async def call(self, func: Callable, *args, **kwargs) -> Any:
        """通过断路器调用函数"""
        if self._state == CircuitState.OPEN:
            if self._should_attempt_reset():
                self._state = CircuitState.HALF_OPEN
            else:
                raise Exception("Circuit breaker is OPEN")
        
        try:
            result = await func(*args, **kwargs)
            self._on_success()
            return result
        except self.expected_exception as e:
            self._on_failure()
            raise e
    
    def _on_success(self):
        """调用成功"""
        self._failure_count = 0
        self._state = CircuitState.CLOSED
    
    def _on_failure(self):
        """调用失败"""
        self._failure_count += 1
        self._last_failure_time = datetime.now()
        
        if self._failure_count >= self.failure_threshold:
            self._state = CircuitState.OPEN
    
    def _should_attempt_reset(self) -> bool:
        """是否应该尝试重置"""
        return (
            self._last_failure_time and
            datetime.now() - self._last_failure_time > timedelta(seconds=self.recovery_timeout)
        )

# 使用示例
coze_circuit_breaker = CircuitBreaker(
    failure_threshold=3,
    recovery_timeout=30
)

async def call_coze_api(data):
    return await coze_circuit_breaker.call(
        coze_client.send_message,
        data
    )

6. 监控与日志

6.1 性能监控

# app/core/monitoring.py
import time
from typing import Dict, Any
from functools import wraps
import asyncio

class PerformanceMonitor:
    """性能监控器"""
    
    def __init__(self):
        self._metrics = {}
    
    def record_metric(self, metric_name: str, value: float, tags: Dict[str, str] = None):
        """记录指标"""
        if metric_name not in self._metrics:
            self._metrics[metric_name] = []
        
        self._metrics[metric_name].append({
            "value": value,
            "timestamp": time.time(),
            "tags": tags or {}
        })
    
    def measure_time(self, metric_name: str):
        """测量执行时间装饰器"""
        def decorator(func):
            @wraps(func)
            async def async_wrapper(*args, **kwargs):
                start_time = time.time()
                try:
                    result = await func(*args, **kwargs)
                    execution_time = time.time() - start_time
                    self.record_metric(
                        f"{metric_name}.execution_time",
                        execution_time,
                        {"status": "success"}
                    )
                    return result
                except Exception as e:
                    execution_time = time.time() - start_time
                    self.record_metric(
                        f"{metric_name}.execution_time",
                        execution_time,
                        {"status": "error", "error_type": type(e).__name__}
                    )
                    raise
            
            @wraps(func)
            def sync_wrapper(*args, **kwargs):
                start_time = time.time()
                try:
                    result = func(*args, **kwargs)
                    execution_time = time.time() - start_time
                    self.record_metric(
                        f"{metric_name}.execution_time",
                        execution_time,
                        {"status": "success"}
                    )
                    return result
                except Exception as e:
                    execution_time = time.time() - start_time
                    self.record_metric(
                        f"{metric_name}.execution_time",
                        execution_time,
                        {"status": "error", "error_type": type(e).__name__}
                    )
                    raise
            
            return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper
        return decorator
    
    def get_metrics_summary(self) -> Dict[str, Any]:
        """获取指标摘要"""
        summary = {}
        for metric_name, values in self._metrics.items():
            if values:
                metric_values = [v["value"] for v in values]
                summary[metric_name] = {
                    "count": len(values),
                    "min": min(metric_values),
                    "max": max(metric_values),
                    "avg": sum(metric_values) / len(metric_values)
                }
        return summary

# 全局性能监控器
performance_monitor = PerformanceMonitor()

# 使用示例
@performance_monitor.measure_time("api.user.get_by_id")
async def get_user_by_id(user_id: int):
    # 业务逻辑
    pass

6.2 统一日志管理

# app/core/logging_config.py
import logging
import sys
from pathlib import Path
import structlog
from pythonjsonlogger import jsonlogger

def setup_logging(log_level: str = "INFO", log_dir: Path = None):
    """配置统一的日志系统"""
    
    # 创建日志目录
    if log_dir:
        log_dir.mkdir(parents=True, exist_ok=True)
    
    # 配置结构化日志
    structlog.configure(
        processors=[
            structlog.stdlib.filter_by_level,
            structlog.stdlib.add_logger_name,
            structlog.stdlib.add_log_level,
            structlog.stdlib.PositionalArgumentsFormatter(),
            structlog.processors.TimeStamper(fmt="iso"),
            structlog.processors.StackInfoRenderer(),
            structlog.processors.format_exc_info,
            structlog.processors.UnicodeDecoder(),
            structlog.processors.CallsiteParameterAdder(
                parameters=[
                    structlog.processors.CallsiteParameter.FILENAME,
                    structlog.processors.CallsiteParameter.LINENO,
                    structlog.processors.CallsiteParameter.FUNC_NAME,
                ]
            ),
            structlog.processors.dict_tracebacks,
            structlog.processors.JSONRenderer()
        ],
        context_class=dict,
        logger_factory=structlog.stdlib.LoggerFactory(),
        wrapper_class=structlog.stdlib.BoundLogger,
        cache_logger_on_first_use=True,
    )
    
    # 配置Python标准日志
    logging.basicConfig(
        format="%(message)s",
        stream=sys.stdout,
        level=getattr(logging, log_level.upper())
    )
    
    # 添加JSON格式化器
    json_formatter = jsonlogger.JsonFormatter(
        "%(timestamp)s %(level)s %(name)s %(message)s",
        timestamp=True
    )
    
    # 文件处理器
    if log_dir:
        file_handler = logging.FileHandler(log_dir / "app.log")
        file_handler.setFormatter(json_formatter)
        logging.getLogger().addHandler(file_handler)
    
    return structlog.get_logger()

# 在应用启动时配置日志
logger = setup_logging(
    log_level="INFO",
    log_dir=Path("/Users/nongjun/Desktop/Ai公司/本地开发与测试/logs")
)

7. 开发工具与脚本

7.1 Makefile配置

# Makefile
.PHONY: help install test lint format type-check run-dev run-prod clean

help:
	@echo "Available commands:"
	@echo "  make install      Install dependencies"
	@echo "  make test         Run tests"
	@echo "  make lint         Run linters"
	@echo "  make format       Format code"
	@echo "  make type-check   Run type checking"
	@echo "  make run-dev      Run development server"
	@echo "  make run-prod     Run production server"
	@echo "  make clean        Clean temporary files"

install:
	pip install -r requirements/dev.txt

test:
	pytest tests/ -v --cov=app --cov-report=html

test-module:
	@echo "Testing specific module: $(MODULE)"
	pytest tests/$(MODULE)/ -v

lint:
	flake8 app/ tests/
	pylint app/

format:
	black app/ tests/
	isort app/ tests/

type-check:
	mypy app/ --strict

run-dev:
	uvicorn app.main:app --reload --host 0.0.0.0 --port 8000

run-prod:
	gunicorn app.main:app -w 4 -k uvicorn.workers.UvicornWorker

clean:
	find . -type d -name __pycache__ -exec rm -rf {} +
	find . -type f -name "*.pyc" -delete
	rm -rf .pytest_cache/
	rm -rf .mypy_cache/
	rm -rf htmlcov/
	rm -rf dist/
	rm -rf *.egg-info

# 模块特定命令
test-auth:
	make test-module MODULE=auth

test-user:
	make test-module MODULE=user

test-integration:
	pytest tests/integration/ -v

# 数据库命令
db-init:
	alembic init migrations

db-migrate:
	alembic revision --autogenerate -m "$(MESSAGE)"

db-upgrade:
	alembic upgrade head

db-downgrade:
	alembic downgrade -1

7.2 开发辅助脚本

# scripts/check_module_integration.py
"""检查模块集成状态"""

import asyncio
import httpx
from typing import Dict, List
import json

async def check_module_health(base_url: str, modules: List[str]) -> Dict[str, bool]:
    """检查各模块健康状态"""
    results = {}
    
    async with httpx.AsyncClient() as client:
        for module in modules:
            try:
                response = await client.get(f"{base_url}/api/v1/{module}/health")
                results[module] = response.status_code == 200
            except Exception:
                results[module] = False
    
    return results

async def check_module_dependencies(base_url: str) -> Dict[str, List[str]]:
    """检查模块依赖关系"""
    async with httpx.AsyncClient() as client:
        response = await client.get(f"{base_url}/api/v1/system/dependencies")
        return response.json()

async def main():
    base_url = "http://localhost:8000"
    modules = ["auth", "user", "course", "exam", "training", "analytics", "admin"]
    
    print("检查模块健康状态...")
    health_results = await check_module_health(base_url, modules)
    
    print("\n模块状态:")
    for module, status in health_results.items():
        status_emoji = "✅" if status else "❌"
        print(f"  {status_emoji} {module}")
    
    print("\n检查模块依赖...")
    dependencies = await check_module_dependencies(base_url)
    
    print("\n模块依赖关系:")
    for module, deps in dependencies.items():
        print(f"  {module}: {', '.join(deps) if deps else '无依赖'}")

if __name__ == "__main__":
    asyncio.run(main())

8. 协作检查清单

每个Agent在开发过程中需要确保

开发前

  • 已阅读并理解项目上下文文档
  • 已了解所负责模块的依赖关系
  • 已查看相关模块的接口契约
  • 已配置开发环境并通过测试

开发中

  • 遵循统一的代码规范
  • 实现规定的接口契约
  • 编写必要的单元测试
  • 记录关键操作日志
  • 处理异常情况

集成前

  • 通过所有代码质量检查
  • 完成API文档编写
  • 与依赖模块进行集成测试
  • 更新模块状态到服务注册中心

集成后

  • 确认集成测试通过
  • 监控模块运行状态
  • 及时响应其他模块的问题
  • 参与每日站会同步进展