- 从服务器拉取完整代码 - 按框架规范整理项目结构 - 配置 Drone CI 测试环境部署 - 包含后端(FastAPI)、前端(Vue3)、管理端 技术栈: Vue3 + TypeScript + FastAPI + MySQL
215 lines
7.0 KiB
Python
215 lines
7.0 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
下载言迹智能工牌音频样本
|
|
只保留10秒以上的录音
|
|
"""
|
|
|
|
import json
|
|
import requests
|
|
import os
|
|
from pathlib import Path
|
|
|
|
# 配置
|
|
TOKEN = "92866b34-ef6e-4290-8d87-b9c1bb4b92c6"
|
|
ESTATE_ID = 516799468310364162
|
|
BASE_URL = "https://open.yanjiai.com"
|
|
|
|
# 目标目录
|
|
BASE_PATH = Path("/Users/nongjun/Desktop/Ai公司/本地开发与测试/考培练系统规划/全链路联调/言迹智能工牌/音频样本")
|
|
|
|
# 要下载的员工列表(选择录音较多的)
|
|
EMPLOYEES = {
|
|
"熊媱媱": "13708515779",
|
|
"黄雪": "19192552551",
|
|
"夏雨沫": "13698554507",
|
|
"杨敏": "18188010718",
|
|
"张永梅": "13608562128",
|
|
"陈谊": "15329451271",
|
|
}
|
|
|
|
def get_audio_list(phone):
|
|
"""获取指定员工的录音列表"""
|
|
url = f"{BASE_URL}/api/beauty/v1/audio/infos"
|
|
headers = {
|
|
"Authorization": f"Bearer {TOKEN}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
data = {
|
|
"estateId": ESTATE_ID,
|
|
"consultantPhone": phone
|
|
}
|
|
|
|
response = requests.post(url, headers=headers, json=data)
|
|
result = response.json()
|
|
|
|
if result.get("code") == "0" and result.get("data"):
|
|
return result["data"].get("records", [])
|
|
return []
|
|
|
|
def download_audio(url, output_path):
|
|
"""下载音频文件"""
|
|
try:
|
|
response = requests.get(url, timeout=30)
|
|
if response.status_code == 200:
|
|
with open(output_path, 'wb') as f:
|
|
f.write(response.content)
|
|
return True
|
|
except Exception as e:
|
|
print(f" 下载失败: {e}")
|
|
return False
|
|
|
|
def main():
|
|
print("=== 言迹音频样本下载工具 ===")
|
|
print(f"筛选条件: 时长 ≥ 10秒\n")
|
|
|
|
# 创建目录
|
|
raw_dir = BASE_PATH / "原始文件"
|
|
by_employee_dir = BASE_PATH / "按员工分类"
|
|
by_duration_dir = BASE_PATH / "按时长分类"
|
|
|
|
for dir_path in [raw_dir, by_employee_dir, by_duration_dir]:
|
|
dir_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
total_downloaded = 0
|
|
total_size = 0
|
|
|
|
# 按员工下载
|
|
for name, phone in EMPLOYEES.items():
|
|
print(f"--- {name} ({phone}) ---")
|
|
|
|
records = get_audio_list(phone)
|
|
|
|
if not records:
|
|
print(f" 无录音记录\n")
|
|
continue
|
|
|
|
# 筛选10秒以上的录音
|
|
filtered_records = [r for r in records if int(r.get('duration', 0)) >= 10000]
|
|
|
|
print(f" 总录音: {len(records)}条, 10秒以上: {len(filtered_records)}条")
|
|
|
|
if not filtered_records:
|
|
print(f" 无符合条件的录音\n")
|
|
continue
|
|
|
|
# 下载前3条
|
|
download_count = 0
|
|
for i, record in enumerate(filtered_records[:3]):
|
|
audio_id = record.get('id')
|
|
duration_ms = int(record.get('duration', 0))
|
|
duration_sec = int(duration_ms / 1000)
|
|
file_size = int(record.get('fileSize', 0))
|
|
file_url = record.get('fileUrl', '')
|
|
start_time = record.get('startTime', '')
|
|
|
|
if not file_url:
|
|
continue
|
|
|
|
# 生成文件名
|
|
date_part = start_time[:10] if start_time else 'unknown'
|
|
filename = f"{name}_{duration_sec}秒_{date_part}_{i+1}.mp3"
|
|
|
|
# 下载到原始文件夹
|
|
output_path = raw_dir / filename
|
|
|
|
print(f" 下载 [{i+1}] {filename} ({file_size/1024:.1f}KB, {duration_sec}秒)...", end="")
|
|
|
|
if download_audio(file_url, output_path):
|
|
print(" ✅")
|
|
download_count += 1
|
|
total_downloaded += 1
|
|
total_size += file_size
|
|
|
|
# 创建员工分类的符号链接
|
|
employee_link = by_employee_dir / name
|
|
employee_link.mkdir(exist_ok=True)
|
|
|
|
# 创建时长分类的符号链接
|
|
if duration_sec < 20:
|
|
duration_category = "10-20秒"
|
|
elif duration_sec < 60:
|
|
duration_category = "20-60秒"
|
|
else:
|
|
duration_category = "60秒以上"
|
|
|
|
duration_link_dir = by_duration_dir / duration_category
|
|
duration_link_dir.mkdir(exist_ok=True)
|
|
else:
|
|
print(" ❌")
|
|
|
|
print(f" 本员工下载: {download_count}条\n")
|
|
|
|
print(f"\n=== 下载完成 ===")
|
|
print(f"总下载文件: {total_downloaded}个")
|
|
print(f"总大小: {total_size/1024:.1f}KB ({total_size/1024/1024:.2f}MB)")
|
|
|
|
# 统计文件
|
|
print(f"\n文件保存位置:")
|
|
print(f" 原始文件: {raw_dir}")
|
|
|
|
files = list(raw_dir.glob("*.mp3"))
|
|
if files:
|
|
print(f"\n已下载文件列表:")
|
|
for i, file in enumerate(sorted(files), 1):
|
|
size = file.stat().st_size / 1024
|
|
print(f" {i:2d}. {file.name} ({size:.1f}KB)")
|
|
|
|
# 复制文件到分类文件夹
|
|
print(f"\n整理文件到分类文件夹...")
|
|
for file in files:
|
|
# 解析文件名获取员工名和时长
|
|
parts = file.stem.split('_')
|
|
if len(parts) >= 2:
|
|
employee_name = parts[0]
|
|
duration_str = parts[1]
|
|
|
|
# 按员工分类
|
|
employee_dir = by_employee_dir / employee_name
|
|
employee_dir.mkdir(exist_ok=True)
|
|
employee_file = employee_dir / file.name
|
|
if not employee_file.exists():
|
|
import shutil
|
|
shutil.copy2(file, employee_file)
|
|
|
|
# 按时长分类
|
|
try:
|
|
duration_sec = int(duration_str.replace('秒', ''))
|
|
if duration_sec < 20:
|
|
category = "10-20秒"
|
|
elif duration_sec < 60:
|
|
category = "20-60秒"
|
|
else:
|
|
category = "60秒以上"
|
|
|
|
category_dir = by_duration_dir / category
|
|
category_dir.mkdir(exist_ok=True)
|
|
category_file = category_dir / file.name
|
|
if not category_file.exists():
|
|
import shutil
|
|
shutil.copy2(file, category_file)
|
|
except:
|
|
pass
|
|
|
|
print("✅ 文件整理完成")
|
|
|
|
# 显示目录结构
|
|
print(f"\n目录结构:")
|
|
print(f"音频样本/")
|
|
print(f"├── 原始文件/ ({len(list(raw_dir.glob('*.mp3')))}个文件)")
|
|
print(f"├── 按员工分类/")
|
|
for emp_dir in sorted(by_employee_dir.iterdir()):
|
|
if emp_dir.is_dir():
|
|
count = len(list(emp_dir.glob('*.mp3')))
|
|
if count > 0:
|
|
print(f"│ └── {emp_dir.name}/ ({count}个文件)")
|
|
print(f"└── 按时长分类/")
|
|
for dur_dir in sorted(by_duration_dir.iterdir()):
|
|
if dur_dir.is_dir():
|
|
count = len(list(dur_dir.glob('*.mp3')))
|
|
if count > 0:
|
|
print(f" └── {dur_dir.name}/ ({count}个文件)")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|