Files
012-kaopeilian/backend/app/services/employee_sync_service.py
yuliang_guo 18d6d5aff3
Some checks failed
continuous-integration/drone/push Build is failing
refactor: 员工同步复用钉钉免密登录配置
- 移除员工同步独立的 API 凭证配置
- 复用 dingtalk 配置组的 CorpId、AppKey、AppSecret
- 简化前端界面,只保留开关和测试连接
2026-01-31 17:29:10 +08:00

763 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
员工同步服务
从钉钉开放 API 同步员工数据到考培练系统
"""
from typing import List, Dict, Any, Optional, Tuple
from datetime import datetime
from sqlalchemy import select, text
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import selectinload
import asyncio
from app.core.logger import get_logger
from app.core.security import get_password_hash
from app.models.user import User, Team
from app.models.position import Position
from app.models.position_member import PositionMember
from app.schemas.user import UserCreate
logger = get_logger(__name__)
class EmployeeSyncService:
"""员工同步服务"""
def __init__(self, db: AsyncSession, tenant_id: int = 1):
self.db = db
self.tenant_id = tenant_id
self._dingtalk_config = None
async def _get_dingtalk_config(self) -> Dict[str, str]:
"""从数据库获取钉钉 API 配置(复用免密登录配置)"""
if self._dingtalk_config:
return self._dingtalk_config
try:
# 从 dingtalk 配置组读取(与免密登录共用)
result = await self.db.execute(
text("""
SELECT config_key, config_value
FROM tenant_configs
WHERE tenant_id = :tenant_id
AND config_group = 'dingtalk'
"""),
{"tenant_id": self.tenant_id}
)
rows = result.fetchall()
config = {}
for key, value in rows:
# 转换 key 名称以匹配 DingTalkService 需要的格式
if key == 'DINGTALK_CORP_ID':
config['CORP_ID'] = value
elif key == 'DINGTALK_APP_KEY':
config['CLIENT_ID'] = value
elif key == 'DINGTALK_APP_SECRET':
config['CLIENT_SECRET'] = value
self._dingtalk_config = config
return config
except Exception as e:
logger.error(f"获取钉钉配置失败: {e}")
return {}
async def __aenter__(self):
"""异步上下文管理器入口"""
# 预加载钉钉配置
await self._get_dingtalk_config()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""异步上下文管理器出口"""
pass
async def fetch_employees_from_dingtalk(self) -> List[Dict[str, Any]]:
"""
从钉钉 API 获取在职员工数据
Returns:
员工数据列表
"""
config = await self._get_dingtalk_config()
corp_id = config.get('CORP_ID')
client_id = config.get('CLIENT_ID')
client_secret = config.get('CLIENT_SECRET')
if not all([corp_id, client_id, client_secret]):
raise Exception("钉钉 API 配置不完整,请先配置 CorpId、ClientId、ClientSecret")
from app.services.dingtalk_service import DingTalkService
dingtalk = DingTalkService(
corp_id=corp_id,
client_id=client_id,
client_secret=client_secret
)
employees = await dingtalk.get_all_employees()
# 过滤在职员工
active_employees = [emp for emp in employees if emp.get('is_active', True)]
logger.info(f"获取到 {len(active_employees)} 条在职员工数据")
return active_employees
def generate_email(self, phone: str, original_email: Optional[str]) -> Optional[str]:
"""
生成邮箱地址
如果原始邮箱为空或格式无效,生成 {手机号}@rxm.com
Args:
phone: 手机号
original_email: 原始邮箱
Returns:
邮箱地址
"""
if original_email and original_email.strip():
email = original_email.strip()
# 验证邮箱格式:检查是否有@后直接跟点号等无效格式
if '@' in email and not email.startswith('@') and not email.endswith('@'):
# 检查@后面是否直接是点号
at_index = email.index('@')
if at_index + 1 < len(email) and email[at_index + 1] != '.':
# 检查是否有域名部分
domain = email[at_index + 1:]
if '.' in domain and len(domain) > 2:
return email
# 如果邮箱无效或为空,使用手机号生成
if phone:
return f"{phone}@rxm.com"
return None
def determine_role(self, is_leader: bool) -> str:
"""
确定用户角色
Args:
is_leader: 是否领导
Returns:
角色: manager 或 trainee
"""
return 'manager' if is_leader else 'trainee'
async def create_or_get_team(self, department_name: str, leader_id: Optional[int] = None) -> Team:
"""
创建或获取部门团队
Args:
department_name: 部门名称
leader_id: 负责人ID
Returns:
团队对象
"""
if not department_name or department_name.strip() == '':
return None
department_name = department_name.strip()
# 检查团队是否已存在使用limit(1)避免重复数据报错)
stmt = select(Team).where(
Team.name == department_name,
Team.is_deleted == False
).limit(1)
result = await self.db.execute(stmt)
team = result.scalar_one_or_none()
if team:
# 更新负责人
if leader_id and not team.leader_id:
team.leader_id = leader_id
logger.info(f"更新团队 {department_name} 的负责人")
return team
# 创建新团队
# 生成团队代码:使用部门名称的拼音首字母或简化处理
team_code = f"DEPT_{hash(department_name) % 100000:05d}"
team = Team(
name=department_name,
code=team_code,
description=f"{department_name}",
team_type='department',
is_active=True,
leader_id=leader_id
)
self.db.add(team)
await self.db.flush() # 获取ID但不提交
logger.info(f"创建团队: {department_name} (ID: {team.id})")
return team
async def create_or_get_position(self, position_name: str) -> Optional[Position]:
"""
创建或获取岗位
Args:
position_name: 岗位名称
Returns:
岗位对象
"""
if not position_name or position_name.strip() == '':
return None
position_name = position_name.strip()
# 检查岗位是否已存在使用first()避免重复数据报错)
stmt = select(Position).where(
Position.name == position_name,
Position.is_deleted == False
).limit(1)
result = await self.db.execute(stmt)
position = result.scalar_one_or_none()
if position:
return position
# 创建新岗位
position_code = f"POS_{hash(position_name) % 100000:05d}"
position = Position(
name=position_name,
code=position_code,
description=f"{position_name}",
status='active'
)
self.db.add(position)
await self.db.flush()
logger.info(f"创建岗位: {position_name} (ID: {position.id})")
return position
async def create_user(self, employee_data: Dict[str, Any]) -> Optional[User]:
"""
创建用户
Args:
employee_data: 员工数据
Returns:
用户对象或None如果创建失败
"""
phone = employee_data.get('phone')
full_name = employee_data.get('full_name')
if not phone:
logger.warning(f"员工 {full_name} 没有手机号,跳过")
return None
# 检查用户是否已存在(通过手机号,包括已软删除的)
stmt = select(User).where(User.phone == phone)
result = await self.db.execute(stmt)
existing_user = result.scalar_one_or_none()
if existing_user:
# 如果是软删除用户,恢复它
if existing_user.is_deleted:
existing_user.is_deleted = False
existing_user.full_name = full_name or existing_user.full_name
existing_user.is_active = True
dingtalk_id = employee_data.get('dingtalk_id')
if dingtalk_id:
existing_user.dingtalk_id = dingtalk_id
logger.info(f"恢复软删除用户: {phone} ({full_name})")
return existing_user
# 如果用户已存在但没有dingtalk_id则更新
dingtalk_id = employee_data.get('dingtalk_id')
if dingtalk_id and not existing_user.dingtalk_id:
existing_user.dingtalk_id = dingtalk_id
logger.info(f"更新用户 {phone} 的钉钉ID: {dingtalk_id}")
logger.info(f"用户已存在: {phone} ({full_name})")
return existing_user
# 生成邮箱
email = self.generate_email(phone, employee_data.get('email'))
# 检查邮箱是否已被其他用户使用(避免唯一索引冲突)
if email:
email_check_stmt = select(User).where(
User.email == email,
User.is_deleted == False
)
email_result = await self.db.execute(email_check_stmt)
if email_result.scalar_one_or_none():
# 邮箱已存在,使用手机号生成唯一邮箱
email = f"{phone}@rxm.com"
logger.warning(f"邮箱 {employee_data.get('email')} 已被使用,为员工 {full_name} 生成新邮箱: {email}")
# 确定角色
role = self.determine_role(employee_data.get('is_leader', False))
# 创建用户
user = User(
username=phone, # 使用手机号作为用户名
email=email,
phone=phone,
hashed_password=get_password_hash('123456'), # 初始密码
full_name=full_name,
role=role,
is_active=True,
is_verified=True,
dingtalk_id=employee_data.get('dingtalk_id'), # 钉钉用户ID
)
self.db.add(user)
await self.db.flush()
logger.info(f"创建用户: {phone} ({full_name}) - 角色: {role}")
return user
async def sync_employees(self) -> Dict[str, Any]:
"""
执行完整的员工同步流程
Returns:
同步结果统计
"""
logger.info("=" * 60)
logger.info("开始员工同步")
logger.info("=" * 60)
stats = {
'total_employees': 0,
'users_created': 0,
'users_skipped': 0,
'teams_created': 0,
'positions_created': 0,
'errors': [],
'start_time': datetime.now()
}
try:
# 1. 获取员工数据
employees = await self.fetch_employees_from_dingtalk()
stats['total_employees'] = len(employees)
if not employees:
logger.warning("没有获取到员工数据")
return stats
# 2. 创建用户和相关数据
for employee in employees:
try:
# 创建用户
user = await self.create_user(employee)
if not user:
stats['users_skipped'] += 1
continue
stats['users_created'] += 1
# 创建部门团队
department = employee.get('department')
if department:
team = await self.create_or_get_team(
department,
leader_id=user.id if employee.get('is_leader') else None
)
if team:
# 用SQL直接插入user_teams关联表避免懒加载问题
await self._add_user_to_team(user.id, team.id)
logger.info(f"关联用户 {user.full_name} 到团队 {team.name}")
# 创建岗位
position_name = employee.get('position')
if position_name:
position = await self.create_or_get_position(position_name)
if position:
# 检查是否已经关联
stmt = select(PositionMember).where(
PositionMember.position_id == position.id,
PositionMember.user_id == user.id,
PositionMember.is_deleted == False
)
result = await self.db.execute(stmt)
existing_member = result.scalar_one_or_none()
if not existing_member:
# 创建岗位成员关联
position_member = PositionMember(
position_id=position.id,
user_id=user.id,
role='member'
)
self.db.add(position_member)
logger.info(f"关联用户 {user.full_name} 到岗位 {position.name}")
except Exception as e:
error_msg = f"处理员工 {employee.get('full_name')} 时出错: {str(e)}"
logger.error(error_msg)
stats['errors'].append(error_msg)
continue
# 3. 提交所有更改
await self.db.commit()
logger.info("✅ 数据库事务已提交")
except Exception as e:
logger.error(f"员工同步失败: {str(e)}")
await self.db.rollback()
stats['errors'].append(str(e))
raise
finally:
stats['end_time'] = datetime.now()
stats['duration'] = (stats['end_time'] - stats['start_time']).total_seconds()
# 4. 输出统计信息
logger.info("=" * 60)
logger.info("同步完成统计")
logger.info("=" * 60)
logger.info(f"总员工数: {stats['total_employees']}")
logger.info(f"创建用户: {stats['users_created']}")
logger.info(f"跳过用户: {stats['users_skipped']}")
logger.info(f"耗时: {stats['duration']:.2f}")
if stats['errors']:
logger.warning(f"错误数量: {len(stats['errors'])}")
for error in stats['errors']:
logger.warning(f" - {error}")
return stats
async def preview_sync_data(self) -> Dict[str, Any]:
"""
预览待同步的员工数据(不执行实际同步)
Returns:
预览数据
"""
logger.info("预览待同步员工数据...")
employees = await self.fetch_employees_from_dingtalk()
preview = {
'total_count': len(employees),
'employees': [],
'departments': set(),
'positions': set(),
'leaders_count': 0,
'trainees_count': 0
}
for emp in employees:
role = self.determine_role(emp.get('is_leader', False))
email = self.generate_email(emp.get('phone'), emp.get('email'))
preview['employees'].append({
'full_name': emp.get('full_name'),
'phone': emp.get('phone'),
'email': email,
'department': emp.get('department'),
'position': emp.get('position'),
'role': role,
'is_leader': emp.get('is_leader')
})
if emp.get('department'):
preview['departments'].add(emp.get('department'))
if emp.get('position'):
preview['positions'].add(emp.get('position'))
if role == 'manager':
preview['leaders_count'] += 1
else:
preview['trainees_count'] += 1
preview['departments'] = list(preview['departments'])
preview['positions'] = list(preview['positions'])
return preview
async def _add_user_to_team(self, user_id: int, team_id: int) -> None:
"""
将用户添加到团队直接SQL操作避免懒加载问题
Args:
user_id: 用户ID
team_id: 团队ID
"""
# 先检查是否已存在关联
check_result = await self.db.execute(
text("SELECT 1 FROM user_teams WHERE user_id = :user_id AND team_id = :team_id"),
{"user_id": user_id, "team_id": team_id}
)
if check_result.scalar() is None:
# 不存在则插入
await self.db.execute(
text("INSERT INTO user_teams (user_id, team_id, role) VALUES (:user_id, :team_id, 'member')"),
{"user_id": user_id, "team_id": team_id}
)
async def _cleanup_user_related_data(self, user_id: int) -> None:
"""
清理用户关联数据(用于删除用户前)
Args:
user_id: 要清理的用户ID
"""
logger.info(f"清理用户 {user_id} 的关联数据...")
# 删除用户的考试记录
await self.db.execute(
text("DELETE FROM exam_results WHERE exam_id IN (SELECT id FROM exams WHERE user_id = :user_id)"),
{"user_id": user_id}
)
await self.db.execute(
text("DELETE FROM exams WHERE user_id = :user_id"),
{"user_id": user_id}
)
# 删除用户的错题记录
await self.db.execute(
text("DELETE FROM exam_mistakes WHERE user_id = :user_id"),
{"user_id": user_id}
)
# 删除用户的能力评估记录
await self.db.execute(
text("DELETE FROM ability_assessments WHERE user_id = :user_id"),
{"user_id": user_id}
)
# 删除用户的岗位关联
await self.db.execute(
text("DELETE FROM position_members WHERE user_id = :user_id"),
{"user_id": user_id}
)
# 删除用户的团队关联
await self.db.execute(
text("DELETE FROM user_teams WHERE user_id = :user_id"),
{"user_id": user_id}
)
# 删除用户的陪练会话
await self.db.execute(
text("DELETE FROM practice_sessions WHERE user_id = :user_id"),
{"user_id": user_id}
)
# 删除用户的任务分配
await self.db.execute(
text("DELETE FROM task_assignments WHERE user_id = :user_id"),
{"user_id": user_id}
)
# 删除用户创建的任务的分配记录
await self.db.execute(
text("DELETE FROM task_assignments WHERE task_id IN (SELECT id FROM tasks WHERE creator_id = :user_id)"),
{"user_id": user_id}
)
# 删除用户创建的任务
await self.db.execute(
text("DELETE FROM tasks WHERE creator_id = :user_id"),
{"user_id": user_id}
)
# 将用户作为负责人的团队的leader_id设为NULL
await self.db.execute(
text("UPDATE teams SET leader_id = NULL WHERE leader_id = :user_id"),
{"user_id": user_id}
)
logger.info(f"用户 {user_id} 的关联数据清理完成")
async def incremental_sync_employees(self) -> Dict[str, Any]:
"""
增量同步员工数据
- 新增钉钉有但系统没有的员工
- 删除系统有但钉钉没有的员工(物理删除)
- 跳过两边都存在的员工(不做任何修改)
Returns:
同步结果统计
"""
logger.info("=" * 60)
logger.info("开始增量员工同步")
logger.info("=" * 60)
stats = {
'added_count': 0,
'deleted_count': 0,
'skipped_count': 0,
'added_users': [],
'deleted_users': [],
'errors': [],
'start_time': datetime.now()
}
try:
# 1. 获取钉钉在职员工数据
dingtalk_employees = await self.fetch_employees_from_dingtalk()
dingtalk_phones = {emp.get('phone') for emp in dingtalk_employees if emp.get('phone')}
logger.info(f"钉钉在职员工数量: {len(dingtalk_phones)}")
# 2. 获取系统现有用户排除admin和已软删除的
stmt = select(User).where(
User.is_deleted == False,
User.username != 'admin'
)
result = await self.db.execute(stmt)
system_users = result.scalars().all()
system_phones = {user.phone for user in system_users if user.phone}
logger.info(f"系统现有员工数量排除admin: {len(system_phones)}")
# 3. 计算需要新增、删除、跳过的员工
phones_to_add = dingtalk_phones - system_phones
phones_to_delete = system_phones - dingtalk_phones
phones_to_skip = dingtalk_phones & system_phones
logger.info(f"待新增: {len(phones_to_add)}, 待删除: {len(phones_to_delete)}, 跳过: {len(phones_to_skip)}")
stats['skipped_count'] = len(phones_to_skip)
# 4. 新增员工
for employee in dingtalk_employees:
phone = employee.get('phone')
if not phone or phone not in phones_to_add:
continue
try:
# 创建用户
user = await self.create_user(employee)
if not user:
continue
stats['added_count'] += 1
stats['added_users'].append({
'full_name': user.full_name,
'phone': user.phone,
'role': user.role
})
# 创建部门团队
department = employee.get('department')
if department:
team = await self.create_or_get_team(
department,
leader_id=user.id if employee.get('is_leader') else None
)
if team:
# 用SQL直接插入user_teams关联表避免懒加载问题
await self._add_user_to_team(user.id, team.id)
logger.info(f"关联用户 {user.full_name} 到团队 {team.name}")
# 创建岗位
position_name = employee.get('position')
if position_name:
position = await self.create_or_get_position(position_name)
if position:
# 检查是否已经关联
stmt = select(PositionMember).where(
PositionMember.position_id == position.id,
PositionMember.user_id == user.id,
PositionMember.is_deleted == False
)
result = await self.db.execute(stmt)
existing_member = result.scalar_one_or_none()
if not existing_member:
position_member = PositionMember(
position_id=position.id,
user_id=user.id,
role='member'
)
self.db.add(position_member)
logger.info(f"关联用户 {user.full_name} 到岗位 {position.name}")
logger.info(f"✅ 新增员工: {user.full_name} ({user.phone})")
except Exception as e:
error_msg = f"新增员工 {employee.get('full_name')} 失败: {str(e)}"
logger.error(error_msg)
stats['errors'].append(error_msg)
continue
# 5. 删除离职员工(物理删除)
# 先flush之前的新增操作避免与删除操作冲突
await self.db.flush()
# 收集需要删除的用户ID
users_to_delete = []
for user in system_users:
if user.phone and user.phone in phones_to_delete:
# 双重保护确保不删除admin
if user.username == 'admin' or user.role == 'admin':
logger.warning(f"⚠️ 跳过删除管理员账户: {user.username}")
continue
users_to_delete.append({
'id': user.id,
'full_name': user.full_name,
'phone': user.phone,
'username': user.username
})
# 批量删除用户及其关联数据
for user_info in users_to_delete:
try:
user_id = user_info['id']
# 先清理关联数据(外键约束)
await self._cleanup_user_related_data(user_id)
# 用SQL直接删除用户避免ORM的级联操作冲突
await self.db.execute(
text("DELETE FROM users WHERE id = :user_id"),
{"user_id": user_id}
)
stats['deleted_users'].append({
'full_name': user_info['full_name'],
'phone': user_info['phone'],
'username': user_info['username']
})
stats['deleted_count'] += 1
logger.info(f"🗑️ 删除离职员工: {user_info['full_name']} ({user_info['phone']})")
except Exception as e:
error_msg = f"删除员工 {user_info['full_name']} 失败: {str(e)}"
logger.error(error_msg)
stats['errors'].append(error_msg)
continue
# 6. 提交所有更改
await self.db.commit()
logger.info("✅ 数据库事务已提交")
except Exception as e:
logger.error(f"增量同步失败: {str(e)}")
await self.db.rollback()
stats['errors'].append(str(e))
raise
finally:
stats['end_time'] = datetime.now()
stats['duration'] = (stats['end_time'] - stats['start_time']).total_seconds()
# 7. 输出统计信息
logger.info("=" * 60)
logger.info("增量同步完成统计")
logger.info("=" * 60)
logger.info(f"新增员工: {stats['added_count']}")
logger.info(f"删除员工: {stats['deleted_count']}")
logger.info(f"跳过员工: {stats['skipped_count']}")
logger.info(f"耗时: {stats['duration']:.2f}")
if stats['errors']:
logger.warning(f"错误数量: {len(stats['errors'])}")
return stats