Files
012-kaopeilian/backend/app/api/v1/auth.py
yuliang_guo d59a4355a5
All checks were successful
continuous-integration/drone/push Build is passing
fix: 修复安全问题 - 登录失败返回401 + XSS过滤
- 登录失败返回 HTTP 401 而非 200
- 添加 XSS 输入过滤工具函数
- 课程名称和描述字段添加 XSS 过滤验证器
2026-01-31 10:39:07 +08:00

265 lines
8.0 KiB
Python

"""
认证 API
"""
from fastapi import APIRouter, Depends, status, Request, Query
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.deps import get_current_active_user, get_db
from app.core.logger import logger
from app.models.user import User
from app.schemas.auth import LoginRequest, RefreshTokenRequest, Token, DingtalkLoginRequest
from app.schemas.base import ResponseModel
from app.schemas.user import User as UserSchema
from app.services.auth_service import AuthService
from app.services.dingtalk_auth_service import DingtalkAuthService
from app.services.system_log_service import system_log_service
from app.schemas.system_log import SystemLogCreate
from app.core.exceptions import UnauthorizedError
router = APIRouter()
@router.post("/login", response_model=ResponseModel)
async def login(
login_data: LoginRequest,
request: Request,
db: AsyncSession = Depends(get_db),
) -> ResponseModel:
"""
用户登录
支持使用用户名、邮箱或手机号登录
"""
auth_service = AuthService(db)
try:
user, token = await auth_service.login(
username=login_data.username,
password=login_data.password,
)
# 记录登录成功日志
await system_log_service.create_log(
db,
SystemLogCreate(
level="INFO",
type="security",
message=f"用户 {user.username} 登录成功",
user_id=user.id,
user=user.username,
ip=request.client.host if request.client else None,
path="/api/v1/auth/login",
method="POST",
user_agent=request.headers.get("user-agent")
)
)
return ResponseModel(
message="登录成功",
data={
"user": UserSchema.model_validate(user).model_dump(),
"token": token.model_dump(),
},
)
except UnauthorizedError as e:
# 记录登录失败日志
await system_log_service.create_log(
db,
SystemLogCreate(
level="WARNING",
type="security",
message=f"用户 {login_data.username} 登录失败:用户名或密码错误",
user=login_data.username,
ip=request.client.host if request.client else None,
path="/api/v1/auth/login",
method="POST",
user_agent=request.headers.get("user-agent")
)
)
logger.warning("login_failed_wrong_credentials", username=login_data.username)
# 返回 HTTP 401 + 统一错误消息(避免用户枚举)
from fastapi.responses import JSONResponse
return JSONResponse(
status_code=401,
content={
"code": 401,
"message": "用户名或密码错误",
"data": None,
}
)
except Exception as e:
logger.error("login_failed_unexpected", error=str(e))
from fastapi.responses import JSONResponse
return JSONResponse(
status_code=500,
content={
"code": 500,
"message": "登录失败,请稍后重试",
"data": None,
}
)
@router.post("/refresh", response_model=ResponseModel)
async def refresh_token(
refresh_data: RefreshTokenRequest,
db: AsyncSession = Depends(get_db),
) -> ResponseModel:
"""
刷新访问令牌
使用刷新令牌获取新的访问令牌
"""
auth_service = AuthService(db)
token = await auth_service.refresh_token(refresh_data.refresh_token)
return ResponseModel(message="令牌刷新成功", data=token.model_dump())
@router.post("/logout", response_model=ResponseModel)
async def logout(
request: Request,
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db),
) -> ResponseModel:
"""
用户登出
注意:客户端需要删除本地存储的令牌
"""
auth_service = AuthService(db)
await auth_service.logout(current_user.id)
# 记录登出日志
await system_log_service.create_log(
db,
SystemLogCreate(
level="INFO",
type="security",
message=f"用户 {current_user.username} 登出",
user_id=current_user.id,
user=current_user.username,
ip=request.client.host if request.client else None,
path="/api/v1/auth/logout",
method="POST",
user_agent=request.headers.get("user-agent")
)
)
return ResponseModel(message="登出成功")
@router.get("/verify", response_model=ResponseModel)
async def verify_token(
current_user: User = Depends(get_current_active_user),
) -> ResponseModel:
"""
验证令牌
用于检查当前令牌是否有效
"""
return ResponseModel(
message="令牌有效",
data={
"user": UserSchema.model_validate(current_user).model_dump(),
},
)
# ============================================
# 钉钉免密登录 API
# ============================================
@router.post("/dingtalk/login", response_model=ResponseModel)
async def dingtalk_login(
login_data: DingtalkLoginRequest,
request: Request,
db: AsyncSession = Depends(get_db),
) -> ResponseModel:
"""
钉钉免密登录
通过钉钉免登授权码登录系统
"""
dingtalk_service = DingtalkAuthService(db)
try:
user, token = await dingtalk_service.dingtalk_login(
tenant_id=login_data.tenant_id,
code=login_data.code,
)
# 记录登录成功日志
await system_log_service.create_log(
db,
SystemLogCreate(
level="INFO",
type="security",
message=f"用户 {user.username} 通过钉钉免密登录成功",
user_id=user.id,
user=user.username,
ip=request.client.host if request.client else None,
path="/api/v1/auth/dingtalk/login",
method="POST",
user_agent=request.headers.get("user-agent")
)
)
return ResponseModel(
message="钉钉登录成功",
data={
"user": UserSchema.model_validate(user).model_dump(),
"token": token.model_dump(),
},
)
except Exception as e:
error_msg = str(e)
logger.warning("dingtalk_login_failed", error=error_msg)
# 记录登录失败日志
await system_log_service.create_log(
db,
SystemLogCreate(
level="WARNING",
type="security",
message=f"钉钉免密登录失败:{error_msg}",
ip=request.client.host if request.client else None,
path="/api/v1/auth/dingtalk/login",
method="POST",
user_agent=request.headers.get("user-agent")
)
)
return ResponseModel(
code=400,
message=error_msg,
data=None,
)
@router.get("/dingtalk/config", response_model=ResponseModel)
async def get_dingtalk_config(
tenant_id: int = Query(default=1, description="租户ID"),
db: AsyncSession = Depends(get_db),
) -> ResponseModel:
"""
获取钉钉公开配置
前端需要使用 corpId 和 agentId 初始化钉钉JSDK
仅返回非敏感信息
"""
dingtalk_service = DingtalkAuthService(db)
try:
config = await dingtalk_service.get_public_config(tenant_id)
return ResponseModel(
message="获取成功",
data=config,
)
except Exception as e:
logger.error("get_dingtalk_config_failed", error=str(e))
return ResponseModel(
code=500,
message="获取钉钉配置失败",
data={"enabled": False, "corp_id": None, "agent_id": None},
)