All checks were successful
continuous-integration/drone/push Build is passing
- 新增storage_service.py封装MinIO操作 - 修改upload.py使用storage_service上传文件 - 修改course_service.py使用storage_service删除文件 - 适配preview.py支持从MinIO获取文件 - 适配knowledge_analysis_v2.py支持MinIO存储 - 在config.py添加MinIO配置项 - 添加minio依赖到requirements.txt 支持特性: - 自动降级到本地存储(MinIO不可用时) - 保持URL格式兼容(/static/uploads/) - 文件自动缓存到本地(用于预览和分析) Co-authored-by: Cursor <cursoragent@cursor.com>
277 lines
8.9 KiB
Python
277 lines
8.9 KiB
Python
"""
|
||
钉钉开放平台 API 服务
|
||
用于通过钉钉 API 获取组织架构和员工信息
|
||
"""
|
||
|
||
import httpx
|
||
from typing import List, Dict, Any, Optional
|
||
from datetime import datetime, timedelta
|
||
from app.core.logger import get_logger
|
||
|
||
logger = get_logger(__name__)
|
||
|
||
|
||
class DingTalkService:
|
||
"""钉钉 API 服务"""
|
||
|
||
BASE_URL = "https://api.dingtalk.com"
|
||
OAPI_URL = "https://oapi.dingtalk.com"
|
||
|
||
def __init__(
|
||
self,
|
||
corp_id: str,
|
||
client_id: str,
|
||
client_secret: str
|
||
):
|
||
"""
|
||
初始化钉钉服务
|
||
|
||
Args:
|
||
corp_id: 企业 CorpId
|
||
client_id: 应用 ClientId (AppKey)
|
||
client_secret: 应用 ClientSecret (AppSecret)
|
||
"""
|
||
self.corp_id = corp_id
|
||
self.client_id = client_id
|
||
self.client_secret = client_secret
|
||
self._access_token: Optional[str] = None
|
||
self._token_expires_at: Optional[datetime] = None
|
||
|
||
async def get_access_token(self) -> str:
|
||
"""
|
||
获取钉钉 Access Token
|
||
|
||
使用新版 OAuth2 接口获取
|
||
|
||
Returns:
|
||
access_token
|
||
"""
|
||
# 检查缓存的 token 是否有效
|
||
if self._access_token and self._token_expires_at:
|
||
if datetime.now() < self._token_expires_at - timedelta(minutes=5):
|
||
return self._access_token
|
||
|
||
url = f"{self.BASE_URL}/v1.0/oauth2/{self.corp_id}/token"
|
||
|
||
payload = {
|
||
"client_id": self.client_id,
|
||
"client_secret": self.client_secret,
|
||
"grant_type": "client_credentials"
|
||
}
|
||
|
||
async with httpx.AsyncClient(timeout=30) as client:
|
||
response = await client.post(url, json=payload)
|
||
response.raise_for_status()
|
||
data = response.json()
|
||
|
||
self._access_token = data["access_token"]
|
||
expires_in = data.get("expires_in", 7200)
|
||
self._token_expires_at = datetime.now() + timedelta(seconds=expires_in)
|
||
|
||
logger.info(f"获取钉钉 Access Token 成功,有效期 {expires_in} 秒")
|
||
return self._access_token
|
||
|
||
async def get_department_list(self, dept_id: int = 1) -> List[Dict[str, Any]]:
|
||
"""
|
||
获取部门列表
|
||
|
||
Args:
|
||
dept_id: 父部门ID,根部门为1
|
||
|
||
Returns:
|
||
部门列表
|
||
"""
|
||
access_token = await self.get_access_token()
|
||
url = f"{self.OAPI_URL}/topapi/v2/department/listsub"
|
||
|
||
params = {"access_token": access_token}
|
||
payload = {"dept_id": dept_id}
|
||
|
||
async with httpx.AsyncClient(timeout=30) as client:
|
||
response = await client.post(url, params=params, json=payload)
|
||
response.raise_for_status()
|
||
data = response.json()
|
||
|
||
if data.get("errcode") != 0:
|
||
raise Exception(f"获取部门列表失败: {data.get('errmsg')}")
|
||
|
||
return data.get("result", [])
|
||
|
||
async def get_all_departments(self) -> List[Dict[str, Any]]:
|
||
"""
|
||
递归获取所有部门
|
||
|
||
Returns:
|
||
所有部门列表(扁平化)
|
||
"""
|
||
all_departments = []
|
||
|
||
async def fetch_recursive(parent_id: int):
|
||
departments = await self.get_department_list(parent_id)
|
||
for dept in departments:
|
||
all_departments.append(dept)
|
||
# 递归获取子部门
|
||
await fetch_recursive(dept["dept_id"])
|
||
|
||
await fetch_recursive(1) # 从根部门开始
|
||
logger.info(f"获取到 {len(all_departments)} 个部门")
|
||
return all_departments
|
||
|
||
async def get_department_users(
|
||
self,
|
||
dept_id: int,
|
||
cursor: int = 0,
|
||
size: int = 100
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
获取部门用户列表
|
||
|
||
Args:
|
||
dept_id: 部门ID
|
||
cursor: 分页游标
|
||
size: 每页大小,最大100
|
||
|
||
Returns:
|
||
用户列表和分页信息
|
||
"""
|
||
access_token = await self.get_access_token()
|
||
url = f"{self.OAPI_URL}/topapi/v2/user/list"
|
||
|
||
params = {"access_token": access_token}
|
||
payload = {
|
||
"dept_id": dept_id,
|
||
"cursor": cursor,
|
||
"size": size
|
||
}
|
||
|
||
async with httpx.AsyncClient(timeout=30) as client:
|
||
response = await client.post(url, params=params, json=payload)
|
||
response.raise_for_status()
|
||
data = response.json()
|
||
|
||
if data.get("errcode") != 0:
|
||
raise Exception(f"获取部门用户失败: {data.get('errmsg')}")
|
||
|
||
return data.get("result", {})
|
||
|
||
async def get_all_employees(self) -> List[Dict[str, Any]]:
|
||
"""
|
||
获取所有在职员工
|
||
|
||
遍历所有部门获取员工列表
|
||
|
||
Returns:
|
||
员工列表
|
||
"""
|
||
logger.info("开始从钉钉 API 获取所有员工...")
|
||
|
||
# 1. 获取所有部门
|
||
departments = await self.get_all_departments()
|
||
|
||
# 创建部门ID到名称的映射
|
||
dept_map = {dept["dept_id"]: dept["name"] for dept in departments}
|
||
dept_map[1] = "根部门" # 添加根部门
|
||
|
||
# 2. 遍历所有部门获取员工
|
||
all_employees = {} # 使用字典去重(按 userid)
|
||
|
||
for dept in [{"dept_id": 1, "name": "根部门"}] + departments:
|
||
dept_id = dept["dept_id"]
|
||
dept_name = dept["name"]
|
||
|
||
cursor = 0
|
||
while True:
|
||
result = await self.get_department_users(dept_id, cursor)
|
||
users = result.get("list", [])
|
||
|
||
for user in users:
|
||
userid = user.get("userid")
|
||
if userid and userid not in all_employees:
|
||
# 转换为统一格式
|
||
employee = self._convert_user_to_employee(user, dept_name)
|
||
all_employees[userid] = employee
|
||
|
||
# 检查是否还有更多数据
|
||
if not result.get("has_more", False):
|
||
break
|
||
cursor = result.get("next_cursor", 0)
|
||
|
||
employees = list(all_employees.values())
|
||
logger.info(f"获取到 {len(employees)} 位在职员工")
|
||
return employees
|
||
|
||
def _convert_user_to_employee(
|
||
self,
|
||
user: Dict[str, Any],
|
||
dept_name: str
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
将钉钉用户数据转换为员工数据格式
|
||
|
||
Args:
|
||
user: 钉钉用户数据
|
||
dept_name: 部门名称
|
||
|
||
Returns:
|
||
标准员工数据格式
|
||
"""
|
||
return {
|
||
'full_name': user.get('name', ''),
|
||
'phone': user.get('mobile', ''),
|
||
'email': user.get('email', ''),
|
||
'department': dept_name,
|
||
'position': user.get('title', ''),
|
||
'employee_no': user.get('job_number', ''),
|
||
'is_leader': user.get('leader', False),
|
||
'is_active': user.get('active', True),
|
||
'dingtalk_id': user.get('userid', ''),
|
||
'join_date': user.get('hired_date'),
|
||
'work_location': user.get('work_place', ''),
|
||
'avatar': user.get('avatar', ''),
|
||
}
|
||
|
||
async def test_connection(self) -> Dict[str, Any]:
|
||
"""
|
||
测试钉钉 API 连接
|
||
|
||
Returns:
|
||
测试结果
|
||
"""
|
||
try:
|
||
# 1. 测试获取 token
|
||
token = await self.get_access_token()
|
||
|
||
# 2. 测试获取根部门信息
|
||
departments = await self.get_department_list(1)
|
||
|
||
# 3. 获取根部门员工数量
|
||
result = await self.get_department_users(1, size=1)
|
||
|
||
return {
|
||
"success": True,
|
||
"message": "连接成功",
|
||
"corp_id": self.corp_id,
|
||
"department_count": len(departments) + 1, # +1 是根部门
|
||
"has_employees": result.get("has_more", False) or len(result.get("list", [])) > 0
|
||
}
|
||
|
||
except httpx.HTTPStatusError as e:
|
||
error_detail = "HTTP错误"
|
||
if e.response.status_code == 400:
|
||
try:
|
||
error_data = e.response.json()
|
||
error_detail = error_data.get("message", str(e))
|
||
except:
|
||
pass
|
||
return {
|
||
"success": False,
|
||
"message": f"连接失败: {error_detail}",
|
||
"error": str(e)
|
||
}
|
||
except Exception as e:
|
||
return {
|
||
"success": False,
|
||
"message": f"连接失败: {str(e)}",
|
||
"error": str(e)
|
||
}
|