From 7be1ac1787e99c1803242b3c29e14f2f1f674ab7 Mon Sep 17 00:00:00 2001 From: yuliang_guo Date: Sat, 31 Jan 2026 17:25:44 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=91=98=E5=B7=A5=E5=90=8C=E6=AD=A5?= =?UTF-8?q?=E6=94=B9=E4=B8=BA=E9=92=89=E9=92=89=E5=BC=80=E6=94=BEAPI?= =?UTF-8?q?=E6=96=B9=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 dingtalk_service.py 调用钉钉开放API - 支持获取 Access Token、部门列表、员工列表 - employee_sync_service 改为从钉钉API获取员工 - 前端配置界面支持配置 CorpId、ClientId、ClientSecret - 移除外部数据库表依赖 --- backend/app/api/v1/system_settings.py | 101 ++++--- backend/app/services/dingtalk_service.py | 276 ++++++++++++++++++ backend/app/services/employee_sync_service.py | 128 +++----- frontend/src/views/admin/system-settings.vue | 81 +++-- 4 files changed, 441 insertions(+), 145 deletions(-) create mode 100644 backend/app/services/dingtalk_service.py diff --git a/backend/app/api/v1/system_settings.py b/backend/app/api/v1/system_settings.py index e41a396..7abd302 100644 --- a/backend/app/api/v1/system_settings.py +++ b/backend/app/api/v1/system_settings.py @@ -41,13 +41,10 @@ class DingtalkConfigResponse(BaseModel): class EmployeeSyncConfigUpdate(BaseModel): - """员工同步配置更新请求""" - db_host: Optional[str] = Field(None, description="数据库主机") - db_port: Optional[int] = Field(None, description="数据库端口") - db_name: Optional[str] = Field(None, description="数据库名") - db_user: Optional[str] = Field(None, description="数据库用户名") - db_password: Optional[str] = Field(None, description="数据库密码") - table_name: Optional[str] = Field(None, description="员工表/视图名称") + """员工同步配置更新请求(钉钉 API 方式)""" + corp_id: Optional[str] = Field(None, description="钉钉企业 CorpId") + client_id: Optional[str] = Field(None, description="应用 ClientId (AppKey)") + client_secret: Optional[str] = Field(None, description="应用 ClientSecret (AppSecret)") enabled: Optional[bool] = Field(None, description="是否启用自动同步") @@ -294,30 +291,39 @@ async def get_employee_sync_config( db: AsyncSession = Depends(get_db), ) -> ResponseModel: """ - 获取员工同步配置 + 获取员工同步配置(钉钉 API 方式) 仅限管理员访问 - 数据库连接从环境变量读取,前端只能配置表名和开关 """ check_admin_permission(current_user) - import os tenant_id = await get_or_create_tenant_id(db) - # 从数据库获取表名和开关配置 - table_name = await get_system_config(db, tenant_id, 'employee_sync', 'TABLE_NAME') + # 从数据库获取钉钉 API 配置 + corp_id = await get_system_config(db, tenant_id, 'employee_sync', 'CORP_ID') + client_id = await get_system_config(db, tenant_id, 'employee_sync', 'CLIENT_ID') + client_secret = await get_system_config(db, tenant_id, 'employee_sync', 'CLIENT_SECRET') enabled = await get_feature_switch(db, tenant_id, 'employee_sync') - # 从环境变量检查数据源是否已配置 - sync_db_url = os.environ.get('EMPLOYEE_SYNC_DB_URL', '') - configured = bool(sync_db_url) + # 脱敏处理 client_secret + client_secret_masked = None + if client_secret: + if len(client_secret) > 8: + client_secret_masked = client_secret[:4] + '****' + client_secret[-4:] + else: + client_secret_masked = '****' + + # 检查配置是否完整 + configured = bool(corp_id and client_id and client_secret) return ResponseModel( message="获取成功", data={ - "table_name": table_name or "v_钉钉员工表", + "corp_id": corp_id, + "client_id": client_id, + "client_secret_masked": client_secret_masked, "enabled": enabled, - "configured": configured, # 数据源是否已在环境变量配置 + "configured": configured, } ) @@ -329,18 +335,23 @@ async def update_employee_sync_config( db: AsyncSession = Depends(get_db), ) -> ResponseModel: """ - 更新员工同步配置 + 更新员工同步配置(钉钉 API 方式) 仅限管理员访问 - 只能配置表名和开关,数据库连接从环境变量读取 """ check_admin_permission(current_user) tenant_id = await get_or_create_tenant_id(db) try: - if config.table_name is not None: - await set_system_config(db, tenant_id, 'employee_sync', 'TABLE_NAME', config.table_name) + if config.corp_id is not None: + await set_system_config(db, tenant_id, 'employee_sync', 'CORP_ID', config.corp_id) + + if config.client_id is not None: + await set_system_config(db, tenant_id, 'employee_sync', 'CLIENT_ID', config.client_id) + + if config.client_secret is not None: + await set_system_config(db, tenant_id, 'employee_sync', 'CLIENT_SECRET', config.client_secret) if config.enabled is not None: await set_feature_switch(db, tenant_id, 'employee_sync', config.enabled) @@ -370,44 +381,46 @@ async def test_employee_sync_connection( db: AsyncSession = Depends(get_db), ) -> ResponseModel: """ - 测试员工同步数据库连接 + 测试钉钉 API 连接 仅限管理员访问 - 使用环境变量中的数据库连接配置 """ check_admin_permission(current_user) - import os tenant_id = await get_or_create_tenant_id(db) - # 从环境变量获取数据库连接 - sync_db_url = os.environ.get('EMPLOYEE_SYNC_DB_URL', '') + # 获取钉钉配置 + corp_id = await get_system_config(db, tenant_id, 'employee_sync', 'CORP_ID') + client_id = await get_system_config(db, tenant_id, 'employee_sync', 'CLIENT_ID') + client_secret = await get_system_config(db, tenant_id, 'employee_sync', 'CLIENT_SECRET') - if not sync_db_url: + if not all([corp_id, client_id, client_secret]): return ResponseModel( code=400, - message="数据源未配置,请联系系统管理员配置 EMPLOYEE_SYNC_DB_URL 环境变量" + message="钉钉 API 配置不完整,请先填写 CorpId、ClientId、ClientSecret" ) - # 获取表名配置 - table_name = await get_system_config(db, tenant_id, 'employee_sync', 'TABLE_NAME') or "v_钉钉员工表" - try: - from sqlalchemy.ext.asyncio import create_async_engine + from app.services.dingtalk_service import DingTalkService - engine = create_async_engine(sync_db_url, echo=False, pool_pre_ping=True) - - async with engine.connect() as conn: - # 测试查询员工表 - result = await conn.execute(text(f"SELECT COUNT(*) FROM {table_name}")) - count = result.scalar() - - await engine.dispose() - - return ResponseModel( - message=f"连接成功!员工表共有 {count} 条在职员工", - data={"employee_count": count} + dingtalk = DingTalkService( + corp_id=corp_id, + client_id=client_id, + client_secret=client_secret ) + + result = await dingtalk.test_connection() + + if result["success"]: + return ResponseModel( + message=f"连接成功!已获取到组织架构", + data=result + ) + else: + return ResponseModel( + code=500, + message=result["message"] + ) except Exception as e: logger.error(f"测试连接失败: {str(e)}") diff --git a/backend/app/services/dingtalk_service.py b/backend/app/services/dingtalk_service.py new file mode 100644 index 0000000..0381961 --- /dev/null +++ b/backend/app/services/dingtalk_service.py @@ -0,0 +1,276 @@ +""" +钉钉开放平台 API 服务 +用于通过钉钉 API 获取组织架构和员工信息 +""" + +import httpx +from typing import List, Dict, Any, Optional +from datetime import datetime, timedelta +from app.core.logger import get_logger + +logger = get_logger(__name__) + + +class DingTalkService: + """钉钉 API 服务""" + + BASE_URL = "https://api.dingtalk.com" + OAPI_URL = "https://oapi.dingtalk.com" + + def __init__( + self, + corp_id: str, + client_id: str, + client_secret: str + ): + """ + 初始化钉钉服务 + + Args: + corp_id: 企业 CorpId + client_id: 应用 ClientId (AppKey) + client_secret: 应用 ClientSecret (AppSecret) + """ + self.corp_id = corp_id + self.client_id = client_id + self.client_secret = client_secret + self._access_token: Optional[str] = None + self._token_expires_at: Optional[datetime] = None + + async def get_access_token(self) -> str: + """ + 获取钉钉 Access Token + + 使用新版 OAuth2 接口获取 + + Returns: + access_token + """ + # 检查缓存的 token 是否有效 + if self._access_token and self._token_expires_at: + if datetime.now() < self._token_expires_at - timedelta(minutes=5): + return self._access_token + + url = f"{self.BASE_URL}/v1.0/oauth2/{self.corp_id}/token" + + payload = { + "client_id": self.client_id, + "client_secret": self.client_secret, + "grant_type": "client_credentials" + } + + async with httpx.AsyncClient(timeout=30) as client: + response = await client.post(url, json=payload) + response.raise_for_status() + data = response.json() + + self._access_token = data["access_token"] + expires_in = data.get("expires_in", 7200) + self._token_expires_at = datetime.now() + timedelta(seconds=expires_in) + + logger.info(f"获取钉钉 Access Token 成功,有效期 {expires_in} 秒") + return self._access_token + + async def get_department_list(self, dept_id: int = 1) -> List[Dict[str, Any]]: + """ + 获取部门列表 + + Args: + dept_id: 父部门ID,根部门为1 + + Returns: + 部门列表 + """ + access_token = await self.get_access_token() + url = f"{self.OAPI_URL}/topapi/v2/department/listsub" + + params = {"access_token": access_token} + payload = {"dept_id": dept_id} + + async with httpx.AsyncClient(timeout=30) as client: + response = await client.post(url, params=params, json=payload) + response.raise_for_status() + data = response.json() + + if data.get("errcode") != 0: + raise Exception(f"获取部门列表失败: {data.get('errmsg')}") + + return data.get("result", []) + + async def get_all_departments(self) -> List[Dict[str, Any]]: + """ + 递归获取所有部门 + + Returns: + 所有部门列表(扁平化) + """ + all_departments = [] + + async def fetch_recursive(parent_id: int): + departments = await self.get_department_list(parent_id) + for dept in departments: + all_departments.append(dept) + # 递归获取子部门 + await fetch_recursive(dept["dept_id"]) + + await fetch_recursive(1) # 从根部门开始 + logger.info(f"获取到 {len(all_departments)} 个部门") + return all_departments + + async def get_department_users( + self, + dept_id: int, + cursor: int = 0, + size: int = 100 + ) -> Dict[str, Any]: + """ + 获取部门用户列表 + + Args: + dept_id: 部门ID + cursor: 分页游标 + size: 每页大小,最大100 + + Returns: + 用户列表和分页信息 + """ + access_token = await self.get_access_token() + url = f"{self.OAPI_URL}/topapi/v2/user/list" + + params = {"access_token": access_token} + payload = { + "dept_id": dept_id, + "cursor": cursor, + "size": size + } + + async with httpx.AsyncClient(timeout=30) as client: + response = await client.post(url, params=params, json=payload) + response.raise_for_status() + data = response.json() + + if data.get("errcode") != 0: + raise Exception(f"获取部门用户失败: {data.get('errmsg')}") + + return data.get("result", {}) + + async def get_all_employees(self) -> List[Dict[str, Any]]: + """ + 获取所有在职员工 + + 遍历所有部门获取员工列表 + + Returns: + 员工列表 + """ + logger.info("开始从钉钉 API 获取所有员工...") + + # 1. 获取所有部门 + departments = await self.get_all_departments() + + # 创建部门ID到名称的映射 + dept_map = {dept["dept_id"]: dept["name"] for dept in departments} + dept_map[1] = "根部门" # 添加根部门 + + # 2. 遍历所有部门获取员工 + all_employees = {} # 使用字典去重(按 userid) + + for dept in [{"dept_id": 1, "name": "根部门"}] + departments: + dept_id = dept["dept_id"] + dept_name = dept["name"] + + cursor = 0 + while True: + result = await self.get_department_users(dept_id, cursor) + users = result.get("list", []) + + for user in users: + userid = user.get("userid") + if userid and userid not in all_employees: + # 转换为统一格式 + employee = self._convert_user_to_employee(user, dept_name) + all_employees[userid] = employee + + # 检查是否还有更多数据 + if not result.get("has_more", False): + break + cursor = result.get("next_cursor", 0) + + employees = list(all_employees.values()) + logger.info(f"获取到 {len(employees)} 位在职员工") + return employees + + def _convert_user_to_employee( + self, + user: Dict[str, Any], + dept_name: str + ) -> Dict[str, Any]: + """ + 将钉钉用户数据转换为员工数据格式 + + Args: + user: 钉钉用户数据 + dept_name: 部门名称 + + Returns: + 标准员工数据格式 + """ + return { + 'full_name': user.get('name', ''), + 'phone': user.get('mobile', ''), + 'email': user.get('email', ''), + 'department': dept_name, + 'position': user.get('title', ''), + 'employee_no': user.get('job_number', ''), + 'is_leader': user.get('leader', False), + 'is_active': user.get('active', True), + 'dingtalk_id': user.get('userid', ''), + 'join_date': user.get('hired_date'), + 'work_location': user.get('work_place', ''), + 'avatar': user.get('avatar', ''), + } + + async def test_connection(self) -> Dict[str, Any]: + """ + 测试钉钉 API 连接 + + Returns: + 测试结果 + """ + try: + # 1. 测试获取 token + token = await self.get_access_token() + + # 2. 测试获取根部门信息 + departments = await self.get_department_list(1) + + # 3. 获取根部门员工数量 + result = await self.get_department_users(1, size=1) + + return { + "success": True, + "message": "连接成功", + "corp_id": self.corp_id, + "department_count": len(departments) + 1, # +1 是根部门 + "has_employees": result.get("has_more", False) or len(result.get("list", [])) > 0 + } + + except httpx.HTTPStatusError as e: + error_detail = "HTTP错误" + if e.response.status_code == 400: + try: + error_data = e.response.json() + error_detail = error_data.get("message", str(e)) + except: + pass + return { + "success": False, + "message": f"连接失败: {error_detail}", + "error": str(e) + } + except Exception as e: + return { + "success": False, + "message": f"连接失败: {str(e)}", + "error": str(e) + } diff --git a/backend/app/services/employee_sync_service.py b/backend/app/services/employee_sync_service.py index 77831a3..c72c5a5 100644 --- a/backend/app/services/employee_sync_service.py +++ b/backend/app/services/employee_sync_service.py @@ -1,6 +1,6 @@ """ 员工同步服务 -从外部钉钉员工表同步员工数据到考培练系统 +从钉钉开放 API 同步员工数据到考培练系统 """ from typing import List, Dict, Any, Optional, Tuple @@ -23,115 +23,79 @@ logger = get_logger(__name__) class EmployeeSyncService: """员工同步服务""" - # 默认外部数据库连接配置(向后兼容,从环境变量读取) - DEFAULT_TABLE_NAME = "v_钉钉员工表" - def __init__(self, db: AsyncSession, tenant_id: int = 1): self.db = db self.tenant_id = tenant_id - self.external_engine = None - self.table_name = self.DEFAULT_TABLE_NAME - self._db_url = None + self._dingtalk_config = None - async def _get_table_name_from_db(self) -> str: - """从数据库获取员工表名配置""" + async def _get_dingtalk_config(self) -> Dict[str, str]: + """从数据库获取钉钉 API 配置""" + if self._dingtalk_config: + return self._dingtalk_config + try: result = await self.db.execute( text(""" - SELECT config_value + SELECT config_key, config_value FROM tenant_configs WHERE tenant_id = :tenant_id - AND config_group = 'employee_sync' - AND config_key = 'TABLE_NAME' + AND config_group = 'employee_sync' """), {"tenant_id": self.tenant_id} ) - row = result.fetchone() - return row[0] if row else self.DEFAULT_TABLE_NAME - except Exception: - return self.DEFAULT_TABLE_NAME + rows = result.fetchall() + + config = {} + for key, value in rows: + config[key] = value + + self._dingtalk_config = config + return config + except Exception as e: + logger.error(f"获取钉钉配置失败: {e}") + return {} - def _get_db_url_from_env(self) -> str: - """从环境变量获取数据库连接URL""" - import os - - # 优先使用环境变量中的完整URL - db_url = os.environ.get('EMPLOYEE_SYNC_DB_URL', '') - - if db_url: - return db_url - - # 向后兼容:如果没有配置环境变量,使用默认值 - logger.warning("EMPLOYEE_SYNC_DB_URL 环境变量未配置,使用默认数据源") - return "mysql+aiomysql://neuron_new:NWxGM6CQoMLKyEszXhfuLBIIo1QbeK@120.77.144.233:29613/neuron_new?charset=utf8mb4" - async def __aenter__(self): """异步上下文管理器入口""" - self._db_url = self._get_db_url_from_env() - self.table_name = await self._get_table_name_from_db() - - self.external_engine = create_async_engine( - self._db_url, - echo=False, - pool_pre_ping=True, - pool_recycle=3600 - ) + # 预加载钉钉配置 + await self._get_dingtalk_config() return self async def __aexit__(self, exc_type, exc_val, exc_tb): """异步上下文管理器出口""" - if self.external_engine: - await self.external_engine.dispose() + pass async def fetch_employees_from_dingtalk(self) -> List[Dict[str, Any]]: """ - 从钉钉员工表获取在职员工数据 + 从钉钉 API 获取在职员工数据 Returns: 员工数据列表 """ - logger.info(f"开始从员工表 {self.table_name} 获取数据...") + config = await self._get_dingtalk_config() - query = f""" - SELECT - 员工姓名, - 手机号, - 邮箱, - 所属部门, - 职位, - 工号, - 是否领导, - 是否在职, - 钉钉用户ID, - 入职日期, - 工作地点 - FROM {self.table_name} - WHERE 是否在职 = 1 - ORDER BY 员工姓名 - """ + corp_id = config.get('CORP_ID') + client_id = config.get('CLIENT_ID') + client_secret = config.get('CLIENT_SECRET') - async with self.external_engine.connect() as conn: - result = await conn.execute(text(query)) - rows = result.fetchall() - - employees = [] - for row in rows: - employees.append({ - 'full_name': row[0], - 'phone': row[1], - 'email': row[2], - 'department': row[3], - 'position': row[4], - 'employee_no': row[5], - 'is_leader': bool(row[6]), - 'is_active': bool(row[7]), - 'dingtalk_id': row[8], - 'join_date': row[9], - 'work_location': row[10] - }) - - logger.info(f"获取到 {len(employees)} 条在职员工数据") - return employees + if not all([corp_id, client_id, client_secret]): + raise Exception("钉钉 API 配置不完整,请先配置 CorpId、ClientId、ClientSecret") + + from app.services.dingtalk_service import DingTalkService + + dingtalk = DingTalkService( + corp_id=corp_id, + client_id=client_id, + client_secret=client_secret + ) + + employees = await dingtalk.get_all_employees() + + # 过滤在职员工 + active_employees = [emp for emp in employees if emp.get('is_active', True)] + logger.info(f"获取到 {len(active_employees)} 条在职员工数据") + + return active_employees def generate_email(self, phone: str, original_email: Optional[str]) -> Optional[str]: """ diff --git a/frontend/src/views/admin/system-settings.vue b/frontend/src/views/admin/system-settings.vue index ac96c60..ff0d65b 100644 --- a/frontend/src/views/admin/system-settings.vue +++ b/frontend/src/views/admin/system-settings.vue @@ -103,7 +103,7 @@ style="margin-bottom: 20px;" > @@ -111,6 +111,7 @@ @@ -120,30 +121,53 @@ active-text="已启用" inactive-text="已禁用" /> - 启用后将每日自动同步员工数据 + 启用后将每日自动从钉钉同步员工数据 - + 钉钉应用配置 + + - 视图需包含:员工姓名、手机号、所属部门、职位、钉钉用户ID等字段 + 在钉钉管理后台-设置中查看 - + + + 钉钉开发者后台-应用凭证 + + + + + + 当前值: {{ syncForm.client_secret_masked }} + + + + {{ syncForm.configured ? '已配置' : '未配置' }} - 数据源由系统管理员在后台配置 保存配置 - + 测试连接 @@ -182,11 +206,14 @@ const dingtalkForm = reactive({ corp_id: '', }) -// 员工同步配置表单 +// 员工同步配置表单(钉钉 API 方式) const syncForm = reactive({ enabled: false, - table_name: 'v_钉钉员工表', - configured: false, // 数据源是否已配置 + corp_id: '', + client_id: '', + client_secret: '', + client_secret_masked: '', + configured: false, }) // 表单验证规则 @@ -202,7 +229,14 @@ const dingtalkRules = reactive({ ] }) -const syncRules = reactive({}) +const syncRules = reactive({ + corp_id: [ + { required: false, message: '请输入企业 CorpId', trigger: 'blur' } + ], + client_id: [ + { required: false, message: '请输入应用 ClientId', trigger: 'blur' } + ] +}) /** * 加载钉钉配置 @@ -274,7 +308,7 @@ const saveDingtalkConfig = async () => { } /** - * 加载员工同步配置 + * 加载员工同步配置(钉钉 API 方式) */ const loadSyncConfig = async () => { syncLoading.value = true @@ -282,7 +316,10 @@ const loadSyncConfig = async () => { const response = await request.get('/api/v1/settings/employee-sync') if (response.code === 200 && response.data) { syncForm.enabled = response.data.enabled || false - syncForm.table_name = response.data.table_name || 'v_钉钉员工表' + syncForm.corp_id = response.data.corp_id || '' + syncForm.client_id = response.data.client_id || '' + syncForm.client_secret = '' + syncForm.client_secret_masked = response.data.client_secret_masked || '' syncForm.configured = response.data.configured || false } } catch (error: any) { @@ -293,14 +330,20 @@ const loadSyncConfig = async () => { } /** - * 保存员工同步配置 + * 保存员工同步配置(钉钉 API 方式) */ const saveSyncConfig = async () => { syncSaving.value = true try { - const updateData = { + const updateData: any = { enabled: syncForm.enabled, - table_name: syncForm.table_name, + corp_id: syncForm.corp_id, + client_id: syncForm.client_id, + } + + // 只有输入了新密码才传递 + if (syncForm.client_secret) { + updateData.client_secret = syncForm.client_secret } const response = await request.put('/api/v1/settings/employee-sync', updateData)