fix: 修复权限提升漏洞和添加安全头
All checks were successful
continuous-integration/drone/push Build is passing

安全修复:
- 创建 UserSelfUpdate schema,禁止用户修改自己的 role 和 is_active
- /users/me 端点现在使用 UserSelfUpdate 而非 UserUpdate

安全增强:
- 添加 SecurityHeadersMiddleware 中间件
- X-Content-Type-Options: nosniff
- X-Frame-Options: DENY
- X-XSS-Protection: 1; mode=block
- Referrer-Policy: strict-origin-when-cross-origin
- Permissions-Policy: 禁用敏感功能
- Cache-Control: API响应不缓存
This commit is contained in:
yuliang_guo
2026-01-31 10:57:41 +08:00
parent 52dccaab79
commit 79b55cfd12
4 changed files with 54 additions and 4 deletions

View File

@@ -13,7 +13,7 @@ from app.core.logger import logger
from app.models.user import User
from app.schemas.base import PaginatedResponse, PaginationParams, ResponseModel
from app.schemas.user import User as UserSchema
from app.schemas.user import UserCreate, UserFilter, UserPasswordUpdate, UserUpdate
from app.schemas.user import UserCreate, UserFilter, UserPasswordUpdate, UserUpdate, UserSelfUpdate
from app.services.user_service import UserService
from app.services.system_log_service import system_log_service
from app.schemas.system_log import SystemLogCreate
@@ -157,7 +157,7 @@ async def get_recent_exams(
@router.put("/me", response_model=ResponseModel)
async def update_current_user(
user_in: UserUpdate,
user_in: UserSelfUpdate,
current_user: dict = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db),
) -> ResponseModel:
@@ -165,6 +165,7 @@ async def update_current_user(
更新当前用户信息
权限:需要登录
注意:用户只能修改自己的基本信息,不能修改角色(role)和激活状态(is_active)
"""
user_service = UserService(db)
user = await user_service.update_user(

View File

@@ -93,6 +93,39 @@ class RateLimitMiddleware(BaseHTTPMiddleware):
return response
class SecurityHeadersMiddleware(BaseHTTPMiddleware):
"""
安全响应头中间件
添加各种安全相关的 HTTP 响应头
"""
async def dispatch(self, request: Request, call_next: Callable) -> Response:
response = await call_next(request)
# 防止 MIME 类型嗅探
response.headers["X-Content-Type-Options"] = "nosniff"
# 防止点击劫持
response.headers["X-Frame-Options"] = "DENY"
# XSS 过滤器(现代浏览器已弃用,但仍有一些旧浏览器支持)
response.headers["X-XSS-Protection"] = "1; mode=block"
# 引用策略
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
# 权限策略(禁用一些敏感功能)
response.headers["Permissions-Policy"] = "geolocation=(), microphone=(), camera=()"
# 缓存控制API 响应不应被缓存)
if request.url.path.startswith("/api/"):
response.headers["Cache-Control"] = "no-store, no-cache, must-revalidate"
response.headers["Pragma"] = "no-cache"
return response
class RequestIDMiddleware(BaseHTTPMiddleware):
"""请求ID中间件"""

View File

@@ -98,13 +98,16 @@ app.add_middleware(
)
# 添加限流中间件
from app.core.middleware import RateLimitMiddleware
from app.core.middleware import RateLimitMiddleware, SecurityHeadersMiddleware
app.add_middleware(
RateLimitMiddleware,
requests_per_minute=120, # 每分钟最大请求数
burst_limit=200, # 突发请求限制
)
# 添加安全响应头中间件
app.add_middleware(SecurityHeadersMiddleware)
# 健康检查端点
@app.get("/health")

View File

@@ -38,7 +38,7 @@ class UserCreate(UserBase):
class UserUpdate(BaseSchema):
"""更新用户"""
"""更新用户(管理员使用)"""
email: Optional[EmailStr] = None
phone: Optional[str] = Field(None, pattern=r"^1[3-9]\d{9}$")
@@ -52,6 +52,19 @@ class UserUpdate(BaseSchema):
major: Optional[str] = Field(None, max_length=100)
class UserSelfUpdate(BaseSchema):
"""用户自己更新个人信息不允许修改role和is_active"""
email: Optional[EmailStr] = None
phone: Optional[str] = Field(None, pattern=r"^1[3-9]\d{9}$")
full_name: Optional[str] = Field(None, max_length=100)
avatar_url: Optional[str] = None
bio: Optional[str] = None
gender: Optional[str] = Field(None, pattern="^(male|female)$")
school: Optional[str] = Field(None, max_length=100)
major: Optional[str] = Field(None, max_length=100)
class UserPasswordUpdate(BaseSchema):
"""更新密码"""