diff --git a/backend/.env.example b/backend/.env.example index d24964b..8e32c11 100644 --- a/backend/.env.example +++ b/backend/.env.example @@ -6,3 +6,8 @@ SECRET_KEY=kaopeilian-secret-key-dev CORS_ORIGINS=["http://localhost:3001","http://localhost:3000"] HOST=0.0.0.0 PORT=8000 + +# 自动同步配置 +AUTO_SYNC_ENABLED=true +AUTO_SYNC_INTERVAL_MINUTES=30 +FULL_SYNC_HOUR=2 diff --git a/backend/app/api/v1/endpoints/employee_sync.py b/backend/app/api/v1/endpoints/employee_sync.py index 50146b5..e626cc6 100644 --- a/backend/app/api/v1/endpoints/employee_sync.py +++ b/backend/app/api/v1/endpoints/employee_sync.py @@ -234,3 +234,95 @@ async def get_sync_status( detail=f"查询统计信息失败: {str(e)}" ) + +@router.get("/scheduler/jobs", summary="查看定时同步任务") +async def get_scheduler_jobs( + *, + current_user: User = Depends(get_current_user) +) -> Dict[str, Any]: + """ + 查看定时同步任务列表 + + 权限要求: 仅管理员可查看 + + Returns: + 定时任务列表,包含下次执行时间等信息 + """ + if current_user.role != 'admin': + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="只有管理员可以查看定时任务" + ) + + try: + from app.core.scheduler import scheduler_manager + + jobs = scheduler_manager.get_jobs() + + return { + "success": True, + "message": "获取定时任务成功", + "data": { + "auto_sync_enabled": scheduler_manager.AUTO_SYNC_ENABLED, + "incremental_sync_interval_minutes": scheduler_manager.INCREMENTAL_SYNC_INTERVAL_MINUTES, + "full_sync_hour": scheduler_manager.FULL_SYNC_HOUR, + "jobs": jobs + } + } + + except Exception as e: + logger.error(f"获取定时任务失败: {str(e)}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"获取定时任务失败: {str(e)}" + ) + + +@router.post("/scheduler/trigger/{job_id}", summary="手动触发定时任务") +async def trigger_scheduler_job( + *, + job_id: str, + current_user: User = Depends(get_current_user) +) -> Dict[str, Any]: + """ + 手动触发指定的定时任务 + + 权限要求: 仅管理员可执行 + + Args: + job_id: 任务ID (employee_incremental_sync 或 employee_full_sync) + + Returns: + 任务执行结果 + """ + if current_user.role != 'admin': + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="只有管理员可以触发定时任务" + ) + + logger.info(f"管理员 {current_user.username} 手动触发定时任务: {job_id}") + + try: + from app.core.scheduler import scheduler_manager + + result = await scheduler_manager.trigger_job(job_id) + + return { + "success": True, + "message": f"任务 {job_id} 执行完成", + "data": result + } + + except ValueError as e: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=str(e) + ) + except Exception as e: + logger.error(f"触发定时任务失败: {str(e)}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"触发定时任务失败: {str(e)}" + ) + diff --git a/backend/app/core/database.py b/backend/app/core/database.py index 7ae09a6..48559dc 100644 --- a/backend/app/core/database.py +++ b/backend/app/core/database.py @@ -29,3 +29,6 @@ AsyncSessionLocal = sessionmaker( class_=AsyncSession, expire_on_commit=False, ) + +# 别名(用于调度器等模块) +async_session_factory = AsyncSessionLocal diff --git a/backend/app/core/scheduler.py b/backend/app/core/scheduler.py new file mode 100644 index 0000000..c5ce461 --- /dev/null +++ b/backend/app/core/scheduler.py @@ -0,0 +1,242 @@ +""" +定时任务调度模块 + +使用 APScheduler 实现定时任务: +- 通讯录增量同步(每30分钟) +- 通讯录完整同步(每天凌晨2点) +""" + +import os +import asyncio +from datetime import datetime +from typing import Optional + +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from apscheduler.triggers.interval import IntervalTrigger +from apscheduler.triggers.cron import CronTrigger +from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED + +from app.core.logger import logger + + +class SchedulerManager: + """ + 定时任务调度管理器 + + 单例模式,统一管理所有定时任务 + """ + + _instance: Optional['SchedulerManager'] = None + _scheduler: Optional[AsyncIOScheduler] = None + _initialized: bool = False + + # 配置(可通过环境变量覆盖) + AUTO_SYNC_ENABLED: bool = True + INCREMENTAL_SYNC_INTERVAL_MINUTES: int = 30 # 增量同步间隔(分钟) + FULL_SYNC_HOUR: int = 2 # 完整同步执行时间(小时,24小时制) + + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + @classmethod + def get_instance(cls) -> 'SchedulerManager': + """获取调度管理器实例""" + if cls._instance is None: + cls._instance = cls() + return cls._instance + + @classmethod + def _load_config(cls): + """从环境变量加载配置""" + cls.AUTO_SYNC_ENABLED = os.getenv('AUTO_SYNC_ENABLED', 'true').lower() == 'true' + cls.INCREMENTAL_SYNC_INTERVAL_MINUTES = int(os.getenv('AUTO_SYNC_INTERVAL_MINUTES', '30')) + cls.FULL_SYNC_HOUR = int(os.getenv('FULL_SYNC_HOUR', '2')) + + async def init(self, db_session_factory): + """ + 初始化调度器 + + Args: + db_session_factory: 数据库会话工厂(async_sessionmaker) + """ + if self._initialized: + logger.info("调度器已初始化,跳过") + return + + self._load_config() + + if not self.AUTO_SYNC_ENABLED: + logger.info("自动同步已禁用,调度器不启动") + return + + self._db_session_factory = db_session_factory + self._scheduler = AsyncIOScheduler(timezone='Asia/Shanghai') + + # 添加任务执行监听器 + self._scheduler.add_listener(self._job_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) + + # 注册定时任务 + self._register_jobs() + + self._initialized = True + logger.info("调度器初始化完成") + + def _register_jobs(self): + """注册所有定时任务""" + if not self._scheduler: + return + + # 1. 增量同步任务(每30分钟) + self._scheduler.add_job( + self._run_incremental_sync, + IntervalTrigger(minutes=self.INCREMENTAL_SYNC_INTERVAL_MINUTES), + id='employee_incremental_sync', + name='员工增量同步', + replace_existing=True, + max_instances=1, # 防止任务堆积 + ) + logger.info(f"已注册任务: 员工增量同步(每{self.INCREMENTAL_SYNC_INTERVAL_MINUTES}分钟)") + + # 2. 完整同步任务(每天凌晨2点) + self._scheduler.add_job( + self._run_full_sync, + CronTrigger(hour=self.FULL_SYNC_HOUR, minute=0), + id='employee_full_sync', + name='员工完整同步', + replace_existing=True, + max_instances=1, + ) + logger.info(f"已注册任务: 员工完整同步(每天{self.FULL_SYNC_HOUR}:00)") + + def _job_listener(self, event): + """任务执行监听器""" + job_id = event.job_id + + if event.exception: + logger.error( + f"定时任务执行失败", + job_id=job_id, + error=str(event.exception), + traceback=event.traceback + ) + else: + logger.info( + f"定时任务执行完成", + job_id=job_id, + return_value=str(event.retval) if event.retval else None + ) + + async def _run_incremental_sync(self): + """执行增量同步""" + from app.services.employee_sync_service import EmployeeSyncService + + logger.info("开始执行定时增量同步任务") + start_time = datetime.now() + + try: + async with self._db_session_factory() as db: + async with EmployeeSyncService(db) as sync_service: + stats = await sync_service.incremental_sync_employees() + + duration = (datetime.now() - start_time).total_seconds() + logger.info( + "定时增量同步完成", + duration_seconds=duration, + stats=stats + ) + return stats + + except Exception as e: + logger.error(f"定时增量同步失败: {str(e)}") + raise + + async def _run_full_sync(self): + """执行完整同步""" + from app.services.employee_sync_service import EmployeeSyncService + + logger.info("开始执行定时完整同步任务") + start_time = datetime.now() + + try: + async with self._db_session_factory() as db: + async with EmployeeSyncService(db) as sync_service: + stats = await sync_service.sync_employees() + + duration = (datetime.now() - start_time).total_seconds() + logger.info( + "定时完整同步完成", + duration_seconds=duration, + stats=stats + ) + return stats + + except Exception as e: + logger.error(f"定时完整同步失败: {str(e)}") + raise + + def start(self): + """启动调度器""" + if not self._scheduler: + logger.warning("调度器未初始化,无法启动") + return + + if self._scheduler.running: + logger.info("调度器已在运行") + return + + self._scheduler.start() + logger.info("调度器已启动") + + # 打印已注册的任务 + jobs = self._scheduler.get_jobs() + for job in jobs: + logger.info(f" - {job.name} (ID: {job.id}, 下次执行: {job.next_run_time})") + + def stop(self): + """停止调度器""" + if self._scheduler and self._scheduler.running: + self._scheduler.shutdown(wait=True) + logger.info("调度器已停止") + + def get_jobs(self): + """获取所有任务列表""" + if not self._scheduler: + return [] + + return [ + { + 'id': job.id, + 'name': job.name, + 'next_run_time': job.next_run_time.isoformat() if job.next_run_time else None, + 'pending': job.pending, + } + for job in self._scheduler.get_jobs() + ] + + async def trigger_job(self, job_id: str): + """ + 手动触发任务 + + Args: + job_id: 任务ID + """ + if not self._scheduler: + raise RuntimeError("调度器未初始化") + + job = self._scheduler.get_job(job_id) + if not job: + raise ValueError(f"任务不存在: {job_id}") + + # 立即执行 + if job_id == 'employee_incremental_sync': + return await self._run_incremental_sync() + elif job_id == 'employee_full_sync': + return await self._run_full_sync() + else: + raise ValueError(f"未知任务: {job_id}") + + +# 全局调度管理器实例 +scheduler_manager = SchedulerManager.get_instance() diff --git a/backend/app/main.py b/backend/app/main.py index de2cb8b..2cc0e3c 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -34,10 +34,29 @@ async def lifespan(app: FastAPI): logger.info("Redis 初始化成功") except Exception as e: logger.warning(f"Redis 初始化失败(非致命): {e}") + + # 初始化定时任务调度器 + try: + from app.core.scheduler import scheduler_manager + from app.core.database import async_session_factory + + await scheduler_manager.init(async_session_factory) + scheduler_manager.start() + logger.info("定时任务调度器启动成功") + except Exception as e: + logger.warning(f"定时任务调度器启动失败(非致命): {e}") yield # 关闭时执行 + # 停止定时任务调度器 + try: + from app.core.scheduler import scheduler_manager + scheduler_manager.stop() + logger.info("定时任务调度器已停止") + except Exception as e: + logger.warning(f"停止定时任务调度器失败: {e}") + try: from app.core.redis import close_redis await close_redis() diff --git a/backend/requirements.txt b/backend/requirements.txt index 8b4b544..a5a5d8d 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -41,6 +41,9 @@ cozepy==0.19.0 python-dateutil==2.8.2 tenacity==8.2.3 +# 定时任务调度 +apscheduler==3.10.4 + # Excel文件处理(用于课程资料预览) openpyxl==3.1.2