Files
012-kaopeilian/backend/app/services/ai/knowledge_analysis_v2.py
yuliang_guo 2f47193059
All checks were successful
continuous-integration/drone/push Build is passing
feat: 集成MinIO对象存储服务
- 新增storage_service.py封装MinIO操作
- 修改upload.py使用storage_service上传文件
- 修改course_service.py使用storage_service删除文件
- 适配preview.py支持从MinIO获取文件
- 适配knowledge_analysis_v2.py支持MinIO存储
- 在config.py添加MinIO配置项
- 添加minio依赖到requirements.txt

支持特性:
- 自动降级到本地存储(MinIO不可用时)
- 保持URL格式兼容(/static/uploads/)
- 文件自动缓存到本地(用于预览和分析)

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-03 14:06:22 +08:00

641 lines
22 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
知识点分析服务 V2 - Python 原生实现
功能:
- 读取文档内容PDF/Word/TXT
- 调用 AI 分析提取知识点
- 解析 JSON 结果
- 写入数据库
提供稳定可靠的知识点分析能力。
支持MinIO和本地文件系统两种存储后端。
"""
import logging
import os
from pathlib import Path
from typing import Any, Dict, List, Optional
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.config import settings
from app.core.exceptions import ExternalServiceError
from app.schemas.course import KnowledgePointCreate
from app.services.storage_service import storage_service
from .ai_service import AIService, AIResponse
from .llm_json_parser import parse_with_fallback, clean_llm_output
from .prompts.knowledge_analysis_prompts import (
SYSTEM_PROMPT,
USER_PROMPT,
KNOWLEDGE_POINT_SCHEMA,
DEFAULT_KNOWLEDGE_TYPE,
)
logger = logging.getLogger(__name__)
# 配置常量
STATIC_UPLOADS_PREFIX = '/static/uploads/'
MAX_CONTENT_LENGTH = 100000 # 最大文档内容长度(字符)
MAX_KNOWLEDGE_POINTS = 20 # 最大知识点数量
class KnowledgeAnalysisServiceV2:
"""
知识点分析服务 V2
使用 Python 原生实现。
使用示例:
```python
service = KnowledgeAnalysisServiceV2()
result = await service.analyze_course_material(
db=db_session,
course_id=1,
material_id=10,
file_url="/static/uploads/courses/1/doc.pdf",
course_title="医美产品知识",
user_id=1
)
```
"""
def __init__(self):
"""初始化服务"""
self.ai_service = AIService(module_code="knowledge_analysis")
self.upload_path = getattr(settings, 'UPLOAD_PATH', 'uploads')
async def analyze_course_material(
self,
db: AsyncSession,
course_id: int,
material_id: int,
file_url: str,
course_title: str,
user_id: int
) -> Dict[str, Any]:
"""
分析课程资料并提取知识点
Args:
db: 数据库会话
course_id: 课程ID
material_id: 资料ID
file_url: 文件URL相对路径
course_title: 课程标题
user_id: 用户ID
Returns:
分析结果,包含 success、knowledge_points_count 等字段
"""
try:
logger.info(
f"开始知识点分析 V2 - course_id: {course_id}, material_id: {material_id}, "
f"file_url: {file_url}"
)
# 1. 解析文件路径支持MinIO和本地文件系统
file_path = await self._resolve_file_path(file_url)
if not file_path.exists():
raise FileNotFoundError(f"文件不存在: {file_path}")
logger.info(f"文件路径解析成功: {file_path}")
# 2. 提取文档内容
content = await self._extract_document_content(file_path)
if not content or not content.strip():
raise ValueError("文档内容为空")
logger.info(f"文档内容提取成功,长度: {len(content)} 字符")
# 3. 调用 AI 分析
ai_response = await self._call_ai_analysis(content, course_title)
logger.info(
f"AI 分析完成 - provider: {ai_response.provider}, "
f"tokens: {ai_response.total_tokens}, latency: {ai_response.latency_ms}ms"
)
# 4. 解析 JSON 结果
knowledge_points = self._parse_knowledge_points(ai_response.content)
logger.info(f"知识点解析成功,数量: {len(knowledge_points)}")
# 5. 删除旧的知识点
await self._delete_old_knowledge_points(db, material_id)
# 6. 保存到数据库
saved_count = await self._save_knowledge_points(
db=db,
course_id=course_id,
material_id=material_id,
knowledge_points=knowledge_points,
user_id=user_id
)
logger.info(
f"知识点分析完成 - course_id: {course_id}, material_id: {material_id}, "
f"saved_count: {saved_count}"
)
return {
"success": True,
"status": "completed",
"knowledge_points_count": saved_count,
"ai_provider": ai_response.provider,
"ai_model": ai_response.model,
"ai_tokens": ai_response.total_tokens,
"ai_latency_ms": ai_response.latency_ms,
}
except FileNotFoundError as e:
logger.error(f"文件不存在: {e}")
raise ExternalServiceError(f"分析文件不存在: {e}")
except ValueError as e:
logger.error(f"参数错误: {e}")
raise ExternalServiceError(f"分析参数错误: {e}")
except Exception as e:
logger.error(
f"知识点分析失败 - course_id: {course_id}, material_id: {material_id}, "
f"error: {e}",
exc_info=True
)
raise ExternalServiceError(f"知识点分析失败: {e}")
async def _resolve_file_path(self, file_url: str) -> Path:
"""
解析文件 URL 为本地路径
支持MinIO和本地文件系统。如果文件在MinIO中会先下载到本地缓存。
"""
if file_url.startswith(STATIC_UPLOADS_PREFIX):
object_name = file_url.replace(STATIC_UPLOADS_PREFIX, '')
# 使用storage_service获取文件路径自动处理MinIO下载
file_path = await storage_service.get_file_path(object_name)
if file_path:
return file_path
# 如果storage_service返回None尝试本地路径兼容旧数据
return Path(self.upload_path) / object_name
elif file_url.startswith('/'):
# 绝对路径
return Path(file_url)
else:
# 相对路径
return Path(self.upload_path) / file_url
async def _extract_document_content(self, file_path: Path) -> str:
"""
提取文档内容
支持PDF、Worddocx、Excelxlsx/xls、PPTpptx/ppt、文本文件
"""
suffix = file_path.suffix.lower()
try:
if suffix == '.pdf':
return await self._extract_pdf_content(file_path)
elif suffix in ['.docx', '.doc']:
return await self._extract_docx_content(file_path)
elif suffix in ['.xlsx', '.xls']:
return await self._extract_excel_content(file_path)
elif suffix in ['.pptx', '.ppt']:
return await self._extract_ppt_content(file_path)
elif suffix in ['.txt', '.md', '.text']:
return await self._extract_text_content(file_path)
else:
# 尝试作为文本读取
return await self._extract_text_content(file_path)
except Exception as e:
logger.error(f"文档内容提取失败: {file_path}, error: {e}")
raise ValueError(f"无法读取文档内容: {e}")
async def _extract_pdf_content(self, file_path: Path) -> str:
"""提取 PDF 内容"""
try:
from PyPDF2 import PdfReader
reader = PdfReader(str(file_path))
text_parts = []
for page in reader.pages:
text = page.extract_text()
if text:
text_parts.append(text)
content = '\n'.join(text_parts)
# 清理和截断
content = self._clean_content(content)
return content
except ImportError:
logger.error("PyPDF2 未安装,无法读取 PDF")
raise ValueError("服务器未安装 PDF 读取组件")
except Exception as e:
logger.error(f"PDF 读取失败: {e}")
raise ValueError(f"PDF 读取失败: {e}")
async def _extract_docx_content(self, file_path: Path) -> str:
"""提取 Word 文档内容"""
try:
from docx import Document
doc = Document(str(file_path))
text_parts = []
for para in doc.paragraphs:
if para.text.strip():
text_parts.append(para.text)
# 也提取表格内容
for table in doc.tables:
for row in table.rows:
for cell in row.cells:
if cell.text.strip():
text_parts.append(cell.text)
content = '\n'.join(text_parts)
content = self._clean_content(content)
return content
except ImportError:
logger.error("python-docx 未安装,无法读取 Word 文档")
raise ValueError("服务器未安装 Word 读取组件")
except Exception as e:
logger.error(f"Word 文档读取失败: {e}")
raise ValueError(f"Word 文档读取失败: {e}")
async def _extract_text_content(self, file_path: Path) -> str:
"""提取文本文件内容"""
try:
# 尝试多种编码
encodings = ['utf-8', 'gbk', 'gb2312', 'latin-1']
for encoding in encodings:
try:
with open(file_path, 'r', encoding=encoding) as f:
content = f.read()
return self._clean_content(content)
except UnicodeDecodeError:
continue
raise ValueError("无法识别文件编码")
except Exception as e:
logger.error(f"文本文件读取失败: {e}")
raise ValueError(f"文本文件读取失败: {e}")
async def _extract_excel_content(self, file_path: Path) -> str:
"""提取 Excel 文件内容"""
try:
from openpyxl import load_workbook
wb = load_workbook(str(file_path), read_only=True, data_only=True)
text_parts = []
for sheet_name in wb.sheetnames:
sheet = wb[sheet_name]
text_parts.append(f"【工作表: {sheet_name}")
for row in sheet.iter_rows(values_only=True):
# 过滤空行
row_values = [str(cell) if cell is not None else '' for cell in row]
if any(v.strip() for v in row_values):
text_parts.append(' | '.join(row_values))
wb.close()
content = '\n'.join(text_parts)
return self._clean_content(content)
except ImportError:
logger.error("openpyxl 未安装,无法读取 Excel 文件")
raise ValueError("服务器未安装 Excel 读取组件(openpyxl)")
except Exception as e:
logger.error(f"Excel 文件读取失败: {e}")
raise ValueError(f"Excel 文件读取失败: {e}")
async def _extract_ppt_content(self, file_path: Path) -> str:
"""提取 PowerPoint 文件内容"""
try:
from pptx import Presentation
from pptx.util import Inches
prs = Presentation(str(file_path))
text_parts = []
for slide_num, slide in enumerate(prs.slides, 1):
slide_texts = []
text_parts.append(f"【幻灯片 {slide_num}")
for shape in slide.shapes:
# 提取文本框内容
if hasattr(shape, "text") and shape.text.strip():
slide_texts.append(shape.text.strip())
# 提取表格内容
if shape.has_table:
table = shape.table
for row in table.rows:
row_text = ' | '.join(
cell.text.strip() for cell in row.cells if cell.text.strip()
)
if row_text:
slide_texts.append(row_text)
if slide_texts:
text_parts.append('\n'.join(slide_texts))
else:
text_parts.append("(无文本内容)")
content = '\n\n'.join(text_parts)
return self._clean_content(content)
except ImportError:
logger.error("python-pptx 未安装,无法读取 PPT 文件")
raise ValueError("服务器未安装 PPT 读取组件(python-pptx)")
except Exception as e:
logger.error(f"PPT 文件读取失败: {e}")
raise ValueError(f"PPT 文件读取失败: {e}")
def _clean_content(self, content: str) -> str:
"""清理和截断内容"""
# 移除多余空白
import re
content = re.sub(r'\n{3,}', '\n\n', content)
content = re.sub(r' {2,}', ' ', content)
# 截断过长内容
if len(content) > MAX_CONTENT_LENGTH:
logger.warning(f"文档内容过长,截断至 {MAX_CONTENT_LENGTH} 字符")
content = content[:MAX_CONTENT_LENGTH] + "\n\n[内容已截断...]"
return content.strip()
async def _call_ai_analysis(
self,
content: str,
course_title: str
) -> AIResponse:
"""调用 AI 进行知识点分析"""
# 构建消息
user_message = USER_PROMPT.format(
course_name=course_title,
content=content
)
messages = [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": user_message}
]
# 调用 AI
response = await self.ai_service.chat(
messages=messages,
temperature=0.1, # 低温度,保持输出稳定
prompt_name="knowledge_analysis"
)
return response
def _parse_knowledge_points(self, ai_output: str) -> List[Dict[str, Any]]:
"""
解析 AI 输出的知识点 JSON
使用 LLM JSON Parser 进行多层兜底解析
"""
# 先清洗输出
cleaned_output, rules = clean_llm_output(ai_output)
if rules:
logger.debug(f"AI 输出已清洗: {rules}")
# 使用带 Schema 校验的解析
knowledge_points = parse_with_fallback(
cleaned_output,
schema=KNOWLEDGE_POINT_SCHEMA,
default=[],
validate_schema=True,
on_error="default"
)
# 后处理:确保每个知识点有必要字段
processed_points = []
for i, kp in enumerate(knowledge_points):
if i >= MAX_KNOWLEDGE_POINTS:
logger.warning(f"知识点数量超过限制 {MAX_KNOWLEDGE_POINTS},截断")
break
if isinstance(kp, dict):
# 提取字段(兼容多种字段名)
title = (
kp.get('title') or
kp.get('name') or
kp.get('知识点名称') or
f"知识点 {i + 1}"
)
content = (
kp.get('content') or
kp.get('description') or
kp.get('知识点描述') or
''
)
kp_type = (
kp.get('type') or
kp.get('知识点类型') or
DEFAULT_KNOWLEDGE_TYPE
)
topic_relation = (
kp.get('topic_relation') or
kp.get('关系描述') or
''
)
if title and (content or topic_relation):
processed_points.append({
'title': title[:200], # 限制长度
'content': content,
'type': kp_type,
'topic_relation': topic_relation,
})
if not processed_points:
logger.warning("未能解析出有效的知识点")
return processed_points
async def _delete_old_knowledge_points(
self,
db: AsyncSession,
material_id: int
) -> int:
"""删除资料关联的旧知识点"""
try:
from sqlalchemy import text
result = await db.execute(
text("DELETE FROM knowledge_points WHERE material_id = :material_id"),
{"material_id": material_id}
)
await db.commit()
deleted_count = result.rowcount
if deleted_count > 0:
logger.info(f"已删除旧知识点: material_id={material_id}, count={deleted_count}")
return deleted_count
except Exception as e:
logger.error(f"删除旧知识点失败: {e}")
await db.rollback()
raise
async def _save_knowledge_points(
self,
db: AsyncSession,
course_id: int,
material_id: int,
knowledge_points: List[Dict[str, Any]],
user_id: int
) -> int:
"""保存知识点到数据库"""
from app.services.course_service import knowledge_point_service
saved_count = 0
for kp_data in knowledge_points:
try:
kp_create = KnowledgePointCreate(
name=kp_data['title'],
description=kp_data.get('content', ''),
type=kp_data.get('type', DEFAULT_KNOWLEDGE_TYPE),
source=1, # AI 分析来源
topic_relation=kp_data.get('topic_relation'),
material_id=material_id
)
await knowledge_point_service.create_knowledge_point(
db=db,
course_id=course_id,
point_in=kp_create,
created_by=user_id
)
saved_count += 1
except Exception as e:
logger.warning(
f"保存单个知识点失败: title={kp_data.get('title')}, error={e}"
)
continue
return saved_count
async def reanalyze_course_materials(
self,
db: AsyncSession,
course_id: int,
course_title: str,
user_id: int
) -> Dict[str, Any]:
"""
重新分析课程的所有资料
Args:
db: 数据库会话
course_id: 课程ID
course_title: 课程标题
user_id: 用户ID
Returns:
分析结果汇总
"""
try:
from app.services.course_service import course_service
# 获取课程的所有资料
materials = await course_service.get_course_materials(db, course_id=course_id)
if not materials:
return {
"success": True,
"message": "该课程暂无资料需要分析",
"materials_count": 0,
"knowledge_points_count": 0
}
total_knowledge_points = 0
analysis_results = []
for material in materials:
try:
result = await self.analyze_course_material(
db=db,
course_id=course_id,
material_id=material.id,
file_url=material.file_url,
course_title=course_title,
user_id=user_id
)
kp_count = result.get('knowledge_points_count', 0)
total_knowledge_points += kp_count
analysis_results.append({
"material_id": material.id,
"material_name": material.name,
"success": True,
"knowledge_points_count": kp_count
})
except Exception as e:
logger.error(
f"资料分析失败: material_id={material.id}, error={e}"
)
analysis_results.append({
"material_id": material.id,
"material_name": material.name,
"success": False,
"error": str(e)
})
success_count = sum(1 for r in analysis_results if r['success'])
logger.info(
f"课程资料重新分析完成 - course_id: {course_id}, "
f"materials: {len(materials)}, success: {success_count}, "
f"total_knowledge_points: {total_knowledge_points}"
)
return {
"success": True,
"materials_count": len(materials),
"success_count": success_count,
"knowledge_points_count": total_knowledge_points,
"analysis_results": analysis_results
}
except Exception as e:
logger.error(
f"课程资料重新分析失败 - course_id: {course_id}, error: {e}",
exc_info=True
)
raise ExternalServiceError(f"重新分析失败: {e}")
# 创建全局实例
knowledge_analysis_service_v2 = KnowledgeAnalysisServiceV2()