| import os |
| import torch |
| import json |
| import numpy as np |
| from torch import nn |
| from torch.nn import functional as F |
| import sentencepiece as spm |
| from sklearn.metrics.pairwise import cosine_similarity |
| from tqdm import tqdm |
| import matplotlib.pyplot as plt |
| from sklearn.manifold import TSNE |
|
|
| |
| class SentencePieceTokenizerWrapper: |
| def __init__(self, sp_model_path): |
| self.sp_model = spm.SentencePieceProcessor() |
| self.sp_model.Load(sp_model_path) |
| self.vocab_size = self.sp_model.GetPieceSize() |
| |
| |
| self.pad_token_id = 0 |
| self.bos_token_id = 1 |
| self.eos_token_id = 2 |
| self.unk_token_id = 3 |
| |
| |
| self.pad_token = "<pad>" |
| self.bos_token = "<s>" |
| self.eos_token = "</s>" |
| self.unk_token = "<unk>" |
| self.mask_token = "<mask>" |
| |
| def __call__(self, text, padding=False, truncation=False, max_length=None, return_tensors=None): |
| |
| if isinstance(text, str): |
| |
| ids = self.sp_model.EncodeAsIds(text) |
| |
| |
| if truncation and max_length and len(ids) > max_length: |
| ids = ids[:max_length] |
| |
| attention_mask = [1] * len(ids) |
| |
| |
| if padding and max_length: |
| padding_length = max(0, max_length - len(ids)) |
| ids = ids + [self.pad_token_id] * padding_length |
| attention_mask = attention_mask + [0] * padding_length |
| |
| result = { |
| 'input_ids': ids, |
| 'attention_mask': attention_mask |
| } |
| |
| |
| if return_tensors == 'pt': |
| import torch |
| result = {k: torch.tensor([v]) for k, v in result.items()} |
| |
| return result |
| |
| |
| batch_encoded = [self.sp_model.EncodeAsIds(t) for t in text] |
| |
| |
| if truncation and max_length: |
| batch_encoded = [ids[:max_length] for ids in batch_encoded] |
| |
| |
| batch_attention_mask = [[1] * len(ids) for ids in batch_encoded] |
| |
| |
| if padding: |
| if max_length: |
| max_len = max_length |
| else: |
| max_len = max(len(ids) for ids in batch_encoded) |
| |
| |
| batch_encoded = [ids + [self.pad_token_id] * (max_len - len(ids)) for ids in batch_encoded] |
| batch_attention_mask = [mask + [0] * (max_len - len(mask)) for mask in batch_attention_mask] |
| |
| result = { |
| 'input_ids': batch_encoded, |
| 'attention_mask': batch_attention_mask |
| } |
| |
| |
| if return_tensors == 'pt': |
| import torch |
| result = {k: torch.tensor(v) for k, v in result.items()} |
| |
| return result |
|
|
| |
| class MultiHeadAttention(nn.Module): |
| """Multi-headed attention mechanism""" |
| def __init__(self, config): |
| super().__init__() |
| self.num_attention_heads = config["num_attention_heads"] |
| self.attention_head_size = config["hidden_size"] // config["num_attention_heads"] |
| self.all_head_size = self.num_attention_heads * self.attention_head_size |
| |
| |
| self.query = nn.Linear(config["hidden_size"], self.all_head_size) |
| self.key = nn.Linear(config["hidden_size"], self.all_head_size) |
| self.value = nn.Linear(config["hidden_size"], self.all_head_size) |
| |
| |
| self.output = nn.Sequential( |
| nn.Linear(self.all_head_size, config["hidden_size"]), |
| nn.Dropout(config["attention_probs_dropout_prob"]) |
| ) |
| |
| |
| self.max_position_embeddings = config["max_position_embeddings"] |
| self.relative_attention_bias = nn.Embedding( |
| 2 * config["max_position_embeddings"] - 1, |
| config["num_attention_heads"] |
| ) |
| |
| def transpose_for_scores(self, x): |
| new_shape = x.size()[:-1] + (self.num_attention_heads, self.attention_head_size) |
| x = x.view(*new_shape) |
| return x.permute(0, 2, 1, 3) |
| |
| def forward(self, hidden_states, attention_mask=None): |
| batch_size, seq_length = hidden_states.size()[:2] |
| |
| |
| query_layer = self.transpose_for_scores(self.query(hidden_states)) |
| key_layer = self.transpose_for_scores(self.key(hidden_states)) |
| value_layer = self.transpose_for_scores(self.value(hidden_states)) |
| |
| |
| attention_scores = torch.matmul(query_layer, key_layer.transpose(-1, -2)) |
| |
| |
| position_ids = torch.arange(seq_length, dtype=torch.long, device=hidden_states.device) |
| relative_position = position_ids.unsqueeze(1) - position_ids.unsqueeze(0) |
| |
| relative_position = relative_position + self.max_position_embeddings - 1 |
| |
| relative_position = torch.clamp(relative_position, 0, 2 * self.max_position_embeddings - 2) |
| |
| |
| rel_attn_bias = self.relative_attention_bias(relative_position) |
| |
| |
| rel_attn_bias = rel_attn_bias.permute(2, 0, 1).unsqueeze(0) |
| |
| |
| attention_scores = attention_scores + rel_attn_bias |
| |
| |
| attention_scores = attention_scores / (self.attention_head_size ** 0.5) |
| |
| |
| if attention_mask is not None: |
| attention_scores = attention_scores + attention_mask |
| |
| |
| attention_probs = F.softmax(attention_scores, dim=-1) |
| |
| |
| attention_probs = F.dropout(attention_probs, p=0.1, training=self.training) |
| |
| |
| context_layer = torch.matmul(attention_probs, value_layer) |
| |
| |
| context_layer = context_layer.permute(0, 2, 1, 3).contiguous() |
| new_shape = context_layer.size()[:-2] + (self.all_head_size,) |
| context_layer = context_layer.view(*new_shape) |
| |
| |
| output = self.output(context_layer) |
| |
| return output |
|
|
| class EnhancedTransformerLayer(nn.Module): |
| """Advanced transformer layer with pre-layer norm and enhanced attention""" |
| def __init__(self, config): |
| super().__init__() |
| self.attention_pre_norm = nn.LayerNorm(config["hidden_size"], eps=config["layer_norm_eps"]) |
| self.attention = MultiHeadAttention(config) |
| |
| self.ffn_pre_norm = nn.LayerNorm(config["hidden_size"], eps=config["layer_norm_eps"]) |
| |
| |
| self.ffn = nn.Sequential( |
| nn.Linear(config["hidden_size"], config["intermediate_size"]), |
| nn.GELU(), |
| nn.Dropout(config["hidden_dropout_prob"]), |
| nn.Linear(config["intermediate_size"], config["hidden_size"]), |
| nn.Dropout(config["hidden_dropout_prob"]) |
| ) |
| |
| def forward(self, hidden_states, attention_mask=None): |
| |
| attn_norm_hidden = self.attention_pre_norm(hidden_states) |
| |
| |
| attention_output = self.attention(attn_norm_hidden, attention_mask) |
| |
| |
| hidden_states = hidden_states + attention_output |
| |
| |
| ffn_norm_hidden = self.ffn_pre_norm(hidden_states) |
| |
| |
| ffn_output = self.ffn(ffn_norm_hidden) |
| |
| |
| hidden_states = hidden_states + ffn_output |
| |
| return hidden_states |
|
|
| class AdvancedTransformerModel(nn.Module): |
| """Advanced Transformer model for inference""" |
| |
| def __init__(self, config): |
| super().__init__() |
| self.config = config |
| |
| |
| self.word_embeddings = nn.Embedding( |
| config["vocab_size"], |
| config["hidden_size"], |
| padding_idx=config["pad_token_id"] |
| ) |
| |
| |
| self.position_embeddings = nn.Embedding(config["max_position_embeddings"], config["hidden_size"]) |
| |
| |
| self.embedding_dropout = nn.Dropout(config["hidden_dropout_prob"]) |
| |
| |
| self.layers = nn.ModuleList([ |
| EnhancedTransformerLayer(config) for _ in range(config["num_hidden_layers"]) |
| ]) |
| |
| |
| self.final_layer_norm = nn.LayerNorm(config["hidden_size"], eps=config["layer_norm_eps"]) |
| |
| def forward(self, input_ids, attention_mask=None): |
| input_shape = input_ids.size() |
| batch_size, seq_length = input_shape |
| |
| |
| position_ids = torch.arange(seq_length, dtype=torch.long, device=input_ids.device) |
| position_ids = position_ids.unsqueeze(0).expand(batch_size, -1) |
| |
| |
| word_embeds = self.word_embeddings(input_ids) |
| position_embeds = self.position_embeddings(position_ids) |
| |
| |
| embeddings = word_embeds + position_embeds |
| |
| |
| embeddings = self.embedding_dropout(embeddings) |
| |
| |
| if attention_mask is None: |
| attention_mask = torch.ones(input_shape, device=input_ids.device) |
| |
| |
| extended_attention_mask = attention_mask.unsqueeze(1).unsqueeze(2) |
| extended_attention_mask = (1.0 - extended_attention_mask) * -10000.0 |
| |
| |
| hidden_states = embeddings |
| for layer in self.layers: |
| hidden_states = layer(hidden_states, extended_attention_mask) |
| |
| |
| hidden_states = self.final_layer_norm(hidden_states) |
| |
| return hidden_states |
|
|
| class AdvancedPooling(nn.Module): |
| """Advanced pooling module supporting multiple pooling strategies""" |
| def __init__(self, config): |
| super().__init__() |
| self.pooling_mode = config["pooling_mode"] |
| self.hidden_size = config["hidden_size"] |
| |
| |
| if self.pooling_mode == 'attention': |
| self.attention_weights = nn.Linear(config["hidden_size"], 1) |
| |
| |
| elif self.pooling_mode == 'weighted': |
| self.weight_layer = nn.Linear(config["hidden_size"], 1) |
| |
| def forward(self, token_embeddings, attention_mask=None): |
| if attention_mask is None: |
| attention_mask = torch.ones_like(token_embeddings[:, :, 0]) |
| |
| mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float() |
| |
| if self.pooling_mode == 'cls': |
| |
| pooled = token_embeddings[:, 0] |
| |
| elif self.pooling_mode == 'max': |
| |
| token_embeddings = token_embeddings.clone() |
| |
| token_embeddings[mask_expanded == 0] = -1e9 |
| pooled = torch.max(token_embeddings, dim=1)[0] |
| |
| elif self.pooling_mode == 'attention': |
| |
| weights = self.attention_weights(token_embeddings).squeeze(-1) |
| |
| weights = weights.masked_fill(attention_mask == 0, -1e9) |
| weights = F.softmax(weights, dim=1).unsqueeze(-1) |
| pooled = torch.sum(token_embeddings * weights, dim=1) |
| |
| elif self.pooling_mode == 'weighted': |
| |
| weights = torch.sigmoid(self.weight_layer(token_embeddings)).squeeze(-1) |
| |
| weights = weights * attention_mask |
| |
| sum_weights = torch.sum(weights, dim=1, keepdim=True) |
| sum_weights = torch.clamp(sum_weights, min=1e-9) |
| weights = weights / sum_weights |
| |
| pooled = torch.sum(token_embeddings * weights.unsqueeze(-1), dim=1) |
| |
| else: |
| |
| sum_embeddings = torch.sum(token_embeddings * mask_expanded, dim=1) |
| sum_mask = torch.clamp(mask_expanded.sum(1), min=1e-9) |
| pooled = sum_embeddings / sum_mask |
| |
| |
| pooled = F.normalize(pooled, p=2, dim=1) |
| |
| return pooled |
|
|
| class SentenceEmbeddingModel(nn.Module): |
| """Complete sentence embedding model for inference""" |
| def __init__(self, config): |
| super(SentenceEmbeddingModel, self).__init__() |
| self.config = config |
| |
| |
| self.transformer = AdvancedTransformerModel(config) |
| |
| |
| self.pooling = AdvancedPooling(config) |
| |
| |
| if "projection_dim" in config and config["projection_dim"] > 0: |
| self.use_projection = True |
| self.projection = nn.Sequential( |
| nn.Linear(config["hidden_size"], config["hidden_size"]), |
| nn.GELU(), |
| nn.Linear(config["hidden_size"], config["projection_dim"]), |
| nn.LayerNorm(config["projection_dim"], eps=config["layer_norm_eps"]) |
| ) |
| else: |
| self.use_projection = False |
| |
| def forward(self, input_ids, attention_mask=None): |
| |
| token_embeddings = self.transformer(input_ids, attention_mask) |
| |
| |
| pooled_output = self.pooling(token_embeddings, attention_mask) |
| |
| |
| if self.use_projection: |
| pooled_output = self.projection(pooled_output) |
| pooled_output = F.normalize(pooled_output, p=2, dim=1) |
| |
| return pooled_output |
|
|
| class HindiEmbedder: |
| def __init__(self, model_path="/home/ubuntu/output/hindi-embeddings-custom-tokenizer/final"): |
| """ |
| Initialize the Hindi sentence embedder. |
| |
| Args: |
| model_path: Path to the model directory |
| """ |
| self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
| print(f"Using device: {self.device}") |
| |
| |
| tokenizer_path = os.path.join(model_path, "tokenizer.model") |
| |
| if not os.path.exists(tokenizer_path): |
| raise FileNotFoundError(f"Could not find tokenizer at {tokenizer_path}") |
| |
| self.tokenizer = SentencePieceTokenizerWrapper(tokenizer_path) |
| print(f"Loaded tokenizer from {tokenizer_path} with vocabulary size: {self.tokenizer.vocab_size}") |
| |
| |
| config_path = os.path.join(model_path, "config.json") |
| with open(config_path, "r") as f: |
| self.config = json.load(f) |
| print(f"Loaded model config with hidden_size={self.config['hidden_size']}") |
| |
| |
| model_pt_path = os.path.join(model_path, "embedding_model.pt") |
| |
| try: |
| |
| try: |
| checkpoint = torch.load(model_pt_path, map_location=self.device, weights_only=False) |
| print("Loaded model using PyTorch 2.6+ style loading") |
| except TypeError: |
| checkpoint = torch.load(model_pt_path, map_location=self.device) |
| print("Loaded model using older PyTorch style loading") |
| |
| |
| self.model = SentenceEmbeddingModel(self.config) |
| |
| |
| if "model_state_dict" in checkpoint: |
| state_dict = checkpoint["model_state_dict"] |
| else: |
| state_dict = checkpoint |
| |
| missing_keys, unexpected_keys = self.model.load_state_dict(state_dict, strict=False) |
| print(f"Loaded model with {len(missing_keys)} missing keys and {len(unexpected_keys)} unexpected keys") |
| |
| |
| self.model.to(self.device) |
| self.model.eval() |
| print("Model loaded successfully and placed in evaluation mode") |
| |
| except Exception as e: |
| print(f"Error loading model: {e}") |
| raise RuntimeError(f"Failed to load the model: {e}") |
| |
| def encode(self, sentences, batch_size=32, normalize=True): |
| """ |
| Encode sentences to embeddings. |
| |
| Args: |
| sentences: A string or list of strings to encode |
| batch_size: Batch size for encoding |
| normalize: Whether to normalize the embeddings |
| |
| Returns: |
| Numpy array of embeddings |
| """ |
| |
| if isinstance(sentences, str): |
| sentences = [sentences] |
| |
| all_embeddings = [] |
| |
| |
| with torch.no_grad(): |
| for i in range(0, len(sentences), batch_size): |
| batch = sentences[i:i+batch_size] |
| |
| |
| inputs = self.tokenizer( |
| batch, |
| padding=True, |
| truncation=True, |
| max_length=self.config.get("max_position_embeddings", 128), |
| return_tensors="pt" |
| ) |
| |
| |
| input_ids = inputs["input_ids"].to(self.device) |
| attention_mask = inputs["attention_mask"].to(self.device) |
| |
| |
| embeddings = self.model(input_ids, attention_mask) |
| |
| |
| all_embeddings.append(embeddings.cpu().numpy()) |
| |
| |
| all_embeddings = np.vstack(all_embeddings) |
| |
| |
| if normalize: |
| all_embeddings = all_embeddings / np.linalg.norm(all_embeddings, axis=1, keepdims=True) |
| |
| return all_embeddings |
| |
| def compute_similarity(self, texts1, texts2=None): |
| """ |
| Compute cosine similarity between texts. |
| |
| Args: |
| texts1: First set of texts |
| texts2: Second set of texts. If None, compute similarity matrix within texts1. |
| |
| Returns: |
| Similarity scores |
| """ |
| |
| if isinstance(texts1, str): |
| texts1 = [texts1] |
| |
| if texts2 is not None and isinstance(texts2, str): |
| texts2 = [texts2] |
| |
| embeddings1 = self.encode(texts1) |
| |
| if texts2 is None: |
| |
| similarities = cosine_similarity(embeddings1) |
| return similarities |
| else: |
| |
| embeddings2 = self.encode(texts2) |
| |
| if len(texts1) == len(texts2): |
| |
| similarities = np.array([ |
| cosine_similarity([e1], [e2])[0][0] |
| for e1, e2 in zip(embeddings1, embeddings2) |
| ]) |
| |
| |
| if len(similarities) == 1: |
| return similarities[0] |
| return similarities |
| else: |
| |
| return cosine_similarity(embeddings1, embeddings2) |
| |
| def search(self, query, documents, top_k=5): |
| """ |
| Search for similar documents to a query. |
| |
| Args: |
| query: The query text |
| documents: List of documents to search |
| top_k: Number of top results to return |
| |
| Returns: |
| List of dictionaries with document and score |
| """ |
| |
| query_embedding = self.encode([query])[0] |
| document_embeddings = self.encode(documents) |
| |
| |
| similarities = np.dot(document_embeddings, query_embedding) |
| |
| |
| top_indices = np.argsort(similarities)[-top_k:][::-1] |
| |
| |
| results = [] |
| for idx in top_indices: |
| results.append({ |
| "document": documents[idx], |
| "score": float(similarities[idx]) |
| }) |
| |
| return results |
| |
| def evaluate_similarity_samples(self): |
| """Evaluate model on some standard similarity examples for Hindi""" |
| test_pairs = [ |
| ( |
| "मुझे हिंदी में पढ़ना बहुत पसंद है।", |
| "मैं हिंदी किताबें बहुत पसंद करता हूँ।" |
| ), |
| ( |
| "आज मौसम बहुत अच्छा है।", |
| "आज बारिश हो रही है।" |
| ), |
| ( |
| "भारत एक विशाल देश है।", |
| "भारत में कई भाषाएँ बोली जाती हैं।" |
| ), |
| ( |
| "कंप्यूटर विज्ञान एक रोचक विषय है।", |
| "मैं कंप्यूटर साइंस का छात्र हूँ।" |
| ), |
| ( |
| "मैं रोज सुबह योग करता हूँ।", |
| "स्वस्थ रहने के लिए व्यायाम जरूरी है।" |
| ), |
| |
| ( |
| "मुझे हिंदी में पढ़ना बहुत पसंद है।", |
| "क्रिकेट भारत में सबसे लोकप्रिय खेल है।" |
| ), |
| ( |
| "आज मौसम बहुत अच्छा है।", |
| "भारतीय व्यंजन दुनिया भर में मशहूर हैं।" |
| ), |
| ( |
| "कंप्यूटर विज्ञान एक रोचक विषय है।", |
| "हिमालय दुनिया का सबसे ऊंचा पर्वत है।" |
| ) |
| ] |
| |
| print("Evaluating model on standard similarity samples:") |
| for i, (text1, text2) in enumerate(test_pairs): |
| similarity = self.compute_similarity([text1], [text2])[0] |
| print(f"\nPair {i+1}:") |
| print(f" Sentence 1: {text1}") |
| print(f" Sentence 2: {text2}") |
| print(f" Similarity: {similarity:.4f}") |
| |
| return |
| |
| def visualize_embeddings(self, sentences, labels=None, output_path="hindi_embeddings_visualization.png"): |
| """ |
| Create a t-SNE visualization of the embeddings. |
| |
| Args: |
| sentences: List of sentences to visualize |
| labels: Optional list of labels for the points |
| output_path: Path to save the visualization |
| |
| Returns: |
| Path to the saved visualization |
| """ |
| |
| embeddings = self.encode(sentences) |
| |
| |
| tsne = TSNE(n_components=2, random_state=42, perplexity=min(30, len(embeddings)-1)) |
| reduced_embeddings = tsne.fit_transform(embeddings) |
| |
| |
| plt.figure(figsize=(12, 10)) |
| |
| |
| scatter = plt.scatter( |
| reduced_embeddings[:, 0], |
| reduced_embeddings[:, 1], |
| c=range(len(reduced_embeddings)), |
| cmap='viridis', |
| alpha=0.8, |
| s=100 |
| ) |
| |
| |
| if labels: |
| for i, label in enumerate(labels): |
| plt.annotate( |
| label, |
| (reduced_embeddings[i, 0], reduced_embeddings[i, 1]), |
| fontsize=10, |
| alpha=0.7 |
| ) |
| |
| plt.title("t-SNE Visualization of Hindi Sentence Embeddings", fontsize=16) |
| plt.xlabel("Dimension 1", fontsize=12) |
| plt.ylabel("Dimension 2", fontsize=12) |
| plt.colorbar(scatter, label="Sentence Index") |
| plt.grid(alpha=0.3) |
| |
| |
| plt.tight_layout() |
| plt.savefig(output_path, dpi=300, bbox_inches='tight') |
| plt.close() |
| |
| print(f"Visualization saved to {output_path}") |
| return output_path |
|
|
| def main(): |
| |
| embedder = HindiEmbedder() |
| |
| |
| embedder.evaluate_similarity_samples() |
| |
| |
| print("\nSemantic Search Example:") |
| query = "भारत की संस्कृति" |
| documents = [ |
| "भारतीय संस्कृति दुनिया की सबसे प्राचीन संस्कृतियों में से एक है।", |
| "भारत की आबादी 1.3 अरब से अधिक है।", |
| "हिमालय पर्वत श्रृंखला भारत के उत्तर में स्थित है।", |
| "भारतीय व्यंजन में मसालों का प्रयोग किया जाता है।", |
| "भारत में 22 आधिकारिक भाषाएँ हैं।", |
| "संस्कृति लोगों के रहन-सहन का तरीका है।", |
| "भारत के विभिन्न राज्यों की अपनी अलग संस्कृति है।", |
| "रामायण और महाभारत भारतीय संस्कृति के महत्वपूर्ण हिस्से हैं।", |
| ] |
| |
| results = embedder.search(query, documents) |
| |
| print(f"Query: {query}") |
| print("Top results:") |
| for i, result in enumerate(results): |
| print(f"{i+1}. Score: {result['score']:.4f}") |
| print(f" {result['document']}") |
| |
| |
| print("\nCreating embedding visualization...") |
| visualization_sentences = [ |
| "मुझे हिंदी में पढ़ना बहुत पसंद है।", |
| "मैं हिंदी किताबें बहुत पसंद करता हूँ।", |
| "आज मौसम बहुत अच्छा है।", |
| "आज बारिश हो रही है।", |
| "भारत एक विशाल देश है।", |
| "भारत में कई भाषाएँ बोली जाती हैं।", |
| "कंप्यूटर विज्ञान एक रोचक विषय है।", |
| "मैं कंप्यूटर साइंस का छात्र हूँ।", |
| "क्रिकेट भारत में सबसे लोकप्रिय खेल है।", |
| "भारतीय व्यंजन दुनिया भर में मशहूर हैं।", |
| "हिमालय दुनिया का सबसे ऊंचा पर्वत है।", |
| "गंगा भारत की सबसे पवित्र नदी है।", |
| "दिल्ली भारत की राजधानी है।", |
| "मुंबई भारत का आर्थिक केंद्र है।", |
| "तमिल, तेलुगु, कन्नड़ और मलयालम दक्षिण भारत की प्रमुख भाषाएँ हैं।" |
| ] |
| |
| labels = ["पढ़ना", "किताबें", "मौसम", "बारिश", "भारत", "भाषाएँ", "कंप्यूटर", |
| "छात्र", "क्रिकेट", "व्यंजन", "हिमालय", "गंगा", "दिल्ली", "मुंबई", "भाषाएँ"] |
| |
| embedder.visualize_embeddings(visualization_sentences, labels) |
|
|
| if __name__ == "__main__": |
| main() |
|
|