| |
| |
| |
| import logging |
| import os |
| import torch |
| from typing import Optional, Dict, Any |
| from transformers import AutoModelForCausalLM, AutoTokenizer, AutoModel |
| from sentence_transformers import SentenceTransformer |
|
|
| |
| try: |
| from huggingface_hub.exceptions import GatedRepoError |
| from huggingface_hub import login as hf_login |
| except ImportError: |
| |
| GatedRepoError = Exception |
| hf_login = None |
|
|
| |
| try: |
| from .config import settings |
| except ImportError: |
| try: |
| from config import settings |
| except ImportError: |
| settings = None |
|
|
| logger = logging.getLogger(__name__) |
|
|
| class LocalModelLoader: |
| """ |
| Loads and manages models locally on GPU for faster inference. |
| Optimized for NVIDIA T4 Medium with 16GB VRAM using 4-bit quantization. |
| """ |
| |
| def __init__(self, device: Optional[str] = None): |
| """Initialize the model loader with GPU device detection.""" |
| |
| if device is None: |
| if torch.cuda.is_available(): |
| self.device = "cuda" |
| self.device_name = torch.cuda.get_device_name(0) |
| logger.info(f"GPU detected: {self.device_name}") |
| logger.info(f"GPU Memory: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.2f} GB") |
| else: |
| self.device = "cpu" |
| self.device_name = "CPU" |
| logger.warning("No GPU detected, using CPU") |
| else: |
| self.device = device |
| self.device_name = device |
| |
| |
| if settings: |
| self.cache_dir = settings.hf_cache_dir |
| self.hf_token = settings.hf_token |
| else: |
| |
| self.cache_dir = os.getenv("HF_HOME") or os.getenv("TRANSFORMERS_CACHE") or "/tmp/huggingface" |
| self.hf_token = os.getenv("HF_TOKEN", "") |
| |
| |
| os.makedirs(self.cache_dir, exist_ok=True) |
| |
| |
| if not os.getenv("HF_HOME"): |
| os.environ["HF_HOME"] = self.cache_dir |
| if not os.getenv("TRANSFORMERS_CACHE"): |
| os.environ["TRANSFORMERS_CACHE"] = self.cache_dir |
| |
| logger.info(f"Cache directory: {self.cache_dir}") |
| |
| |
| if self.hf_token and hf_login: |
| try: |
| hf_login(token=self.hf_token, add_to_git_credential=False) |
| logger.info("✓ HF_TOKEN authenticated for gated model access") |
| except Exception as e: |
| logger.warning(f"HF_TOKEN login failed (may not be needed): {e}") |
| |
| |
| self.loaded_models: Dict[str, Any] = {} |
| self.loaded_tokenizers: Dict[str, Any] = {} |
| self.loaded_embedding_models: Dict[str, Any] = {} |
| |
| def load_chat_model(self, model_id: str, load_in_8bit: bool = False, load_in_4bit: bool = False) -> tuple: |
| """ |
| Load a chat model and tokenizer on GPU. |
| |
| Args: |
| model_id: HuggingFace model identifier |
| load_in_8bit: Use 8-bit quantization (saves memory) |
| load_in_4bit: Use 4-bit quantization (saves more memory) |
| |
| Returns: |
| Tuple of (model, tokenizer) |
| """ |
| if model_id in self.loaded_models: |
| logger.info(f"Model {model_id} already loaded, reusing") |
| return self.loaded_models[model_id], self.loaded_tokenizers[model_id] |
| |
| try: |
| logger.info(f"Loading model {model_id} on {self.device}...") |
| |
| |
| |
| base_model_id = model_id.split(':')[0] if ':' in model_id else model_id |
| if base_model_id != model_id: |
| logger.info(f"Stripping API suffix from {model_id}, using base model: {base_model_id}") |
| |
| |
| |
| try: |
| tokenizer = AutoTokenizer.from_pretrained( |
| base_model_id, |
| cache_dir=self.cache_dir, |
| token=self.hf_token if self.hf_token else None, |
| trust_remote_code=True |
| ) |
| except Exception as e: |
| |
| error_str = str(e).lower() |
| if "gated" in error_str or "authorized" in error_str or "access" in error_str: |
| |
| try: |
| from huggingface_hub.exceptions import GatedRepoError as RealGatedRepoError |
| if isinstance(e, RealGatedRepoError): |
| logger.error(f"❌ Gated Repository Error: Cannot access gated repo {base_model_id}") |
| logger.error(f" Access to model {base_model_id} is restricted and you are not in the authorized list.") |
| logger.error(f" Visit https://huggingface.co/{base_model_id} to request access.") |
| logger.error(f" Error details: {e}") |
| raise RealGatedRepoError( |
| f"Cannot access gated repository {base_model_id}. " |
| f"Visit https://huggingface.co/{base_model_id} to request access." |
| ) from e |
| except ImportError: |
| pass |
| |
| |
| raise |
| |
| |
| if load_in_4bit and self.device == "cuda": |
| try: |
| from transformers import BitsAndBytesConfig |
| quantization_config = BitsAndBytesConfig( |
| load_in_4bit=True, |
| bnb_4bit_compute_dtype=torch.float16, |
| bnb_4bit_use_double_quant=True, |
| bnb_4bit_quant_type="nf4" |
| ) |
| logger.info("Using 4-bit quantization") |
| except ImportError: |
| logger.warning("bitsandbytes not available, loading without quantization") |
| quantization_config = None |
| elif load_in_8bit and self.device == "cuda": |
| try: |
| quantization_config = {"load_in_8bit": True} |
| logger.info("Using 8-bit quantization") |
| except: |
| quantization_config = None |
| else: |
| quantization_config = None |
| |
| |
| |
| load_kwargs = { |
| "cache_dir": self.cache_dir, |
| "token": self.hf_token if self.hf_token else None, |
| "trust_remote_code": True |
| } |
| |
| if self.device == "cuda": |
| |
| |
| load_kwargs.update({ |
| "torch_dtype": torch.float16, |
| }) |
| |
| |
| |
| |
| model = None |
| quantization_failed = False |
| |
| if quantization_config and self.device == "cuda": |
| try: |
| if isinstance(quantization_config, dict): |
| load_kwargs.update(quantization_config) |
| else: |
| load_kwargs["quantization_config"] = quantization_config |
| |
| |
| load_kwargs["device_map"] = "auto" |
| |
| model = AutoModelForCausalLM.from_pretrained( |
| base_model_id, |
| **load_kwargs |
| ) |
| logger.info("✓ Model loaded with quantization") |
| except (RuntimeError, ModuleNotFoundError, ImportError) as e: |
| error_str = str(e).lower() |
| |
| if "bitsandbytes" in error_str or "int8_mm_dequant" in error_str or "validate_bnb_backend" in error_str: |
| logger.warning(f"⚠ BitsAndBytes error detected: {e}") |
| logger.warning("⚠ Falling back to loading without quantization") |
| quantization_failed = True |
| |
| load_kwargs.pop("quantization_config", None) |
| load_kwargs.pop("load_in_8bit", None) |
| load_kwargs.pop("load_in_4bit", None) |
| else: |
| |
| raise |
| |
| |
| if model is None: |
| try: |
| if self.device == "cuda": |
| |
| |
| model = AutoModelForCausalLM.from_pretrained( |
| base_model_id, |
| **load_kwargs |
| ) |
| |
| model = model.to(self.device) |
| logger.info(f"✓ Model loaded without quantization on {self.device}") |
| else: |
| load_kwargs.update({ |
| "torch_dtype": torch.float32, |
| }) |
| model = AutoModelForCausalLM.from_pretrained( |
| base_model_id, |
| **load_kwargs |
| ) |
| model = model.to(self.device) |
| except Exception as e: |
| |
| error_str = str(e).lower() |
| if "bitsandbytes" in error_str or "int8_mm_dequant" in error_str: |
| |
| logger.error(f"❌ Unexpected BitsAndBytes error: {e}") |
| raise RuntimeError(f"BitsAndBytes compatibility issue: {e}") from e |
| |
| |
| try: |
| from huggingface_hub.exceptions import GatedRepoError as RealGatedRepoError |
| if isinstance(e, RealGatedRepoError) or "gated" in error_str or "authorized" in error_str: |
| logger.error(f"❌ Gated Repository Error: Cannot access gated repo {base_model_id}") |
| logger.error(f" Access to model {base_model_id} is restricted and you are not in the authorized list.") |
| logger.error(f" Visit https://huggingface.co/{base_model_id} to request access.") |
| logger.error(f" Error details: {e}") |
| raise RealGatedRepoError( |
| f"Cannot access gated repository {base_model_id}. " |
| f"Visit https://huggingface.co/{base_model_id} to request access." |
| ) from e |
| except ImportError: |
| pass |
| |
| |
| raise |
| |
| |
| if tokenizer.pad_token is None: |
| tokenizer.pad_token = tokenizer.eos_token |
| |
| |
| self.loaded_models[model_id] = model |
| self.loaded_tokenizers[model_id] = tokenizer |
| |
| |
| if self.device == "cuda": |
| allocated = torch.cuda.memory_allocated(0) / 1024**3 |
| reserved = torch.cuda.memory_reserved(0) / 1024**3 |
| logger.info(f"GPU Memory - Allocated: {allocated:.2f} GB, Reserved: {reserved:.2f} GB") |
| |
| logger.info(f"✓ Model {model_id} (base: {base_model_id}) loaded successfully on {self.device}") |
| return model, tokenizer |
| |
| except GatedRepoError: |
| |
| raise |
| except Exception as e: |
| logger.error(f"Error loading model {model_id}: {e}", exc_info=True) |
| raise |
| |
| def load_embedding_model(self, model_id: str) -> SentenceTransformer: |
| """ |
| Load a sentence transformer model for embeddings. |
| |
| Args: |
| model_id: HuggingFace model identifier |
| |
| Returns: |
| SentenceTransformer model |
| """ |
| if model_id in self.loaded_embedding_models: |
| logger.info(f"Embedding model {model_id} already loaded, reusing") |
| return self.loaded_embedding_models[model_id] |
| |
| try: |
| logger.info(f"Loading embedding model {model_id}...") |
| |
| |
| base_model_id = model_id.split(':')[0] if ':' in model_id else model_id |
| if base_model_id != model_id: |
| logger.info(f"Stripping API suffix from {model_id}, using base model: {base_model_id}") |
| |
| |
| |
| |
| try: |
| model = SentenceTransformer( |
| base_model_id, |
| device=self.device |
| ) |
| except GatedRepoError as e: |
| logger.error(f"❌ Gated Repository Error: Cannot access gated repo {base_model_id}") |
| logger.error(f" Access to model {base_model_id} is restricted and you are not in the authorized list.") |
| logger.error(f" Visit https://huggingface.co/{base_model_id} to request access.") |
| logger.error(f" Error details: {e}") |
| raise GatedRepoError( |
| f"Cannot access gated repository {base_model_id}. " |
| f"Visit https://huggingface.co/{base_model_id} to request access." |
| ) from e |
| |
| |
| self.loaded_embedding_models[model_id] = model |
| |
| logger.info(f"✓ Embedding model {model_id} (base: {base_model_id}) loaded successfully on {self.device}") |
| return model |
| |
| except GatedRepoError: |
| |
| raise |
| except Exception as e: |
| logger.error(f"Error loading embedding model {model_id}: {e}", exc_info=True) |
| raise |
| |
| def generate_text( |
| self, |
| model_id: str, |
| prompt: str, |
| max_tokens: int = 512, |
| temperature: float = 0.7, |
| **kwargs |
| ) -> str: |
| """ |
| Generate text using a loaded chat model. |
| |
| Args: |
| model_id: Model identifier |
| prompt: Input prompt |
| max_tokens: Maximum tokens to generate |
| temperature: Sampling temperature |
| |
| Returns: |
| Generated text |
| """ |
| if model_id not in self.loaded_models: |
| raise ValueError(f"Model {model_id} not loaded. Call load_chat_model() first.") |
| |
| model = self.loaded_models[model_id] |
| tokenizer = self.loaded_tokenizers[model_id] |
| |
| try: |
| |
| inputs = tokenizer(prompt, return_tensors="pt").to(self.device) |
| |
| |
| generation_kwargs = { |
| "max_new_tokens": max_tokens, |
| "temperature": temperature, |
| "do_sample": True, |
| "pad_token_id": tokenizer.pad_token_id, |
| "eos_token_id": tokenizer.eos_token_id, |
| } |
| |
| |
| |
| if "phi" in model_id.lower() or "phi3" in model_id.lower() or "phi-3" in model_id.lower(): |
| |
| generation_kwargs["use_cache"] = False |
| logger.debug(f"Using use_cache=False for Phi-3 model to avoid DynamicCache compatibility issues") |
| |
| |
| generation_kwargs.update(kwargs) |
| |
| |
| with torch.no_grad(): |
| outputs = model.generate( |
| **inputs, |
| **generation_kwargs |
| ) |
| |
| |
| generated_text = tokenizer.decode(outputs[0], skip_special_tokens=True) |
| |
| |
| if generated_text.startswith(prompt): |
| generated_text = generated_text[len(prompt):].strip() |
| |
| return generated_text |
| |
| except AttributeError as e: |
| |
| if "seen_tokens" in str(e) or "DynamicCache" in str(e): |
| logger.warning(f"DynamicCache compatibility issue detected ({e}), retrying without cache") |
| try: |
| |
| with torch.no_grad(): |
| outputs = model.generate( |
| **inputs, |
| max_new_tokens=max_tokens, |
| temperature=temperature, |
| do_sample=True, |
| use_cache=False, |
| pad_token_id=tokenizer.pad_token_id, |
| eos_token_id=tokenizer.eos_token_id, |
| **{k: v for k, v in kwargs.items() if k != "use_cache"} |
| ) |
| generated_text = tokenizer.decode(outputs[0], skip_special_tokens=True) |
| if generated_text.startswith(prompt): |
| generated_text = generated_text[len(prompt):].strip() |
| logger.info("✓ Generation successful after DynamicCache workaround") |
| return generated_text |
| except Exception as retry_error: |
| logger.error(f"Retry without cache also failed: {retry_error}", exc_info=True) |
| raise RuntimeError(f"Generation failed even with cache disabled: {retry_error}") from retry_error |
| |
| raise |
| except Exception as e: |
| logger.error(f"Error generating text: {e}", exc_info=True) |
| raise |
| |
| def generate_chat_completion( |
| self, |
| model_id: str, |
| messages: list, |
| max_tokens: int = 512, |
| temperature: float = 0.7, |
| **kwargs |
| ) -> str: |
| """ |
| Generate chat completion using a loaded model. |
| |
| Args: |
| model_id: Model identifier |
| messages: List of message dicts with 'role' and 'content' |
| max_tokens: Maximum tokens to generate |
| temperature: Sampling temperature |
| |
| Returns: |
| Generated response |
| """ |
| if model_id not in self.loaded_models: |
| raise ValueError(f"Model {model_id} not loaded. Call load_chat_model() first.") |
| |
| model = self.loaded_models[model_id] |
| tokenizer = self.loaded_tokenizers[model_id] |
| |
| try: |
| |
| if hasattr(tokenizer, 'apply_chat_template'): |
| |
| prompt = tokenizer.apply_chat_template( |
| messages, |
| tokenize=False, |
| add_generation_prompt=True |
| ) |
| else: |
| |
| prompt = "\n".join([ |
| f"{msg['role']}: {msg['content']}" |
| for msg in messages |
| ]) + "\nassistant: " |
| |
| |
| return self.generate_text( |
| model_id=model_id, |
| prompt=prompt, |
| max_tokens=max_tokens, |
| temperature=temperature, |
| **kwargs |
| ) |
| |
| except Exception as e: |
| logger.error(f"Error generating chat completion: {e}", exc_info=True) |
| raise |
| |
| def get_embedding(self, model_id: str, text: str) -> list: |
| """ |
| Get embedding vector for text. |
| |
| Args: |
| model_id: Embedding model identifier |
| text: Input text |
| |
| Returns: |
| Embedding vector |
| """ |
| if model_id not in self.loaded_embedding_models: |
| raise ValueError(f"Embedding model {model_id} not loaded. Call load_embedding_model() first.") |
| |
| model = self.loaded_embedding_models[model_id] |
| |
| try: |
| embedding = model.encode(text, convert_to_numpy=True) |
| return embedding.tolist() |
| except Exception as e: |
| logger.error(f"Error getting embedding: {e}", exc_info=True) |
| raise |
| |
| def clear_cache(self): |
| """Clear all loaded models from memory.""" |
| logger.info("Clearing model cache...") |
| |
| |
| for model_id in list(self.loaded_models.keys()): |
| del self.loaded_models[model_id] |
| for model_id in list(self.loaded_tokenizers.keys()): |
| del self.loaded_tokenizers[model_id] |
| for model_id in list(self.loaded_embedding_models.keys()): |
| del self.loaded_embedding_models[model_id] |
| |
| |
| if self.device == "cuda": |
| torch.cuda.empty_cache() |
| |
| logger.info("✓ Model cache cleared") |
| |
| def get_memory_usage(self) -> Dict[str, float]: |
| """Get current GPU memory usage in GB.""" |
| if self.device != "cuda": |
| return {"device": "cpu", "gpu_available": False} |
| |
| return { |
| "device": self.device_name, |
| "gpu_available": True, |
| "allocated_gb": torch.cuda.memory_allocated(0) / 1024**3, |
| "reserved_gb": torch.cuda.memory_reserved(0) / 1024**3, |
| "total_gb": torch.cuda.get_device_properties(0).total_memory / 1024**3 |
| } |
|
|
|
|