"""日志路由""" import csv import io from typing import Optional from datetime import datetime from fastapi import APIRouter, Depends, Header, HTTPException, Query from fastapi.responses import StreamingResponse from sqlalchemy.orm import Session from sqlalchemy import desc from ..database import get_db from ..config import get_settings from ..models.logs import PlatformLog from ..schemas.logs import LogCreate, LogResponse, BatchLogRequest from ..services.auth import decode_token router = APIRouter(prefix="/logs", tags=["logs"]) settings = get_settings() # 尝试导入openpyxl try: from openpyxl import Workbook from openpyxl.styles import Font, Alignment, PatternFill OPENPYXL_AVAILABLE = True except ImportError: OPENPYXL_AVAILABLE = False def get_current_user_optional(authorization: Optional[str] = Header(None)): """可选的用户认证""" if authorization and authorization.startswith("Bearer "): token = authorization[7:] return decode_token(token) return None def verify_api_key(x_api_key: str = Header(..., alias="X-API-Key")): """验证API Key""" if x_api_key != settings.API_KEY: raise HTTPException(status_code=401, detail="Invalid API Key") return x_api_key @router.post("/write", response_model=LogResponse) async def write_log( log: LogCreate, db: Session = Depends(get_db), _: str = Depends(verify_api_key) ): """写入日志""" db_log = PlatformLog(**log.model_dump()) db.add(db_log) db.commit() db.refresh(db_log) return db_log @router.post("/write/batch") async def batch_write_logs( request: BatchLogRequest, db: Session = Depends(get_db), _: str = Depends(verify_api_key) ): """批量写入日志""" logs = [PlatformLog(**l.model_dump()) for l in request.logs] db.add_all(logs) db.commit() return {"success": True, "count": len(logs)} @router.get("") async def query_logs( page: int = Query(1, ge=1), size: int = Query(20, ge=1, le=100), log_type: Optional[str] = None, level: Optional[str] = None, app_code: Optional[str] = None, tenant_id: Optional[str] = None, trace_id: Optional[str] = None, keyword: Optional[str] = None, db: Session = Depends(get_db), user = Depends(get_current_user_optional) ): """查询日志列表""" query = db.query(PlatformLog) if log_type: query = query.filter(PlatformLog.log_type == log_type) if level: query = query.filter(PlatformLog.level == level) if app_code: query = query.filter(PlatformLog.app_code == app_code) if tenant_id: query = query.filter(PlatformLog.tenant_id == tenant_id) if trace_id: query = query.filter(PlatformLog.trace_id == trace_id) if keyword: query = query.filter(PlatformLog.message.like(f"%{keyword}%")) total = query.count() items = query.order_by(desc(PlatformLog.log_time)).offset((page-1)*size).limit(size).all() return { "total": total, "page": page, "size": size, "items": [ { "id": item.id, "log_type": item.log_type, "level": item.level, "app_code": item.app_code, "tenant_id": item.tenant_id, "trace_id": item.trace_id, "message": item.message, "path": item.path, "method": item.method, "status_code": item.status_code, "duration_ms": item.duration_ms, "ip_address": item.context.get("ip") if item.context else None, "context": item.context, "stack_trace": item.stack_trace, "log_time": str(item.log_time) if item.log_time else None } for item in items ] } @router.get("/export") async def export_logs( format: str = Query("csv", description="导出格式: csv 或 excel"), log_type: Optional[str] = None, level: Optional[str] = None, app_code: Optional[str] = None, tenant_id: Optional[str] = None, start_date: Optional[str] = None, end_date: Optional[str] = None, limit: int = Query(10000, ge=1, le=100000, description="最大导出记录数"), db: Session = Depends(get_db), user = Depends(get_current_user_optional) ): """导出日志 支持CSV和Excel格式,最多导出10万条记录 """ query = db.query(PlatformLog) if log_type: query = query.filter(PlatformLog.log_type == log_type) if level: query = query.filter(PlatformLog.level == level) if app_code: query = query.filter(PlatformLog.app_code == app_code) if tenant_id: query = query.filter(PlatformLog.tenant_id == tenant_id) if start_date: query = query.filter(PlatformLog.log_time >= start_date) if end_date: query = query.filter(PlatformLog.log_time <= end_date + " 23:59:59") items = query.order_by(desc(PlatformLog.log_time)).limit(limit).all() if format.lower() == "excel": return export_excel(items) else: return export_csv(items) def export_csv(logs: list) -> StreamingResponse: """导出为CSV格式""" output = io.StringIO() writer = csv.writer(output) # 写入表头 headers = [ "ID", "类型", "级别", "应用", "租户", "Trace ID", "消息", "路径", "方法", "状态码", "耗时(ms)", "IP地址", "时间" ] writer.writerow(headers) # 写入数据 for log in logs: ip_address = log.context.get("ip") if log.context else "" writer.writerow([ log.id, log.log_type, log.level, log.app_code or "", log.tenant_id or "", log.trace_id or "", log.message or "", log.path or "", log.method or "", log.status_code or "", log.duration_ms or "", ip_address or "", str(log.log_time) if log.log_time else "" ]) output.seek(0) # 生成文件名 filename = f"logs_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" return StreamingResponse( iter([output.getvalue()]), media_type="text/csv", headers={ "Content-Disposition": f'attachment; filename="{filename}"', "Content-Type": "text/csv; charset=utf-8-sig" } ) def export_excel(logs: list) -> StreamingResponse: """导出为Excel格式""" if not OPENPYXL_AVAILABLE: raise HTTPException(status_code=400, detail="Excel导出功能不可用,请安装openpyxl") wb = Workbook() ws = wb.active ws.title = "日志导出" # 表头样式 header_font = Font(bold=True, color="FFFFFF") header_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid") header_alignment = Alignment(horizontal="center", vertical="center") # 写入表头 headers = [ "ID", "类型", "级别", "应用", "租户", "Trace ID", "消息", "路径", "方法", "状态码", "耗时(ms)", "IP地址", "时间" ] for col, header in enumerate(headers, 1): cell = ws.cell(row=1, column=col, value=header) cell.font = header_font cell.fill = header_fill cell.alignment = header_alignment # 写入数据 for row, log in enumerate(logs, 2): ip_address = log.context.get("ip") if log.context else "" ws.cell(row=row, column=1, value=log.id) ws.cell(row=row, column=2, value=log.log_type) ws.cell(row=row, column=3, value=log.level) ws.cell(row=row, column=4, value=log.app_code or "") ws.cell(row=row, column=5, value=log.tenant_id or "") ws.cell(row=row, column=6, value=log.trace_id or "") ws.cell(row=row, column=7, value=log.message or "") ws.cell(row=row, column=8, value=log.path or "") ws.cell(row=row, column=9, value=log.method or "") ws.cell(row=row, column=10, value=log.status_code or "") ws.cell(row=row, column=11, value=log.duration_ms or "") ws.cell(row=row, column=12, value=ip_address or "") ws.cell(row=row, column=13, value=str(log.log_time) if log.log_time else "") # 调整列宽 column_widths = [8, 10, 10, 12, 12, 36, 50, 30, 8, 10, 10, 15, 20] for col, width in enumerate(column_widths, 1): ws.column_dimensions[chr(64 + col)].width = width # 保存到内存 output = io.BytesIO() wb.save(output) output.seek(0) # 生成文件名 filename = f"logs_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx" return StreamingResponse( iter([output.getvalue()]), media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", headers={ "Content-Disposition": f'attachment; filename="{filename}"' } )