""" 定时任务调度模块 使用 APScheduler 实现定时任务: - 通讯录增量同步(每30分钟) - 通讯录完整同步(每天凌晨2点) """ import os import asyncio from datetime import datetime from typing import Optional from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.interval import IntervalTrigger from apscheduler.triggers.cron import CronTrigger from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED from app.core.logger import logger class SchedulerManager: """ 定时任务调度管理器 单例模式,统一管理所有定时任务 """ _instance: Optional['SchedulerManager'] = None _scheduler: Optional[AsyncIOScheduler] = None _initialized: bool = False # 配置(可通过环境变量覆盖) AUTO_SYNC_ENABLED: bool = True INCREMENTAL_SYNC_INTERVAL_MINUTES: int = 30 # 增量同步间隔(分钟) FULL_SYNC_HOUR: int = 2 # 完整同步执行时间(小时,24小时制) def __new__(cls): if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance @classmethod def get_instance(cls) -> 'SchedulerManager': """获取调度管理器实例""" if cls._instance is None: cls._instance = cls() return cls._instance @classmethod def _load_config(cls): """从环境变量加载配置""" cls.AUTO_SYNC_ENABLED = os.getenv('AUTO_SYNC_ENABLED', 'true').lower() == 'true' cls.INCREMENTAL_SYNC_INTERVAL_MINUTES = int(os.getenv('AUTO_SYNC_INTERVAL_MINUTES', '30')) cls.FULL_SYNC_HOUR = int(os.getenv('FULL_SYNC_HOUR', '2')) async def init(self, db_session_factory): """ 初始化调度器 Args: db_session_factory: 数据库会话工厂(async_sessionmaker) """ if self._initialized: logger.info("调度器已初始化,跳过") return self._load_config() if not self.AUTO_SYNC_ENABLED: logger.info("自动同步已禁用,调度器不启动") return self._db_session_factory = db_session_factory self._scheduler = AsyncIOScheduler(timezone='Asia/Shanghai') # 添加任务执行监听器 self._scheduler.add_listener(self._job_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) # 注册定时任务 self._register_jobs() self._initialized = True logger.info("调度器初始化完成") def _register_jobs(self): """注册所有定时任务""" if not self._scheduler: return # 1. 增量同步任务(每30分钟) self._scheduler.add_job( self._run_incremental_sync, IntervalTrigger(minutes=self.INCREMENTAL_SYNC_INTERVAL_MINUTES), id='employee_incremental_sync', name='员工增量同步', replace_existing=True, max_instances=1, # 防止任务堆积 ) logger.info(f"已注册任务: 员工增量同步(每{self.INCREMENTAL_SYNC_INTERVAL_MINUTES}分钟)") # 2. 完整同步任务(每天凌晨2点) self._scheduler.add_job( self._run_full_sync, CronTrigger(hour=self.FULL_SYNC_HOUR, minute=0), id='employee_full_sync', name='员工完整同步', replace_existing=True, max_instances=1, ) logger.info(f"已注册任务: 员工完整同步(每天{self.FULL_SYNC_HOUR}:00)") def _job_listener(self, event): """任务执行监听器""" job_id = event.job_id if event.exception: logger.error( f"定时任务执行失败", job_id=job_id, error=str(event.exception), traceback=event.traceback ) else: logger.info( f"定时任务执行完成", job_id=job_id, return_value=str(event.retval) if event.retval else None ) async def _run_incremental_sync(self): """执行增量同步""" from app.services.employee_sync_service import EmployeeSyncService logger.info("开始执行定时增量同步任务") start_time = datetime.now() try: async with self._db_session_factory() as db: async with EmployeeSyncService(db) as sync_service: stats = await sync_service.incremental_sync_employees() duration = (datetime.now() - start_time).total_seconds() logger.info( "定时增量同步完成", duration_seconds=duration, stats=stats ) return stats except Exception as e: logger.error(f"定时增量同步失败: {str(e)}") raise async def _run_full_sync(self): """执行完整同步""" from app.services.employee_sync_service import EmployeeSyncService logger.info("开始执行定时完整同步任务") start_time = datetime.now() try: async with self._db_session_factory() as db: async with EmployeeSyncService(db) as sync_service: stats = await sync_service.sync_employees() duration = (datetime.now() - start_time).total_seconds() logger.info( "定时完整同步完成", duration_seconds=duration, stats=stats ) return stats except Exception as e: logger.error(f"定时完整同步失败: {str(e)}") raise def start(self): """启动调度器""" if not self._scheduler: logger.warning("调度器未初始化,无法启动") return if self._scheduler.running: logger.info("调度器已在运行") return self._scheduler.start() logger.info("调度器已启动") # 打印已注册的任务 jobs = self._scheduler.get_jobs() for job in jobs: logger.info(f" - {job.name} (ID: {job.id}, 下次执行: {job.next_run_time})") def stop(self): """停止调度器""" if self._scheduler and self._scheduler.running: self._scheduler.shutdown(wait=True) logger.info("调度器已停止") def get_jobs(self): """获取所有任务列表""" if not self._scheduler: return [] return [ { 'id': job.id, 'name': job.name, 'next_run_time': job.next_run_time.isoformat() if job.next_run_time else None, 'pending': job.pending, } for job in self._scheduler.get_jobs() ] async def trigger_job(self, job_id: str): """ 手动触发任务 Args: job_id: 任务ID """ if not self._scheduler: raise RuntimeError("调度器未初始化") job = self._scheduler.get_job(job_id) if not job: raise ValueError(f"任务不存在: {job_id}") # 立即执行 if job_id == 'employee_incremental_sync': return await self._run_incremental_sync() elif job_id == 'employee_full_sync': return await self._run_full_sync() else: raise ValueError(f"未知任务: {job_id}") # 全局调度管理器实例 scheduler_manager = SchedulerManager.get_instance()