All checks were successful
continuous-integration/drone/push Build is passing
- 新增告警模块 (alerts): 告警规则配置与触发 - 新增成本管理模块 (cost): 成本统计与分析 - 新增配额模块 (quota): 配额管理与限制 - 新增微信模块 (wechat): 微信相关功能接口 - 新增缓存服务 (cache): Redis 缓存封装 - 新增请求日志中间件 (request_logger) - 新增异常处理和链路追踪中间件 - 更新 dashboard 前端展示 - 更新 SDK stats_client 功能
456 lines
17 KiB
Python
456 lines
17 KiB
Python
"""告警服务"""
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional, List, Dict, Any
|
|
|
|
import httpx
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy import func
|
|
|
|
from ..models.alert import AlertRule, AlertRecord, NotificationChannel
|
|
from ..models.stats import AICallEvent
|
|
from ..models.logs import PlatformLog
|
|
from .cache import get_cache
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class AlertService:
|
|
"""告警服务
|
|
|
|
提供告警规则检测、告警记录管理、通知发送等功能
|
|
"""
|
|
|
|
def __init__(self, db: Session):
|
|
self.db = db
|
|
self._cache = get_cache()
|
|
|
|
async def check_all_rules(self) -> List[AlertRecord]:
|
|
"""检查所有启用的告警规则
|
|
|
|
Returns:
|
|
触发的告警记录列表
|
|
"""
|
|
rules = self.db.query(AlertRule).filter(AlertRule.status == 1).all()
|
|
triggered_alerts = []
|
|
|
|
for rule in rules:
|
|
try:
|
|
alert = await self.check_rule(rule)
|
|
if alert:
|
|
triggered_alerts.append(alert)
|
|
except Exception as e:
|
|
logger.error(f"Failed to check rule {rule.id}: {e}")
|
|
|
|
return triggered_alerts
|
|
|
|
async def check_rule(self, rule: AlertRule) -> Optional[AlertRecord]:
|
|
"""检查单个告警规则
|
|
|
|
Args:
|
|
rule: 告警规则
|
|
|
|
Returns:
|
|
触发的告警记录或None
|
|
"""
|
|
# 检查冷却期
|
|
if self._is_in_cooldown(rule):
|
|
logger.debug(f"Rule {rule.id} is in cooldown")
|
|
return None
|
|
|
|
# 检查每日告警次数限制
|
|
if self._exceeds_daily_limit(rule):
|
|
logger.debug(f"Rule {rule.id} exceeds daily limit")
|
|
return None
|
|
|
|
# 根据规则类型检查
|
|
metric_value = None
|
|
threshold_value = None
|
|
triggered = False
|
|
|
|
condition = rule.condition or {}
|
|
|
|
if rule.rule_type == 'error_rate':
|
|
triggered, metric_value, threshold_value = self._check_error_rate(rule, condition)
|
|
elif rule.rule_type == 'call_count':
|
|
triggered, metric_value, threshold_value = self._check_call_count(rule, condition)
|
|
elif rule.rule_type == 'token_usage':
|
|
triggered, metric_value, threshold_value = self._check_token_usage(rule, condition)
|
|
elif rule.rule_type == 'cost_threshold':
|
|
triggered, metric_value, threshold_value = self._check_cost_threshold(rule, condition)
|
|
elif rule.rule_type == 'latency':
|
|
triggered, metric_value, threshold_value = self._check_latency(rule, condition)
|
|
|
|
if triggered:
|
|
alert = self._create_alert_record(rule, metric_value, threshold_value)
|
|
return alert
|
|
|
|
return None
|
|
|
|
def _is_in_cooldown(self, rule: AlertRule) -> bool:
|
|
"""检查规则是否在冷却期"""
|
|
cache_key = f"alert:cooldown:{rule.id}"
|
|
return self._cache.exists(cache_key)
|
|
|
|
def _set_cooldown(self, rule: AlertRule):
|
|
"""设置规则冷却期"""
|
|
cache_key = f"alert:cooldown:{rule.id}"
|
|
self._cache.set(cache_key, "1", ttl=rule.cooldown_minutes * 60)
|
|
|
|
def _exceeds_daily_limit(self, rule: AlertRule) -> bool:
|
|
"""检查是否超过每日告警次数限制"""
|
|
today = datetime.now().date()
|
|
count = self.db.query(func.count(AlertRecord.id)).filter(
|
|
AlertRecord.rule_id == rule.id,
|
|
func.date(AlertRecord.created_at) == today
|
|
).scalar()
|
|
return count >= rule.max_alerts_per_day
|
|
|
|
def _check_error_rate(self, rule: AlertRule, condition: dict) -> tuple:
|
|
"""检查错误率"""
|
|
window_minutes = self._parse_window(condition.get('window', '5m'))
|
|
threshold = condition.get('threshold', 10) # 错误次数阈值
|
|
operator = condition.get('operator', '>')
|
|
|
|
since = datetime.now() - timedelta(minutes=window_minutes)
|
|
|
|
query = self.db.query(func.count(AICallEvent.id)).filter(
|
|
AICallEvent.created_at >= since,
|
|
AICallEvent.status == 'error'
|
|
)
|
|
|
|
# 应用作用范围
|
|
if rule.scope_type == 'tenant' and rule.scope_value:
|
|
query = query.filter(AICallEvent.tenant_id == rule.scope_value)
|
|
elif rule.scope_type == 'app' and rule.scope_value:
|
|
query = query.filter(AICallEvent.app_code == rule.scope_value)
|
|
|
|
error_count = query.scalar() or 0
|
|
triggered = self._compare(error_count, threshold, operator)
|
|
|
|
return triggered, str(error_count), str(threshold)
|
|
|
|
def _check_call_count(self, rule: AlertRule, condition: dict) -> tuple:
|
|
"""检查调用次数"""
|
|
window_minutes = self._parse_window(condition.get('window', '1h'))
|
|
threshold = condition.get('threshold', 1000)
|
|
operator = condition.get('operator', '>')
|
|
|
|
since = datetime.now() - timedelta(minutes=window_minutes)
|
|
|
|
query = self.db.query(func.count(AICallEvent.id)).filter(
|
|
AICallEvent.created_at >= since
|
|
)
|
|
|
|
if rule.scope_type == 'tenant' and rule.scope_value:
|
|
query = query.filter(AICallEvent.tenant_id == rule.scope_value)
|
|
elif rule.scope_type == 'app' and rule.scope_value:
|
|
query = query.filter(AICallEvent.app_code == rule.scope_value)
|
|
|
|
call_count = query.scalar() or 0
|
|
triggered = self._compare(call_count, threshold, operator)
|
|
|
|
return triggered, str(call_count), str(threshold)
|
|
|
|
def _check_token_usage(self, rule: AlertRule, condition: dict) -> tuple:
|
|
"""检查Token使用量"""
|
|
window_minutes = self._parse_window(condition.get('window', '1d'))
|
|
threshold = condition.get('threshold', 100000)
|
|
operator = condition.get('operator', '>')
|
|
|
|
since = datetime.now() - timedelta(minutes=window_minutes)
|
|
|
|
query = self.db.query(
|
|
func.coalesce(func.sum(AICallEvent.input_tokens + AICallEvent.output_tokens), 0)
|
|
).filter(
|
|
AICallEvent.created_at >= since
|
|
)
|
|
|
|
if rule.scope_type == 'tenant' and rule.scope_value:
|
|
query = query.filter(AICallEvent.tenant_id == rule.scope_value)
|
|
elif rule.scope_type == 'app' and rule.scope_value:
|
|
query = query.filter(AICallEvent.app_code == rule.scope_value)
|
|
|
|
token_usage = query.scalar() or 0
|
|
triggered = self._compare(token_usage, threshold, operator)
|
|
|
|
return triggered, str(token_usage), str(threshold)
|
|
|
|
def _check_cost_threshold(self, rule: AlertRule, condition: dict) -> tuple:
|
|
"""检查费用阈值"""
|
|
window_minutes = self._parse_window(condition.get('window', '1d'))
|
|
threshold = condition.get('threshold', 100) # 费用阈值(元)
|
|
operator = condition.get('operator', '>')
|
|
|
|
since = datetime.now() - timedelta(minutes=window_minutes)
|
|
|
|
query = self.db.query(
|
|
func.coalesce(func.sum(AICallEvent.cost), 0)
|
|
).filter(
|
|
AICallEvent.created_at >= since
|
|
)
|
|
|
|
if rule.scope_type == 'tenant' and rule.scope_value:
|
|
query = query.filter(AICallEvent.tenant_id == rule.scope_value)
|
|
elif rule.scope_type == 'app' and rule.scope_value:
|
|
query = query.filter(AICallEvent.app_code == rule.scope_value)
|
|
|
|
total_cost = float(query.scalar() or 0)
|
|
triggered = self._compare(total_cost, threshold, operator)
|
|
|
|
return triggered, f"¥{total_cost:.2f}", f"¥{threshold:.2f}"
|
|
|
|
def _check_latency(self, rule: AlertRule, condition: dict) -> tuple:
|
|
"""检查延迟"""
|
|
window_minutes = self._parse_window(condition.get('window', '5m'))
|
|
threshold = condition.get('threshold', 5000) # 延迟阈值(ms)
|
|
operator = condition.get('operator', '>')
|
|
percentile = condition.get('percentile', 'avg') # avg, p95, p99, max
|
|
|
|
since = datetime.now() - timedelta(minutes=window_minutes)
|
|
|
|
query = self.db.query(AICallEvent.latency_ms).filter(
|
|
AICallEvent.created_at >= since,
|
|
AICallEvent.latency_ms.isnot(None)
|
|
)
|
|
|
|
if rule.scope_type == 'tenant' and rule.scope_value:
|
|
query = query.filter(AICallEvent.tenant_id == rule.scope_value)
|
|
elif rule.scope_type == 'app' and rule.scope_value:
|
|
query = query.filter(AICallEvent.app_code == rule.scope_value)
|
|
|
|
latencies = [r.latency_ms for r in query.all()]
|
|
|
|
if not latencies:
|
|
return False, "0", str(threshold)
|
|
|
|
if percentile == 'avg':
|
|
metric = sum(latencies) / len(latencies)
|
|
elif percentile == 'max':
|
|
metric = max(latencies)
|
|
elif percentile == 'p95':
|
|
latencies.sort()
|
|
idx = int(len(latencies) * 0.95)
|
|
metric = latencies[idx] if idx < len(latencies) else latencies[-1]
|
|
elif percentile == 'p99':
|
|
latencies.sort()
|
|
idx = int(len(latencies) * 0.99)
|
|
metric = latencies[idx] if idx < len(latencies) else latencies[-1]
|
|
else:
|
|
metric = sum(latencies) / len(latencies)
|
|
|
|
triggered = self._compare(metric, threshold, operator)
|
|
|
|
return triggered, f"{metric:.0f}ms", f"{threshold}ms"
|
|
|
|
def _parse_window(self, window: str) -> int:
|
|
"""解析时间窗口字符串为分钟数"""
|
|
if window.endswith('m'):
|
|
return int(window[:-1])
|
|
elif window.endswith('h'):
|
|
return int(window[:-1]) * 60
|
|
elif window.endswith('d'):
|
|
return int(window[:-1]) * 60 * 24
|
|
else:
|
|
return int(window)
|
|
|
|
def _compare(self, value: float, threshold: float, operator: str) -> bool:
|
|
"""比较值与阈值"""
|
|
if operator == '>':
|
|
return value > threshold
|
|
elif operator == '>=':
|
|
return value >= threshold
|
|
elif operator == '<':
|
|
return value < threshold
|
|
elif operator == '<=':
|
|
return value <= threshold
|
|
elif operator == '==':
|
|
return value == threshold
|
|
elif operator == '!=':
|
|
return value != threshold
|
|
return False
|
|
|
|
def _create_alert_record(
|
|
self,
|
|
rule: AlertRule,
|
|
metric_value: str,
|
|
threshold_value: str
|
|
) -> AlertRecord:
|
|
"""创建告警记录"""
|
|
title = f"[{rule.priority.upper()}] {rule.name}"
|
|
message = f"规则 '{rule.name}' 触发告警\n当前值: {metric_value}\n阈值: {threshold_value}"
|
|
|
|
if rule.scope_type == 'tenant':
|
|
message += f"\n租户: {rule.scope_value}"
|
|
elif rule.scope_type == 'app':
|
|
message += f"\n应用: {rule.scope_value}"
|
|
|
|
alert = AlertRecord(
|
|
rule_id=rule.id,
|
|
rule_name=rule.name,
|
|
alert_type=rule.rule_type,
|
|
severity=self._priority_to_severity(rule.priority),
|
|
title=title,
|
|
message=message,
|
|
tenant_id=rule.scope_value if rule.scope_type == 'tenant' else None,
|
|
app_code=rule.scope_value if rule.scope_type == 'app' else None,
|
|
metric_value=metric_value,
|
|
threshold_value=threshold_value,
|
|
notification_status='pending'
|
|
)
|
|
|
|
self.db.add(alert)
|
|
self.db.commit()
|
|
self.db.refresh(alert)
|
|
|
|
# 设置冷却期
|
|
self._set_cooldown(rule)
|
|
|
|
logger.info(f"Alert triggered: {title}")
|
|
|
|
return alert
|
|
|
|
def _priority_to_severity(self, priority: str) -> str:
|
|
"""将优先级转换为严重程度"""
|
|
mapping = {
|
|
'low': 'info',
|
|
'medium': 'warning',
|
|
'high': 'error',
|
|
'critical': 'critical'
|
|
}
|
|
return mapping.get(priority, 'warning')
|
|
|
|
async def send_notification(self, alert: AlertRecord, rule: AlertRule) -> bool:
|
|
"""发送告警通知
|
|
|
|
Args:
|
|
alert: 告警记录
|
|
rule: 告警规则
|
|
|
|
Returns:
|
|
是否发送成功
|
|
"""
|
|
if not rule.notification_channels:
|
|
alert.notification_status = 'skipped'
|
|
self.db.commit()
|
|
return True
|
|
|
|
results = []
|
|
success = True
|
|
|
|
for channel_config in rule.notification_channels:
|
|
try:
|
|
result = await self._send_to_channel(channel_config, alert)
|
|
results.append(result)
|
|
if not result.get('success'):
|
|
success = False
|
|
except Exception as e:
|
|
logger.error(f"Failed to send notification: {e}")
|
|
results.append({'success': False, 'error': str(e)})
|
|
success = False
|
|
|
|
alert.notification_status = 'sent' if success else 'failed'
|
|
alert.notification_result = results
|
|
alert.notified_at = datetime.now()
|
|
self.db.commit()
|
|
|
|
return success
|
|
|
|
async def _send_to_channel(self, channel_config: dict, alert: AlertRecord) -> dict:
|
|
"""发送到指定渠道"""
|
|
channel_type = channel_config.get('type')
|
|
|
|
if channel_type == 'wechat_bot':
|
|
return await self._send_wechat_bot(channel_config, alert)
|
|
elif channel_type == 'webhook':
|
|
return await self._send_webhook(channel_config, alert)
|
|
else:
|
|
return {'success': False, 'error': f'Unsupported channel type: {channel_type}'}
|
|
|
|
async def _send_wechat_bot(self, config: dict, alert: AlertRecord) -> dict:
|
|
"""发送到企微机器人"""
|
|
webhook = config.get('webhook')
|
|
if not webhook:
|
|
return {'success': False, 'error': 'Missing webhook URL'}
|
|
|
|
# 构建消息
|
|
content = f"**{alert.title}**\n\n{alert.message}\n\n时间: {alert.created_at}"
|
|
|
|
payload = {
|
|
"msgtype": "markdown",
|
|
"markdown": {
|
|
"content": content
|
|
}
|
|
}
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10) as client:
|
|
response = await client.post(webhook, json=payload)
|
|
result = response.json()
|
|
|
|
if result.get('errcode', 0) == 0:
|
|
return {'success': True, 'channel': 'wechat_bot'}
|
|
else:
|
|
return {'success': False, 'error': result.get('errmsg')}
|
|
except Exception as e:
|
|
return {'success': False, 'error': str(e)}
|
|
|
|
async def _send_webhook(self, config: dict, alert: AlertRecord) -> dict:
|
|
"""发送到Webhook"""
|
|
url = config.get('url')
|
|
if not url:
|
|
return {'success': False, 'error': 'Missing webhook URL'}
|
|
|
|
payload = {
|
|
"alert_id": alert.id,
|
|
"title": alert.title,
|
|
"message": alert.message,
|
|
"severity": alert.severity,
|
|
"alert_type": alert.alert_type,
|
|
"metric_value": alert.metric_value,
|
|
"threshold_value": alert.threshold_value,
|
|
"created_at": alert.created_at.isoformat()
|
|
}
|
|
|
|
headers = config.get('headers', {})
|
|
method = config.get('method', 'POST')
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10) as client:
|
|
if method.upper() == 'POST':
|
|
response = await client.post(url, json=payload, headers=headers)
|
|
else:
|
|
response = await client.get(url, params=payload, headers=headers)
|
|
|
|
if response.status_code < 400:
|
|
return {'success': True, 'channel': 'webhook', 'status': response.status_code}
|
|
else:
|
|
return {'success': False, 'error': f'HTTP {response.status_code}'}
|
|
except Exception as e:
|
|
return {'success': False, 'error': str(e)}
|
|
|
|
def acknowledge_alert(self, alert_id: int, acknowledged_by: str) -> Optional[AlertRecord]:
|
|
"""确认告警"""
|
|
alert = self.db.query(AlertRecord).filter(AlertRecord.id == alert_id).first()
|
|
if not alert:
|
|
return None
|
|
|
|
alert.status = 'acknowledged'
|
|
alert.acknowledged_by = acknowledged_by
|
|
alert.acknowledged_at = datetime.now()
|
|
self.db.commit()
|
|
|
|
return alert
|
|
|
|
def resolve_alert(self, alert_id: int) -> Optional[AlertRecord]:
|
|
"""解决告警"""
|
|
alert = self.db.query(AlertRecord).filter(AlertRecord.id == alert_id).first()
|
|
if not alert:
|
|
return None
|
|
|
|
alert.status = 'resolved'
|
|
alert.resolved_at = datetime.now()
|
|
self.db.commit()
|
|
|
|
return alert
|