Some checks failed
continuous-integration/drone/push Build is failing
- 后端:钉钉 OAuth 认证服务 - 后端:系统设置 API(钉钉配置) - 前端:登录页钉钉扫码入口 - 前端:系统设置页面 - 数据库迁移脚本
257 lines
7.8 KiB
Python
257 lines
7.8 KiB
Python
"""
|
||
认证 API
|
||
"""
|
||
from fastapi import APIRouter, Depends, status, Request, Query
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
|
||
from app.core.deps import get_current_active_user, get_db
|
||
from app.core.logger import logger
|
||
from app.models.user import User
|
||
from app.schemas.auth import LoginRequest, RefreshTokenRequest, Token, DingtalkLoginRequest
|
||
from app.schemas.base import ResponseModel
|
||
from app.schemas.user import User as UserSchema
|
||
from app.services.auth_service import AuthService
|
||
from app.services.dingtalk_auth_service import DingtalkAuthService
|
||
from app.services.system_log_service import system_log_service
|
||
from app.schemas.system_log import SystemLogCreate
|
||
from app.core.exceptions import UnauthorizedError
|
||
|
||
router = APIRouter()
|
||
|
||
|
||
@router.post("/login", response_model=ResponseModel)
|
||
async def login(
|
||
login_data: LoginRequest,
|
||
request: Request,
|
||
db: AsyncSession = Depends(get_db),
|
||
) -> ResponseModel:
|
||
"""
|
||
用户登录
|
||
|
||
支持使用用户名、邮箱或手机号登录
|
||
"""
|
||
auth_service = AuthService(db)
|
||
try:
|
||
user, token = await auth_service.login(
|
||
username=login_data.username,
|
||
password=login_data.password,
|
||
)
|
||
|
||
# 记录登录成功日志
|
||
await system_log_service.create_log(
|
||
db,
|
||
SystemLogCreate(
|
||
level="INFO",
|
||
type="security",
|
||
message=f"用户 {user.username} 登录成功",
|
||
user_id=user.id,
|
||
user=user.username,
|
||
ip=request.client.host if request.client else None,
|
||
path="/api/v1/auth/login",
|
||
method="POST",
|
||
user_agent=request.headers.get("user-agent")
|
||
)
|
||
)
|
||
|
||
return ResponseModel(
|
||
message="登录成功",
|
||
data={
|
||
"user": UserSchema.model_validate(user).model_dump(),
|
||
"token": token.model_dump(),
|
||
},
|
||
)
|
||
except UnauthorizedError as e:
|
||
# 记录登录失败日志
|
||
await system_log_service.create_log(
|
||
db,
|
||
SystemLogCreate(
|
||
level="WARNING",
|
||
type="security",
|
||
message=f"用户 {login_data.username} 登录失败:密码错误",
|
||
user=login_data.username,
|
||
ip=request.client.host if request.client else None,
|
||
path="/api/v1/auth/login",
|
||
method="POST",
|
||
user_agent=request.headers.get("user-agent")
|
||
)
|
||
)
|
||
# 不返回 401,统一返回 HTTP 200 + 业务失败码,便于前端友好提示
|
||
logger.warning("login_failed_wrong_credentials", username=login_data.username)
|
||
return ResponseModel(
|
||
code=400,
|
||
message=str(e) or "用户名或密码错误",
|
||
data=None,
|
||
)
|
||
except Exception as e:
|
||
logger.error("login_failed_unexpected", error=str(e))
|
||
return ResponseModel(
|
||
code=500,
|
||
message="登录失败,请稍后重试",
|
||
data=None,
|
||
)
|
||
|
||
|
||
@router.post("/refresh", response_model=ResponseModel)
|
||
async def refresh_token(
|
||
refresh_data: RefreshTokenRequest,
|
||
db: AsyncSession = Depends(get_db),
|
||
) -> ResponseModel:
|
||
"""
|
||
刷新访问令牌
|
||
|
||
使用刷新令牌获取新的访问令牌
|
||
"""
|
||
auth_service = AuthService(db)
|
||
token = await auth_service.refresh_token(refresh_data.refresh_token)
|
||
|
||
return ResponseModel(message="令牌刷新成功", data=token.model_dump())
|
||
|
||
|
||
@router.post("/logout", response_model=ResponseModel)
|
||
async def logout(
|
||
request: Request,
|
||
current_user: User = Depends(get_current_active_user),
|
||
db: AsyncSession = Depends(get_db),
|
||
) -> ResponseModel:
|
||
"""
|
||
用户登出
|
||
|
||
注意:客户端需要删除本地存储的令牌
|
||
"""
|
||
auth_service = AuthService(db)
|
||
await auth_service.logout(current_user.id)
|
||
|
||
# 记录登出日志
|
||
await system_log_service.create_log(
|
||
db,
|
||
SystemLogCreate(
|
||
level="INFO",
|
||
type="security",
|
||
message=f"用户 {current_user.username} 登出",
|
||
user_id=current_user.id,
|
||
user=current_user.username,
|
||
ip=request.client.host if request.client else None,
|
||
path="/api/v1/auth/logout",
|
||
method="POST",
|
||
user_agent=request.headers.get("user-agent")
|
||
)
|
||
)
|
||
|
||
return ResponseModel(message="登出成功")
|
||
|
||
|
||
@router.get("/verify", response_model=ResponseModel)
|
||
async def verify_token(
|
||
current_user: User = Depends(get_current_active_user),
|
||
) -> ResponseModel:
|
||
"""
|
||
验证令牌
|
||
|
||
用于检查当前令牌是否有效
|
||
"""
|
||
return ResponseModel(
|
||
message="令牌有效",
|
||
data={
|
||
"user": UserSchema.model_validate(current_user).model_dump(),
|
||
},
|
||
)
|
||
|
||
|
||
# ============================================
|
||
# 钉钉免密登录 API
|
||
# ============================================
|
||
|
||
@router.post("/dingtalk/login", response_model=ResponseModel)
|
||
async def dingtalk_login(
|
||
login_data: DingtalkLoginRequest,
|
||
request: Request,
|
||
db: AsyncSession = Depends(get_db),
|
||
) -> ResponseModel:
|
||
"""
|
||
钉钉免密登录
|
||
|
||
通过钉钉免登授权码登录系统
|
||
"""
|
||
dingtalk_service = DingtalkAuthService(db)
|
||
|
||
try:
|
||
user, token = await dingtalk_service.dingtalk_login(
|
||
tenant_id=login_data.tenant_id,
|
||
code=login_data.code,
|
||
)
|
||
|
||
# 记录登录成功日志
|
||
await system_log_service.create_log(
|
||
db,
|
||
SystemLogCreate(
|
||
level="INFO",
|
||
type="security",
|
||
message=f"用户 {user.username} 通过钉钉免密登录成功",
|
||
user_id=user.id,
|
||
user=user.username,
|
||
ip=request.client.host if request.client else None,
|
||
path="/api/v1/auth/dingtalk/login",
|
||
method="POST",
|
||
user_agent=request.headers.get("user-agent")
|
||
)
|
||
)
|
||
|
||
return ResponseModel(
|
||
message="钉钉登录成功",
|
||
data={
|
||
"user": UserSchema.model_validate(user).model_dump(),
|
||
"token": token.model_dump(),
|
||
},
|
||
)
|
||
except Exception as e:
|
||
error_msg = str(e)
|
||
logger.warning("dingtalk_login_failed", error=error_msg)
|
||
|
||
# 记录登录失败日志
|
||
await system_log_service.create_log(
|
||
db,
|
||
SystemLogCreate(
|
||
level="WARNING",
|
||
type="security",
|
||
message=f"钉钉免密登录失败:{error_msg}",
|
||
ip=request.client.host if request.client else None,
|
||
path="/api/v1/auth/dingtalk/login",
|
||
method="POST",
|
||
user_agent=request.headers.get("user-agent")
|
||
)
|
||
)
|
||
|
||
return ResponseModel(
|
||
code=400,
|
||
message=error_msg,
|
||
data=None,
|
||
)
|
||
|
||
|
||
@router.get("/dingtalk/config", response_model=ResponseModel)
|
||
async def get_dingtalk_config(
|
||
tenant_id: int = Query(default=1, description="租户ID"),
|
||
db: AsyncSession = Depends(get_db),
|
||
) -> ResponseModel:
|
||
"""
|
||
获取钉钉公开配置
|
||
|
||
前端需要使用 corpId 和 agentId 初始化钉钉JSDK
|
||
仅返回非敏感信息
|
||
"""
|
||
dingtalk_service = DingtalkAuthService(db)
|
||
|
||
try:
|
||
config = await dingtalk_service.get_public_config(tenant_id)
|
||
return ResponseModel(
|
||
message="获取成功",
|
||
data=config,
|
||
)
|
||
except Exception as e:
|
||
logger.error("get_dingtalk_config_failed", error=str(e))
|
||
return ResponseModel(
|
||
code=500,
|
||
message="获取钉钉配置失败",
|
||
data={"enabled": False, "corp_id": None, "agent_id": None},
|
||
)
|