Files
012-kaopeilian/backend/app/services/dingtalk_auth_service.py
yuliang_guo 662947cd06
Some checks failed
continuous-integration/drone/push Build is failing
feat: 添加钉钉扫码登录功能
- 后端:钉钉 OAuth 认证服务
- 后端:系统设置 API(钉钉配置)
- 前端:登录页钉钉扫码入口
- 前端:系统设置页面
- 数据库迁移脚本
2026-01-29 14:40:00 +08:00

295 lines
9.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
钉钉认证服务
提供钉钉免密登录功能,从数据库读取配置
"""
import json
import time
from typing import Optional, Dict, Any, Tuple
import httpx
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.logger import get_logger
from app.core.security import create_access_token, create_refresh_token
from app.models.user import User
from app.schemas.auth import Token
from app.services.user_service import UserService
logger = get_logger(__name__)
# 钉钉API地址
DINGTALK_API_BASE = "https://oapi.dingtalk.com"
class DingtalkAuthService:
"""钉钉认证服务"""
def __init__(self, db: AsyncSession):
self.db = db
self.user_service = UserService(db)
self._access_token_cache: Dict[int, Tuple[str, float]] = {} # tenant_id -> (token, expire_time)
async def get_dingtalk_config(self, tenant_id: int) -> Dict[str, str]:
"""
从数据库获取钉钉配置
Args:
tenant_id: 租户ID
Returns:
配置字典 {app_key, app_secret, agent_id, corp_id}
"""
result = await self.db.execute(
text("""
SELECT config_key, config_value
FROM tenant_configs
WHERE tenant_id = :tenant_id AND config_group = 'dingtalk'
"""),
{"tenant_id": tenant_id}
)
rows = result.fetchall()
config = {}
key_mapping = {
"DINGTALK_APP_KEY": "app_key",
"DINGTALK_APP_SECRET": "app_secret",
"DINGTALK_AGENT_ID": "agent_id",
"DINGTALK_CORP_ID": "corp_id",
}
for row in rows:
if row[0] in key_mapping:
config[key_mapping[row[0]]] = row[1]
return config
async def is_dingtalk_login_enabled(self, tenant_id: int) -> bool:
"""
检查钉钉免密登录功能是否启用
Args:
tenant_id: 租户ID
Returns:
是否启用
"""
# 先查租户级别的配置
result = await self.db.execute(
text("""
SELECT is_enabled FROM feature_switches
WHERE feature_code = 'dingtalk_login'
AND (tenant_id = :tenant_id OR tenant_id IS NULL)
ORDER BY tenant_id DESC
LIMIT 1
"""),
{"tenant_id": tenant_id}
)
row = result.fetchone()
if row:
return bool(row[0])
return False
async def get_access_token(self, tenant_id: int) -> str:
"""
获取钉钉访问令牌(带内存缓存)
Args:
tenant_id: 租户ID
Returns:
access_token
Raises:
Exception: 获取失败时抛出异常
"""
# 检查缓存
if tenant_id in self._access_token_cache:
token, expire_time = self._access_token_cache[tenant_id]
if time.time() < expire_time - 300: # 提前5分钟刷新
return token
# 获取配置
config = await self.get_dingtalk_config(tenant_id)
if not config.get("app_key") or not config.get("app_secret"):
raise ValueError("钉钉配置不完整请在管理后台配置AppKey和AppSecret")
# 调用钉钉API获取token
url = f"{DINGTALK_API_BASE}/gettoken"
params = {
"appkey": config["app_key"],
"appsecret": config["app_secret"],
}
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(url, params=params)
data = response.json()
if data.get("errcode") != 0:
error_msg = data.get("errmsg", "未知错误")
logger.error(f"获取钉钉access_token失败: {error_msg}")
raise Exception(f"获取钉钉access_token失败: {error_msg}")
access_token = data["access_token"]
expires_in = data.get("expires_in", 7200)
# 缓存token
self._access_token_cache[tenant_id] = (access_token, time.time() + expires_in)
logger.info(f"获取钉钉access_token成功有效期: {expires_in}")
return access_token
async def get_user_info_by_code(self, tenant_id: int, code: str) -> Dict[str, Any]:
"""
通过免登码获取钉钉用户信息
Args:
tenant_id: 租户ID
code: 免登授权码
Returns:
用户信息 {userid, name, ...}
Raises:
Exception: 获取失败时抛出异常
"""
access_token = await self.get_access_token(tenant_id)
url = f"{DINGTALK_API_BASE}/topapi/v2/user/getuserinfo"
params = {"access_token": access_token}
payload = {"code": code}
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(url, params=params, json=payload)
data = response.json()
if data.get("errcode") != 0:
error_msg = data.get("errmsg", "未知错误")
logger.error(f"通过code获取钉钉用户信息失败: {error_msg}")
raise Exception(f"获取钉钉用户信息失败: {error_msg}")
result = data.get("result", {})
logger.info(f"获取钉钉用户信息成功: userid={result.get('userid')}, name={result.get('name')}")
return result
async def get_user_detail(self, tenant_id: int, userid: str) -> Dict[str, Any]:
"""
获取钉钉用户详细信息
Args:
tenant_id: 租户ID
userid: 钉钉用户ID
Returns:
用户详细信息
"""
access_token = await self.get_access_token(tenant_id)
url = f"{DINGTALK_API_BASE}/topapi/v2/user/get"
params = {"access_token": access_token}
payload = {"userid": userid}
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(url, params=params, json=payload)
data = response.json()
if data.get("errcode") != 0:
error_msg = data.get("errmsg", "未知错误")
logger.warning(f"获取钉钉用户详情失败: {error_msg}")
return {}
return data.get("result", {})
async def dingtalk_login(self, tenant_id: int, code: str) -> Tuple[User, Token]:
"""
钉钉免密登录主流程
Args:
tenant_id: 租户ID
code: 免登授权码
Returns:
(用户对象, Token对象)
Raises:
Exception: 登录失败时抛出异常
"""
# 1. 检查功能是否启用
if not await self.is_dingtalk_login_enabled(tenant_id):
raise Exception("钉钉免密登录功能未启用")
# 2. 通过code获取钉钉用户信息
dingtalk_user = await self.get_user_info_by_code(tenant_id, code)
dingtalk_userid = dingtalk_user.get("userid")
if not dingtalk_userid:
raise Exception("无法获取钉钉用户ID")
# 3. 根据dingtalk_id查找系统用户
user = await self.user_service.get_by_dingtalk_id(dingtalk_userid)
if not user:
# 尝试通过手机号匹配
user_detail = await self.get_user_detail(tenant_id, dingtalk_userid)
mobile = user_detail.get("mobile")
if mobile:
user = await self.user_service.get_by_phone(mobile)
if user:
# 绑定dingtalk_id
user.dingtalk_id = dingtalk_userid
await self.db.commit()
logger.info(f"通过手机号匹配成功已绑定dingtalk_id: {dingtalk_userid}")
if not user:
raise Exception("未找到对应的系统用户,请联系管理员")
if not user.is_active:
raise Exception("用户已被禁用")
# 4. 生成JWT Token
access_token = create_access_token(subject=user.id)
refresh_token = create_refresh_token(subject=user.id)
# 5. 更新最后登录时间
await self.user_service.update_last_login(user.id)
logger.info(f"钉钉免密登录成功: user_id={user.id}, username={user.username}")
return user, Token(
access_token=access_token,
refresh_token=refresh_token,
)
async def get_public_config(self, tenant_id: int) -> Dict[str, Any]:
"""
获取钉钉公开配置前端需要用于初始化JSDK
Args:
tenant_id: 租户ID
Returns:
{corp_id, agent_id, enabled}
"""
enabled = await self.is_dingtalk_login_enabled(tenant_id)
if not enabled:
return {
"enabled": False,
"corp_id": None,
"agent_id": None,
}
config = await self.get_dingtalk_config(tenant_id)
return {
"enabled": True,
"corp_id": config.get("corp_id"),
"agent_id": config.get("agent_id"),
}