Files
smart-project-pricing/后端服务/app/services/market_service.py
2026-01-31 21:33:06 +08:00

368 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""市场分析服务
实现市场价格分析核心业务逻辑,包含:
- 竞品价格统计分析
- 价格分布计算
- 建议定价区间生成
"""
from datetime import date
from decimal import Decimal
from typing import Optional, List
from statistics import mean, median, stdev
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.models import (
Project,
Competitor,
CompetitorPrice,
BenchmarkPrice,
MarketAnalysisResult,
Category,
)
from app.schemas.market import (
MarketAnalysisResult as MarketAnalysisResultSchema,
PriceStatistics,
PriceDistribution,
PriceDistributionItem,
CompetitorPriceSummary,
BenchmarkReference,
SuggestedRange,
)
class MarketService:
"""市场分析服务"""
def __init__(self, db: AsyncSession):
self.db = db
async def get_competitor_prices_for_project(
self,
project_id: int,
competitor_ids: Optional[List[int]] = None
) -> List[CompetitorPrice]:
"""获取项目的竞品价格数据
Args:
project_id: 项目ID
competitor_ids: 指定竞品机构ID列表
Returns:
竞品价格列表
"""
query = select(CompetitorPrice).options(
selectinload(CompetitorPrice.competitor)
).where(
CompetitorPrice.project_id == project_id
)
if competitor_ids:
query = query.where(CompetitorPrice.competitor_id.in_(competitor_ids))
result = await self.db.execute(query.order_by(CompetitorPrice.collected_at.desc()))
return result.scalars().all()
async def get_benchmark_prices_for_category(
self,
category_id: Optional[int]
) -> List[BenchmarkPrice]:
"""获取分类的标杆价格
Args:
category_id: 分类ID
Returns:
标杆价格列表
"""
if not category_id:
return []
query = select(BenchmarkPrice).where(
BenchmarkPrice.category_id == category_id
).order_by(BenchmarkPrice.effective_date.desc())
result = await self.db.execute(query)
return result.scalars().all()
def calculate_price_statistics(self, prices: List[float]) -> PriceStatistics:
"""计算价格统计数据
Args:
prices: 价格列表
Returns:
价格统计
"""
if not prices:
return PriceStatistics(
min_price=0,
max_price=0,
avg_price=0,
median_price=0,
std_deviation=0
)
std_dev = None
if len(prices) > 1:
try:
std_dev = round(stdev(prices), 2)
except Exception:
pass
return PriceStatistics(
min_price=round(min(prices), 2),
max_price=round(max(prices), 2),
avg_price=round(mean(prices), 2),
median_price=round(median(prices), 2),
std_deviation=std_dev
)
def calculate_price_distribution(
self,
prices: List[float],
min_price: float,
max_price: float
) -> PriceDistribution:
"""计算价格分布
将价格分为低/中/高三个区间
Args:
prices: 价格列表
min_price: 最低价
max_price: 最高价
Returns:
价格分布
"""
if not prices or min_price >= max_price:
return PriceDistribution(
low=PriceDistributionItem(range="N/A", count=0, percentage=0),
medium=PriceDistributionItem(range="N/A", count=0, percentage=0),
high=PriceDistributionItem(range="N/A", count=0, percentage=0),
)
# 计算三个区间的边界
range_size = (max_price - min_price) / 3
low_upper = min_price + range_size
mid_upper = min_price + range_size * 2
# 统计各区间数量
low_count = sum(1 for p in prices if p < low_upper)
mid_count = sum(1 for p in prices if low_upper <= p < mid_upper)
high_count = sum(1 for p in prices if p >= mid_upper)
total = len(prices)
return PriceDistribution(
low=PriceDistributionItem(
range=f"{int(min_price)}-{int(low_upper)}",
count=low_count,
percentage=round(low_count / total * 100, 1) if total > 0 else 0
),
medium=PriceDistributionItem(
range=f"{int(low_upper)}-{int(mid_upper)}",
count=mid_count,
percentage=round(mid_count / total * 100, 1) if total > 0 else 0
),
high=PriceDistributionItem(
range=f"{int(mid_upper)}-{int(max_price)}",
count=high_count,
percentage=round(high_count / total * 100, 1) if total > 0 else 0
),
)
def calculate_suggested_range(
self,
avg_price: float,
min_price: float,
max_price: float,
benchmark_avg: Optional[float] = None
) -> SuggestedRange:
"""计算建议定价区间
Args:
avg_price: 市场均价
min_price: 市场最低价
max_price: 市场最高价
benchmark_avg: 标杆均价(可选)
Returns:
建议定价区间
"""
if avg_price == 0:
return SuggestedRange(min=0, max=0, recommended=0)
# 建议区间以均价为中心±20%
range_factor = 0.2
suggested_min = round(avg_price * (1 - range_factor), 2)
suggested_max = round(avg_price * (1 + range_factor), 2)
# 确保不低于市场最低价的80%不高于市场最高价的120%
suggested_min = max(suggested_min, round(min_price * 0.8, 2))
suggested_max = min(suggested_max, round(max_price * 1.2, 2))
# 推荐价格:如果有标杆价格,取市场均价和标杆均价的加权平均
if benchmark_avg:
recommended = round((avg_price * 0.6 + benchmark_avg * 0.4), 2)
else:
recommended = avg_price
return SuggestedRange(
min=suggested_min,
max=suggested_max,
recommended=recommended
)
async def analyze_market(
self,
project_id: int,
competitor_ids: Optional[List[int]] = None,
include_benchmark: bool = True
) -> MarketAnalysisResultSchema:
"""执行市场价格分析
Args:
project_id: 项目ID
competitor_ids: 指定竞品机构ID列表
include_benchmark: 是否包含标杆参考
Returns:
市场分析结果
"""
# 获取项目信息
project_result = await self.db.execute(
select(Project).where(Project.id == project_id)
)
project = project_result.scalar_one_or_none()
if not project:
raise ValueError(f"项目不存在: {project_id}")
# 获取竞品价格
competitor_prices = await self.get_competitor_prices_for_project(
project_id, competitor_ids
)
# 提取价格列表(使用原价)
prices = [float(cp.original_price) for cp in competitor_prices]
# 计算统计数据
price_stats = self.calculate_price_statistics(prices)
# 计算价格分布
price_distribution = None
if len(prices) >= 3:
price_distribution = self.calculate_price_distribution(
prices, price_stats.min_price, price_stats.max_price
)
# 构建竞品价格摘要
competitor_summaries = []
for cp in competitor_prices[:10]: # 最多返回10条
competitor_summaries.append(CompetitorPriceSummary(
competitor_name=cp.competitor.competitor_name if cp.competitor else "未知",
positioning=cp.competitor.positioning if cp.competitor else "medium",
original_price=float(cp.original_price),
promo_price=float(cp.promo_price) if cp.promo_price else None,
collected_at=cp.collected_at,
))
# 获取标杆参考
benchmark_ref = None
benchmark_avg = None
if include_benchmark and project.category_id:
benchmarks = await self.get_benchmark_prices_for_category(project.category_id)
if benchmarks:
latest_benchmark = benchmarks[0]
benchmark_avg = float(latest_benchmark.avg_price)
benchmark_ref = BenchmarkReference(
tier=latest_benchmark.price_tier,
min_price=float(latest_benchmark.min_price),
max_price=float(latest_benchmark.max_price),
avg_price=benchmark_avg,
)
# 计算建议区间
suggested_range = self.calculate_suggested_range(
price_stats.avg_price,
price_stats.min_price,
price_stats.max_price,
benchmark_avg
)
analysis_date = date.today()
# 保存分析结果到数据库
await self._save_analysis_result(
project_id=project_id,
analysis_date=analysis_date,
competitor_count=len(competitor_prices),
min_price=price_stats.min_price,
max_price=price_stats.max_price,
avg_price=price_stats.avg_price,
median_price=price_stats.median_price,
suggested_min=suggested_range.min,
suggested_max=suggested_range.max,
)
return MarketAnalysisResultSchema(
project_id=project_id,
project_name=project.project_name,
analysis_date=analysis_date,
competitor_count=len(competitor_prices),
price_statistics=price_stats,
price_distribution=price_distribution,
competitor_prices=competitor_summaries,
benchmark_reference=benchmark_ref,
suggested_range=suggested_range,
)
async def _save_analysis_result(
self,
project_id: int,
analysis_date: date,
competitor_count: int,
min_price: float,
max_price: float,
avg_price: float,
median_price: float,
suggested_min: float,
suggested_max: float,
):
"""保存分析结果"""
# 创建新记录(保留历史)
result = MarketAnalysisResult(
project_id=project_id,
analysis_date=analysis_date,
competitor_count=competitor_count,
market_min_price=Decimal(str(min_price)),
market_max_price=Decimal(str(max_price)),
market_avg_price=Decimal(str(avg_price)),
market_median_price=Decimal(str(median_price)),
suggested_range_min=Decimal(str(suggested_min)),
suggested_range_max=Decimal(str(suggested_max)),
)
self.db.add(result)
await self.db.flush()
async def get_latest_analysis(self, project_id: int) -> Optional[MarketAnalysisResult]:
"""获取最新的市场分析结果
Args:
project_id: 项目ID
Returns:
最新分析结果
"""
result = await self.db.execute(
select(MarketAnalysisResult).where(
MarketAnalysisResult.project_id == project_id
).order_by(MarketAnalysisResult.analysis_date.desc()).limit(1)
)
return result.scalar_one_or_none()