| |
| """ |
| MAC OS X INSTALL: pip3 install torch==2.1.1 torchvision torchaudio transformers==4.48.0 accelerate==0.28.0 (You must use these versions, higher version have some numerical instability bug on MPS chips) |
| Interactive model evaluation script for pretraining experiments. |
| Automatically discovers and loads all models with /hf subdirectories. |
| """ |
|
|
| import os |
| import glob |
| from pathlib import Path |
| from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline |
| import torch |
| import warnings |
|
|
| |
| warnings.filterwarnings("ignore") |
|
|
| MODEL_NAME_FILTER = None |
|
|
| class ModelEvaluator: |
| def __init__(self): |
| self.models = {} |
| self.tokenizers = {} |
| self.pipelines = {} |
| self.model_names = [] |
| |
| def discover_models(self): |
| """Discover all models with /hf subdirectories.""" |
| print("๐ Discovering models with /hf subdirectories...") |
| |
| |
| hf_dirs = [] |
| for item in os.listdir('.'): |
| if os.path.isdir(item) and os.path.exists(os.path.join(item, 'hf')): |
| if MODEL_NAME_FILTER is None or MODEL_NAME_FILTER in item: |
| hf_dirs.append(item) |
| |
| if not hf_dirs: |
| print("โ No models with /hf subdirectories found!") |
| return False |
| |
| print(f"โ
Found {len(hf_dirs)} models:") |
| for model_dir in hf_dirs: |
| print(f" - {model_dir}") |
| return hf_dirs |
| |
| def load_model(self, model_dir): |
| """Load a single model and its tokenizer.""" |
| try: |
| hf_path = os.path.join(model_dir, 'hf') |
| print(f"๐ Loading {model_dir}...") |
| |
| |
| tokenizer = AutoTokenizer.from_pretrained(hf_path) |
| if tokenizer.pad_token is None: |
| tokenizer.pad_token = tokenizer.eos_token |
| |
| |
| model = AutoModelForCausalLM.from_pretrained( |
| hf_path, |
| device_map=None, |
| torch_dtype=torch.float16, |
| trust_remote_code=True |
| ) |
| model = model.to(torch.float16) |
| if torch.cuda.is_available(): |
| model.to("cuda:0") |
| else: |
| model.to("mps") |
| |
| |
| if "chat" in model_dir.lower() or "sft" in model_dir.lower(): |
| pipe = pipeline( |
| "text-generation", |
| model=model, |
| tokenizer=tokenizer, |
| device_map="auto", |
| torch_dtype=torch.float16 |
| ) |
| print(f" ๐ Using conversational pipeline for chat model") |
| else: |
| pipe = pipeline( |
| "text-generation", |
| model=model, |
| tokenizer=tokenizer, |
| device_map="auto", |
| torch_dtype=torch.float16 |
| ) |
| print(f" ๐ Using text-generation pipeline") |
| |
| self.models[model_dir] = model |
| self.tokenizers[model_dir] = tokenizer |
| self.pipelines[model_dir] = pipe |
| self.model_names.append(model_dir) |
| |
| print(f" โ
{model_dir} loaded successfully") |
| return True |
| |
| except Exception as e: |
| print(f" โ Failed to load {model_dir}: {str(e)}") |
| return False |
| |
| def load_all_models(self): |
| """Load all discovered models.""" |
| hf_dirs = self.discover_models() |
| if not hf_dirs: |
| return False |
| |
| print("\n๐ Loading models...") |
| successful_loads = 0 |
| |
| for model_dir in hf_dirs: |
| if self.load_model(model_dir): |
| successful_loads += 1 |
| |
| print(f"\n๐ Loaded {successful_loads}/{len(hf_dirs)} models successfully") |
| return successful_loads > 0 |
| |
| def generate_response(self, model_name, prompt, max_length=256): |
| """Generate response for a specific model.""" |
| try: |
| pipe = self.pipelines[model_name] |
| |
| |
| if "chat" in model_name.lower() or "sft" in model_name.lower(): |
| |
| chat_input = [{"role": "user", "content": prompt}] |
| outputs = pipe( |
| chat_input, |
| max_new_tokens=max_length, |
| do_sample=True, |
| temperature=0.7, |
| top_p=0.9, |
| repetition_penalty=1.1, |
| pad_token_id=self.tokenizers[model_name].eos_token_id |
| ) |
| |
| if outputs and len(outputs) > 0: |
| |
| |
| conversation = outputs[0]['generated_text'] |
| if isinstance(conversation, list) and len(conversation) > 1: |
| |
| for message in reversed(conversation): |
| if message.get('role') == 'assistant': |
| return message.get('content', 'No response generated') |
| |
| return conversation[-1].get('content', 'No response generated') |
| else: |
| return str(conversation) |
| else: |
| return "No response generated" |
| else: |
| |
| outputs = pipe( |
| prompt, |
| max_new_tokens=max_length, |
| do_sample=True, |
| temperature=0.7, |
| top_p=0.9, |
| pad_token_id=self.tokenizers[model_name].eos_token_id, |
| return_full_text=False |
| ) |
| |
| return outputs[0]['generated_text'] |
| |
| except Exception as e: |
| return f"โ Generation failed: {str(e)}" |
| |
| def evaluate_prompt(self, prompt): |
| """Evaluate a prompt across all loaded models.""" |
| print(f"\n๐ฏ Evaluating prompt: '{prompt}'") |
| print("=" * 80) |
| |
| for model_name in self.model_names: |
| print(f"\n๐ค {model_name}:") |
| print("-" * 40) |
| |
| response = self.generate_response(model_name, prompt) |
| print(response) |
| |
| print("\n" + "=" * 80) |
| |
| def interactive_loop(self): |
| """Main interactive evaluation loop.""" |
| print("\n๐ฎ Interactive Evaluation Mode") |
| print("Commands:") |
| print(" - Type your prompt to evaluate all models") |
| print(" - Type 'quit' or 'exit' to end") |
| print(" - Type 'help' for this message") |
| print(" - Type 'models' to list loaded models") |
| print(" - Type 'clear' to clear screen") |
| print("\n๐ก Note: Models with 'chat' in their name use conversational pipeline,") |
| print(" other models use text-generation pipeline.") |
| |
| while True: |
| try: |
| user_input = input("\n๐ฌ Enter prompt (or command): ").strip() |
| |
| if not user_input: |
| continue |
| |
| if user_input.lower() in ['quit', 'exit', 'q']: |
| print("๐ Goodbye!") |
| break |
| |
| elif user_input.lower() == 'help': |
| print("\n๐ฎ Interactive Evaluation Mode") |
| print("Commands:") |
| print(" - Type your prompt to evaluate all models") |
| print(" - Type 'quit' or 'exit' to end") |
| print(" - Type 'help' for this message") |
| print(" - Type 'models' to list loaded models") |
| print(" - Type 'clear' to clear screen") |
| print("\n๐ก Note: Models with 'chat' in their name use conversational pipeline,") |
| print(" other models use text-generation pipeline.") |
| |
| elif user_input.lower() == 'models': |
| print(f"\n๐ Loaded models ({len(self.model_names)}):") |
| for i, model_name in enumerate(self.model_names, 1): |
| print(f" {i}. {model_name}") |
| |
| elif user_input.lower() == 'clear': |
| os.system('clear' if os.name == 'posix' else 'cls') |
| |
| else: |
| self.evaluate_prompt(user_input) |
| |
| except KeyboardInterrupt: |
| print("\n\n๐ Goodbye!") |
| break |
| except Exception as e: |
| print(f"โ Error: {str(e)}") |
|
|
| def main(): |
| print("๐ Model Evaluation Script") |
| print("=" * 50) |
| |
| evaluator = ModelEvaluator() |
| |
| |
| if not evaluator.load_all_models(): |
| print("โ No models could be loaded. Exiting.") |
| return |
| |
| |
| evaluator.interactive_loop() |
|
|
| if __name__ == "__main__": |
| main() |