#!/usr/bin/env python3 """ 下载言迹智能工牌音频样本 只保留10秒以上的录音 """ import json import requests import os from pathlib import Path # 配置 TOKEN = "92866b34-ef6e-4290-8d87-b9c1bb4b92c6" ESTATE_ID = 516799468310364162 BASE_URL = "https://open.yanjiai.com" # 目标目录 BASE_PATH = Path("/Users/nongjun/Desktop/Ai公司/本地开发与测试/考培练系统规划/全链路联调/言迹智能工牌/音频样本") # 要下载的员工列表(选择录音较多的) EMPLOYEES = { "熊媱媱": "13708515779", "黄雪": "19192552551", "夏雨沫": "13698554507", "杨敏": "18188010718", "张永梅": "13608562128", "陈谊": "15329451271", } def get_audio_list(phone): """获取指定员工的录音列表""" url = f"{BASE_URL}/api/beauty/v1/audio/infos" headers = { "Authorization": f"Bearer {TOKEN}", "Content-Type": "application/json" } data = { "estateId": ESTATE_ID, "consultantPhone": phone } response = requests.post(url, headers=headers, json=data) result = response.json() if result.get("code") == "0" and result.get("data"): return result["data"].get("records", []) return [] def download_audio(url, output_path): """下载音频文件""" try: response = requests.get(url, timeout=30) if response.status_code == 200: with open(output_path, 'wb') as f: f.write(response.content) return True except Exception as e: print(f" 下载失败: {e}") return False def main(): print("=== 言迹音频样本下载工具 ===") print(f"筛选条件: 时长 ≥ 10秒\n") # 创建目录 raw_dir = BASE_PATH / "原始文件" by_employee_dir = BASE_PATH / "按员工分类" by_duration_dir = BASE_PATH / "按时长分类" for dir_path in [raw_dir, by_employee_dir, by_duration_dir]: dir_path.mkdir(parents=True, exist_ok=True) total_downloaded = 0 total_size = 0 # 按员工下载 for name, phone in EMPLOYEES.items(): print(f"--- {name} ({phone}) ---") records = get_audio_list(phone) if not records: print(f" 无录音记录\n") continue # 筛选10秒以上的录音 filtered_records = [r for r in records if int(r.get('duration', 0)) >= 10000] print(f" 总录音: {len(records)}条, 10秒以上: {len(filtered_records)}条") if not filtered_records: print(f" 无符合条件的录音\n") continue # 下载前3条 download_count = 0 for i, record in enumerate(filtered_records[:3]): audio_id = record.get('id') duration_ms = int(record.get('duration', 0)) duration_sec = int(duration_ms / 1000) file_size = int(record.get('fileSize', 0)) file_url = record.get('fileUrl', '') start_time = record.get('startTime', '') if not file_url: continue # 生成文件名 date_part = start_time[:10] if start_time else 'unknown' filename = f"{name}_{duration_sec}秒_{date_part}_{i+1}.mp3" # 下载到原始文件夹 output_path = raw_dir / filename print(f" 下载 [{i+1}] {filename} ({file_size/1024:.1f}KB, {duration_sec}秒)...", end="") if download_audio(file_url, output_path): print(" ✅") download_count += 1 total_downloaded += 1 total_size += file_size # 创建员工分类的符号链接 employee_link = by_employee_dir / name employee_link.mkdir(exist_ok=True) # 创建时长分类的符号链接 if duration_sec < 20: duration_category = "10-20秒" elif duration_sec < 60: duration_category = "20-60秒" else: duration_category = "60秒以上" duration_link_dir = by_duration_dir / duration_category duration_link_dir.mkdir(exist_ok=True) else: print(" ❌") print(f" 本员工下载: {download_count}条\n") print(f"\n=== 下载完成 ===") print(f"总下载文件: {total_downloaded}个") print(f"总大小: {total_size/1024:.1f}KB ({total_size/1024/1024:.2f}MB)") # 统计文件 print(f"\n文件保存位置:") print(f" 原始文件: {raw_dir}") files = list(raw_dir.glob("*.mp3")) if files: print(f"\n已下载文件列表:") for i, file in enumerate(sorted(files), 1): size = file.stat().st_size / 1024 print(f" {i:2d}. {file.name} ({size:.1f}KB)") # 复制文件到分类文件夹 print(f"\n整理文件到分类文件夹...") for file in files: # 解析文件名获取员工名和时长 parts = file.stem.split('_') if len(parts) >= 2: employee_name = parts[0] duration_str = parts[1] # 按员工分类 employee_dir = by_employee_dir / employee_name employee_dir.mkdir(exist_ok=True) employee_file = employee_dir / file.name if not employee_file.exists(): import shutil shutil.copy2(file, employee_file) # 按时长分类 try: duration_sec = int(duration_str.replace('秒', '')) if duration_sec < 20: category = "10-20秒" elif duration_sec < 60: category = "20-60秒" else: category = "60秒以上" category_dir = by_duration_dir / category category_dir.mkdir(exist_ok=True) category_file = category_dir / file.name if not category_file.exists(): import shutil shutil.copy2(file, category_file) except: pass print("✅ 文件整理完成") # 显示目录结构 print(f"\n目录结构:") print(f"音频样本/") print(f"├── 原始文件/ ({len(list(raw_dir.glob('*.mp3')))}个文件)") print(f"├── 按员工分类/") for emp_dir in sorted(by_employee_dir.iterdir()): if emp_dir.is_dir(): count = len(list(emp_dir.glob('*.mp3'))) if count > 0: print(f"│ └── {emp_dir.name}/ ({count}个文件)") print(f"└── 按时长分类/") for dur_dir in sorted(by_duration_dir.iterdir()): if dur_dir.is_dir(): count = len(list(dur_dir.glob('*.mp3'))) if count > 0: print(f" └── {dur_dir.name}/ ({count}个文件)") if __name__ == "__main__": main()