Files
012-kaopeilian/backend/app/services/auth_service.py
111 998211c483 feat: 初始化考培练系统项目
- 从服务器拉取完整代码
- 按框架规范整理项目结构
- 配置 Drone CI 测试环境部署
- 包含后端(FastAPI)、前端(Vue3)、管理端

技术栈: Vue3 + TypeScript + FastAPI + MySQL
2026-01-24 19:33:28 +08:00

142 lines
3.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
认证服务
"""
from datetime import datetime, timedelta
from typing import Optional
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.exceptions import UnauthorizedError
from app.core.logger import logger
from app.core.security import create_access_token, create_refresh_token, decode_token
from app.models.user import User
from app.schemas.auth import Token
from app.services.user_service import UserService
class AuthService:
"""认证服务"""
def __init__(self, db: AsyncSession):
self.db = db
self.user_service = UserService(db)
async def login(self, username: str, password: str) -> tuple[User, Token]:
"""
用户登录
Args:
username: 用户名/邮箱/手机号
password: 密码
Returns:
用户对象和令牌
"""
# 验证用户
user = await self.user_service.authenticate(
username=username, password=password
)
if not user:
logger.warning(
"登录失败:用户名或密码错误",
username=username,
)
raise UnauthorizedError("用户名或密码错误")
if not user.is_active:
logger.warning(
"登录失败:用户已被禁用",
user_id=user.id,
username=user.username,
)
raise UnauthorizedError("用户已被禁用")
# 生成令牌
access_token = create_access_token(subject=user.id)
refresh_token = create_refresh_token(subject=user.id)
# 更新最后登录时间
await self.user_service.update_last_login(user.id)
# 记录日志
logger.info(
"用户登录成功",
user_id=user.id,
username=user.username,
role=user.role,
)
return user, Token(
access_token=access_token,
refresh_token=refresh_token,
)
async def refresh_token(self, refresh_token: str) -> Token:
"""
刷新访问令牌
Args:
refresh_token: 刷新令牌
Returns:
新的令牌
"""
try:
# 解码刷新令牌
payload = decode_token(refresh_token)
# 验证令牌类型
if payload.get("type") != "refresh":
raise UnauthorizedError("无效的刷新令牌")
# 获取用户ID
user_id = int(payload.get("sub"))
# 获取用户
user = await self.user_service.get_by_id(user_id)
if not user:
raise UnauthorizedError("用户不存在")
if not user.is_active:
raise UnauthorizedError("用户已被禁用")
# 生成新的访问令牌
access_token = create_access_token(subject=user.id)
logger.info(
"令牌刷新成功",
user_id=user.id,
username=user.username,
)
return Token(
access_token=access_token,
refresh_token=refresh_token, # 保持原刷新令牌
)
except Exception as e:
logger.error(
"令牌刷新失败",
error=str(e),
)
raise UnauthorizedError("无效的刷新令牌")
async def logout(self, user_id: int) -> None:
"""
用户登出
注意JWT是无状态的实际的登出需要在客户端删除令牌
这里只是记录日志,如果需要可以将令牌加入黑名单
Args:
user_id: 用户ID
"""
user = await self.user_service.get_by_id(user_id)
if user:
logger.info(
"用户登出",
user_id=user.id,
username=user.username,
)