All checks were successful
continuous-integration/drone/push Build is passing
- 登录失败返回 HTTP 401 而非 200 - 添加 XSS 输入过滤工具函数 - 课程名称和描述字段添加 XSS 过滤验证器
265 lines
8.0 KiB
Python
265 lines
8.0 KiB
Python
"""
|
|
认证 API
|
|
"""
|
|
from fastapi import APIRouter, Depends, status, Request, Query
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.core.deps import get_current_active_user, get_db
|
|
from app.core.logger import logger
|
|
from app.models.user import User
|
|
from app.schemas.auth import LoginRequest, RefreshTokenRequest, Token, DingtalkLoginRequest
|
|
from app.schemas.base import ResponseModel
|
|
from app.schemas.user import User as UserSchema
|
|
from app.services.auth_service import AuthService
|
|
from app.services.dingtalk_auth_service import DingtalkAuthService
|
|
from app.services.system_log_service import system_log_service
|
|
from app.schemas.system_log import SystemLogCreate
|
|
from app.core.exceptions import UnauthorizedError
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
@router.post("/login", response_model=ResponseModel)
|
|
async def login(
|
|
login_data: LoginRequest,
|
|
request: Request,
|
|
db: AsyncSession = Depends(get_db),
|
|
) -> ResponseModel:
|
|
"""
|
|
用户登录
|
|
|
|
支持使用用户名、邮箱或手机号登录
|
|
"""
|
|
auth_service = AuthService(db)
|
|
try:
|
|
user, token = await auth_service.login(
|
|
username=login_data.username,
|
|
password=login_data.password,
|
|
)
|
|
|
|
# 记录登录成功日志
|
|
await system_log_service.create_log(
|
|
db,
|
|
SystemLogCreate(
|
|
level="INFO",
|
|
type="security",
|
|
message=f"用户 {user.username} 登录成功",
|
|
user_id=user.id,
|
|
user=user.username,
|
|
ip=request.client.host if request.client else None,
|
|
path="/api/v1/auth/login",
|
|
method="POST",
|
|
user_agent=request.headers.get("user-agent")
|
|
)
|
|
)
|
|
|
|
return ResponseModel(
|
|
message="登录成功",
|
|
data={
|
|
"user": UserSchema.model_validate(user).model_dump(),
|
|
"token": token.model_dump(),
|
|
},
|
|
)
|
|
except UnauthorizedError as e:
|
|
# 记录登录失败日志
|
|
await system_log_service.create_log(
|
|
db,
|
|
SystemLogCreate(
|
|
level="WARNING",
|
|
type="security",
|
|
message=f"用户 {login_data.username} 登录失败:用户名或密码错误",
|
|
user=login_data.username,
|
|
ip=request.client.host if request.client else None,
|
|
path="/api/v1/auth/login",
|
|
method="POST",
|
|
user_agent=request.headers.get("user-agent")
|
|
)
|
|
)
|
|
logger.warning("login_failed_wrong_credentials", username=login_data.username)
|
|
# 返回 HTTP 401 + 统一错误消息(避免用户枚举)
|
|
from fastapi.responses import JSONResponse
|
|
return JSONResponse(
|
|
status_code=401,
|
|
content={
|
|
"code": 401,
|
|
"message": "用户名或密码错误",
|
|
"data": None,
|
|
}
|
|
)
|
|
except Exception as e:
|
|
logger.error("login_failed_unexpected", error=str(e))
|
|
from fastapi.responses import JSONResponse
|
|
return JSONResponse(
|
|
status_code=500,
|
|
content={
|
|
"code": 500,
|
|
"message": "登录失败,请稍后重试",
|
|
"data": None,
|
|
}
|
|
)
|
|
|
|
|
|
@router.post("/refresh", response_model=ResponseModel)
|
|
async def refresh_token(
|
|
refresh_data: RefreshTokenRequest,
|
|
db: AsyncSession = Depends(get_db),
|
|
) -> ResponseModel:
|
|
"""
|
|
刷新访问令牌
|
|
|
|
使用刷新令牌获取新的访问令牌
|
|
"""
|
|
auth_service = AuthService(db)
|
|
token = await auth_service.refresh_token(refresh_data.refresh_token)
|
|
|
|
return ResponseModel(message="令牌刷新成功", data=token.model_dump())
|
|
|
|
|
|
@router.post("/logout", response_model=ResponseModel)
|
|
async def logout(
|
|
request: Request,
|
|
current_user: User = Depends(get_current_active_user),
|
|
db: AsyncSession = Depends(get_db),
|
|
) -> ResponseModel:
|
|
"""
|
|
用户登出
|
|
|
|
注意:客户端需要删除本地存储的令牌
|
|
"""
|
|
auth_service = AuthService(db)
|
|
await auth_service.logout(current_user.id)
|
|
|
|
# 记录登出日志
|
|
await system_log_service.create_log(
|
|
db,
|
|
SystemLogCreate(
|
|
level="INFO",
|
|
type="security",
|
|
message=f"用户 {current_user.username} 登出",
|
|
user_id=current_user.id,
|
|
user=current_user.username,
|
|
ip=request.client.host if request.client else None,
|
|
path="/api/v1/auth/logout",
|
|
method="POST",
|
|
user_agent=request.headers.get("user-agent")
|
|
)
|
|
)
|
|
|
|
return ResponseModel(message="登出成功")
|
|
|
|
|
|
@router.get("/verify", response_model=ResponseModel)
|
|
async def verify_token(
|
|
current_user: User = Depends(get_current_active_user),
|
|
) -> ResponseModel:
|
|
"""
|
|
验证令牌
|
|
|
|
用于检查当前令牌是否有效
|
|
"""
|
|
return ResponseModel(
|
|
message="令牌有效",
|
|
data={
|
|
"user": UserSchema.model_validate(current_user).model_dump(),
|
|
},
|
|
)
|
|
|
|
|
|
# ============================================
|
|
# 钉钉免密登录 API
|
|
# ============================================
|
|
|
|
@router.post("/dingtalk/login", response_model=ResponseModel)
|
|
async def dingtalk_login(
|
|
login_data: DingtalkLoginRequest,
|
|
request: Request,
|
|
db: AsyncSession = Depends(get_db),
|
|
) -> ResponseModel:
|
|
"""
|
|
钉钉免密登录
|
|
|
|
通过钉钉免登授权码登录系统
|
|
"""
|
|
dingtalk_service = DingtalkAuthService(db)
|
|
|
|
try:
|
|
user, token = await dingtalk_service.dingtalk_login(
|
|
tenant_id=login_data.tenant_id,
|
|
code=login_data.code,
|
|
)
|
|
|
|
# 记录登录成功日志
|
|
await system_log_service.create_log(
|
|
db,
|
|
SystemLogCreate(
|
|
level="INFO",
|
|
type="security",
|
|
message=f"用户 {user.username} 通过钉钉免密登录成功",
|
|
user_id=user.id,
|
|
user=user.username,
|
|
ip=request.client.host if request.client else None,
|
|
path="/api/v1/auth/dingtalk/login",
|
|
method="POST",
|
|
user_agent=request.headers.get("user-agent")
|
|
)
|
|
)
|
|
|
|
return ResponseModel(
|
|
message="钉钉登录成功",
|
|
data={
|
|
"user": UserSchema.model_validate(user).model_dump(),
|
|
"token": token.model_dump(),
|
|
},
|
|
)
|
|
except Exception as e:
|
|
error_msg = str(e)
|
|
logger.warning("dingtalk_login_failed", error=error_msg)
|
|
|
|
# 记录登录失败日志
|
|
await system_log_service.create_log(
|
|
db,
|
|
SystemLogCreate(
|
|
level="WARNING",
|
|
type="security",
|
|
message=f"钉钉免密登录失败:{error_msg}",
|
|
ip=request.client.host if request.client else None,
|
|
path="/api/v1/auth/dingtalk/login",
|
|
method="POST",
|
|
user_agent=request.headers.get("user-agent")
|
|
)
|
|
)
|
|
|
|
return ResponseModel(
|
|
code=400,
|
|
message=error_msg,
|
|
data=None,
|
|
)
|
|
|
|
|
|
@router.get("/dingtalk/config", response_model=ResponseModel)
|
|
async def get_dingtalk_config(
|
|
tenant_id: int = Query(default=1, description="租户ID"),
|
|
db: AsyncSession = Depends(get_db),
|
|
) -> ResponseModel:
|
|
"""
|
|
获取钉钉公开配置
|
|
|
|
前端需要使用 corpId 和 agentId 初始化钉钉JSDK
|
|
仅返回非敏感信息
|
|
"""
|
|
dingtalk_service = DingtalkAuthService(db)
|
|
|
|
try:
|
|
config = await dingtalk_service.get_public_config(tenant_id)
|
|
return ResponseModel(
|
|
message="获取成功",
|
|
data=config,
|
|
)
|
|
except Exception as e:
|
|
logger.error("get_dingtalk_config_failed", error=str(e))
|
|
return ResponseModel(
|
|
code=500,
|
|
message="获取钉钉配置失败",
|
|
data={"enabled": False, "corp_id": None, "agent_id": None},
|
|
)
|