| | """ |
| | LDF Model for Hugging Face Hub |
| | |
| | Usage: |
| | from transformers import AutoModel |
| | |
| | model = AutoModel.from_pretrained("ShandaAI/FloodDiffusion", trust_remote_code=True) |
| | motion = model("a person walking forward", length=60) |
| | """ |
| |
|
| | import torch |
| | from transformers import PretrainedConfig, PreTrainedModel |
| | from typing import Union, List, Optional |
| | import os |
| | import sys |
| |
|
| |
|
| | class LDFConfig(PretrainedConfig): |
| | """Configuration for LDF Motion Generation Model""" |
| | model_type = "ldf_motion" |
| | |
| | def __init__( |
| | self, |
| | input_dim=4, |
| | output_dim=263, |
| | **kwargs |
| | ): |
| | super().__init__(**kwargs) |
| | self.input_dim = input_dim |
| | self.output_dim = output_dim |
| |
|
| |
|
| | class LDFModel(PreTrainedModel): |
| | """ |
| | LDF Motion Generation Model |
| | |
| | This model generates motion sequences from text descriptions using Latent Diffusion Forcing. |
| | |
| | Example: |
| | >>> from transformers import AutoModel |
| | >>> model = AutoModel.from_pretrained("ShandaAI/FloodDiffusion", trust_remote_code=True) |
| | >>> motion = model("a person walking forward", length=60) |
| | >>> print(motion.shape) # (~240, 263) |
| | """ |
| | |
| | config_class = LDFConfig |
| | |
| | def __init__(self, config): |
| | super().__init__(config) |
| | self.config = config |
| | |
| | |
| | self.ldf_model = None |
| | self.vae = None |
| | self.model_dir = None |
| | |
| | def _load_models(self): |
| | """Load the actual LDF and VAE models""" |
| | if self.ldf_model is not None: |
| | return |
| | |
| | |
| | if hasattr(self, 'name_or_path') and os.path.exists(self.name_or_path): |
| | model_dir = self.name_or_path |
| | else: |
| | raise RuntimeError( |
| | "Model directory not found. Please use from_pretrained() to load the model." |
| | ) |
| | |
| | |
| | self.model_dir = model_dir |
| | |
| | |
| | if model_dir not in sys.path: |
| | sys.path.insert(0, model_dir) |
| | |
| | |
| | import importlib |
| | generate_ldf = importlib.import_module('generate_ldf') |
| | load_model_from_config = generate_ldf.load_model_from_config |
| | |
| | config_path = os.path.join(model_dir, "ldf.yaml") |
| | old_argv = sys.argv |
| | sys.argv = ['model', '--config', config_path] |
| | |
| | try: |
| | self.vae, self.ldf_model = load_model_from_config() |
| | |
| | |
| | device = next(self.parameters()).device if list(self.parameters()) else torch.device('cuda' if torch.cuda.is_available() else 'cpu') |
| | self.ldf_model = self.ldf_model.to(device) |
| | self.vae = self.vae.to(device) |
| | finally: |
| | sys.argv = old_argv |
| | |
| | @classmethod |
| | def from_pretrained(cls, pretrained_model_name_or_path, *model_args, **kwargs): |
| | """ |
| | Load pretrained model |
| | |
| | Args: |
| | pretrained_model_name_or_path: Model name or path |
| | trust_remote_code: Must be True to load this custom model |
| | **kwargs: Additional arguments |
| | |
| | Returns: |
| | LDFModel instance |
| | """ |
| | |
| | if not kwargs.get('trust_remote_code', False): |
| | raise ValueError( |
| | "Loading this model requires trust_remote_code=True. " |
| | "Usage: AutoModel.from_pretrained(..., trust_remote_code=True)" |
| | ) |
| | |
| | |
| | if not os.path.exists(pretrained_model_name_or_path): |
| | from huggingface_hub import snapshot_download |
| | model_path = snapshot_download(repo_id=pretrained_model_name_or_path) |
| | else: |
| | model_path = pretrained_model_name_or_path |
| | |
| | |
| | config = LDFConfig.from_pretrained(model_path) |
| | |
| | |
| | model = cls(config) |
| | model.name_or_path = model_path |
| | |
| | |
| | model._load_models() |
| | |
| | return model |
| | |
| | def forward( |
| | self, |
| | text: Union[str, List[str], List[List[str]]], |
| | length: Union[int, List[int]] = 60, |
| | text_end: Optional[Union[List[int], List[List[int]]]] = None, |
| | num_denoise_steps: Optional[int] = None, |
| | **kwargs |
| | ): |
| | """ |
| | Generate motion from text |
| | |
| | Args: |
| | text: Text description(s) |
| | length: Number of latent tokens (output frames ≈ length × 4) |
| | text_end: Transition points for multi-text |
| | num_denoise_steps: Number of denoising steps |
| | |
| | Returns: |
| | Generated motion sequence(s) |
| | """ |
| | return self.__call__(text, length, text_end, num_denoise_steps) |
| | |
| | @torch.no_grad() |
| | def __call__( |
| | self, |
| | text: Union[str, List[str], List[List[str]]], |
| | length: Union[int, List[int]] = 60, |
| | text_end: Optional[Union[List[int], List[List[int]]]] = None, |
| | num_denoise_steps: Optional[int] = None, |
| | output_joints: bool = False, |
| | smoothing_alpha: float = 1.0 |
| | ): |
| | """ |
| | Generate motion sequences |
| | |
| | Args: |
| | text: Text description |
| | - Single string: "walk" -> single sample |
| | - String list: ["walk", "run"] -> batch |
| | - Nested list: [["walk", "turn"], ["run", "jump"]] -> multi-text per sample |
| | length: Number of latent tokens (frames ≈ length × 4) |
| | text_end: Token positions for text switching |
| | num_denoise_steps: Number of denoising steps |
| | output_joints: If True, output 22×3 joint coordinates; if False (default), output 263-dim HumanML3D features |
| | smoothing_alpha: EMA smoothing factor for joint positions (0.0-1.0, default=1.0 no smoothing) |
| | - Only used when output_joints=True |
| | - Recommended: 0.5 for smoother animations |
| | |
| | Returns: |
| | numpy.ndarray or list of arrays |
| | - If output_joints=False: shape (frames, 263) |
| | - If output_joints=True: shape (frames, 22, 3) |
| | """ |
| | |
| | self._load_models() |
| | |
| | |
| | is_single = not isinstance(length, list) |
| | if is_single: |
| | text_batch = [text] |
| | length_batch = [length] |
| | text_end_batch = [text_end] if text_end is not None else None |
| | else: |
| | text_batch = text |
| | length_batch = length |
| | text_end_batch = text_end |
| | |
| | |
| | if text_end_batch is not None: |
| | for i, (txt, te) in enumerate(zip(text_batch, text_end_batch)): |
| | if isinstance(txt, list) and te is not None: |
| | if len(txt) != len(te): |
| | raise ValueError( |
| | f"Batch {i}: text has {len(txt)} segments but text_end has {len(te)} endpoints. " |
| | f"They must match! text={txt}, text_end={te}" |
| | ) |
| | |
| | batch_size = len(text_batch) |
| | |
| | |
| | x = {"feature_length": torch.tensor(length_batch), "text": text_batch} |
| | if text_end_batch is not None: |
| | x["feature_text_end"] = text_end_batch |
| | |
| | |
| | output = self.ldf_model.generate(x, num_denoise_steps=num_denoise_steps) |
| | generated_batch = output["generated"] |
| | |
| | |
| | decoded_results = [] |
| | joints_results = [] if output_joints else None |
| | |
| | |
| | if output_joints: |
| | import importlib.util |
| | import numpy as np |
| | utils_spec = importlib.util.spec_from_file_location( |
| | "motion_process", |
| | os.path.join(self.model_dir, "ldf_utils", "motion_process.py") |
| | ) |
| | motion_process_module = importlib.util.module_from_spec(utils_spec) |
| | utils_spec.loader.exec_module(motion_process_module) |
| | |
| | for i, generated in enumerate(generated_batch): |
| | if generated is not None and torch.is_tensor(generated): |
| | |
| | decoded_g = self.vae.decode(generated[None, :])[0] |
| | |
| | if output_joints: |
| | |
| | |
| | decoded_np = decoded_g.cpu().numpy() |
| | recovery = motion_process_module.StreamJointRecovery263( |
| | joints_num=22, smoothing_alpha=smoothing_alpha |
| | ) |
| | joints = [recovery.process_frame(frame) for frame in decoded_np] |
| | joints = np.array(joints) |
| | joints_results.append(joints) |
| | else: |
| | decoded_results.append(decoded_g.cpu().numpy()) |
| | else: |
| | if output_joints: |
| | joints_results.append(None) |
| | else: |
| | decoded_results.append(None) |
| | |
| | |
| | if output_joints: |
| | return joints_results[0] if is_single else joints_results |
| | else: |
| | return decoded_results[0] if is_single else decoded_results |
| | |
| | def generate(self, *args, **kwargs): |
| | """Alias for __call__ to match transformers API""" |
| | return self.__call__(*args, **kwargs) |
| |
|
| |
|
| | |
| | LDFPipeline = LDFModel |
| |
|
| |
|
| | |
| | try: |
| | from transformers import AutoModel, AutoConfig |
| | AutoConfig.register("ldf_motion", LDFConfig) |
| | AutoModel.register(LDFConfig, LDFModel) |
| | except: |
| | pass |
| |
|