""" 系统设置 API 供企业管理员(admin角色)配置系统级别的设置,如钉钉免密登录等 """ from typing import Optional, Dict, Any from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy import text from sqlalchemy.ext.asyncio import AsyncSession from pydantic import BaseModel, Field from app.core.deps import get_current_active_user, get_db from app.core.logger import logger from app.models.user import User from app.schemas.base import ResponseModel router = APIRouter() # ============================================ # Schema 定义 # ============================================ class DingtalkConfigUpdate(BaseModel): """钉钉配置更新请求""" app_key: Optional[str] = Field(None, description="钉钉AppKey") app_secret: Optional[str] = Field(None, description="钉钉AppSecret") agent_id: Optional[str] = Field(None, description="钉钉AgentId") corp_id: Optional[str] = Field(None, description="钉钉CorpId") enabled: Optional[bool] = Field(None, description="是否启用钉钉免密登录") class DingtalkConfigResponse(BaseModel): """钉钉配置响应""" app_key: Optional[str] = None app_secret_masked: Optional[str] = None # 脱敏显示 agent_id: Optional[str] = None corp_id: Optional[str] = None enabled: bool = False class EmployeeSyncConfigUpdate(BaseModel): """员工同步配置更新请求""" db_host: Optional[str] = Field(None, description="数据库主机") db_port: Optional[int] = Field(None, description="数据库端口") db_name: Optional[str] = Field(None, description="数据库名") db_user: Optional[str] = Field(None, description="数据库用户名") db_password: Optional[str] = Field(None, description="数据库密码") table_name: Optional[str] = Field(None, description="员工表/视图名称") enabled: Optional[bool] = Field(None, description="是否启用自动同步") # ============================================ # 辅助函数 # ============================================ def check_admin_permission(user: User): """检查是否为管理员""" if user.role != 'admin': raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="需要管理员权限" ) async def get_or_create_tenant_id(db: AsyncSession) -> int: """获取或创建默认租户ID(简化版,假设单租户)""" # 对于考培练系统,简化处理,使用固定的租户ID=1 return 1 async def get_system_config(db: AsyncSession, tenant_id: int, config_group: str, config_key: str) -> Optional[str]: """获取系统配置值""" result = await db.execute( text(""" SELECT config_value FROM tenant_configs WHERE tenant_id = :tenant_id AND config_group = :config_group AND config_key = :config_key """), {"tenant_id": tenant_id, "config_group": config_group, "config_key": config_key} ) row = result.fetchone() return row[0] if row else None async def set_system_config(db: AsyncSession, tenant_id: int, config_group: str, config_key: str, config_value: str): """设置系统配置值""" # 检查是否已存在 existing = await get_system_config(db, tenant_id, config_group, config_key) if existing is not None: # 更新 await db.execute( text(""" UPDATE tenant_configs SET config_value = :config_value WHERE tenant_id = :tenant_id AND config_group = :config_group AND config_key = :config_key """), {"tenant_id": tenant_id, "config_group": config_group, "config_key": config_key, "config_value": config_value} ) else: # 插入 await db.execute( text(""" INSERT INTO tenant_configs (tenant_id, config_group, config_key, config_value, value_type, is_encrypted) VALUES (:tenant_id, :config_group, :config_key, :config_value, 'string', :is_encrypted) """), { "tenant_id": tenant_id, "config_group": config_group, "config_key": config_key, "config_value": config_value, "is_encrypted": 1 if config_key == 'DINGTALK_APP_SECRET' else 0 } ) async def get_feature_switch(db: AsyncSession, tenant_id: int, feature_code: str) -> bool: """获取功能开关状态""" # 先查租户级别 result = await db.execute( text(""" SELECT is_enabled FROM feature_switches WHERE feature_code = :feature_code AND tenant_id = :tenant_id """), {"tenant_id": tenant_id, "feature_code": feature_code} ) row = result.fetchone() if row: return bool(row[0]) # 再查默认值 result = await db.execute( text(""" SELECT is_enabled FROM feature_switches WHERE feature_code = :feature_code AND tenant_id IS NULL """), {"feature_code": feature_code} ) row = result.fetchone() return bool(row[0]) if row else False async def set_feature_switch(db: AsyncSession, tenant_id: int, feature_code: str, is_enabled: bool): """设置功能开关状态""" # 检查是否已存在租户级配置 result = await db.execute( text(""" SELECT id FROM feature_switches WHERE feature_code = :feature_code AND tenant_id = :tenant_id """), {"tenant_id": tenant_id, "feature_code": feature_code} ) row = result.fetchone() if row: # 更新 await db.execute( text(""" UPDATE feature_switches SET is_enabled = :is_enabled WHERE tenant_id = :tenant_id AND feature_code = :feature_code """), {"tenant_id": tenant_id, "feature_code": feature_code, "is_enabled": 1 if is_enabled else 0} ) else: # 获取默认配置信息 result = await db.execute( text(""" SELECT feature_name, feature_group, description FROM feature_switches WHERE feature_code = :feature_code AND tenant_id IS NULL """), {"feature_code": feature_code} ) default_row = result.fetchone() if default_row: # 插入租户级配置 await db.execute( text(""" INSERT INTO feature_switches (tenant_id, feature_code, feature_name, feature_group, is_enabled, description) VALUES (:tenant_id, :feature_code, :feature_name, :feature_group, :is_enabled, :description) """), { "tenant_id": tenant_id, "feature_code": feature_code, "feature_name": default_row[0], "feature_group": default_row[1], "is_enabled": 1 if is_enabled else 0, "description": default_row[2] } ) # ============================================ # API 端点 # ============================================ @router.get("/dingtalk", response_model=ResponseModel) async def get_dingtalk_config( current_user: User = Depends(get_current_active_user), db: AsyncSession = Depends(get_db), ) -> ResponseModel: """ 获取钉钉配置 仅限管理员访问 """ check_admin_permission(current_user) tenant_id = await get_or_create_tenant_id(db) # 获取配置 app_key = await get_system_config(db, tenant_id, 'dingtalk', 'DINGTALK_APP_KEY') app_secret = await get_system_config(db, tenant_id, 'dingtalk', 'DINGTALK_APP_SECRET') agent_id = await get_system_config(db, tenant_id, 'dingtalk', 'DINGTALK_AGENT_ID') corp_id = await get_system_config(db, tenant_id, 'dingtalk', 'DINGTALK_CORP_ID') enabled = await get_feature_switch(db, tenant_id, 'dingtalk_login') # 脱敏处理 app_secret app_secret_masked = None if app_secret: if len(app_secret) > 8: app_secret_masked = app_secret[:4] + '****' + app_secret[-4:] else: app_secret_masked = '****' return ResponseModel( message="获取成功", data={ "app_key": app_key, "app_secret_masked": app_secret_masked, "agent_id": agent_id, "corp_id": corp_id, "enabled": enabled, } ) @router.put("/dingtalk", response_model=ResponseModel) async def update_dingtalk_config( config: DingtalkConfigUpdate, current_user: User = Depends(get_current_active_user), db: AsyncSession = Depends(get_db), ) -> ResponseModel: """ 更新钉钉配置 仅限管理员访问 """ check_admin_permission(current_user) tenant_id = await get_or_create_tenant_id(db) try: # 更新配置 if config.app_key is not None: await set_system_config(db, tenant_id, 'dingtalk', 'DINGTALK_APP_KEY', config.app_key) if config.app_secret is not None: await set_system_config(db, tenant_id, 'dingtalk', 'DINGTALK_APP_SECRET', config.app_secret) if config.agent_id is not None: await set_system_config(db, tenant_id, 'dingtalk', 'DINGTALK_AGENT_ID', config.agent_id) if config.corp_id is not None: await set_system_config(db, tenant_id, 'dingtalk', 'DINGTALK_CORP_ID', config.corp_id) if config.enabled is not None: await set_feature_switch(db, tenant_id, 'dingtalk_login', config.enabled) await db.commit() logger.info( "钉钉配置已更新", user_id=current_user.id, username=current_user.username, ) return ResponseModel(message="配置已保存") except Exception as e: await db.rollback() logger.error(f"更新钉钉配置失败: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="保存配置失败" ) @router.get("/employee-sync", response_model=ResponseModel) async def get_employee_sync_config( current_user: User = Depends(get_current_active_user), db: AsyncSession = Depends(get_db), ) -> ResponseModel: """ 获取员工同步配置 仅限管理员访问 数据库连接从环境变量读取,前端只能配置表名和开关 """ check_admin_permission(current_user) import os tenant_id = await get_or_create_tenant_id(db) # 从数据库获取表名和开关配置 table_name = await get_system_config(db, tenant_id, 'employee_sync', 'TABLE_NAME') enabled = await get_feature_switch(db, tenant_id, 'employee_sync') # 从环境变量检查数据源是否已配置 sync_db_url = os.environ.get('EMPLOYEE_SYNC_DB_URL', '') configured = bool(sync_db_url) return ResponseModel( message="获取成功", data={ "table_name": table_name or "v_钉钉员工表", "enabled": enabled, "configured": configured, # 数据源是否已在环境变量配置 } ) @router.put("/employee-sync", response_model=ResponseModel) async def update_employee_sync_config( config: EmployeeSyncConfigUpdate, current_user: User = Depends(get_current_active_user), db: AsyncSession = Depends(get_db), ) -> ResponseModel: """ 更新员工同步配置 仅限管理员访问 只能配置表名和开关,数据库连接从环境变量读取 """ check_admin_permission(current_user) tenant_id = await get_or_create_tenant_id(db) try: if config.table_name is not None: await set_system_config(db, tenant_id, 'employee_sync', 'TABLE_NAME', config.table_name) if config.enabled is not None: await set_feature_switch(db, tenant_id, 'employee_sync', config.enabled) await db.commit() logger.info( "员工同步配置已更新", user_id=current_user.id, username=current_user.username, ) return ResponseModel(message="配置已保存") except Exception as e: await db.rollback() logger.error(f"更新员工同步配置失败: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="保存配置失败" ) @router.post("/employee-sync/test", response_model=ResponseModel) async def test_employee_sync_connection( current_user: User = Depends(get_current_active_user), db: AsyncSession = Depends(get_db), ) -> ResponseModel: """ 测试员工同步数据库连接 仅限管理员访问 使用环境变量中的数据库连接配置 """ check_admin_permission(current_user) import os tenant_id = await get_or_create_tenant_id(db) # 从环境变量获取数据库连接 sync_db_url = os.environ.get('EMPLOYEE_SYNC_DB_URL', '') if not sync_db_url: return ResponseModel( code=400, message="数据源未配置,请联系系统管理员配置 EMPLOYEE_SYNC_DB_URL 环境变量" ) # 获取表名配置 table_name = await get_system_config(db, tenant_id, 'employee_sync', 'TABLE_NAME') or "v_钉钉员工表" try: from sqlalchemy.ext.asyncio import create_async_engine engine = create_async_engine(sync_db_url, echo=False, pool_pre_ping=True) async with engine.connect() as conn: # 测试查询员工表 result = await conn.execute(text(f"SELECT COUNT(*) FROM {table_name}")) count = result.scalar() await engine.dispose() return ResponseModel( message=f"连接成功!员工表共有 {count} 条在职员工", data={"employee_count": count} ) except Exception as e: logger.error(f"测试连接失败: {str(e)}") return ResponseModel( code=500, message=f"连接失败: {str(e)}" ) @router.get("/all", response_model=ResponseModel) async def get_all_settings( current_user: User = Depends(get_current_active_user), db: AsyncSession = Depends(get_db), ) -> ResponseModel: """ 获取所有系统设置概览 仅限管理员访问 """ check_admin_permission(current_user) tenant_id = await get_or_create_tenant_id(db) # 钉钉配置状态 dingtalk_enabled = await get_feature_switch(db, tenant_id, 'dingtalk_login') dingtalk_corp_id = await get_system_config(db, tenant_id, 'dingtalk', 'DINGTALK_CORP_ID') # 员工同步配置状态 employee_sync_enabled = await get_feature_switch(db, tenant_id, 'employee_sync') employee_sync_host = await get_system_config(db, tenant_id, 'employee_sync', 'DB_HOST') return ResponseModel( message="获取成功", data={ "dingtalk": { "enabled": dingtalk_enabled, "configured": bool(dingtalk_corp_id), }, "employee_sync": { "enabled": employee_sync_enabled, "configured": bool(employee_sync_host), } } )