All checks were successful
continuous-integration/drone/push Build is passing
问题原因:钉钉应用缺少手机号读取权限,导致返回的员工手机号全为空, 同步逻辑认为"钉钉没有这些员工"而错误删除了系统中的用户。 修复方案: 1. 增加安全检查:如果钉钉返回员工但手机号全为空,跳过删除操作 2. 使用双重匹配:同时考虑手机号和钉钉ID进行员工匹配 3. 增强日志:记录有手机号和钉钉ID的员工数量 4. 增加保护:只有手机号和钉钉ID都不匹配时才删除
823 lines
32 KiB
Python
823 lines
32 KiB
Python
"""
|
||
员工同步服务
|
||
从钉钉开放 API 同步员工数据到考培练系统
|
||
"""
|
||
|
||
from typing import List, Dict, Any, Optional, Tuple
|
||
from datetime import datetime
|
||
from sqlalchemy import select, text
|
||
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
|
||
from sqlalchemy.orm import selectinload
|
||
import asyncio
|
||
|
||
from app.core.logger import get_logger
|
||
from app.core.security import get_password_hash
|
||
from app.models.user import User, Team
|
||
from app.models.position import Position
|
||
from app.models.position_member import PositionMember
|
||
from app.schemas.user import UserCreate
|
||
|
||
logger = get_logger(__name__)
|
||
|
||
|
||
class EmployeeSyncService:
|
||
"""员工同步服务"""
|
||
|
||
def __init__(self, db: AsyncSession, tenant_id: int = 1):
|
||
self.db = db
|
||
self.tenant_id = tenant_id
|
||
self._dingtalk_config = None
|
||
|
||
async def _get_dingtalk_config(self) -> Dict[str, str]:
|
||
"""从数据库获取钉钉 API 配置(复用免密登录配置)"""
|
||
if self._dingtalk_config:
|
||
return self._dingtalk_config
|
||
|
||
try:
|
||
# 从 dingtalk 配置组读取(与免密登录共用)
|
||
result = await self.db.execute(
|
||
text("""
|
||
SELECT config_key, config_value
|
||
FROM tenant_configs
|
||
WHERE tenant_id = :tenant_id
|
||
AND config_group = 'dingtalk'
|
||
"""),
|
||
{"tenant_id": self.tenant_id}
|
||
)
|
||
rows = result.fetchall()
|
||
|
||
config = {}
|
||
for key, value in rows:
|
||
# 转换 key 名称以匹配 DingTalkService 需要的格式
|
||
if key == 'DINGTALK_CORP_ID':
|
||
config['CORP_ID'] = value
|
||
elif key == 'DINGTALK_APP_KEY':
|
||
config['CLIENT_ID'] = value
|
||
elif key == 'DINGTALK_APP_SECRET':
|
||
config['CLIENT_SECRET'] = value
|
||
|
||
self._dingtalk_config = config
|
||
return config
|
||
except Exception as e:
|
||
logger.error(f"获取钉钉配置失败: {e}")
|
||
return {}
|
||
|
||
async def __aenter__(self):
|
||
"""异步上下文管理器入口"""
|
||
# 预加载钉钉配置
|
||
await self._get_dingtalk_config()
|
||
return self
|
||
|
||
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
||
"""异步上下文管理器出口"""
|
||
pass
|
||
|
||
async def fetch_employees_from_dingtalk(self) -> List[Dict[str, Any]]:
|
||
"""
|
||
从钉钉 API 获取在职员工数据
|
||
|
||
Returns:
|
||
员工数据列表
|
||
"""
|
||
config = await self._get_dingtalk_config()
|
||
|
||
corp_id = config.get('CORP_ID')
|
||
client_id = config.get('CLIENT_ID')
|
||
client_secret = config.get('CLIENT_SECRET')
|
||
|
||
if not all([corp_id, client_id, client_secret]):
|
||
raise Exception("钉钉 API 配置不完整,请先配置 CorpId、ClientId、ClientSecret")
|
||
|
||
from app.services.dingtalk_service import DingTalkService
|
||
|
||
dingtalk = DingTalkService(
|
||
corp_id=corp_id,
|
||
client_id=client_id,
|
||
client_secret=client_secret
|
||
)
|
||
|
||
employees = await dingtalk.get_all_employees()
|
||
|
||
# 过滤在职员工
|
||
active_employees = [emp for emp in employees if emp.get('is_active', True)]
|
||
logger.info(f"获取到 {len(active_employees)} 条在职员工数据")
|
||
|
||
return active_employees
|
||
|
||
def generate_email(self, phone: str, original_email: Optional[str]) -> Optional[str]:
|
||
"""
|
||
生成邮箱地址
|
||
如果原始邮箱为空或格式无效,生成 {手机号}@rxm.com
|
||
|
||
Args:
|
||
phone: 手机号
|
||
original_email: 原始邮箱
|
||
|
||
Returns:
|
||
邮箱地址
|
||
"""
|
||
if original_email and original_email.strip():
|
||
email = original_email.strip()
|
||
# 验证邮箱格式:检查是否有@后直接跟点号等无效格式
|
||
if '@' in email and not email.startswith('@') and not email.endswith('@'):
|
||
# 检查@后面是否直接是点号
|
||
at_index = email.index('@')
|
||
if at_index + 1 < len(email) and email[at_index + 1] != '.':
|
||
# 检查是否有域名部分
|
||
domain = email[at_index + 1:]
|
||
if '.' in domain and len(domain) > 2:
|
||
return email
|
||
|
||
# 如果邮箱无效或为空,使用手机号生成
|
||
if phone:
|
||
return f"{phone}@rxm.com"
|
||
|
||
return None
|
||
|
||
def determine_role(self, is_leader: bool) -> str:
|
||
"""
|
||
确定用户角色
|
||
|
||
Args:
|
||
is_leader: 是否领导
|
||
|
||
Returns:
|
||
角色: manager 或 trainee
|
||
"""
|
||
return 'manager' if is_leader else 'trainee'
|
||
|
||
async def create_or_get_team(self, department_name: str, leader_id: Optional[int] = None) -> Team:
|
||
"""
|
||
创建或获取部门团队
|
||
|
||
Args:
|
||
department_name: 部门名称
|
||
leader_id: 负责人ID
|
||
|
||
Returns:
|
||
团队对象
|
||
"""
|
||
if not department_name or department_name.strip() == '':
|
||
return None
|
||
|
||
department_name = department_name.strip()
|
||
|
||
# 检查团队是否已存在(使用limit(1)避免重复数据报错)
|
||
stmt = select(Team).where(
|
||
Team.name == department_name,
|
||
Team.is_deleted == False
|
||
).limit(1)
|
||
result = await self.db.execute(stmt)
|
||
team = result.scalar_one_or_none()
|
||
|
||
if team:
|
||
# 更新负责人
|
||
if leader_id and not team.leader_id:
|
||
team.leader_id = leader_id
|
||
logger.info(f"更新团队 {department_name} 的负责人")
|
||
return team
|
||
|
||
# 创建新团队
|
||
# 生成团队代码:使用部门名称的拼音首字母或简化处理
|
||
team_code = f"DEPT_{hash(department_name) % 100000:05d}"
|
||
|
||
team = Team(
|
||
name=department_name,
|
||
code=team_code,
|
||
description=f"{department_name}",
|
||
team_type='department',
|
||
is_active=True,
|
||
leader_id=leader_id
|
||
)
|
||
|
||
self.db.add(team)
|
||
await self.db.flush() # 获取ID但不提交
|
||
|
||
logger.info(f"创建团队: {department_name} (ID: {team.id})")
|
||
return team
|
||
|
||
async def create_or_get_position(self, position_name: str) -> Optional[Position]:
|
||
"""
|
||
创建或获取岗位
|
||
|
||
Args:
|
||
position_name: 岗位名称
|
||
|
||
Returns:
|
||
岗位对象
|
||
"""
|
||
if not position_name or position_name.strip() == '':
|
||
return None
|
||
|
||
position_name = position_name.strip()
|
||
|
||
# 检查岗位是否已存在(使用first()避免重复数据报错)
|
||
stmt = select(Position).where(
|
||
Position.name == position_name,
|
||
Position.is_deleted == False
|
||
).limit(1)
|
||
result = await self.db.execute(stmt)
|
||
position = result.scalar_one_or_none()
|
||
|
||
if position:
|
||
return position
|
||
|
||
# 创建新岗位
|
||
position_code = f"POS_{hash(position_name) % 100000:05d}"
|
||
|
||
position = Position(
|
||
name=position_name,
|
||
code=position_code,
|
||
description=f"{position_name}",
|
||
status='active'
|
||
)
|
||
|
||
self.db.add(position)
|
||
await self.db.flush()
|
||
|
||
logger.info(f"创建岗位: {position_name} (ID: {position.id})")
|
||
return position
|
||
|
||
async def create_user(self, employee_data: Dict[str, Any]) -> Tuple[Optional[User], str]:
|
||
"""
|
||
创建用户
|
||
|
||
Args:
|
||
employee_data: 员工数据
|
||
|
||
Returns:
|
||
(用户对象, 状态): 状态为 'created'/'existing'/'restored'/'skipped'
|
||
"""
|
||
phone = employee_data.get('phone')
|
||
full_name = employee_data.get('full_name')
|
||
|
||
if not phone:
|
||
logger.warning(f"员工 {full_name} 没有手机号,跳过")
|
||
return None, 'skipped'
|
||
|
||
# 检查用户是否已存在(通过手机号,包括已软删除的)
|
||
stmt = select(User).where(User.phone == phone)
|
||
result = await self.db.execute(stmt)
|
||
existing_user = result.scalar_one_or_none()
|
||
|
||
if existing_user:
|
||
# 如果是软删除用户,恢复它
|
||
if existing_user.is_deleted:
|
||
existing_user.is_deleted = False
|
||
existing_user.full_name = full_name or existing_user.full_name
|
||
existing_user.is_active = True
|
||
dingtalk_id = employee_data.get('dingtalk_id')
|
||
if dingtalk_id:
|
||
existing_user.dingtalk_id = dingtalk_id
|
||
logger.info(f"恢复软删除用户: {phone} ({full_name})")
|
||
return existing_user, 'restored'
|
||
|
||
# 如果用户已存在但没有dingtalk_id,则更新
|
||
dingtalk_id = employee_data.get('dingtalk_id')
|
||
if dingtalk_id and not existing_user.dingtalk_id:
|
||
existing_user.dingtalk_id = dingtalk_id
|
||
logger.info(f"更新用户 {phone} 的钉钉ID: {dingtalk_id}")
|
||
logger.debug(f"用户已存在: {phone} ({full_name})")
|
||
return existing_user, 'existing'
|
||
|
||
# 生成邮箱
|
||
email = self.generate_email(phone, employee_data.get('email'))
|
||
|
||
# 检查邮箱是否已被其他用户使用(避免唯一索引冲突)
|
||
if email:
|
||
email_check_stmt = select(User).where(
|
||
User.email == email,
|
||
User.is_deleted == False
|
||
)
|
||
email_result = await self.db.execute(email_check_stmt)
|
||
if email_result.scalar_one_or_none():
|
||
# 邮箱已存在,使用手机号生成唯一邮箱
|
||
email = f"{phone}@rxm.com"
|
||
logger.warning(f"邮箱 {employee_data.get('email')} 已被使用,为员工 {full_name} 生成新邮箱: {email}")
|
||
|
||
# 确定角色
|
||
role = self.determine_role(employee_data.get('is_leader', False))
|
||
|
||
# 创建用户
|
||
user = User(
|
||
username=phone, # 使用手机号作为用户名
|
||
email=email,
|
||
phone=phone,
|
||
hashed_password=get_password_hash('123456'), # 初始密码
|
||
full_name=full_name,
|
||
role=role,
|
||
is_active=True,
|
||
is_verified=True,
|
||
dingtalk_id=employee_data.get('dingtalk_id'), # 钉钉用户ID
|
||
)
|
||
|
||
self.db.add(user)
|
||
await self.db.flush()
|
||
|
||
logger.info(f"创建用户: {phone} ({full_name}) - 角色: {role}")
|
||
return user, 'created'
|
||
|
||
async def sync_employees(self) -> Dict[str, Any]:
|
||
"""
|
||
执行完整的员工同步流程
|
||
|
||
Returns:
|
||
同步结果统计
|
||
"""
|
||
logger.info("=" * 60)
|
||
logger.info("开始员工同步")
|
||
logger.info("=" * 60)
|
||
|
||
stats = {
|
||
'total_employees': 0,
|
||
'users_created': 0,
|
||
'users_existing': 0,
|
||
'users_restored': 0,
|
||
'users_departed': 0,
|
||
'users_skipped': 0,
|
||
'teams_created': 0,
|
||
'positions_created': 0,
|
||
'errors': [],
|
||
'start_time': datetime.now()
|
||
}
|
||
|
||
try:
|
||
# 1. 获取员工数据
|
||
employees = await self.fetch_employees_from_dingtalk()
|
||
stats['total_employees'] = len(employees)
|
||
|
||
if not employees:
|
||
logger.warning("没有获取到员工数据")
|
||
return stats
|
||
|
||
# 2. 创建用户和相关数据
|
||
for employee in employees:
|
||
try:
|
||
# 创建用户
|
||
user, status = await self.create_user(employee)
|
||
if not user:
|
||
stats['users_skipped'] += 1
|
||
continue
|
||
|
||
# 根据状态统计
|
||
if status == 'created':
|
||
stats['users_created'] += 1
|
||
elif status == 'existing':
|
||
stats['users_existing'] += 1
|
||
elif status == 'restored':
|
||
stats['users_restored'] += 1
|
||
|
||
# 创建部门团队
|
||
department = employee.get('department')
|
||
if department:
|
||
team = await self.create_or_get_team(
|
||
department,
|
||
leader_id=user.id if employee.get('is_leader') else None
|
||
)
|
||
if team:
|
||
# 用SQL直接插入user_teams关联表(避免懒加载问题)
|
||
await self._add_user_to_team(user.id, team.id)
|
||
logger.info(f"关联用户 {user.full_name} 到团队 {team.name}")
|
||
|
||
# 创建岗位
|
||
position_name = employee.get('position')
|
||
if position_name:
|
||
position = await self.create_or_get_position(position_name)
|
||
if position:
|
||
# 检查是否已经关联
|
||
stmt = select(PositionMember).where(
|
||
PositionMember.position_id == position.id,
|
||
PositionMember.user_id == user.id,
|
||
PositionMember.is_deleted == False
|
||
)
|
||
result = await self.db.execute(stmt)
|
||
existing_member = result.scalar_one_or_none()
|
||
|
||
if not existing_member:
|
||
# 创建岗位成员关联
|
||
position_member = PositionMember(
|
||
position_id=position.id,
|
||
user_id=user.id,
|
||
role='member'
|
||
)
|
||
self.db.add(position_member)
|
||
logger.info(f"关联用户 {user.full_name} 到岗位 {position.name}")
|
||
|
||
except Exception as e:
|
||
error_msg = f"处理员工 {employee.get('full_name')} 时出错: {str(e)}"
|
||
logger.error(error_msg)
|
||
stats['errors'].append(error_msg)
|
||
continue
|
||
|
||
# 3. 处理离职员工(软删除)
|
||
dingtalk_phones = {emp.get('phone') for emp in employees if emp.get('phone')}
|
||
|
||
# 获取系统中所有活跃用户(排除 admin)
|
||
stmt = select(User).where(
|
||
User.is_deleted == False,
|
||
User.is_active == True,
|
||
User.username != 'admin',
|
||
User.role != 'admin'
|
||
)
|
||
result = await self.db.execute(stmt)
|
||
system_users = result.scalars().all()
|
||
|
||
# 找出离职员工(系统有但钉钉没有)
|
||
for user in system_users:
|
||
if user.phone and user.phone not in dingtalk_phones:
|
||
# 软删除:标记为离职
|
||
user.is_active = False
|
||
user.is_deleted = True
|
||
stats['users_departed'] += 1
|
||
logger.info(f"🚪 标记离职员工: {user.full_name} ({user.phone})")
|
||
|
||
# 4. 提交所有更改
|
||
await self.db.commit()
|
||
logger.info("✅ 数据库事务已提交")
|
||
|
||
except Exception as e:
|
||
logger.error(f"员工同步失败: {str(e)}")
|
||
await self.db.rollback()
|
||
stats['errors'].append(str(e))
|
||
raise
|
||
|
||
finally:
|
||
stats['end_time'] = datetime.now()
|
||
stats['duration'] = (stats['end_time'] - stats['start_time']).total_seconds()
|
||
|
||
# 5. 输出统计信息
|
||
logger.info("=" * 60)
|
||
logger.info("同步完成统计")
|
||
logger.info("=" * 60)
|
||
logger.info(f"钉钉在职员工: {stats['total_employees']}")
|
||
logger.info(f"新增用户: {stats['users_created']}")
|
||
logger.info(f"已存在用户: {stats['users_existing']}")
|
||
logger.info(f"恢复用户: {stats['users_restored']}")
|
||
logger.info(f"离职处理: {stats['users_departed']}")
|
||
logger.info(f"跳过用户: {stats['users_skipped']}")
|
||
logger.info(f"耗时: {stats['duration']:.2f}秒")
|
||
|
||
if stats['errors']:
|
||
logger.warning(f"错误数量: {len(stats['errors'])}")
|
||
for error in stats['errors']:
|
||
logger.warning(f" - {error}")
|
||
|
||
return stats
|
||
|
||
async def preview_sync_data(self) -> Dict[str, Any]:
|
||
"""
|
||
预览待同步的员工数据(不执行实际同步)
|
||
|
||
Returns:
|
||
预览数据
|
||
"""
|
||
logger.info("预览待同步员工数据...")
|
||
|
||
employees = await self.fetch_employees_from_dingtalk()
|
||
|
||
preview = {
|
||
'total_count': len(employees),
|
||
'employees': [],
|
||
'departments': set(),
|
||
'positions': set(),
|
||
'leaders_count': 0,
|
||
'trainees_count': 0
|
||
}
|
||
|
||
for emp in employees:
|
||
role = self.determine_role(emp.get('is_leader', False))
|
||
email = self.generate_email(emp.get('phone'), emp.get('email'))
|
||
|
||
preview['employees'].append({
|
||
'full_name': emp.get('full_name'),
|
||
'phone': emp.get('phone'),
|
||
'email': email,
|
||
'department': emp.get('department'),
|
||
'position': emp.get('position'),
|
||
'role': role,
|
||
'is_leader': emp.get('is_leader')
|
||
})
|
||
|
||
if emp.get('department'):
|
||
preview['departments'].add(emp.get('department'))
|
||
if emp.get('position'):
|
||
preview['positions'].add(emp.get('position'))
|
||
|
||
if role == 'manager':
|
||
preview['leaders_count'] += 1
|
||
else:
|
||
preview['trainees_count'] += 1
|
||
|
||
preview['departments'] = list(preview['departments'])
|
||
preview['positions'] = list(preview['positions'])
|
||
|
||
return preview
|
||
|
||
async def _add_user_to_team(self, user_id: int, team_id: int) -> None:
|
||
"""
|
||
将用户添加到团队(直接SQL操作,避免懒加载问题)
|
||
|
||
Args:
|
||
user_id: 用户ID
|
||
team_id: 团队ID
|
||
"""
|
||
# 先检查是否已存在关联
|
||
check_result = await self.db.execute(
|
||
text("SELECT 1 FROM user_teams WHERE user_id = :user_id AND team_id = :team_id"),
|
||
{"user_id": user_id, "team_id": team_id}
|
||
)
|
||
if check_result.scalar() is None:
|
||
# 不存在则插入
|
||
await self.db.execute(
|
||
text("INSERT INTO user_teams (user_id, team_id, role) VALUES (:user_id, :team_id, 'member')"),
|
||
{"user_id": user_id, "team_id": team_id}
|
||
)
|
||
|
||
async def _cleanup_user_related_data(self, user_id: int) -> None:
|
||
"""
|
||
清理用户关联数据(用于删除用户前)
|
||
|
||
Args:
|
||
user_id: 要清理的用户ID
|
||
"""
|
||
logger.info(f"清理用户 {user_id} 的关联数据...")
|
||
|
||
# 删除用户的考试记录
|
||
await self.db.execute(
|
||
text("DELETE FROM exam_results WHERE exam_id IN (SELECT id FROM exams WHERE user_id = :user_id)"),
|
||
{"user_id": user_id}
|
||
)
|
||
await self.db.execute(
|
||
text("DELETE FROM exams WHERE user_id = :user_id"),
|
||
{"user_id": user_id}
|
||
)
|
||
|
||
# 删除用户的错题记录
|
||
await self.db.execute(
|
||
text("DELETE FROM exam_mistakes WHERE user_id = :user_id"),
|
||
{"user_id": user_id}
|
||
)
|
||
|
||
# 删除用户的能力评估记录
|
||
await self.db.execute(
|
||
text("DELETE FROM ability_assessments WHERE user_id = :user_id"),
|
||
{"user_id": user_id}
|
||
)
|
||
|
||
# 删除用户的岗位关联
|
||
await self.db.execute(
|
||
text("DELETE FROM position_members WHERE user_id = :user_id"),
|
||
{"user_id": user_id}
|
||
)
|
||
|
||
# 删除用户的团队关联
|
||
await self.db.execute(
|
||
text("DELETE FROM user_teams WHERE user_id = :user_id"),
|
||
{"user_id": user_id}
|
||
)
|
||
|
||
# 删除用户的陪练会话
|
||
await self.db.execute(
|
||
text("DELETE FROM practice_sessions WHERE user_id = :user_id"),
|
||
{"user_id": user_id}
|
||
)
|
||
|
||
# 删除用户的任务分配
|
||
await self.db.execute(
|
||
text("DELETE FROM task_assignments WHERE user_id = :user_id"),
|
||
{"user_id": user_id}
|
||
)
|
||
|
||
# 删除用户创建的任务的分配记录
|
||
await self.db.execute(
|
||
text("DELETE FROM task_assignments WHERE task_id IN (SELECT id FROM tasks WHERE creator_id = :user_id)"),
|
||
{"user_id": user_id}
|
||
)
|
||
# 删除用户创建的任务
|
||
await self.db.execute(
|
||
text("DELETE FROM tasks WHERE creator_id = :user_id"),
|
||
{"user_id": user_id}
|
||
)
|
||
|
||
# 将用户作为负责人的团队的leader_id设为NULL
|
||
await self.db.execute(
|
||
text("UPDATE teams SET leader_id = NULL WHERE leader_id = :user_id"),
|
||
{"user_id": user_id}
|
||
)
|
||
|
||
logger.info(f"用户 {user_id} 的关联数据清理完成")
|
||
|
||
async def incremental_sync_employees(self) -> Dict[str, Any]:
|
||
"""
|
||
增量同步员工数据
|
||
- 新增钉钉有但系统没有的员工
|
||
- 删除系统有但钉钉没有的员工(软删除)
|
||
- 跳过两边都存在的员工(不做任何修改)
|
||
|
||
Returns:
|
||
同步结果统计
|
||
"""
|
||
logger.info("=" * 60)
|
||
logger.info("开始增量员工同步")
|
||
logger.info("=" * 60)
|
||
|
||
stats = {
|
||
'added_count': 0,
|
||
'deleted_count': 0,
|
||
'skipped_count': 0,
|
||
'added_users': [],
|
||
'deleted_users': [],
|
||
'errors': [],
|
||
'start_time': datetime.now()
|
||
}
|
||
|
||
try:
|
||
# 1. 获取钉钉在职员工数据
|
||
dingtalk_employees = await self.fetch_employees_from_dingtalk()
|
||
|
||
# 使用手机号和钉钉ID双重匹配
|
||
dingtalk_phones = {emp.get('phone') for emp in dingtalk_employees if emp.get('phone')}
|
||
dingtalk_ids = {emp.get('dingtalk_id') for emp in dingtalk_employees if emp.get('dingtalk_id')}
|
||
|
||
logger.info(f"钉钉在职员工数量: {len(dingtalk_employees)}")
|
||
logger.info(f"有手机号的员工: {len(dingtalk_phones)}")
|
||
logger.info(f"有钉钉ID的员工: {len(dingtalk_ids)}")
|
||
|
||
# 安全检查:如果钉钉返回了员工但手机号全为空,可能是权限问题,跳过删除操作
|
||
skip_delete = False
|
||
if len(dingtalk_employees) > 0 and len(dingtalk_phones) == 0:
|
||
logger.warning("⚠️ 钉钉返回员工数据但手机号全为空,可能是钉钉应用缺少手机号读取权限!")
|
||
logger.warning("⚠️ 跳过离职员工处理,避免误删")
|
||
skip_delete = True
|
||
stats['errors'].append("钉钉应用可能缺少手机号读取权限,跳过删除操作")
|
||
|
||
# 2. 获取系统现有用户(排除admin和已软删除的)
|
||
stmt = select(User).where(
|
||
User.is_deleted == False,
|
||
User.username != 'admin'
|
||
)
|
||
result = await self.db.execute(stmt)
|
||
system_users = result.scalars().all()
|
||
system_phones = {user.phone for user in system_users if user.phone}
|
||
system_dingtalk_ids = {user.dingtalk_id for user in system_users if user.dingtalk_id}
|
||
logger.info(f"系统现有员工数量(排除admin): {len(system_users)}")
|
||
logger.info(f"系统有手机号的员工: {len(system_phones)}")
|
||
logger.info(f"系统有钉钉ID的员工: {len(system_dingtalk_ids)}")
|
||
|
||
# 3. 计算需要新增、删除、跳过的员工(同时考虑手机号和钉钉ID)
|
||
# 新增: 钉钉有但系统没有(手机号或钉钉ID都不存在)
|
||
phones_to_add = dingtalk_phones - system_phones
|
||
ids_to_add = dingtalk_ids - system_dingtalk_ids
|
||
|
||
# 删除: 系统有但钉钉没有(手机号和钉钉ID都不在钉钉列表中)
|
||
phones_to_delete = system_phones - dingtalk_phones if not skip_delete else set()
|
||
|
||
# 跳过: 两边都存在
|
||
phones_to_skip = dingtalk_phones & system_phones
|
||
ids_to_skip = dingtalk_ids & system_dingtalk_ids
|
||
|
||
logger.info(f"待新增(手机号): {len(phones_to_add)}, 待删除(手机号): {len(phones_to_delete)}, 跳过(手机号): {len(phones_to_skip)}")
|
||
|
||
stats['skipped_count'] = len(phones_to_skip) + len(ids_to_skip)
|
||
|
||
# 4. 新增员工
|
||
for employee in dingtalk_employees:
|
||
phone = employee.get('phone')
|
||
if not phone or phone not in phones_to_add:
|
||
continue
|
||
|
||
try:
|
||
# 创建用户
|
||
user, status = await self.create_user(employee)
|
||
if not user:
|
||
continue
|
||
|
||
# 只有真正创建的才计入新增
|
||
if status == 'created':
|
||
stats['added_count'] += 1
|
||
stats['added_users'].append({
|
||
'full_name': user.full_name,
|
||
'phone': user.phone,
|
||
'role': user.role
|
||
})
|
||
|
||
# 创建部门团队
|
||
department = employee.get('department')
|
||
if department:
|
||
team = await self.create_or_get_team(
|
||
department,
|
||
leader_id=user.id if employee.get('is_leader') else None
|
||
)
|
||
if team:
|
||
# 用SQL直接插入user_teams关联表(避免懒加载问题)
|
||
await self._add_user_to_team(user.id, team.id)
|
||
logger.info(f"关联用户 {user.full_name} 到团队 {team.name}")
|
||
|
||
# 创建岗位
|
||
position_name = employee.get('position')
|
||
if position_name:
|
||
position = await self.create_or_get_position(position_name)
|
||
if position:
|
||
# 检查是否已经关联
|
||
stmt = select(PositionMember).where(
|
||
PositionMember.position_id == position.id,
|
||
PositionMember.user_id == user.id,
|
||
PositionMember.is_deleted == False
|
||
)
|
||
result = await self.db.execute(stmt)
|
||
existing_member = result.scalar_one_or_none()
|
||
|
||
if not existing_member:
|
||
position_member = PositionMember(
|
||
position_id=position.id,
|
||
user_id=user.id,
|
||
role='member'
|
||
)
|
||
self.db.add(position_member)
|
||
logger.info(f"关联用户 {user.full_name} 到岗位 {position.name}")
|
||
|
||
logger.info(f"✅ 新增员工: {user.full_name} ({user.phone})")
|
||
|
||
except Exception as e:
|
||
error_msg = f"新增员工 {employee.get('full_name')} 失败: {str(e)}"
|
||
logger.error(error_msg)
|
||
stats['errors'].append(error_msg)
|
||
continue
|
||
|
||
# 5. 处理离职员工(软删除)
|
||
# 先flush之前的新增操作
|
||
await self.db.flush()
|
||
|
||
# 如果跳过删除,则不处理
|
||
if skip_delete:
|
||
logger.info("⚠️ 由于安全检查未通过,跳过离职员工处理")
|
||
else:
|
||
# 标记离职员工(需要手机号和钉钉ID都不在钉钉列表中才删除)
|
||
for user in system_users:
|
||
# 双重保护:确保不删除admin
|
||
if user.username == 'admin' or user.role == 'admin':
|
||
continue
|
||
|
||
# 检查用户是否在钉钉列表中(手机号或钉钉ID匹配任一即视为在职)
|
||
in_dingtalk = False
|
||
if user.phone and user.phone in dingtalk_phones:
|
||
in_dingtalk = True
|
||
if user.dingtalk_id and user.dingtalk_id in dingtalk_ids:
|
||
in_dingtalk = True
|
||
|
||
if in_dingtalk:
|
||
continue # 在钉钉中,跳过
|
||
|
||
# 额外安全检查:如果钉钉没有返回有效数据,不删除
|
||
if len(dingtalk_phones) == 0 and len(dingtalk_ids) == 0:
|
||
logger.warning(f"⚠️ 钉钉数据为空,跳过删除用户: {user.full_name}")
|
||
continue
|
||
|
||
try:
|
||
# 软删除:标记为离职
|
||
user.is_active = False
|
||
user.is_deleted = True
|
||
|
||
stats['deleted_users'].append({
|
||
'full_name': user.full_name,
|
||
'phone': user.phone,
|
||
'username': user.username
|
||
})
|
||
stats['deleted_count'] += 1
|
||
logger.info(f"🚪 标记离职员工: {user.full_name} ({user.phone})")
|
||
|
||
except Exception as e:
|
||
error_msg = f"处理离职员工 {user.full_name} 失败: {str(e)}"
|
||
logger.error(error_msg)
|
||
stats['errors'].append(error_msg)
|
||
continue
|
||
|
||
# 6. 提交所有更改
|
||
await self.db.commit()
|
||
logger.info("✅ 数据库事务已提交")
|
||
|
||
except Exception as e:
|
||
logger.error(f"增量同步失败: {str(e)}")
|
||
await self.db.rollback()
|
||
stats['errors'].append(str(e))
|
||
raise
|
||
|
||
finally:
|
||
stats['end_time'] = datetime.now()
|
||
stats['duration'] = (stats['end_time'] - stats['start_time']).total_seconds()
|
||
|
||
# 7. 输出统计信息
|
||
logger.info("=" * 60)
|
||
logger.info("增量同步完成统计")
|
||
logger.info("=" * 60)
|
||
logger.info(f"新增员工: {stats['added_count']}")
|
||
logger.info(f"离职处理: {stats['deleted_count']}")
|
||
logger.info(f"跳过员工: {stats['skipped_count']}")
|
||
logger.info(f"耗时: {stats['duration']:.2f}秒")
|
||
|
||
if stats['errors']:
|
||
logger.warning(f"错误数量: {len(stats['errors'])}")
|
||
|
||
return stats
|
||
|