Files
012-kaopeilian/backend/simple_main.py
111 998211c483 feat: 初始化考培练系统项目
- 从服务器拉取完整代码
- 按框架规范整理项目结构
- 配置 Drone CI 测试环境部署
- 包含后端(FastAPI)、前端(Vue3)、管理端

技术栈: Vue3 + TypeScript + FastAPI + MySQL
2026-01-24 19:33:28 +08:00

226 lines
6.6 KiB
Python

"""
简化的主应用 - 修复前端集成问题
"""
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import mysql.connector
import hashlib
import jwt
from datetime import datetime, timedelta
from typing import Optional
app = FastAPI(
title="考培练系统API",
version="1.0.0",
description="用户管理和认证系统"
)
# 配置CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# JWT配置
SECRET_KEY = "dev-secret-key-change-in-production"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# 数据库配置
DB_CONFIG = {
'host': '127.0.0.1',
'user': 'root',
'password': '',
'database': 'kaopeilian',
'charset': 'utf8mb4',
'autocommit': True
}
# Pydantic模型
class LoginRequest(BaseModel):
username: str
password: str
class RefreshTokenRequest(BaseModel):
refresh_token: str
class ResponseModel(BaseModel):
code: int = 200
message: str
data: Optional[dict] = None
# 辅助函数
def hash_password(password: str) -> str:
return hashlib.sha256(password.encode()).hexdigest()
def create_access_token(user_id: int) -> str:
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode = {"sub": str(user_id), "exp": expire, "type": "access"}
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def create_refresh_token(user_id: int) -> str:
expire = datetime.utcnow() + timedelta(days=7)
to_encode = {"sub": str(user_id), "exp": expire, "type": "refresh"}
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def get_db_connection():
"""获取数据库连接"""
try:
return mysql.connector.connect(**DB_CONFIG)
except mysql.connector.Error as e:
raise HTTPException(status_code=500, detail=f"数据库连接失败: {str(e)}")
# API路由
@app.get("/")
def read_root():
return {"message": "考培练系统API", "version": "1.0.0", "status": "running"}
@app.get("/health")
def health_check():
return {"status": "healthy", "service": "kaopeilian-api", "timestamp": datetime.utcnow().isoformat()}
@app.post("/api/v1/auth/login")
def login(login_data: LoginRequest):
"""用户登录"""
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
# 查询用户
cursor.execute(
"SELECT id, username, email, hashed_password, full_name, role, is_active, is_verified FROM users WHERE username = %s AND is_deleted = 0",
(login_data.username,)
)
user = cursor.fetchone()
if not user:
raise HTTPException(status_code=400, detail="用户名或密码错误")
# 验证密码
if user['hashed_password'] != hash_password(login_data.password):
raise HTTPException(status_code=400, detail="用户名或密码错误")
if not user['is_active']:
raise HTTPException(status_code=400, detail="用户已被禁用")
# 生成令牌
access_token = create_access_token(user['id'])
refresh_token = create_refresh_token(user['id'])
# 更新最后登录时间
cursor.execute(
"UPDATE users SET last_login_at = NOW() WHERE id = %s",
(user['id'],)
)
cursor.close()
conn.close()
return ResponseModel(
message="登录成功",
data={
"user": {
"id": user['id'],
"username": user['username'],
"email": user['email'],
"full_name": user['full_name'],
"role": user['role'],
"is_active": bool(user['is_active']),
"is_verified": bool(user['is_verified'])
},
"token": {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer"
}
}
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"服务器错误: {str(e)}")
@app.post("/api/v1/auth/refresh")
def refresh_token(refresh_data: RefreshTokenRequest):
"""刷新访问令牌"""
try:
# 解码刷新令牌
payload = jwt.decode(refresh_data.refresh_token, SECRET_KEY, algorithms=[ALGORITHM])
if payload.get("type") != "refresh":
raise HTTPException(status_code=400, detail="无效的刷新令牌")
user_id = int(payload.get("sub"))
# 生成新的访问令牌
access_token = create_access_token(user_id)
return ResponseModel(
message="令牌刷新成功",
data={
"access_token": access_token,
"refresh_token": refresh_data.refresh_token,
"token_type": "bearer"
}
)
except jwt.PyJWTError:
raise HTTPException(status_code=400, detail="无效的刷新令牌")
except Exception as e:
raise HTTPException(status_code=500, detail=f"服务器错误: {str(e)}")
@app.get("/api/v1/users/me")
def get_current_user_info():
"""获取当前用户信息"""
# 简化版本,返回管理员信息
return ResponseModel(
message="获取成功",
data={
"id": 1,
"username": "admin",
"email": "admin@test.com",
"full_name": "系统管理员",
"role": "admin",
"is_active": True,
"is_verified": True,
"created_at": "2024-01-01T00:00:00"
}
)
@app.get("/api/v1/users")
def list_users():
"""获取用户列表"""
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute(
"SELECT id, username, email, full_name, role, is_active, created_at FROM users WHERE is_deleted = 0 ORDER BY id"
)
users = cursor.fetchall()
cursor.close()
conn.close()
return ResponseModel(
message="获取成功",
data={
"items": users,
"total": len(users),
"page": 1,
"page_size": len(users)
}
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"服务器错误: {str(e)}")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)