Spaces:
Running
Running
| from __future__ import annotations | |
| import json | |
| import os | |
| import shutil | |
| import subprocess | |
| import tempfile | |
| from pathlib import Path | |
| from typing import Iterable, List, Tuple | |
| def read_hf_token(token: str | None = None, key_path: str | Path = "hugging_face_key.txt") -> str: | |
| """Ưu tiên token truyền vào, nếu không thì đọc từ biến môi trường hoặc file.""" | |
| candidates = [ | |
| token, | |
| os.getenv("HF_TOKEN"), # HF Spaces standard secret name | |
| os.getenv("HUGGINGFACE_TOKEN"), | |
| os.getenv("HUGGINGFACE_ACCESS_TOKEN"), | |
| ] | |
| for value in candidates: | |
| if value and value.strip(): | |
| return value.strip() | |
| path = Path(key_path) | |
| if not path.exists(): | |
| raise FileNotFoundError( | |
| f"Không tìm thấy token. Truyền biến --hf-token hoặc đặt file {path}." | |
| ) | |
| content = path.read_text(encoding="utf-8").strip() | |
| if not content: | |
| raise ValueError(f"File {path} trống, hãy dán Hugging Face access token vào.") | |
| return content | |
| def ensure_audio_path(audio_path: str | Path) -> Path: | |
| """Kiểm tra đường dẫn audio hợp lệ.""" | |
| path = Path(audio_path) | |
| if not path.exists(): | |
| raise FileNotFoundError(f"Không tìm thấy file âm thanh: {path}") | |
| if not path.is_file(): | |
| raise ValueError(f"Đường dẫn không phải file: {path}") | |
| return path | |
| def export_segments_json(segments: Iterable[dict], output_path: str | Path) -> Path: | |
| """Lưu danh sách segment thành JSON.""" | |
| path = Path(output_path) | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| data: List[dict] = list(segments) | |
| path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") | |
| return path | |
| def seconds_to_mmss(seconds: float) -> str: | |
| total_seconds = int(round(seconds)) | |
| minutes, sec = divmod(total_seconds, 60) | |
| return f"{minutes:02d}:{sec:02d}" | |
| def format_segments_table(segments: Iterable[dict]) -> str: | |
| """Trả về chuỗi bảng đơn giản để in ra terminal.""" | |
| lines = [] | |
| for idx, seg in enumerate(segments, start=1): | |
| start = seg.get("start", 0.0) | |
| end = seg.get("end", 0.0) | |
| speaker = seg.get("speaker", "unknown") | |
| lines.append( | |
| f"{idx:02d} | {seconds_to_mmss(start)} -> {seconds_to_mmss(end)} | speaker {speaker}" | |
| ) | |
| return "\n".join(lines) | |
| def merge_adjacent_segments( | |
| segments: list[dict], | |
| max_gap: float = 0.5, | |
| min_duration: float = 1.0, | |
| ) -> list[dict]: | |
| """ | |
| Ghép các đoạn liên tiếp cùng speaker nếu khoảng trống <= max_gap (giây). | |
| Đồng thời lọc bỏ đoạn quá ngắn (< min_duration). | |
| """ | |
| if not segments: | |
| return [] | |
| merged: list[dict] = [] | |
| # đảm bảo sắp xếp theo thời gian | |
| segs = sorted(segments, key=lambda s: s.get("start", 0.0)) | |
| current = segs[0].copy() | |
| for seg in segs[1:]: | |
| if ( | |
| seg.get("speaker") == current.get("speaker") | |
| and seg.get("start", 0.0) - current.get("end", 0.0) <= max_gap | |
| ): | |
| current["end"] = max(current.get("end", 0.0), seg.get("end", 0.0)) | |
| else: | |
| if current.get("end", 0.0) - current.get("start", 0.0) >= min_duration: | |
| merged.append(current) | |
| current = seg.copy() | |
| if current.get("end", 0.0) - current.get("start", 0.0) >= min_duration: | |
| merged.append(current) | |
| return merged | |
| def convert_to_wav_16k(audio_path: Path) -> Tuple[Path, Path | None]: | |
| """ | |
| Chuyển audio về WAV mono 16 kHz bằng ffmpeg. | |
| Trả về (đường dẫn dùng để suy luận, thư mục tạm để dọn dẹp hoặc None nếu không cần). | |
| """ | |
| safe_stem = audio_path.stem.replace(" ", "_") | |
| tmpdir = Path(tempfile.mkdtemp(prefix="diarization_audio_")) | |
| if not shutil.which("ffmpeg"): | |
| # Sao chép tệp hiện có vào thư mục tạm với tên không dấu cách | |
| output = tmpdir / audio_path.name.replace(" ", "_") | |
| shutil.copy2(audio_path, output) | |
| return output, tmpdir | |
| output = tmpdir / f"{safe_stem}_16k.wav" | |
| cmd = [ | |
| "ffmpeg", | |
| "-y", | |
| "-i", | |
| str(audio_path), | |
| "-ac", | |
| "1", | |
| "-ar", | |
| "16000", | |
| "-vn", | |
| "-f", | |
| "wav", | |
| str(output), | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, text=True) | |
| if result.returncode != 0: | |
| stderr = result.stderr.strip() | |
| raise RuntimeError(f"ffmpeg convert thất bại: {stderr}") | |
| if not output.exists(): | |
| raise RuntimeError("ffmpeg không tạo được file WAV.") | |
| return output, tmpdir | |
| def download_audio_from_url(url: str) -> Tuple[Path, Path]: | |
| """ | |
| Tải audio từ YouTube/TikTok/... dùng yt-dlp, xuất WAV để xử lý tiếp. | |
| Trả về (đường dẫn file wav, thư mục tạm chứa file). | |
| """ | |
| if not shutil.which("yt-dlp"): | |
| raise RuntimeError("Cần cài yt-dlp để tải liên kết (pip install yt-dlp).") | |
| if not shutil.which("ffmpeg"): | |
| raise RuntimeError("Cần cài ffmpeg để chuyển đổi audio.") | |
| tmpdir = Path(tempfile.mkdtemp(prefix="download_media_")) | |
| out_tmpl = tmpdir / "%(title)s.%(ext)s" | |
| cmd = [ | |
| "yt-dlp", | |
| "-x", | |
| "--audio-format", | |
| "wav", | |
| "--audio-quality", | |
| "0", | |
| "-o", | |
| str(out_tmpl), | |
| url, | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, text=True) | |
| if result.returncode != 0: | |
| stderr = result.stderr.strip() | |
| # Check if it's a network error | |
| if "Failed to resolve" in stderr or "No address associated with hostname" in stderr: | |
| raise RuntimeError( | |
| "Không thể kết nối internet để tải video. " | |
| "Hugging Face Spaces (free tier) không hỗ trợ download từ URL. " | |
| "Vui lòng tải file audio trực tiếp thay vì dùng URL." | |
| ) | |
| raise RuntimeError(f"Tải audio thất bại: {stderr}") | |
| wav_files = list(tmpdir.glob("*.wav")) | |
| if not wav_files: | |
| raise RuntimeError("Không tìm thấy file WAV sau khi tải.") | |
| return wav_files[0], tmpdir | |