""" 员工同步服务 从外部钉钉员工表同步员工数据到考培练系统 """ from typing import List, Dict, Any, Optional, Tuple from datetime import datetime from sqlalchemy import select, text from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine from sqlalchemy.orm import selectinload import asyncio from app.core.logger import get_logger from app.core.security import get_password_hash from app.models.user import User, Team from app.models.position import Position from app.models.position_member import PositionMember from app.schemas.user import UserCreate logger = get_logger(__name__) class EmployeeSyncService: """员工同步服务""" # 默认外部数据库连接配置(向后兼容,从环境变量读取) DEFAULT_TABLE_NAME = "v_钉钉员工表" def __init__(self, db: AsyncSession, tenant_id: int = 1): self.db = db self.tenant_id = tenant_id self.external_engine = None self.table_name = self.DEFAULT_TABLE_NAME self._db_url = None async def _get_table_name_from_db(self) -> str: """从数据库获取员工表名配置""" try: result = await self.db.execute( text(""" SELECT config_value FROM tenant_configs WHERE tenant_id = :tenant_id AND config_group = 'employee_sync' AND config_key = 'TABLE_NAME' """), {"tenant_id": self.tenant_id} ) row = result.fetchone() return row[0] if row else self.DEFAULT_TABLE_NAME except Exception: return self.DEFAULT_TABLE_NAME def _get_db_url_from_env(self) -> str: """从环境变量获取数据库连接URL""" import os # 优先使用环境变量中的完整URL db_url = os.environ.get('EMPLOYEE_SYNC_DB_URL', '') if db_url: return db_url # 向后兼容:如果没有配置环境变量,使用默认值 logger.warning("EMPLOYEE_SYNC_DB_URL 环境变量未配置,使用默认数据源") return "mysql+aiomysql://neuron_new:NWxGM6CQoMLKyEszXhfuLBIIo1QbeK@120.77.144.233:29613/neuron_new?charset=utf8mb4" async def __aenter__(self): """异步上下文管理器入口""" self._db_url = self._get_db_url_from_env() self.table_name = await self._get_table_name_from_db() self.external_engine = create_async_engine( self._db_url, echo=False, pool_pre_ping=True, pool_recycle=3600 ) return self async def __aexit__(self, exc_type, exc_val, exc_tb): """异步上下文管理器出口""" if self.external_engine: await self.external_engine.dispose() async def fetch_employees_from_dingtalk(self) -> List[Dict[str, Any]]: """ 从钉钉员工表获取在职员工数据 Returns: 员工数据列表 """ logger.info(f"开始从员工表 {self.table_name} 获取数据...") query = f""" SELECT 员工姓名, 手机号, 邮箱, 所属部门, 职位, 工号, 是否领导, 是否在职, 钉钉用户ID, 入职日期, 工作地点 FROM {self.table_name} WHERE 是否在职 = 1 ORDER BY 员工姓名 """ async with self.external_engine.connect() as conn: result = await conn.execute(text(query)) rows = result.fetchall() employees = [] for row in rows: employees.append({ 'full_name': row[0], 'phone': row[1], 'email': row[2], 'department': row[3], 'position': row[4], 'employee_no': row[5], 'is_leader': bool(row[6]), 'is_active': bool(row[7]), 'dingtalk_id': row[8], 'join_date': row[9], 'work_location': row[10] }) logger.info(f"获取到 {len(employees)} 条在职员工数据") return employees def generate_email(self, phone: str, original_email: Optional[str]) -> Optional[str]: """ 生成邮箱地址 如果原始邮箱为空或格式无效,生成 {手机号}@rxm.com Args: phone: 手机号 original_email: 原始邮箱 Returns: 邮箱地址 """ if original_email and original_email.strip(): email = original_email.strip() # 验证邮箱格式:检查是否有@后直接跟点号等无效格式 if '@' in email and not email.startswith('@') and not email.endswith('@'): # 检查@后面是否直接是点号 at_index = email.index('@') if at_index + 1 < len(email) and email[at_index + 1] != '.': # 检查是否有域名部分 domain = email[at_index + 1:] if '.' in domain and len(domain) > 2: return email # 如果邮箱无效或为空,使用手机号生成 if phone: return f"{phone}@rxm.com" return None def determine_role(self, is_leader: bool) -> str: """ 确定用户角色 Args: is_leader: 是否领导 Returns: 角色: manager 或 trainee """ return 'manager' if is_leader else 'trainee' async def create_or_get_team(self, department_name: str, leader_id: Optional[int] = None) -> Team: """ 创建或获取部门团队 Args: department_name: 部门名称 leader_id: 负责人ID Returns: 团队对象 """ if not department_name or department_name.strip() == '': return None department_name = department_name.strip() # 检查团队是否已存在(使用limit(1)避免重复数据报错) stmt = select(Team).where( Team.name == department_name, Team.is_deleted == False ).limit(1) result = await self.db.execute(stmt) team = result.scalar_one_or_none() if team: # 更新负责人 if leader_id and not team.leader_id: team.leader_id = leader_id logger.info(f"更新团队 {department_name} 的负责人") return team # 创建新团队 # 生成团队代码:使用部门名称的拼音首字母或简化处理 team_code = f"DEPT_{hash(department_name) % 100000:05d}" team = Team( name=department_name, code=team_code, description=f"{department_name}", team_type='department', is_active=True, leader_id=leader_id ) self.db.add(team) await self.db.flush() # 获取ID但不提交 logger.info(f"创建团队: {department_name} (ID: {team.id})") return team async def create_or_get_position(self, position_name: str) -> Optional[Position]: """ 创建或获取岗位 Args: position_name: 岗位名称 Returns: 岗位对象 """ if not position_name or position_name.strip() == '': return None position_name = position_name.strip() # 检查岗位是否已存在(使用first()避免重复数据报错) stmt = select(Position).where( Position.name == position_name, Position.is_deleted == False ).limit(1) result = await self.db.execute(stmt) position = result.scalar_one_or_none() if position: return position # 创建新岗位 position_code = f"POS_{hash(position_name) % 100000:05d}" position = Position( name=position_name, code=position_code, description=f"{position_name}", status='active' ) self.db.add(position) await self.db.flush() logger.info(f"创建岗位: {position_name} (ID: {position.id})") return position async def create_user(self, employee_data: Dict[str, Any]) -> Optional[User]: """ 创建用户 Args: employee_data: 员工数据 Returns: 用户对象或None(如果创建失败) """ phone = employee_data.get('phone') full_name = employee_data.get('full_name') if not phone: logger.warning(f"员工 {full_name} 没有手机号,跳过") return None # 检查用户是否已存在(通过手机号,包括已软删除的) stmt = select(User).where(User.phone == phone) result = await self.db.execute(stmt) existing_user = result.scalar_one_or_none() if existing_user: # 如果是软删除用户,恢复它 if existing_user.is_deleted: existing_user.is_deleted = False existing_user.full_name = full_name or existing_user.full_name existing_user.is_active = True dingtalk_id = employee_data.get('dingtalk_id') if dingtalk_id: existing_user.dingtalk_id = dingtalk_id logger.info(f"恢复软删除用户: {phone} ({full_name})") return existing_user # 如果用户已存在但没有dingtalk_id,则更新 dingtalk_id = employee_data.get('dingtalk_id') if dingtalk_id and not existing_user.dingtalk_id: existing_user.dingtalk_id = dingtalk_id logger.info(f"更新用户 {phone} 的钉钉ID: {dingtalk_id}") logger.info(f"用户已存在: {phone} ({full_name})") return existing_user # 生成邮箱 email = self.generate_email(phone, employee_data.get('email')) # 检查邮箱是否已被其他用户使用(避免唯一索引冲突) if email: email_check_stmt = select(User).where( User.email == email, User.is_deleted == False ) email_result = await self.db.execute(email_check_stmt) if email_result.scalar_one_or_none(): # 邮箱已存在,使用手机号生成唯一邮箱 email = f"{phone}@rxm.com" logger.warning(f"邮箱 {employee_data.get('email')} 已被使用,为员工 {full_name} 生成新邮箱: {email}") # 确定角色 role = self.determine_role(employee_data.get('is_leader', False)) # 创建用户 user = User( username=phone, # 使用手机号作为用户名 email=email, phone=phone, hashed_password=get_password_hash('123456'), # 初始密码 full_name=full_name, role=role, is_active=True, is_verified=True, dingtalk_id=employee_data.get('dingtalk_id'), # 钉钉用户ID ) self.db.add(user) await self.db.flush() logger.info(f"创建用户: {phone} ({full_name}) - 角色: {role}") return user async def sync_employees(self) -> Dict[str, Any]: """ 执行完整的员工同步流程 Returns: 同步结果统计 """ logger.info("=" * 60) logger.info("开始员工同步") logger.info("=" * 60) stats = { 'total_employees': 0, 'users_created': 0, 'users_skipped': 0, 'teams_created': 0, 'positions_created': 0, 'errors': [], 'start_time': datetime.now() } try: # 1. 获取员工数据 employees = await self.fetch_employees_from_dingtalk() stats['total_employees'] = len(employees) if not employees: logger.warning("没有获取到员工数据") return stats # 2. 创建用户和相关数据 for employee in employees: try: # 创建用户 user = await self.create_user(employee) if not user: stats['users_skipped'] += 1 continue stats['users_created'] += 1 # 创建部门团队 department = employee.get('department') if department: team = await self.create_or_get_team( department, leader_id=user.id if employee.get('is_leader') else None ) if team: # 用SQL直接插入user_teams关联表(避免懒加载问题) await self._add_user_to_team(user.id, team.id) logger.info(f"关联用户 {user.full_name} 到团队 {team.name}") # 创建岗位 position_name = employee.get('position') if position_name: position = await self.create_or_get_position(position_name) if position: # 检查是否已经关联 stmt = select(PositionMember).where( PositionMember.position_id == position.id, PositionMember.user_id == user.id, PositionMember.is_deleted == False ) result = await self.db.execute(stmt) existing_member = result.scalar_one_or_none() if not existing_member: # 创建岗位成员关联 position_member = PositionMember( position_id=position.id, user_id=user.id, role='member' ) self.db.add(position_member) logger.info(f"关联用户 {user.full_name} 到岗位 {position.name}") except Exception as e: error_msg = f"处理员工 {employee.get('full_name')} 时出错: {str(e)}" logger.error(error_msg) stats['errors'].append(error_msg) continue # 3. 提交所有更改 await self.db.commit() logger.info("✅ 数据库事务已提交") except Exception as e: logger.error(f"员工同步失败: {str(e)}") await self.db.rollback() stats['errors'].append(str(e)) raise finally: stats['end_time'] = datetime.now() stats['duration'] = (stats['end_time'] - stats['start_time']).total_seconds() # 4. 输出统计信息 logger.info("=" * 60) logger.info("同步完成统计") logger.info("=" * 60) logger.info(f"总员工数: {stats['total_employees']}") logger.info(f"创建用户: {stats['users_created']}") logger.info(f"跳过用户: {stats['users_skipped']}") logger.info(f"耗时: {stats['duration']:.2f}秒") if stats['errors']: logger.warning(f"错误数量: {len(stats['errors'])}") for error in stats['errors']: logger.warning(f" - {error}") return stats async def preview_sync_data(self) -> Dict[str, Any]: """ 预览待同步的员工数据(不执行实际同步) Returns: 预览数据 """ logger.info("预览待同步员工数据...") employees = await self.fetch_employees_from_dingtalk() preview = { 'total_count': len(employees), 'employees': [], 'departments': set(), 'positions': set(), 'leaders_count': 0, 'trainees_count': 0 } for emp in employees: role = self.determine_role(emp.get('is_leader', False)) email = self.generate_email(emp.get('phone'), emp.get('email')) preview['employees'].append({ 'full_name': emp.get('full_name'), 'phone': emp.get('phone'), 'email': email, 'department': emp.get('department'), 'position': emp.get('position'), 'role': role, 'is_leader': emp.get('is_leader') }) if emp.get('department'): preview['departments'].add(emp.get('department')) if emp.get('position'): preview['positions'].add(emp.get('position')) if role == 'manager': preview['leaders_count'] += 1 else: preview['trainees_count'] += 1 preview['departments'] = list(preview['departments']) preview['positions'] = list(preview['positions']) return preview async def _add_user_to_team(self, user_id: int, team_id: int) -> None: """ 将用户添加到团队(直接SQL操作,避免懒加载问题) Args: user_id: 用户ID team_id: 团队ID """ # 先检查是否已存在关联 check_result = await self.db.execute( text("SELECT 1 FROM user_teams WHERE user_id = :user_id AND team_id = :team_id"), {"user_id": user_id, "team_id": team_id} ) if check_result.scalar() is None: # 不存在则插入 await self.db.execute( text("INSERT INTO user_teams (user_id, team_id, role) VALUES (:user_id, :team_id, 'member')"), {"user_id": user_id, "team_id": team_id} ) async def _cleanup_user_related_data(self, user_id: int) -> None: """ 清理用户关联数据(用于删除用户前) Args: user_id: 要清理的用户ID """ logger.info(f"清理用户 {user_id} 的关联数据...") # 删除用户的考试记录 await self.db.execute( text("DELETE FROM exam_results WHERE exam_id IN (SELECT id FROM exams WHERE user_id = :user_id)"), {"user_id": user_id} ) await self.db.execute( text("DELETE FROM exams WHERE user_id = :user_id"), {"user_id": user_id} ) # 删除用户的错题记录 await self.db.execute( text("DELETE FROM exam_mistakes WHERE user_id = :user_id"), {"user_id": user_id} ) # 删除用户的能力评估记录 await self.db.execute( text("DELETE FROM ability_assessments WHERE user_id = :user_id"), {"user_id": user_id} ) # 删除用户的岗位关联 await self.db.execute( text("DELETE FROM position_members WHERE user_id = :user_id"), {"user_id": user_id} ) # 删除用户的团队关联 await self.db.execute( text("DELETE FROM user_teams WHERE user_id = :user_id"), {"user_id": user_id} ) # 删除用户的陪练会话 await self.db.execute( text("DELETE FROM practice_sessions WHERE user_id = :user_id"), {"user_id": user_id} ) # 删除用户的任务分配 await self.db.execute( text("DELETE FROM task_assignments WHERE user_id = :user_id"), {"user_id": user_id} ) # 删除用户创建的任务的分配记录 await self.db.execute( text("DELETE FROM task_assignments WHERE task_id IN (SELECT id FROM tasks WHERE creator_id = :user_id)"), {"user_id": user_id} ) # 删除用户创建的任务 await self.db.execute( text("DELETE FROM tasks WHERE creator_id = :user_id"), {"user_id": user_id} ) # 将用户作为负责人的团队的leader_id设为NULL await self.db.execute( text("UPDATE teams SET leader_id = NULL WHERE leader_id = :user_id"), {"user_id": user_id} ) logger.info(f"用户 {user_id} 的关联数据清理完成") async def incremental_sync_employees(self) -> Dict[str, Any]: """ 增量同步员工数据 - 新增钉钉有但系统没有的员工 - 删除系统有但钉钉没有的员工(物理删除) - 跳过两边都存在的员工(不做任何修改) Returns: 同步结果统计 """ logger.info("=" * 60) logger.info("开始增量员工同步") logger.info("=" * 60) stats = { 'added_count': 0, 'deleted_count': 0, 'skipped_count': 0, 'added_users': [], 'deleted_users': [], 'errors': [], 'start_time': datetime.now() } try: # 1. 获取钉钉在职员工数据 dingtalk_employees = await self.fetch_employees_from_dingtalk() dingtalk_phones = {emp.get('phone') for emp in dingtalk_employees if emp.get('phone')} logger.info(f"钉钉在职员工数量: {len(dingtalk_phones)}") # 2. 获取系统现有用户(排除admin和已软删除的) stmt = select(User).where( User.is_deleted == False, User.username != 'admin' ) result = await self.db.execute(stmt) system_users = result.scalars().all() system_phones = {user.phone for user in system_users if user.phone} logger.info(f"系统现有员工数量(排除admin): {len(system_phones)}") # 3. 计算需要新增、删除、跳过的员工 phones_to_add = dingtalk_phones - system_phones phones_to_delete = system_phones - dingtalk_phones phones_to_skip = dingtalk_phones & system_phones logger.info(f"待新增: {len(phones_to_add)}, 待删除: {len(phones_to_delete)}, 跳过: {len(phones_to_skip)}") stats['skipped_count'] = len(phones_to_skip) # 4. 新增员工 for employee in dingtalk_employees: phone = employee.get('phone') if not phone or phone not in phones_to_add: continue try: # 创建用户 user = await self.create_user(employee) if not user: continue stats['added_count'] += 1 stats['added_users'].append({ 'full_name': user.full_name, 'phone': user.phone, 'role': user.role }) # 创建部门团队 department = employee.get('department') if department: team = await self.create_or_get_team( department, leader_id=user.id if employee.get('is_leader') else None ) if team: # 用SQL直接插入user_teams关联表(避免懒加载问题) await self._add_user_to_team(user.id, team.id) logger.info(f"关联用户 {user.full_name} 到团队 {team.name}") # 创建岗位 position_name = employee.get('position') if position_name: position = await self.create_or_get_position(position_name) if position: # 检查是否已经关联 stmt = select(PositionMember).where( PositionMember.position_id == position.id, PositionMember.user_id == user.id, PositionMember.is_deleted == False ) result = await self.db.execute(stmt) existing_member = result.scalar_one_or_none() if not existing_member: position_member = PositionMember( position_id=position.id, user_id=user.id, role='member' ) self.db.add(position_member) logger.info(f"关联用户 {user.full_name} 到岗位 {position.name}") logger.info(f"✅ 新增员工: {user.full_name} ({user.phone})") except Exception as e: error_msg = f"新增员工 {employee.get('full_name')} 失败: {str(e)}" logger.error(error_msg) stats['errors'].append(error_msg) continue # 5. 删除离职员工(物理删除) # 先flush之前的新增操作,避免与删除操作冲突 await self.db.flush() # 收集需要删除的用户ID users_to_delete = [] for user in system_users: if user.phone and user.phone in phones_to_delete: # 双重保护:确保不删除admin if user.username == 'admin' or user.role == 'admin': logger.warning(f"⚠️ 跳过删除管理员账户: {user.username}") continue users_to_delete.append({ 'id': user.id, 'full_name': user.full_name, 'phone': user.phone, 'username': user.username }) # 批量删除用户及其关联数据 for user_info in users_to_delete: try: user_id = user_info['id'] # 先清理关联数据(外键约束) await self._cleanup_user_related_data(user_id) # 用SQL直接删除用户(避免ORM的级联操作冲突) await self.db.execute( text("DELETE FROM users WHERE id = :user_id"), {"user_id": user_id} ) stats['deleted_users'].append({ 'full_name': user_info['full_name'], 'phone': user_info['phone'], 'username': user_info['username'] }) stats['deleted_count'] += 1 logger.info(f"🗑️ 删除离职员工: {user_info['full_name']} ({user_info['phone']})") except Exception as e: error_msg = f"删除员工 {user_info['full_name']} 失败: {str(e)}" logger.error(error_msg) stats['errors'].append(error_msg) continue # 6. 提交所有更改 await self.db.commit() logger.info("✅ 数据库事务已提交") except Exception as e: logger.error(f"增量同步失败: {str(e)}") await self.db.rollback() stats['errors'].append(str(e)) raise finally: stats['end_time'] = datetime.now() stats['duration'] = (stats['end_time'] - stats['start_time']).total_seconds() # 7. 输出统计信息 logger.info("=" * 60) logger.info("增量同步完成统计") logger.info("=" * 60) logger.info(f"新增员工: {stats['added_count']}") logger.info(f"删除员工: {stats['deleted_count']}") logger.info(f"跳过员工: {stats['skipped_count']}") logger.info(f"耗时: {stats['duration']:.2f}秒") if stats['errors']: logger.warning(f"错误数量: {len(stats['errors'])}") return stats