Files
012-kaopeilian/backend/app/api/v1/auth.py
yuliang_guo 662947cd06
Some checks failed
continuous-integration/drone/push Build is failing
feat: 添加钉钉扫码登录功能
- 后端:钉钉 OAuth 认证服务
- 后端:系统设置 API(钉钉配置)
- 前端:登录页钉钉扫码入口
- 前端:系统设置页面
- 数据库迁移脚本
2026-01-29 14:40:00 +08:00

257 lines
7.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
认证 API
"""
from fastapi import APIRouter, Depends, status, Request, Query
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.deps import get_current_active_user, get_db
from app.core.logger import logger
from app.models.user import User
from app.schemas.auth import LoginRequest, RefreshTokenRequest, Token, DingtalkLoginRequest
from app.schemas.base import ResponseModel
from app.schemas.user import User as UserSchema
from app.services.auth_service import AuthService
from app.services.dingtalk_auth_service import DingtalkAuthService
from app.services.system_log_service import system_log_service
from app.schemas.system_log import SystemLogCreate
from app.core.exceptions import UnauthorizedError
router = APIRouter()
@router.post("/login", response_model=ResponseModel)
async def login(
login_data: LoginRequest,
request: Request,
db: AsyncSession = Depends(get_db),
) -> ResponseModel:
"""
用户登录
支持使用用户名、邮箱或手机号登录
"""
auth_service = AuthService(db)
try:
user, token = await auth_service.login(
username=login_data.username,
password=login_data.password,
)
# 记录登录成功日志
await system_log_service.create_log(
db,
SystemLogCreate(
level="INFO",
type="security",
message=f"用户 {user.username} 登录成功",
user_id=user.id,
user=user.username,
ip=request.client.host if request.client else None,
path="/api/v1/auth/login",
method="POST",
user_agent=request.headers.get("user-agent")
)
)
return ResponseModel(
message="登录成功",
data={
"user": UserSchema.model_validate(user).model_dump(),
"token": token.model_dump(),
},
)
except UnauthorizedError as e:
# 记录登录失败日志
await system_log_service.create_log(
db,
SystemLogCreate(
level="WARNING",
type="security",
message=f"用户 {login_data.username} 登录失败:密码错误",
user=login_data.username,
ip=request.client.host if request.client else None,
path="/api/v1/auth/login",
method="POST",
user_agent=request.headers.get("user-agent")
)
)
# 不返回 401统一返回 HTTP 200 + 业务失败码,便于前端友好提示
logger.warning("login_failed_wrong_credentials", username=login_data.username)
return ResponseModel(
code=400,
message=str(e) or "用户名或密码错误",
data=None,
)
except Exception as e:
logger.error("login_failed_unexpected", error=str(e))
return ResponseModel(
code=500,
message="登录失败,请稍后重试",
data=None,
)
@router.post("/refresh", response_model=ResponseModel)
async def refresh_token(
refresh_data: RefreshTokenRequest,
db: AsyncSession = Depends(get_db),
) -> ResponseModel:
"""
刷新访问令牌
使用刷新令牌获取新的访问令牌
"""
auth_service = AuthService(db)
token = await auth_service.refresh_token(refresh_data.refresh_token)
return ResponseModel(message="令牌刷新成功", data=token.model_dump())
@router.post("/logout", response_model=ResponseModel)
async def logout(
request: Request,
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db),
) -> ResponseModel:
"""
用户登出
注意:客户端需要删除本地存储的令牌
"""
auth_service = AuthService(db)
await auth_service.logout(current_user.id)
# 记录登出日志
await system_log_service.create_log(
db,
SystemLogCreate(
level="INFO",
type="security",
message=f"用户 {current_user.username} 登出",
user_id=current_user.id,
user=current_user.username,
ip=request.client.host if request.client else None,
path="/api/v1/auth/logout",
method="POST",
user_agent=request.headers.get("user-agent")
)
)
return ResponseModel(message="登出成功")
@router.get("/verify", response_model=ResponseModel)
async def verify_token(
current_user: User = Depends(get_current_active_user),
) -> ResponseModel:
"""
验证令牌
用于检查当前令牌是否有效
"""
return ResponseModel(
message="令牌有效",
data={
"user": UserSchema.model_validate(current_user).model_dump(),
},
)
# ============================================
# 钉钉免密登录 API
# ============================================
@router.post("/dingtalk/login", response_model=ResponseModel)
async def dingtalk_login(
login_data: DingtalkLoginRequest,
request: Request,
db: AsyncSession = Depends(get_db),
) -> ResponseModel:
"""
钉钉免密登录
通过钉钉免登授权码登录系统
"""
dingtalk_service = DingtalkAuthService(db)
try:
user, token = await dingtalk_service.dingtalk_login(
tenant_id=login_data.tenant_id,
code=login_data.code,
)
# 记录登录成功日志
await system_log_service.create_log(
db,
SystemLogCreate(
level="INFO",
type="security",
message=f"用户 {user.username} 通过钉钉免密登录成功",
user_id=user.id,
user=user.username,
ip=request.client.host if request.client else None,
path="/api/v1/auth/dingtalk/login",
method="POST",
user_agent=request.headers.get("user-agent")
)
)
return ResponseModel(
message="钉钉登录成功",
data={
"user": UserSchema.model_validate(user).model_dump(),
"token": token.model_dump(),
},
)
except Exception as e:
error_msg = str(e)
logger.warning("dingtalk_login_failed", error=error_msg)
# 记录登录失败日志
await system_log_service.create_log(
db,
SystemLogCreate(
level="WARNING",
type="security",
message=f"钉钉免密登录失败:{error_msg}",
ip=request.client.host if request.client else None,
path="/api/v1/auth/dingtalk/login",
method="POST",
user_agent=request.headers.get("user-agent")
)
)
return ResponseModel(
code=400,
message=error_msg,
data=None,
)
@router.get("/dingtalk/config", response_model=ResponseModel)
async def get_dingtalk_config(
tenant_id: int = Query(default=1, description="租户ID"),
db: AsyncSession = Depends(get_db),
) -> ResponseModel:
"""
获取钉钉公开配置
前端需要使用 corpId 和 agentId 初始化钉钉JSDK
仅返回非敏感信息
"""
dingtalk_service = DingtalkAuthService(db)
try:
config = await dingtalk_service.get_public_config(tenant_id)
return ResponseModel(
message="获取成功",
data=config,
)
except Exception as e:
logger.error("get_dingtalk_config_failed", error=str(e))
return ResponseModel(
code=500,
message="获取钉钉配置失败",
data={"enabled": False, "corp_id": None, "agent_id": None},
)