Files
012-kaopeilian/backend/app/api/v1/notifications.py
111 998211c483 feat: 初始化考培练系统项目
- 从服务器拉取完整代码
- 按框架规范整理项目结构
- 配置 Drone CI 测试环境部署
- 包含后端(FastAPI)、前端(Vue3)、管理端

技术栈: Vue3 + TypeScript + FastAPI + MySQL
2026-01-24 19:33:28 +08:00

256 lines
7.8 KiB
Python

"""
站内消息通知 API
提供通知的查询、标记已读、删除等功能
"""
import logging
from typing import Optional, List
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.deps import get_db, get_current_user
from app.models.user import User
from app.schemas.base import ResponseModel
from app.schemas.notification import (
NotificationCreate,
NotificationBatchCreate,
NotificationResponse,
NotificationListResponse,
NotificationCountResponse,
MarkReadRequest,
)
from app.services.notification_service import notification_service
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/notifications")
@router.get("", response_model=ResponseModel[NotificationListResponse])
async def get_notifications(
is_read: Optional[bool] = Query(None, description="是否已读筛选"),
type: Optional[str] = Query(None, description="通知类型筛选"),
page: int = Query(1, ge=1, description="页码"),
page_size: int = Query(20, ge=1, le=100, description="每页数量"),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
获取当前用户的通知列表
支持按已读状态和通知类型筛选
"""
try:
skip = (page - 1) * page_size
notifications, total, unread_count = await notification_service.get_user_notifications(
db=db,
user_id=current_user.id,
skip=skip,
limit=page_size,
is_read=is_read,
notification_type=type
)
response_data = NotificationListResponse(
items=notifications,
total=total,
unread_count=unread_count
)
return ResponseModel(
code=200,
message="获取通知列表成功",
data=response_data
)
except Exception as e:
logger.error(f"获取通知列表失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"获取通知列表失败: {str(e)}")
@router.get("/unread-count", response_model=ResponseModel[NotificationCountResponse])
async def get_unread_count(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
获取当前用户的未读通知数量
用于顶部导航栏显示未读消息数
"""
try:
unread_count, total = await notification_service.get_unread_count(
db=db,
user_id=current_user.id
)
return ResponseModel(
code=200,
message="获取未读数量成功",
data=NotificationCountResponse(
unread_count=unread_count,
total=total
)
)
except Exception as e:
logger.error(f"获取未读数量失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"获取未读数量失败: {str(e)}")
@router.post("/mark-read", response_model=ResponseModel)
async def mark_notifications_read(
request: MarkReadRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
标记通知为已读
- 传入 notification_ids 则标记指定通知
- 不传则标记全部未读通知为已读
"""
try:
updated_count = await notification_service.mark_as_read(
db=db,
user_id=current_user.id,
notification_ids=request.notification_ids
)
return ResponseModel(
code=200,
message=f"成功标记 {updated_count} 条通知为已读",
data={"updated_count": updated_count}
)
except Exception as e:
logger.error(f"标记已读失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"标记已读失败: {str(e)}")
@router.delete("/{notification_id}", response_model=ResponseModel)
async def delete_notification(
notification_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
删除单条通知
只能删除自己的通知
"""
try:
success = await notification_service.delete_notification(
db=db,
user_id=current_user.id,
notification_id=notification_id
)
if not success:
raise HTTPException(status_code=404, detail="通知不存在或无权删除")
return ResponseModel(
code=200,
message="删除通知成功",
data={"deleted": True}
)
except HTTPException:
raise
except Exception as e:
logger.error(f"删除通知失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"删除通知失败: {str(e)}")
# ==================== 管理员接口 ====================
@router.post("/send", response_model=ResponseModel[NotificationResponse])
async def send_notification(
notification_in: NotificationCreate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
发送单条通知(管理员接口)
向指定用户发送通知
"""
try:
# 权限检查:仅管理员和管理者可发送通知
if current_user.role not in ["admin", "manager"]:
raise HTTPException(status_code=403, detail="无权限发送通知")
# 设置发送者
notification_in.sender_id = current_user.id
notification = await notification_service.create_notification(
db=db,
notification_in=notification_in
)
# 构建响应
response = NotificationResponse(
id=notification.id,
user_id=notification.user_id,
title=notification.title,
content=notification.content,
type=notification.type,
is_read=notification.is_read,
related_id=notification.related_id,
related_type=notification.related_type,
sender_id=notification.sender_id,
sender_name=current_user.full_name,
created_at=notification.created_at,
updated_at=notification.updated_at
)
return ResponseModel(
code=200,
message="发送通知成功",
data=response
)
except HTTPException:
raise
except Exception as e:
logger.error(f"发送通知失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"发送通知失败: {str(e)}")
@router.post("/send-batch", response_model=ResponseModel)
async def send_batch_notifications(
batch_in: NotificationBatchCreate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
批量发送通知(管理员接口)
向多个用户发送相同的通知
"""
try:
# 权限检查:仅管理员和管理者可发送通知
if current_user.role not in ["admin", "manager"]:
raise HTTPException(status_code=403, detail="无权限发送通知")
# 设置发送者
batch_in.sender_id = current_user.id
notifications = await notification_service.batch_create_notifications(
db=db,
batch_in=batch_in
)
return ResponseModel(
code=200,
message=f"成功发送 {len(notifications)} 条通知",
data={"sent_count": len(notifications)}
)
except HTTPException:
raise
except Exception as e:
logger.error(f"批量发送通知失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"批量发送通知失败: {str(e)}")