Files
000-platform/backend/app/routers/logs.py
Admin 9ff89f4a7d
All checks were successful
continuous-integration/drone/push Build is passing
fix: 修复日志查询接口访问不存在字段的问题
- ip_address 从 context.ip 获取
- 移除不存在的 extra_data 字段
- 修复 CSV/Excel 导出中的同样问题
2026-01-24 17:25:17 +08:00

281 lines
9.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""日志路由"""
import csv
import io
from typing import Optional
from datetime import datetime
from fastapi import APIRouter, Depends, Header, HTTPException, Query
from fastapi.responses import StreamingResponse
from sqlalchemy.orm import Session
from sqlalchemy import desc
from ..database import get_db
from ..config import get_settings
from ..models.logs import PlatformLog
from ..schemas.logs import LogCreate, LogResponse, BatchLogRequest
from ..services.auth import decode_token
router = APIRouter(prefix="/logs", tags=["logs"])
settings = get_settings()
# 尝试导入openpyxl
try:
from openpyxl import Workbook
from openpyxl.styles import Font, Alignment, PatternFill
OPENPYXL_AVAILABLE = True
except ImportError:
OPENPYXL_AVAILABLE = False
def get_current_user_optional(authorization: Optional[str] = Header(None)):
"""可选的用户认证"""
if authorization and authorization.startswith("Bearer "):
token = authorization[7:]
return decode_token(token)
return None
def verify_api_key(x_api_key: str = Header(..., alias="X-API-Key")):
"""验证API Key"""
if x_api_key != settings.API_KEY:
raise HTTPException(status_code=401, detail="Invalid API Key")
return x_api_key
@router.post("/write", response_model=LogResponse)
async def write_log(
log: LogCreate,
db: Session = Depends(get_db),
_: str = Depends(verify_api_key)
):
"""写入日志"""
db_log = PlatformLog(**log.model_dump())
db.add(db_log)
db.commit()
db.refresh(db_log)
return db_log
@router.post("/write/batch")
async def batch_write_logs(
request: BatchLogRequest,
db: Session = Depends(get_db),
_: str = Depends(verify_api_key)
):
"""批量写入日志"""
logs = [PlatformLog(**l.model_dump()) for l in request.logs]
db.add_all(logs)
db.commit()
return {"success": True, "count": len(logs)}
@router.get("")
async def query_logs(
page: int = Query(1, ge=1),
size: int = Query(20, ge=1, le=100),
log_type: Optional[str] = None,
level: Optional[str] = None,
app_code: Optional[str] = None,
tenant_id: Optional[str] = None,
trace_id: Optional[str] = None,
keyword: Optional[str] = None,
db: Session = Depends(get_db),
user = Depends(get_current_user_optional)
):
"""查询日志列表"""
query = db.query(PlatformLog)
if log_type:
query = query.filter(PlatformLog.log_type == log_type)
if level:
query = query.filter(PlatformLog.level == level)
if app_code:
query = query.filter(PlatformLog.app_code == app_code)
if tenant_id:
query = query.filter(PlatformLog.tenant_id == tenant_id)
if trace_id:
query = query.filter(PlatformLog.trace_id == trace_id)
if keyword:
query = query.filter(PlatformLog.message.like(f"%{keyword}%"))
total = query.count()
items = query.order_by(desc(PlatformLog.log_time)).offset((page-1)*size).limit(size).all()
return {
"total": total,
"page": page,
"size": size,
"items": [
{
"id": item.id,
"log_type": item.log_type,
"level": item.level,
"app_code": item.app_code,
"tenant_id": item.tenant_id,
"trace_id": item.trace_id,
"message": item.message,
"path": item.path,
"method": item.method,
"status_code": item.status_code,
"duration_ms": item.duration_ms,
"ip_address": item.context.get("ip") if item.context else None,
"context": item.context,
"stack_trace": item.stack_trace,
"log_time": str(item.log_time) if item.log_time else None
}
for item in items
]
}
@router.get("/export")
async def export_logs(
format: str = Query("csv", description="导出格式: csv 或 excel"),
log_type: Optional[str] = None,
level: Optional[str] = None,
app_code: Optional[str] = None,
tenant_id: Optional[str] = None,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
limit: int = Query(10000, ge=1, le=100000, description="最大导出记录数"),
db: Session = Depends(get_db),
user = Depends(get_current_user_optional)
):
"""导出日志
支持CSV和Excel格式最多导出10万条记录
"""
query = db.query(PlatformLog)
if log_type:
query = query.filter(PlatformLog.log_type == log_type)
if level:
query = query.filter(PlatformLog.level == level)
if app_code:
query = query.filter(PlatformLog.app_code == app_code)
if tenant_id:
query = query.filter(PlatformLog.tenant_id == tenant_id)
if start_date:
query = query.filter(PlatformLog.log_time >= start_date)
if end_date:
query = query.filter(PlatformLog.log_time <= end_date + " 23:59:59")
items = query.order_by(desc(PlatformLog.log_time)).limit(limit).all()
if format.lower() == "excel":
return export_excel(items)
else:
return export_csv(items)
def export_csv(logs: list) -> StreamingResponse:
"""导出为CSV格式"""
output = io.StringIO()
writer = csv.writer(output)
# 写入表头
headers = [
"ID", "类型", "级别", "应用", "租户", "Trace ID",
"消息", "路径", "方法", "状态码", "耗时(ms)",
"IP地址", "时间"
]
writer.writerow(headers)
# 写入数据
for log in logs:
ip_address = log.context.get("ip") if log.context else ""
writer.writerow([
log.id,
log.log_type,
log.level,
log.app_code or "",
log.tenant_id or "",
log.trace_id or "",
log.message or "",
log.path or "",
log.method or "",
log.status_code or "",
log.duration_ms or "",
ip_address or "",
str(log.log_time) if log.log_time else ""
])
output.seek(0)
# 生成文件名
filename = f"logs_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
return StreamingResponse(
iter([output.getvalue()]),
media_type="text/csv",
headers={
"Content-Disposition": f'attachment; filename="{filename}"',
"Content-Type": "text/csv; charset=utf-8-sig"
}
)
def export_excel(logs: list) -> StreamingResponse:
"""导出为Excel格式"""
if not OPENPYXL_AVAILABLE:
raise HTTPException(status_code=400, detail="Excel导出功能不可用请安装openpyxl")
wb = Workbook()
ws = wb.active
ws.title = "日志导出"
# 表头样式
header_font = Font(bold=True, color="FFFFFF")
header_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid")
header_alignment = Alignment(horizontal="center", vertical="center")
# 写入表头
headers = [
"ID", "类型", "级别", "应用", "租户", "Trace ID",
"消息", "路径", "方法", "状态码", "耗时(ms)",
"IP地址", "时间"
]
for col, header in enumerate(headers, 1):
cell = ws.cell(row=1, column=col, value=header)
cell.font = header_font
cell.fill = header_fill
cell.alignment = header_alignment
# 写入数据
for row, log in enumerate(logs, 2):
ip_address = log.context.get("ip") if log.context else ""
ws.cell(row=row, column=1, value=log.id)
ws.cell(row=row, column=2, value=log.log_type)
ws.cell(row=row, column=3, value=log.level)
ws.cell(row=row, column=4, value=log.app_code or "")
ws.cell(row=row, column=5, value=log.tenant_id or "")
ws.cell(row=row, column=6, value=log.trace_id or "")
ws.cell(row=row, column=7, value=log.message or "")
ws.cell(row=row, column=8, value=log.path or "")
ws.cell(row=row, column=9, value=log.method or "")
ws.cell(row=row, column=10, value=log.status_code or "")
ws.cell(row=row, column=11, value=log.duration_ms or "")
ws.cell(row=row, column=12, value=ip_address or "")
ws.cell(row=row, column=13, value=str(log.log_time) if log.log_time else "")
# 调整列宽
column_widths = [8, 10, 10, 12, 12, 36, 50, 30, 8, 10, 10, 15, 20]
for col, width in enumerate(column_widths, 1):
ws.column_dimensions[chr(64 + col)].width = width
# 保存到内存
output = io.BytesIO()
wb.save(output)
output.seek(0)
# 生成文件名
filename = f"logs_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx"
return StreamingResponse(
iter([output.getvalue()]),
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
headers={
"Content-Disposition": f'attachment; filename="{filename}"'
}
)