- 从服务器拉取完整代码 - 按框架规范整理项目结构 - 配置 Drone CI 测试环境部署 - 包含后端(FastAPI)、前端(Vue3)、管理端 技术栈: Vue3 + TypeScript + FastAPI + MySQL
31 KiB
31 KiB
考培练系统后端协作机制设计
1. 全局上下文共享机制
1.1 项目上下文管理
# app/core/project_context.py
from dataclasses import dataclass
from typing import Dict, Any, List
import json
from pathlib import Path
@dataclass
class ProjectContext:
"""项目全局上下文,供所有Agent共享"""
# 项目基础信息
project_name: str = "考培练系统"
project_version: str = "1.0.0"
base_path: Path = Path("/Users/nongjun/Desktop/Ai公司/本地开发与测试")
# 技术栈信息
tech_stack: Dict[str, str] = None
# 数据库schema
database_schema: Dict[str, Any] = None
# API规范
api_specifications: Dict[str, Any] = None
# 业务规则
business_rules: Dict[str, Any] = None
# 模块依赖关系
module_dependencies: Dict[str, List[str]] = None
def __post_init__(self):
"""初始化默认值"""
if self.tech_stack is None:
self.tech_stack = {
"backend": "Python 3.8+ + FastAPI",
"frontend": "Vue3 + TypeScript + Element Plus",
"database": "MySQL 8.0",
"cache": "Redis",
"ai_platforms": ["Coze", "Dify"]
}
if self.module_dependencies is None:
self.module_dependencies = {
"auth": [],
"user": ["auth"],
"course": ["auth", "user"],
"exam": ["auth", "user", "dify"],
"training": ["auth", "user", "coze"],
"analytics": ["auth", "user", "exam", "training"],
"admin": ["auth", "user"]
}
@classmethod
def load_from_docs(cls, docs_path: Path) -> "ProjectContext":
"""从项目文档加载上下文"""
context = cls()
# 加载数据库架构
db_schema_path = docs_path / "数据库架构.md"
if db_schema_path.exists():
context.database_schema = cls._parse_database_schema(db_schema_path)
# 加载API规范
api_spec_path = docs_path / "技术架构设计.md"
if api_spec_path.exists():
context.api_specifications = cls._parse_api_specs(api_spec_path)
# 加载业务规则
prd_path = docs_path / "产品需求文档 (PRD).md"
if prd_path.exists():
context.business_rules = cls._parse_business_rules(prd_path)
return context
def to_json(self) -> str:
"""导出为JSON格式,便于Agent读取"""
return json.dumps({
"project_name": self.project_name,
"project_version": self.project_version,
"tech_stack": self.tech_stack,
"module_dependencies": self.module_dependencies,
"database_tables": list(self.database_schema.keys()) if self.database_schema else [],
"api_endpoints": list(self.api_specifications.keys()) if self.api_specifications else []
}, ensure_ascii=False, indent=2)
# 创建全局上下文实例
project_context = ProjectContext.load_from_docs(
Path("/Users/nongjun/Desktop/Ai公司/本地开发与测试/考培练系统规划")
)
1.2 Agent间通信接口
# app/core/agent_communication.py
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional
from datetime import datetime
class AgentInterface(ABC):
"""所有Agent必须实现的接口"""
def __init__(self, agent_name: str, module_name: str):
self.agent_name = agent_name
self.module_name = module_name
self.start_time = datetime.now()
self.context = project_context
@abstractmethod
def get_module_info(self) -> Dict[str, Any]:
"""返回模块基本信息"""
pass
@abstractmethod
def get_api_endpoints(self) -> List[Dict[str, str]]:
"""返回模块提供的API端点"""
pass
@abstractmethod
def get_dependencies(self) -> List[str]:
"""返回模块依赖的其他模块"""
pass
@abstractmethod
def get_exported_services(self) -> Dict[str, Any]:
"""返回模块导出的服务"""
pass
def validate_integration(self, other_module: str) -> bool:
"""验证与其他模块的集成是否正确"""
dependencies = self.get_dependencies()
return other_module in dependencies or self.module_name in project_context.module_dependencies.get(other_module, [])
# 示例:Auth模块的Agent实现
class AuthAgent(AgentInterface):
def __init__(self):
super().__init__("Agent-Auth", "auth")
def get_module_info(self) -> Dict[str, Any]:
return {
"name": self.module_name,
"description": "认证授权模块",
"version": "1.0.0",
"status": "developing",
"owner": self.agent_name
}
def get_api_endpoints(self) -> List[Dict[str, str]]:
return [
{"method": "POST", "path": "/api/v1/auth/login", "description": "用户登录"},
{"method": "POST", "path": "/api/v1/auth/register", "description": "用户注册"},
{"method": "POST", "path": "/api/v1/auth/logout", "description": "用户登出"},
{"method": "POST", "path": "/api/v1/auth/refresh", "description": "刷新Token"}
]
def get_dependencies(self) -> List[str]:
return [] # Auth模块不依赖其他模块
def get_exported_services(self) -> Dict[str, Any]:
return {
"get_current_user": "获取当前登录用户",
"require_admin": "需要管理员权限",
"create_access_token": "创建访问令牌",
"verify_password": "验证密码"
}
2. 代码集成策略
2.1 Git分支管理
# 主分支
main # 生产环境代码
develop # 开发环境主分支
# 功能分支(每个Agent一个)
feature/auth # Agent-Auth的开发分支
feature/user # Agent-User的开发分支
feature/course # Agent-Course的开发分支
feature/exam # Agent-Exam的开发分支
feature/training # Agent-Training的开发分支
feature/analytics # Agent-Analytics的开发分支
feature/admin # Agent-Admin的开发分支
feature/coze-integration # Agent-Coze的开发分支
feature/dify-integration # Agent-Dify的开发分支
# 集成分支
integration/daily # 每日集成分支
integration/weekly # 每周集成分支
2.2 代码合并流程
# .github/workflows/integration.yml
name: Daily Integration
on:
schedule:
- cron: '0 2 * * *' # 每天凌晨2点执行
workflow_dispatch: # 支持手动触发
jobs:
integration:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v3
with:
ref: develop
- name: Create integration branch
run: |
git checkout -b integration/$(date +%Y%m%d)
- name: Merge feature branches
run: |
# 按依赖顺序合并
for branch in auth user coze-integration dify-integration course exam training analytics admin; do
echo "Merging feature/$branch..."
git merge --no-ff origin/feature/$branch || true
done
- name: Run tests
run: |
make test-all
- name: Check code quality
run: |
make lint
make type-check
2.3 模块接口契约
# app/contracts/auth_contract.py
from typing import Protocol, Optional
from app.schemas.user import User
class AuthServiceProtocol(Protocol):
"""Auth模块对外提供的服务契约"""
async def authenticate_user(self, username: str, password: str) -> Optional[User]:
"""验证用户身份"""
...
async def create_access_token(self, user_id: int) -> str:
"""创建访问令牌"""
...
async def get_current_user(self, token: str) -> Optional[User]:
"""根据token获取当前用户"""
...
# app/contracts/user_contract.py
class UserServiceProtocol(Protocol):
"""User模块对外提供的服务契约"""
async def get_user_by_id(self, user_id: int) -> Optional[User]:
"""根据ID获取用户"""
...
async def get_users_by_team(self, team_id: int) -> List[User]:
"""获取团队成员"""
...
3. 通信与协调机制
3.1 内部消息总线
# app/core/message_bus.py
import asyncio
from typing import Dict, Any, Callable, List
from enum import Enum
import json
from datetime import datetime
class MessageType(Enum):
"""消息类型"""
MODULE_READY = "module.ready"
MODULE_ERROR = "module.error"
DATA_UPDATED = "data.updated"
TASK_COMPLETED = "task.completed"
INTEGRATION_REQUEST = "integration.request"
class Message:
"""消息实体"""
def __init__(
self,
type: MessageType,
source: str,
target: str = "*", # *表示广播
data: Dict[str, Any] = None,
correlation_id: str = None
):
self.type = type
self.source = source
self.target = target
self.data = data or {}
self.correlation_id = correlation_id
self.timestamp = datetime.now()
def to_json(self) -> str:
return json.dumps({
"type": self.type.value,
"source": self.source,
"target": self.target,
"data": self.data,
"correlation_id": self.correlation_id,
"timestamp": self.timestamp.isoformat()
})
class MessageBus:
"""模块间消息总线"""
def __init__(self):
self._subscribers: Dict[MessageType, List[Callable]] = {}
self._queue: asyncio.Queue = asyncio.Queue()
self._running = False
def subscribe(self, message_type: MessageType, handler: Callable):
"""订阅消息"""
if message_type not in self._subscribers:
self._subscribers[message_type] = []
self._subscribers[message_type].append(handler)
async def publish(self, message: Message):
"""发布消息"""
await self._queue.put(message)
async def start(self):
"""启动消息处理"""
self._running = True
while self._running:
try:
message = await asyncio.wait_for(self._queue.get(), timeout=1.0)
await self._process_message(message)
except asyncio.TimeoutError:
continue
async def _process_message(self, message: Message):
"""处理消息"""
handlers = self._subscribers.get(message.type, [])
for handler in handlers:
try:
await handler(message)
except Exception as e:
print(f"Error processing message: {e}")
# 全局消息总线实例
message_bus = MessageBus()
# 使用示例
async def handle_module_ready(message: Message):
print(f"Module {message.source} is ready")
message_bus.subscribe(MessageType.MODULE_READY, handle_module_ready)
# 在Auth模块启动时
await message_bus.publish(Message(
type=MessageType.MODULE_READY,
source="auth",
data={"endpoints": ["/api/v1/auth/login", "/api/v1/auth/register"]}
))
3.2 服务注册与发现
# app/core/service_registry.py
from typing import Dict, Any, Optional
import json
class ServiceRegistry:
"""服务注册中心"""
def __init__(self):
self._services: Dict[str, Dict[str, Any]] = {}
def register(self, service_name: str, service_info: Dict[str, Any]):
"""注册服务"""
self._services[service_name] = {
**service_info,
"registered_at": datetime.now().isoformat(),
"status": "active"
}
print(f"Service {service_name} registered")
def unregister(self, service_name: str):
"""注销服务"""
if service_name in self._services:
self._services[service_name]["status"] = "inactive"
def get_service(self, service_name: str) -> Optional[Dict[str, Any]]:
"""获取服务信息"""
service = self._services.get(service_name)
if service and service["status"] == "active":
return service
return None
def list_services(self) -> Dict[str, Dict[str, Any]]:
"""列出所有活跃服务"""
return {
name: info
for name, info in self._services.items()
if info["status"] == "active"
}
# 全局服务注册中心
service_registry = ServiceRegistry()
# 服务注册示例
service_registry.register("auth", {
"module": "auth",
"version": "1.0.0",
"endpoints": [
"/api/v1/auth/login",
"/api/v1/auth/register"
],
"dependencies": [],
"health_check": "/api/v1/auth/health"
})
4. 数据一致性保证
4.1 分布式事务管理
# app/core/transaction_manager.py
from typing import List, Callable, Any
import asyncio
from contextlib import asynccontextmanager
class TransactionManager:
"""分布式事务管理器(使用Saga模式)"""
def __init__(self):
self._transactions = {}
async def execute_saga(
self,
saga_id: str,
steps: List[Dict[str, Any]]
) -> bool:
"""
执行Saga事务
Args:
saga_id: 事务ID
steps: 事务步骤列表,每个步骤包含:
- name: 步骤名称
- action: 执行函数
- compensate: 补偿函数
- args: 参数
"""
completed_steps = []
try:
# 执行所有步骤
for step in steps:
result = await step["action"](**step.get("args", {}))
completed_steps.append({
"step": step,
"result": result
})
return True
except Exception as e:
# 发生错误,执行补偿操作
print(f"Saga {saga_id} failed at step {step['name']}: {e}")
# 反向执行补偿操作
for completed in reversed(completed_steps):
try:
await completed["step"]["compensate"](
completed["result"],
**completed["step"].get("args", {})
)
except Exception as comp_error:
print(f"Compensation failed: {comp_error}")
return False
# 使用示例:创建用户并分配课程
async def create_user_and_assign_course(user_data: dict, course_id: int):
saga = TransactionManager()
# 定义事务步骤
steps = [
{
"name": "create_user",
"action": user_service.create_user,
"compensate": user_service.delete_user,
"args": {"data": user_data}
},
{
"name": "assign_course",
"action": course_service.assign_to_user,
"compensate": course_service.unassign_from_user,
"args": {"course_id": course_id}
},
{
"name": "init_analytics",
"action": analytics_service.init_user_data,
"compensate": analytics_service.cleanup_user_data,
"args": {}
}
]
success = await saga.execute_saga("create_user_flow", steps)
return success
4.2 数据版本控制
# app/core/data_versioning.py
from typing import Any, Dict, Optional
from datetime import datetime
import json
class DataVersion:
"""数据版本控制"""
def __init__(self, entity_type: str, entity_id: str):
self.entity_type = entity_type
self.entity_id = entity_id
self._versions = []
def save_version(self, data: Dict[str, Any], author: str):
"""保存新版本"""
version = {
"version": len(self._versions) + 1,
"data": data,
"author": author,
"timestamp": datetime.now().isoformat(),
"hash": self._calculate_hash(data)
}
self._versions.append(version)
return version["version"]
def get_version(self, version_number: int) -> Optional[Dict[str, Any]]:
"""获取指定版本"""
for v in self._versions:
if v["version"] == version_number:
return v
return None
def get_latest(self) -> Optional[Dict[str, Any]]:
"""获取最新版本"""
return self._versions[-1] if self._versions else None
def _calculate_hash(self, data: Dict[str, Any]) -> str:
"""计算数据哈希值"""
import hashlib
json_str = json.dumps(data, sort_keys=True)
return hashlib.sha256(json_str.encode()).hexdigest()
5. 错误处理与恢复
5.1 统一错误处理
# app/core/error_handling.py
from typing import Optional, Dict, Any
import traceback
from datetime import datetime
class ErrorContext:
"""错误上下文信息"""
def __init__(
self,
module: str,
operation: str,
user_id: Optional[int] = None,
request_id: Optional[str] = None
):
self.module = module
self.operation = operation
self.user_id = user_id
self.request_id = request_id
self.timestamp = datetime.now()
self.additional_data = {}
class ErrorHandler:
"""统一错误处理器"""
def __init__(self):
self._error_handlers = {}
self._error_log = []
def register_handler(self, error_type: type, handler: Callable):
"""注册错误处理器"""
self._error_handlers[error_type] = handler
async def handle_error(
self,
error: Exception,
context: ErrorContext
) -> Dict[str, Any]:
"""处理错误"""
error_record = {
"error_type": type(error).__name__,
"error_message": str(error),
"traceback": traceback.format_exc(),
"context": {
"module": context.module,
"operation": context.operation,
"user_id": context.user_id,
"request_id": context.request_id,
"timestamp": context.timestamp.isoformat()
}
}
# 记录错误
self._error_log.append(error_record)
# 查找并执行特定的错误处理器
handler = self._error_handlers.get(type(error))
if handler:
try:
return await handler(error, context)
except Exception as handler_error:
print(f"Error handler failed: {handler_error}")
# 默认处理
return {
"handled": False,
"error_id": self._generate_error_id(),
"user_message": "系统错误,请稍后重试"
}
def _generate_error_id(self) -> str:
"""生成错误ID"""
import uuid
return f"ERR-{uuid.uuid4().hex[:8]}"
# 全局错误处理器
error_handler = ErrorHandler()
# 注册特定错误处理器
async def handle_database_error(error: Exception, context: ErrorContext):
# 数据库错误特殊处理
if "duplicate" in str(error).lower():
return {
"handled": True,
"user_message": "数据已存在,请勿重复操作"
}
return {"handled": False}
error_handler.register_handler(DatabaseError, handle_database_error)
5.2 断路器模式
# app/core/circuit_breaker.py
from typing import Callable, Any
import asyncio
from datetime import datetime, timedelta
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed" # 正常状态
OPEN = "open" # 断开状态
HALF_OPEN = "half_open" # 半开状态
class CircuitBreaker:
"""断路器实现"""
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: int = 60,
expected_exception: type = Exception
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exception = expected_exception
self._failure_count = 0
self._last_failure_time = None
self._state = CircuitState.CLOSED
async def call(self, func: Callable, *args, **kwargs) -> Any:
"""通过断路器调用函数"""
if self._state == CircuitState.OPEN:
if self._should_attempt_reset():
self._state = CircuitState.HALF_OPEN
else:
raise Exception("Circuit breaker is OPEN")
try:
result = await func(*args, **kwargs)
self._on_success()
return result
except self.expected_exception as e:
self._on_failure()
raise e
def _on_success(self):
"""调用成功"""
self._failure_count = 0
self._state = CircuitState.CLOSED
def _on_failure(self):
"""调用失败"""
self._failure_count += 1
self._last_failure_time = datetime.now()
if self._failure_count >= self.failure_threshold:
self._state = CircuitState.OPEN
def _should_attempt_reset(self) -> bool:
"""是否应该尝试重置"""
return (
self._last_failure_time and
datetime.now() - self._last_failure_time > timedelta(seconds=self.recovery_timeout)
)
# 使用示例
coze_circuit_breaker = CircuitBreaker(
failure_threshold=3,
recovery_timeout=30
)
async def call_coze_api(data):
return await coze_circuit_breaker.call(
coze_client.send_message,
data
)
6. 监控与日志
6.1 性能监控
# app/core/monitoring.py
import time
from typing import Dict, Any
from functools import wraps
import asyncio
class PerformanceMonitor:
"""性能监控器"""
def __init__(self):
self._metrics = {}
def record_metric(self, metric_name: str, value: float, tags: Dict[str, str] = None):
"""记录指标"""
if metric_name not in self._metrics:
self._metrics[metric_name] = []
self._metrics[metric_name].append({
"value": value,
"timestamp": time.time(),
"tags": tags or {}
})
def measure_time(self, metric_name: str):
"""测量执行时间装饰器"""
def decorator(func):
@wraps(func)
async def async_wrapper(*args, **kwargs):
start_time = time.time()
try:
result = await func(*args, **kwargs)
execution_time = time.time() - start_time
self.record_metric(
f"{metric_name}.execution_time",
execution_time,
{"status": "success"}
)
return result
except Exception as e:
execution_time = time.time() - start_time
self.record_metric(
f"{metric_name}.execution_time",
execution_time,
{"status": "error", "error_type": type(e).__name__}
)
raise
@wraps(func)
def sync_wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
execution_time = time.time() - start_time
self.record_metric(
f"{metric_name}.execution_time",
execution_time,
{"status": "success"}
)
return result
except Exception as e:
execution_time = time.time() - start_time
self.record_metric(
f"{metric_name}.execution_time",
execution_time,
{"status": "error", "error_type": type(e).__name__}
)
raise
return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper
return decorator
def get_metrics_summary(self) -> Dict[str, Any]:
"""获取指标摘要"""
summary = {}
for metric_name, values in self._metrics.items():
if values:
metric_values = [v["value"] for v in values]
summary[metric_name] = {
"count": len(values),
"min": min(metric_values),
"max": max(metric_values),
"avg": sum(metric_values) / len(metric_values)
}
return summary
# 全局性能监控器
performance_monitor = PerformanceMonitor()
# 使用示例
@performance_monitor.measure_time("api.user.get_by_id")
async def get_user_by_id(user_id: int):
# 业务逻辑
pass
6.2 统一日志管理
# app/core/logging_config.py
import logging
import sys
from pathlib import Path
import structlog
from pythonjsonlogger import jsonlogger
def setup_logging(log_level: str = "INFO", log_dir: Path = None):
"""配置统一的日志系统"""
# 创建日志目录
if log_dir:
log_dir.mkdir(parents=True, exist_ok=True)
# 配置结构化日志
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
structlog.processors.CallsiteParameterAdder(
parameters=[
structlog.processors.CallsiteParameter.FILENAME,
structlog.processors.CallsiteParameter.LINENO,
structlog.processors.CallsiteParameter.FUNC_NAME,
]
),
structlog.processors.dict_tracebacks,
structlog.processors.JSONRenderer()
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
# 配置Python标准日志
logging.basicConfig(
format="%(message)s",
stream=sys.stdout,
level=getattr(logging, log_level.upper())
)
# 添加JSON格式化器
json_formatter = jsonlogger.JsonFormatter(
"%(timestamp)s %(level)s %(name)s %(message)s",
timestamp=True
)
# 文件处理器
if log_dir:
file_handler = logging.FileHandler(log_dir / "app.log")
file_handler.setFormatter(json_formatter)
logging.getLogger().addHandler(file_handler)
return structlog.get_logger()
# 在应用启动时配置日志
logger = setup_logging(
log_level="INFO",
log_dir=Path("/Users/nongjun/Desktop/Ai公司/本地开发与测试/logs")
)
7. 开发工具与脚本
7.1 Makefile配置
# Makefile
.PHONY: help install test lint format type-check run-dev run-prod clean
help:
@echo "Available commands:"
@echo " make install Install dependencies"
@echo " make test Run tests"
@echo " make lint Run linters"
@echo " make format Format code"
@echo " make type-check Run type checking"
@echo " make run-dev Run development server"
@echo " make run-prod Run production server"
@echo " make clean Clean temporary files"
install:
pip install -r requirements/dev.txt
test:
pytest tests/ -v --cov=app --cov-report=html
test-module:
@echo "Testing specific module: $(MODULE)"
pytest tests/$(MODULE)/ -v
lint:
flake8 app/ tests/
pylint app/
format:
black app/ tests/
isort app/ tests/
type-check:
mypy app/ --strict
run-dev:
uvicorn app.main:app --reload --host 0.0.0.0 --port 8000
run-prod:
gunicorn app.main:app -w 4 -k uvicorn.workers.UvicornWorker
clean:
find . -type d -name __pycache__ -exec rm -rf {} +
find . -type f -name "*.pyc" -delete
rm -rf .pytest_cache/
rm -rf .mypy_cache/
rm -rf htmlcov/
rm -rf dist/
rm -rf *.egg-info
# 模块特定命令
test-auth:
make test-module MODULE=auth
test-user:
make test-module MODULE=user
test-integration:
pytest tests/integration/ -v
# 数据库命令
db-init:
alembic init migrations
db-migrate:
alembic revision --autogenerate -m "$(MESSAGE)"
db-upgrade:
alembic upgrade head
db-downgrade:
alembic downgrade -1
7.2 开发辅助脚本
# scripts/check_module_integration.py
"""检查模块集成状态"""
import asyncio
import httpx
from typing import Dict, List
import json
async def check_module_health(base_url: str, modules: List[str]) -> Dict[str, bool]:
"""检查各模块健康状态"""
results = {}
async with httpx.AsyncClient() as client:
for module in modules:
try:
response = await client.get(f"{base_url}/api/v1/{module}/health")
results[module] = response.status_code == 200
except Exception:
results[module] = False
return results
async def check_module_dependencies(base_url: str) -> Dict[str, List[str]]:
"""检查模块依赖关系"""
async with httpx.AsyncClient() as client:
response = await client.get(f"{base_url}/api/v1/system/dependencies")
return response.json()
async def main():
base_url = "http://localhost:8000"
modules = ["auth", "user", "course", "exam", "training", "analytics", "admin"]
print("检查模块健康状态...")
health_results = await check_module_health(base_url, modules)
print("\n模块状态:")
for module, status in health_results.items():
status_emoji = "✅" if status else "❌"
print(f" {status_emoji} {module}")
print("\n检查模块依赖...")
dependencies = await check_module_dependencies(base_url)
print("\n模块依赖关系:")
for module, deps in dependencies.items():
print(f" {module}: {', '.join(deps) if deps else '无依赖'}")
if __name__ == "__main__":
asyncio.run(main())
8. 协作检查清单
每个Agent在开发过程中需要确保:
开发前
- 已阅读并理解项目上下文文档
- 已了解所负责模块的依赖关系
- 已查看相关模块的接口契约
- 已配置开发环境并通过测试
开发中
- 遵循统一的代码规范
- 实现规定的接口契约
- 编写必要的单元测试
- 记录关键操作日志
- 处理异常情况
集成前
- 通过所有代码质量检查
- 完成API文档编写
- 与依赖模块进行集成测试
- 更新模块状态到服务注册中心
集成后
- 确认集成测试通过
- 监控模块运行状态
- 及时响应其他模块的问题
- 参与每日站会同步进展