| import argparse |
| import os |
| import json |
| import numpy as np |
| import torch |
| from typing import List, Dict |
| from transformers import ( |
| AutoTokenizer, |
| AutoModel |
| ) |
| from stable_baselines3 import PPO |
| from llama_cpp import Llama |
| import logging |
|
|
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
|
|
|
|
| class SalesConversionPredictor: |
| """Sales conversion prediction class using Hugging Face models and llama.cpp""" |
|
|
| def __init__(self, |
| model_path: str, |
| embedding_model_name: str = "BAAI/bge-large-en-v1.5", |
| llm_gguf_path: str = "path/to/your/llama-3.2-1b-instruct.gguf", |
| use_gpu: bool = True, |
| n_gpu_layers: int = -1, |
| n_ctx: int = 2048, |
| use_mini_embeddings: bool = True): |
| """Initialize with Hugging Face embeddings and llama.cpp LLM""" |
|
|
| |
| self.device = torch.device("cuda" if torch.cuda.is_available() and use_gpu else "cpu") |
| logger.info(f"Using device: {self.device}") |
|
|
| |
| logger.info(f"Loading embedding model: {embedding_model_name}") |
| self.embedding_tokenizer = AutoTokenizer.from_pretrained(embedding_model_name) |
| self.embedding_model = AutoModel.from_pretrained(embedding_model_name).to(self.device) |
|
|
| |
| self.use_mini_embeddings = use_mini_embeddings |
| self.embedding_dim = 1024 |
|
|
| |
| logger.info(f"Loading LLM model from GGUF: {llm_gguf_path}") |
| self.llm = Llama.from_pretrained( |
| repo_id=llm_gguf_path, |
| filename="*Q4_K_M.gguf", |
| n_gpu_layers=n_gpu_layers if use_gpu else 0, |
| n_ctx=n_ctx, |
| verbose=False, |
| use_mlock=True, |
| n_threads=None |
| ) |
|
|
| |
| ppo_device = "cpu" |
| logger.info(f"Loading PPO model on {ppo_device}") |
| self.ppo_model = PPO.load(model_path, device=ppo_device) |
|
|
| |
| self.conversation_states = {} |
|
|
| def _normalize_history_format(self, history: List[Dict[str, str]]) -> List[Dict[str, str]]: |
| """Normalize history format to ensure consistency""" |
| normalized_history = [] |
|
|
| for msg in history: |
| |
| role = msg.get('role', msg.get('speaker', '')) |
|
|
| |
| content = msg.get('content', msg.get('message', '')) |
|
|
| |
| if role in ['user', 'customer']: |
| speaker = 'user' |
| elif role in ['assistant', 'sales_rep']: |
| speaker = 'sales_rep' |
| else: |
| speaker = role |
|
|
| normalized_history.append({ |
| 'speaker': speaker, |
| 'message': content |
| }) |
|
|
| return normalized_history |
|
|
| def get_embedding(self, text: str) -> np.ndarray: |
| """Get embedding for text using BAAI/bge-large-en-v1.5""" |
| try: |
| |
| inputs = self.embedding_tokenizer( |
| text, |
| padding=True, |
| truncation=True, |
| return_tensors='pt', |
| max_length=8192 |
| ).to(self.device) |
|
|
| |
| with torch.no_grad(): |
| model_output = self.embedding_model(**inputs) |
| |
| embeddings = model_output.last_hidden_state |
| attention_mask = inputs['attention_mask'] |
|
|
| |
| input_mask_expanded = attention_mask.unsqueeze(-1).expand(embeddings.size()).float() |
| sum_embeddings = torch.sum(embeddings * input_mask_expanded, 1) |
| sum_mask = input_mask_expanded.sum(1) |
|
|
| |
| sum_mask = torch.clamp(sum_mask, min=1e-9) |
| mean_embeddings = sum_embeddings / sum_mask |
|
|
| |
| embeddings = torch.nn.functional.normalize(mean_embeddings, p=2, dim=1) |
|
|
| |
| bge_embedding = embeddings.cpu().numpy()[0].astype(np.float32) |
|
|
| |
| logger.info(f"BGE embedding shape: {bge_embedding.shape}") |
|
|
| |
| if len(bge_embedding) != 1024: |
| logger.warning(f"Expected 1024 dimensions, got {len(bge_embedding)}") |
| |
| if len(bge_embedding) < 1024: |
| padded = np.zeros(1024, dtype=np.float32) |
| padded[:len(bge_embedding)] = bge_embedding |
| bge_embedding = padded |
| else: |
| bge_embedding = bge_embedding[:1024] |
|
|
| return bge_embedding |
|
|
| except Exception as e: |
| logger.error(f"Error getting embedding: {str(e)}") |
| |
| return np.zeros(1024, dtype=np.float32) |
|
|
| def analyze_conversation_metrics(self, history: List[Dict[str, str]]) -> Dict[str, float]: |
| """Analyze conversation to extract key metrics using LLM""" |
| try: |
| |
| normalized_history = self._normalize_history_format(history) |
|
|
| |
| conversation_text = "" |
| for msg in normalized_history: |
| speaker = msg.get('speaker', '') |
| message = msg.get('message', '') |
| conversation_text += f"{speaker}: {message}\n\n" |
|
|
| |
| prompt = f"""Analyze this sales conversation and rate each metric from 0.0 to 1.0: |
| |
| customer_engagement: |
| sales_effectiveness: |
| |
| Respond only with numbers in the format shown above. |
| |
| Conversation: |
| {conversation_text}""" |
|
|
| |
| response = self.generate_llm_response(prompt, max_new_tokens=50) |
| print("response", response) |
|
|
| |
| lines = response.strip().split('\n') |
| print("lines", lines) |
|
|
| engagement = 0.5 |
| effectiveness = 0.5 |
|
|
| for line in lines: |
| if 'customer_engagement' in line.lower(): |
| try: |
| engagement = float(line.split(':')[-1].strip()) |
| |
| engagement = max(0.0, min(1.0, engagement)) |
| except: |
| pass |
| elif 'sales_effectiveness' in line.lower(): |
| try: |
| effectiveness = float(line.split(':')[-1].strip()) |
| |
| effectiveness = max(0.0, min(1.0, effectiveness)) |
| except: |
| pass |
|
|
| return { |
| 'customer_engagement': engagement, |
| 'sales_effectiveness': effectiveness, |
| 'conversation_length': len(normalized_history), |
| 'outcome': 0.5, |
| 'progress': min(1.0, len(normalized_history) / 20) |
| } |
|
|
| except Exception as e: |
| logger.error(f"Error analyzing conversation: {str(e)}") |
| |
| return { |
| 'customer_engagement': 0.5, |
| 'sales_effectiveness': 0.5, |
| 'conversation_length': len(history), |
| 'outcome': 0.5, |
| 'progress': min(1.0, len(history) / 20) |
| } |
|
|
| def generate_llm_response(self, prompt: str, max_new_tokens: int = 2048) -> str: |
| """Generate response using llama-cpp""" |
| try: |
| |
| response = self.llm( |
| prompt, |
| max_tokens=max_new_tokens, |
| temperature=0.001, |
| top_p=0.95, |
| repeat_penalty=1.1, |
| stop=["User:", "Assistant:", "\n\n"] |
| ) |
|
|
| |
| generated_text = response['choices'][0]['text'] |
|
|
| |
| generated_text = generated_text.strip() |
|
|
| return generated_text |
|
|
| except Exception as e: |
| logger.error(f"Error generating LLM response: {str(e)}") |
| return "I apologize, but I encountered an error generating a response." |
|
|
| def create_state_vector(self, |
| embedding: np.ndarray, |
| metrics: Dict[str, float], |
| turn_number: int, |
| previous_probs: List[float]) -> np.ndarray: |
| """Create state vector for model input""" |
|
|
| |
| metric_values = np.array([ |
| metrics['customer_engagement'], |
| metrics['sales_effectiveness'], |
| metrics['conversation_length'], |
| metrics['outcome'], |
| metrics['progress'] |
| ], dtype=np.float32) |
|
|
| |
| turn_info = np.array([turn_number], dtype=np.float32) |
|
|
| |
| padded_probs = np.zeros(10, dtype=np.float32) |
| if previous_probs: |
| |
| recent_probs = previous_probs[-10:] if len(previous_probs) > 10 else previous_probs |
| padded_probs[:len(recent_probs)] = recent_probs |
|
|
| |
| if len(embedding) != 1024: |
| logger.warning(f"Unexpected embedding size: {len(embedding)}. Expected 1024. Creating zero embedding.") |
| embedding = np.zeros(1024, dtype=np.float32) |
|
|
| |
| combined = np.concatenate([ |
| embedding, |
| metric_values, |
| turn_info, |
| padded_probs |
| ]) |
|
|
| logger.info(f"State vector shape: {combined.shape} (expected: 1040)") |
| return combined |
| |
| def predict_conversion(self, conversation_id: str, history: List[Dict[str, str]], |
| new_response: str) -> float: |
| """Predict conversion probability for a conversation""" |
| logger.info(f"Predicting conversion for conversation {conversation_id}") |
|
|
| |
| normalized_history = self._normalize_history_format(history) |
|
|
| |
| updated_history = normalized_history.copy() |
| updated_history.append({'speaker': 'sales_rep', 'message': new_response}) |
|
|
| |
| full_text = " ".join([msg.get('message', '') for msg in updated_history]) |
|
|
| |
| embedding = self.get_embedding(full_text) |
| logger.info(f"Embedding shape: {embedding.shape}") |
|
|
| |
| metrics = self.analyze_conversation_metrics(updated_history) |
| logger.info(f"Metrics: engagement={metrics['customer_engagement']:.2f}, effectiveness={metrics['sales_effectiveness']:.2f}") |
|
|
| |
| turn = len(updated_history) // 2 |
|
|
| |
| if conversation_id in self.conversation_states: |
| previous_probs = self.conversation_states[conversation_id]['probabilities'] |
| else: |
| previous_probs = [0.5] |
|
|
| |
| state_vector = self.create_state_vector(embedding, metrics, turn, previous_probs) |
|
|
| |
| if isinstance(state_vector, torch.Tensor): |
| state_vector = state_vector.cpu().numpy() |
|
|
| |
| state_vector = np.array(state_vector, dtype=np.float32) |
|
|
| |
| logger.info(f"Final state vector shape: {state_vector.shape}") |
|
|
| |
| try: |
| |
| action, _ = self.ppo_model.predict(state_vector, deterministic=True) |
|
|
| |
| if hasattr(action, 'item'): |
| predicted_prob = float(action.item()) |
| elif isinstance(action, np.ndarray): |
| predicted_prob = float(action[0]) |
| else: |
| predicted_prob = float(action) |
|
|
| |
| predicted_prob = max(0.0, min(1.0, predicted_prob)) |
|
|
| except Exception as e: |
| logger.error(f"Error during prediction: {str(e)}") |
| |
| predicted_prob = 0.5 |
|
|
| |
| self.conversation_states[conversation_id] = { |
| 'history': updated_history, |
| 'probabilities': previous_probs + [predicted_prob] |
| } |
|
|
| logger.info(f"Predicted conversion probability: {predicted_prob:.4f}") |
| return predicted_prob |
|
|
| def generate_response(self, conversation_id: str, history: List[Dict[str, str]], |
| user_input: str, system_prompt: str = None) -> str: |
| """Generate a response using llama-cpp and add conversion probability""" |
|
|
| |
| normalized_history = self._normalize_history_format(history) |
|
|
| |
| messages = [] |
|
|
| |
| if system_prompt: |
| messages.append(f"System: {system_prompt}\n") |
| else: |
| messages.append("System: You are a helpful sales assistant.\n") |
|
|
| |
| for msg in normalized_history: |
| speaker = msg.get('speaker', '') |
| message = msg.get('message', '') |
|
|
| if speaker == 'user': |
| messages.append(f"User: {message}\n") |
| elif speaker == 'sales_rep': |
| messages.append(f"Assistant: {message}\n") |
|
|
| |
| messages.append(f"User: {user_input}\n") |
| messages.append("Assistant: ") |
|
|
| |
| prompt = "".join(messages) |
|
|
| |
| llm_response = self.generate_llm_response(prompt, max_new_tokens=2048) |
| print(llm_response) |
|
|
| |
| history_with_user = history.copy() |
| history_with_user.append({'role': 'user', 'content': user_input}) |
|
|
| |
| probability = self.predict_conversion(conversation_id, history_with_user, llm_response) |
|
|
| |
| formatted_response = self.format_response_with_probability(llm_response, probability) |
|
|
| return formatted_response |
|
|
| def format_response_with_probability(self, response: str, probability: float) -> str: |
| """Format response with conversion probability""" |
| probability_pct = probability * 100 |
|
|
| if probability >= 0.38: |
| indicator = "🟢 Conversion Highly Likely" |
| elif probability >= 0.37: |
| indicator = "🟡 Good Conversion Potential" |
| elif probability >= 0.35: |
| indicator = "🟠 Moderate Conversion Potential" |
| else: |
| indicator = "🔴 Conversion Unlikely" |
|
|
| formatted_response = ( |
| f"{response}\n\n" |
| f"---\n" |
| f"{indicator} ({probability_pct:.1f}%)\n" |
| ) |
|
|
| return formatted_response |
|
|
| def format_prediction_result(self, probability: float) -> Dict[str, str]: |
| """Format prediction result with status and suggestion""" |
| probability_pct = probability * 100 |
|
|
| if probability >= 0.38: |
| status = "🟢 Conversion Highly Likely" |
| suggestion = "Follow up with specific next steps or a call to action." |
| elif probability >= 0.37: |
| status = "🟡 Good Conversion Potential" |
| suggestion = "Address any remaining concerns and guide toward a decision." |
| elif probability >= 0.35: |
| status = "🟠 Moderate Conversion Potential" |
| suggestion = "Focus on building value and addressing objections." |
| else: |
| status = "🔴 Conversion Unlikely" |
| suggestion = "Reframe the conversation or qualify needs better." |
|
|
| return { |
| "probability": probability, |
| "formatted_probability": f"{probability_pct:.1f}%", |
| "status": status, |
| "suggestion": suggestion |
| } |
|
|
|
|
| if __name__ == "__main__": |
| parser = argparse.ArgumentParser(description="Sales Conversion Predictor") |
| parser.add_argument( |
| "--model_path", |
| type=str, |
| default="/content/sales-conversion-model-reinf-learning/sales_conversion_model", |
| help="Path to the trained PPO model zip file." |
| ) |
| parser.add_argument( |
| "--embedding_model_name", |
| type=str, |
| default="BAAI/bge-m3", |
| help="Name of the Hugging Face embedding model (e.g., 'BAAI/bge-m3', 'BAAI/bge-large-en-v1.5')." |
| ) |
| parser.add_argument( |
| "--llm_gguf_path", |
| type=str, |
| default="unsloth/gemma-3-4b-it-GGUF", |
| help="Path to the GGUF LLM model file, a local directory containing GGUF files, or a HuggingFace repo_id." |
| ) |
| parser.add_argument( |
| "--no_gpu", |
| action="store_true", |
| help="Disable GPU usage (use CPU only)." |
| ) |
| parser.add_argument( |
| "--n_gpu_layers", |
| type=int, |
| default=-1, |
| help="Number of LLM layers to offload to GPU. -1 for all, 0 for none." |
| ) |
| parser.add_argument( |
| "--n_ctx", |
| type=int, |
| default=2048, |
| help="Context window size for the LLM." |
| ) |
|
|
| args = parser.parse_args() |
|
|
| |
| predictor = SalesConversionPredictor( |
| model_path=args.model_path, |
| embedding_model_name=args.embedding_model_name, |
| llm_gguf_path=args.llm_gguf_path, |
| use_gpu=not args.no_gpu, |
| n_gpu_layers=args.n_gpu_layers, |
| n_ctx=args.n_ctx, |
| use_mini_embeddings=True |
| |
| ) |
| |
| scenarios = [ |
| { |
| "id": "negative_outcome", |
| "history": [ |
| {"role": "user", "content": "I'm looking for a CRM solution for my startup."}, |
| {"role": "assistant", "content": "I'd be happy to help you find the right CRM solution. What's the size of your team and what are your main requirements?"}, |
| {"role": "user", "content": "We're a team of 10 and need lead management and email automation."}, |
| {"role": "assistant", "content": "Our CRM offers excellent lead management and built-in email automation that would be perfect for a team of 10. Let me show you how it works."}, |
| {"role": "user", "content": "not interested, bye"} |
| ], |
| "response": "ok, thank you for the interest" |
| }, |
| { |
| "id": "positive_outcome", |
| "history": [ |
| {"role": "user", "content": "I need a project management tool urgently."}, |
| {"role": "assistant", "content": "I can definitely help you with that! Our tool is designed for quick implementation. What's your main priority?"}, |
| {"role": "user", "content": "We need to track tasks and deadlines for 20 people."}, |
| {"role": "assistant", "content": "Perfect! Our solution handles that easily with real-time collaboration features. We can get you set up today with a free trial."}, |
| {"role": "user", "content": "That sounds great! What's the pricing?"} |
| ], |
| "response": "For a team of 20, it's $299/month with all features included. You get 14 days free to test everything. Shall I send you the signup link?" |
| }, |
| { |
| "id": "neutral_outcome", |
| "history": [ |
| {"role": "user", "content": "Tell me about your software."}, |
| {"role": "assistant", "content": "Our software helps businesses manage their operations more efficiently. What specific area are you looking to improve?"}, |
| {"role": "user", "content": "Just browsing for now."} |
| ], |
| "response": "No problem! Feel free to explore our website for more information, and I'm here if you have any questions." |
| } |
| ] |
|
|
| |
| for scenario in scenarios: |
| print(f"\n=== Testing Scenario: {scenario['id']} ===") |
|
|
| |
| probability = predictor.predict_conversion( |
| conversation_id=scenario['id'], |
| history=scenario['history'], |
| new_response=scenario['response'] |
| ) |
|
|
| |
| result = predictor.format_prediction_result(probability) |
|
|
| |
| print(f"Response: {scenario['response']}") |
| print(f"Probability: {result['formatted_probability']}") |
| print(f"Status: {result['status']}") |
| print(f"Suggestion: {result['suggestion']}") |
| print("-" * 50) |