Files
111 998211c483 feat: 初始化考培练系统项目
- 从服务器拉取完整代码
- 按框架规范整理项目结构
- 配置 Drone CI 测试环境部署
- 包含后端(FastAPI)、前端(Vue3)、管理端

技术栈: Vue3 + TypeScript + FastAPI + MySQL
2026-01-24 19:33:28 +08:00

45 lines
1.1 KiB
Python

"""
Redis连接管理
"""
from typing import Optional
from redis import asyncio as aioredis
from app.core.config import settings
from app.core.logger import logger
# 全局Redis连接实例
redis_client: Optional[aioredis.Redis] = None
async def init_redis() -> aioredis.Redis:
"""初始化Redis连接"""
global redis_client
try:
redis_client = await aioredis.from_url(
settings.REDIS_URL, encoding="utf-8", decode_responses=True
)
# 测试连接
await redis_client.ping()
logger.info("Redis连接成功", url=settings.REDIS_URL)
return redis_client
except Exception as e:
logger.error("Redis连接失败", error=str(e), url=settings.REDIS_URL)
raise
async def close_redis():
"""关闭Redis连接"""
global redis_client
if redis_client:
await redis_client.close()
logger.info("Redis连接已关闭")
redis_client = None
def get_redis_client() -> aioredis.Redis:
"""获取Redis客户端实例"""
if not redis_client:
raise RuntimeError("Redis client not initialized")
return redis_client