Files
smart-project-pricing/后端服务/app/services/pricing_service.py
2026-01-31 21:33:06 +08:00

575 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""智能定价服务
实现智能定价核心业务逻辑,包含:
- 综合定价计算
- AI 定价建议生成(遵循瑞小美 AI 接入规范)
- 定价策略模拟
- 定价报告导出
"""
import json
from datetime import datetime
from decimal import Decimal
from typing import Optional, List, Dict, Any, AsyncIterator
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.models import (
Project,
ProjectCostSummary,
MarketAnalysisResult,
PricingPlan,
)
from app.schemas.pricing import (
StrategyType,
PricingSuggestions,
StrategySuggestion,
MarketReference,
AIAdvice,
AIUsage,
GeneratePricingResponse,
StrategySimulationResult,
SimulateStrategyResponse,
)
from app.services.ai_service_wrapper import AIServiceWrapper
from app.services.cost_service import CostService
from app.services.market_service import MarketService
# 导入提示词
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../../prompts'))
from prompts.pricing_advice_prompts import SYSTEM_PROMPT, USER_PROMPT, PROMPT_META
class PricingService:
"""智能定价服务"""
# 各策略的利润率范围
STRATEGY_MARGINS = {
StrategyType.TRAFFIC: (0.10, 0.20), # 引流款10%-20%
StrategyType.PROFIT: (0.40, 0.60), # 利润款40%-60%
StrategyType.PREMIUM: (0.60, 0.80), # 高端款60%-80%
}
STRATEGY_NAMES = {
StrategyType.TRAFFIC: "引流款",
StrategyType.PROFIT: "利润款",
StrategyType.PREMIUM: "高端款",
}
def __init__(self, db: AsyncSession):
self.db = db
self.cost_service = CostService(db)
self.market_service = MarketService(db)
async def get_project_with_cost(self, project_id: int) -> tuple[Project, Optional[ProjectCostSummary]]:
"""获取项目及其成本汇总"""
result = await self.db.execute(
select(Project).options(
selectinload(Project.cost_summary)
).where(Project.id == project_id)
)
project = result.scalar_one_or_none()
if not project:
raise ValueError(f"项目不存在: {project_id}")
return project, project.cost_summary
async def get_market_reference(self, project_id: int) -> Optional[MarketReference]:
"""获取市场参考数据"""
result = await self.db.execute(
select(MarketAnalysisResult).where(
MarketAnalysisResult.project_id == project_id
).order_by(MarketAnalysisResult.analysis_date.desc()).limit(1)
)
market_result = result.scalar_one_or_none()
if market_result:
return MarketReference(
min=float(market_result.market_min_price),
max=float(market_result.market_max_price),
avg=float(market_result.market_avg_price),
)
return None
def calculate_strategy_price(
self,
base_cost: float,
strategy: StrategyType,
target_margin: Optional[float] = None,
market_ref: Optional[MarketReference] = None
) -> StrategySuggestion:
"""计算单个策略的定价建议
Args:
base_cost: 基础成本
strategy: 策略类型
target_margin: 自定义目标毛利率(可选)
market_ref: 市场参考(可选)
Returns:
策略定价建议
"""
min_margin, max_margin = self.STRATEGY_MARGINS[strategy]
# 使用策略默认的中间利润率,或自定义目标
if target_margin is not None:
margin = target_margin / 100
else:
margin = (min_margin + max_margin) / 2
# 成本加成定价法:价格 = 成本 / (1 - 毛利率)
suggested_price = base_cost / (1 - margin)
# 如果有市场参考,调整价格
if market_ref:
if strategy == StrategyType.TRAFFIC:
# 引流款:取市场最低价和成本定价的较低者
market_low = market_ref.min * 0.9
suggested_price = min(suggested_price, market_low)
elif strategy == StrategyType.PREMIUM:
# 高端款:取市场高位
market_high = market_ref.max * 1.1
suggested_price = max(suggested_price, market_high * 0.9)
# 确保价格不低于成本(边界处理:零成本时设置最低价格)
if base_cost == 0:
# 零成本时,使用市场参考或设置一个最低保护价格
if market_ref:
suggested_price = market_ref.avg * 0.8
else:
suggested_price = 1.0 # 最低保护价格
else:
suggested_price = max(suggested_price, base_cost * 1.05)
# 计算实际毛利率(防止除零)
if suggested_price > 0:
actual_margin = (suggested_price - base_cost) / suggested_price * 100
else:
actual_margin = 0
descriptions = {
StrategyType.TRAFFIC: "低于市场均价,适合引流获客、新店开业、淡季促销",
StrategyType.PROFIT: "接近市场均价,平衡利润与竞争力,适合日常经营",
StrategyType.PREMIUM: "定位高端,高利润空间,需配套优质服务和品牌溢价",
}
return StrategySuggestion(
strategy=self.STRATEGY_NAMES[strategy],
suggested_price=round(suggested_price, 2),
margin=round(actual_margin, 1),
description=descriptions[strategy],
)
def calculate_all_strategies(
self,
base_cost: float,
target_margin: float,
market_ref: Optional[MarketReference] = None,
strategies: Optional[List[StrategyType]] = None
) -> PricingSuggestions:
"""计算所有策略的定价建议"""
if strategies is None:
strategies = list(StrategyType)
suggestions = PricingSuggestions()
for strategy in strategies:
suggestion = self.calculate_strategy_price(
base_cost=base_cost,
strategy=strategy,
target_margin=target_margin if strategy == StrategyType.PROFIT else None,
market_ref=market_ref,
)
if strategy == StrategyType.TRAFFIC:
suggestions.traffic = suggestion
elif strategy == StrategyType.PROFIT:
suggestions.profit = suggestion
elif strategy == StrategyType.PREMIUM:
suggestions.premium = suggestion
return suggestions
async def generate_pricing_advice(
self,
project_id: int,
target_margin: float = 50,
strategies: Optional[List[StrategyType]] = None,
stream: bool = False,
) -> GeneratePricingResponse:
"""生成智能定价建议
遵循瑞小美 AI 接入规范:
- 通过 AIServiceWrapper 调用
- 必须传入 prompt_name用于统计
Args:
project_id: 项目ID
target_margin: 目标毛利率
strategies: 要计算的策略列表
stream: 是否流式输出(此方法返回完整结果,流式由路由处理)
Returns:
定价建议响应
"""
# 获取项目和成本数据
project, cost_summary = await self.get_project_with_cost(project_id)
if not cost_summary:
# 如果没有成本汇总,先计算
cost_result = await self.cost_service.calculate_project_cost(project_id)
base_cost = cost_result.total_cost
else:
base_cost = float(cost_summary.total_cost)
# 获取市场参考
market_ref = await self.get_market_reference(project_id)
# 计算各策略价格
pricing_suggestions = self.calculate_all_strategies(
base_cost=base_cost,
target_margin=target_margin,
market_ref=market_ref,
strategies=strategies,
)
# 构建 AI 输入数据
cost_data = self._format_cost_data(cost_summary)
market_data = self._format_market_data(market_ref)
# 调用 AI 生成建议(遵循规范)
ai_service = AIServiceWrapper(db_session=self.db)
messages = [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": USER_PROMPT.format(
project_name=project.project_name,
cost_data=cost_data,
market_data=market_data,
target_margin=target_margin,
)},
]
ai_advice = None
ai_usage = None
try:
response = await ai_service.chat(
messages=messages,
prompt_name=PROMPT_META["name"], # 必填!用于调用统计
)
ai_advice = AIAdvice(
summary=self._extract_section(response.content, "推荐方案") or response.content[:200],
cost_analysis=self._extract_section(response.content, "成本") or "",
market_analysis=self._extract_section(response.content, "市场") or "",
risk_notes=self._extract_section(response.content, "风险") or "",
recommendations=self._extract_recommendations(response.content),
)
ai_usage = AIUsage(
provider=response.provider,
model=response.model,
tokens=response.total_tokens,
latency_ms=response.latency_ms,
)
except Exception as e:
# AI 调用失败不影响基本定价计算
print(f"AI 调用失败: {e}")
return GeneratePricingResponse(
project_id=project_id,
project_name=project.project_name,
cost_base=base_cost,
market_reference=market_ref,
pricing_suggestions=pricing_suggestions,
ai_advice=ai_advice,
ai_usage=ai_usage,
)
async def generate_pricing_advice_stream(
self,
project_id: int,
target_margin: float = 50,
) -> AsyncIterator[str]:
"""流式生成定价建议
Args:
project_id: 项目ID
target_margin: 目标毛利率
Yields:
SSE 格式的响应片段
"""
# 获取基础数据
project, cost_summary = await self.get_project_with_cost(project_id)
if not cost_summary:
cost_result = await self.cost_service.calculate_project_cost(project_id)
base_cost = cost_result.total_cost
else:
base_cost = float(cost_summary.total_cost)
market_ref = await self.get_market_reference(project_id)
# 先返回基础定价计算结果
pricing_suggestions = self.calculate_all_strategies(
base_cost=base_cost,
target_margin=target_margin,
market_ref=market_ref,
)
# 发送初始数据
initial_data = {
"type": "init",
"project_name": project.project_name,
"cost_base": base_cost,
"market_reference": market_ref.model_dump() if market_ref else None,
"pricing_suggestions": pricing_suggestions.model_dump(),
}
yield f"data: {json.dumps(initial_data, ensure_ascii=False)}\n\n"
# 构建 AI 输入
cost_data = self._format_cost_data(cost_summary)
market_data = self._format_market_data(market_ref)
ai_service = AIServiceWrapper(db_session=self.db)
messages = [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": USER_PROMPT.format(
project_name=project.project_name,
cost_data=cost_data,
market_data=market_data,
target_margin=target_margin,
)},
]
# 流式返回 AI 建议
try:
async for chunk in ai_service.chat_stream(
messages=messages,
prompt_name=PROMPT_META["name"],
):
yield f"data: {json.dumps({'type': 'chunk', 'content': chunk}, ensure_ascii=False)}\n\n"
except Exception as e:
yield f"data: {json.dumps({'type': 'error', 'message': str(e)}, ensure_ascii=False)}\n\n"
# 发送完成信号
yield f"data: {json.dumps({'type': 'done'}, ensure_ascii=False)}\n\n"
async def simulate_strategies(
self,
project_id: int,
strategies: List[StrategyType],
target_margin: float = 50,
) -> SimulateStrategyResponse:
"""模拟定价策略
Args:
project_id: 项目ID
strategies: 要模拟的策略列表
target_margin: 目标毛利率
Returns:
策略模拟结果
"""
project, cost_summary = await self.get_project_with_cost(project_id)
if not cost_summary:
cost_result = await self.cost_service.calculate_project_cost(project_id)
base_cost = cost_result.total_cost
else:
base_cost = float(cost_summary.total_cost)
market_ref = await self.get_market_reference(project_id)
results = []
for strategy in strategies:
suggestion = self.calculate_strategy_price(
base_cost=base_cost,
strategy=strategy,
target_margin=target_margin if strategy == StrategyType.PROFIT else None,
market_ref=market_ref,
)
# 确定市场位置
market_position = "中等"
if market_ref:
if suggestion.suggested_price < market_ref.avg * 0.8:
market_position = "低于市场均价"
elif suggestion.suggested_price > market_ref.avg * 1.2:
market_position = "高于市场均价"
else:
market_position = "接近市场均价"
results.append(StrategySimulationResult(
strategy_type=strategy.value,
strategy_name=self.STRATEGY_NAMES[strategy],
suggested_price=suggestion.suggested_price,
margin=suggestion.margin,
profit_per_unit=round(suggestion.suggested_price - base_cost, 2),
market_position=market_position,
))
return SimulateStrategyResponse(
project_id=project_id,
project_name=project.project_name,
base_cost=base_cost,
results=results,
)
async def create_pricing_plan(
self,
project_id: int,
plan_name: str,
strategy_type: StrategyType,
target_margin: float,
created_by: Optional[int] = None,
) -> PricingPlan:
"""创建定价方案
Args:
project_id: 项目ID
plan_name: 方案名称
strategy_type: 策略类型
target_margin: 目标毛利率
created_by: 创建人ID
Returns:
创建的定价方案
"""
# 获取成本数据
project, cost_summary = await self.get_project_with_cost(project_id)
if not cost_summary:
cost_result = await self.cost_service.calculate_project_cost(project_id)
base_cost = Decimal(str(cost_result.total_cost))
else:
base_cost = cost_summary.total_cost
# 获取市场参考
market_ref = await self.get_market_reference(project_id)
# 计算建议价格
suggestion = self.calculate_strategy_price(
base_cost=float(base_cost),
strategy=strategy_type,
target_margin=target_margin if strategy_type == StrategyType.PROFIT else None,
market_ref=market_ref,
)
# 创建定价方案
pricing_plan = PricingPlan(
project_id=project_id,
plan_name=plan_name,
strategy_type=strategy_type.value,
base_cost=base_cost,
target_margin=Decimal(str(target_margin)),
suggested_price=Decimal(str(suggestion.suggested_price)),
is_active=True,
created_by=created_by,
)
self.db.add(pricing_plan)
await self.db.flush()
await self.db.refresh(pricing_plan)
return pricing_plan
async def update_pricing_plan(
self,
plan_id: int,
**kwargs
) -> PricingPlan:
"""更新定价方案"""
result = await self.db.execute(
select(PricingPlan).where(PricingPlan.id == plan_id)
)
plan = result.scalar_one_or_none()
if not plan:
raise ValueError(f"定价方案不存在: {plan_id}")
for key, value in kwargs.items():
if value is not None and hasattr(plan, key):
if key in ['target_margin', 'final_price', 'base_cost', 'suggested_price']:
setattr(plan, key, Decimal(str(value)))
else:
setattr(plan, key, value)
await self.db.flush()
await self.db.refresh(plan)
return plan
async def save_ai_advice(self, plan_id: int, advice: str) -> None:
"""保存 AI 建议到定价方案"""
result = await self.db.execute(
select(PricingPlan).where(PricingPlan.id == plan_id)
)
plan = result.scalar_one_or_none()
if plan:
plan.ai_advice = advice
await self.db.flush()
def _format_cost_data(self, cost_summary: Optional[ProjectCostSummary]) -> str:
"""格式化成本数据用于 AI 输入"""
if not cost_summary:
return "暂无成本数据"
return f"""- 耗材成本:{float(cost_summary.material_cost):.2f}
- 设备折旧:{float(cost_summary.equipment_cost):.2f}
- 人工成本:{float(cost_summary.labor_cost):.2f}
- 固定成本分摊:{float(cost_summary.fixed_cost_allocation):.2f}
- **总成本(最低成本线):{float(cost_summary.total_cost):.2f} 元**"""
def _format_market_data(self, market_ref: Optional[MarketReference]) -> str:
"""格式化市场数据用于 AI 输入"""
if not market_ref:
return "暂无市场行情数据"
return f"""- 市场最低价:{market_ref.min:.2f}
- 市场最高价:{market_ref.max:.2f}
- 市场均价:{market_ref.avg:.2f}"""
def _extract_section(self, content: str, keyword: str) -> Optional[str]:
"""从 AI 响应中提取特定部分"""
lines = content.split('\n')
in_section = False
section_lines = []
for line in lines:
if keyword in line and ('#' in line or '**' in line):
in_section = True
continue
elif in_section:
if line.startswith('#') or (line.startswith('**') and line.endswith('**')):
break
section_lines.append(line)
return '\n'.join(section_lines).strip() if section_lines else None
def _extract_recommendations(self, content: str) -> List[str]:
"""从 AI 响应中提取建议列表"""
recommendations = []
lines = content.split('\n')
for line in lines:
line = line.strip()
if line.startswith('- ') or line.startswith('* ') or line.startswith(''):
recommendations.append(line[2:].strip())
elif line and line[0].isdigit() and '.' in line:
# 处理 "1. xxx" 格式
parts = line.split('.', 1)
if len(parts) > 1:
recommendations.append(parts[1].strip())
return recommendations[:5] # 最多返回5条