| | import gc |
| | import traceback |
| | from typing import Optional |
| |
|
| | import torch |
| | import ftfy |
| | import sentencepiece |
| | from transformers import AutoModelForCausalLM, AutoTokenizer |
| |
|
| |
|
| | class TextProcessor: |
| | """ |
| | Semantic expansion engine using Qwen2.5-0.5B. |
| | Transforms user inputs into motion-rich prompts for video generation. |
| | """ |
| |
|
| | MODEL_ID = "Qwen/Qwen2.5-0.5B-Instruct" |
| | MAX_OUTPUT_LENGTH = 100 |
| |
|
| | def __init__(self, resource_manager: Optional[object] = None): |
| | """ |
| | Initialize TextProcessor with optional resource management. |
| | |
| | Args: |
| | resource_manager: Optional resource manager instance |
| | """ |
| | self.resource_manager = resource_manager |
| |
|
| | |
| | if resource_manager is not None: |
| | self.device = resource_manager.get_device() |
| | else: |
| | self.device = "cuda" if torch.cuda.is_available() else "cpu" |
| |
|
| | self.model: Optional[AutoModelForCausalLM] = None |
| | self.tokenizer: Optional[AutoTokenizer] = None |
| | self.is_loaded = False |
| |
|
| | def load_model(self) -> None: |
| | """Load Qwen model and tokenizer.""" |
| | if self.is_loaded: |
| | print("⚠ TextProcessor already loaded, skipping...") |
| | return |
| |
|
| | try: |
| | print("→ Loading Qwen2.5-0.5B-Instruct...") |
| |
|
| | self.tokenizer = AutoTokenizer.from_pretrained( |
| | self.MODEL_ID, |
| | trust_remote_code=True |
| | ) |
| |
|
| | self.model = AutoModelForCausalLM.from_pretrained( |
| | self.MODEL_ID, |
| | torch_dtype=torch.bfloat16, |
| | device_map="auto", |
| | trust_remote_code=True |
| | ) |
| |
|
| | if self.resource_manager is not None: |
| | self.resource_manager.register_model("TextProcessor", self.model) |
| |
|
| | self.is_loaded = True |
| | print("✓ TextProcessor loaded successfully") |
| |
|
| | except Exception as e: |
| | print(f"✗ Error loading TextProcessor: {str(e)}") |
| | raise |
| |
|
| | def unload_model(self) -> None: |
| | """Unload model and free GPU memory.""" |
| | if not self.is_loaded: |
| | return |
| |
|
| | try: |
| | if self.model is not None: |
| | self.model.to('cpu') |
| | del self.model |
| | self.model = None |
| |
|
| | if self.tokenizer is not None: |
| | del self.tokenizer |
| | self.tokenizer = None |
| |
|
| | if self.resource_manager is not None: |
| | self.resource_manager.unregister_model("TextProcessor") |
| | self.resource_manager.clear_cache(aggressive=True) |
| | else: |
| | gc.collect() |
| | if torch.cuda.is_available(): |
| | torch.cuda.empty_cache() |
| |
|
| | self.is_loaded = False |
| | print("✓ TextProcessor unloaded") |
| |
|
| | except Exception as e: |
| | print(f"⚠ Error during TextProcessor unload: {str(e)}") |
| |
|
| | def expand_prompt(self, user_input: str) -> str: |
| | """ |
| | Convert user's brief instruction into detailed motion description. |
| | |
| | Args: |
| | user_input: User's original instruction |
| | |
| | Returns: |
| | str: Expanded prompt for video generation (≤50 words) |
| | """ |
| | if not self.is_loaded: |
| | raise RuntimeError("TextProcessor not loaded. Call load_model() first.") |
| |
|
| | system_prompt = """You are a motion description expert. Convert the user's brief instruction into a detailed, dynamic prompt for video generation. |
| | |
| | Focus on: |
| | - Camera movements (pan, zoom, tilt, tracking) |
| | - Subject actions and motions |
| | - Scene dynamics and atmosphere |
| | - Temporal flow and transitions |
| | |
| | Keep output under 50 words. Use vivid, cinematic language. English only.""" |
| |
|
| | try: |
| | messages = [ |
| | {"role": "system", "content": system_prompt}, |
| | {"role": "user", "content": user_input} |
| | ] |
| |
|
| | text = self.tokenizer.apply_chat_template( |
| | messages, |
| | tokenize=False, |
| | add_generation_prompt=True |
| | ) |
| |
|
| | model_inputs = self.tokenizer([text], return_tensors="pt").to(self.device) |
| |
|
| | with torch.no_grad(): |
| | generated_ids = self.model.generate( |
| | **model_inputs, |
| | max_new_tokens=self.MAX_OUTPUT_LENGTH, |
| | do_sample=True, |
| | temperature=0.7, |
| | top_p=0.9, |
| | repetition_penalty=1.1 |
| | ) |
| |
|
| | generated_ids = [ |
| | output_ids[len(input_ids):] |
| | for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids) |
| | ] |
| |
|
| | expanded_prompt = self.tokenizer.batch_decode( |
| | generated_ids, |
| | skip_special_tokens=True |
| | )[0].strip() |
| |
|
| | |
| | words = expanded_prompt.split() |
| | if len(words) > 50: |
| | expanded_prompt = " ".join(words[:50]) + "..." |
| |
|
| | print(f"✓ Prompt expanded: '{user_input}' → '{expanded_prompt}'") |
| | return expanded_prompt |
| |
|
| | except Exception as e: |
| | print(f"✗ Error during prompt expansion: {str(e)}") |
| | return user_input |
| |
|
| | def process(self, user_input: str, auto_unload: bool = True) -> str: |
| | """ |
| | Main processing pipeline: load → expand → (optionally unload). |
| | |
| | Args: |
| | user_input: User's instruction |
| | auto_unload: Whether to unload model after processing |
| | |
| | Returns: |
| | str: Expanded prompt |
| | """ |
| | try: |
| | if not self.is_loaded: |
| | self.load_model() |
| |
|
| | expanded = self.expand_prompt(user_input) |
| |
|
| | if auto_unload: |
| | self.unload_model() |
| |
|
| | return expanded |
| |
|
| | except Exception as e: |
| | print(f"✗ TextProcessor pipeline error: {str(e)}") |
| | return user_input |
| |
|