| | import threading |
| | import pandas as pd |
| | from dataclasses import dataclass |
| |
|
| | from ..core.logging import logger |
| | from ..core.decorators import atomic_method |
| | from ..core.callbacks import suppress_cost_logs |
| | from ..core.registry import MODEL_REGISTRY |
| | from .model_configs import LLMConfig |
| | from ..models.base_model import BaseLLM |
| |
|
| | def get_openai_model_cost() -> dict: |
| | import json |
| | from importlib.resources import files |
| | |
| | |
| | |
| | json_path = files('litellm') / 'model_prices_and_context_window_backup.json' |
| | model_cost = json.loads(json_path.read_text(encoding="utf-8")) |
| | return model_cost |
| |
|
| | def infer_litellm_company_from_model(model: str) -> str: |
| |
|
| | if "/" in model: |
| | company = model.split("/")[0] |
| | else: |
| | if "claude" in model or "anthropic" in model: |
| | company = "anthropic" |
| | elif "gemini" in model: |
| | company = "gemini" |
| | elif "deepseek" in model: |
| | company = "deepseek" |
| | elif "openrouter" in model: |
| | company = "openrouter" |
| | elif "azure" in model.lower(): |
| | company = "azure" |
| | else: |
| | company = "openai" |
| | return company |
| |
|
| |
|
| | @dataclass |
| | class Cost: |
| | input_tokens: int |
| | output_tokens: int |
| | input_cost: float |
| | output_cost: float |
| |
|
| |
|
| | class CostManager: |
| |
|
| | def __init__(self): |
| |
|
| | self.total_input_tokens = {} |
| | self.total_output_tokens = {} |
| | self.total_tokens = {} |
| |
|
| | self.total_input_cost = {} |
| | self.total_output_cost = {} |
| | self.total_cost = {} |
| |
|
| | self._lock = threading.Lock() |
| |
|
| | def compute_total_cost(self): |
| | total_tokens, total_cost = 0, 0.0 |
| | for _, value in self.total_tokens.items(): |
| | total_tokens += value |
| | for _, value in self.total_cost.items(): |
| | total_cost += value |
| | return total_tokens, total_cost |
| |
|
| | @atomic_method |
| | def update_cost(self, cost: Cost, model: str): |
| |
|
| | self.total_input_tokens[model] = self.total_input_tokens.get(model, 0) + cost.input_tokens |
| | self.total_output_tokens[model] = self.total_output_tokens.get(model, 0) + cost.output_tokens |
| | current_total_tokens = cost.input_tokens + cost.output_tokens |
| | self.total_tokens[model] = self.total_tokens.get(model, 0) + current_total_tokens |
| |
|
| | self.total_input_cost[model] = self.total_input_cost.get(model, 0.0) + cost.input_cost |
| | self.total_output_cost[model] = self.total_output_cost.get(model, 0.0) + cost.output_cost |
| | current_total_cost = cost.input_cost + cost.output_cost |
| | self.total_cost[model] = self.total_cost.get(model, 0.0) + current_total_cost |
| | |
| | total_tokens, total_cost = self.compute_total_cost() |
| | if not suppress_cost_logs.get(): |
| | logger.info(f"Total cost: ${total_cost:.3f} | Total tokens: {total_tokens} | Current cost: ${current_total_cost:.3f} | Current tokens: {current_total_tokens}") |
| |
|
| | def display_cost(self): |
| |
|
| | data = { |
| | "Model": [], |
| | "Total Cost (USD)": [], |
| | "Total Input Cost (USD)": [], |
| | "Total Output Cost (USD)": [], |
| | "Total Tokens": [], |
| | "Total Input Tokens": [], |
| | "Total Output Tokens": [], |
| | } |
| |
|
| | for model in self.total_tokens.keys(): |
| |
|
| | data["Model"].append(model) |
| | data["Total Cost (USD)"].append(round(self.total_cost[model], 4)) |
| | data["Total Input Cost (USD)"].append(round(self.total_input_cost[model], 4)) |
| | data["Total Output Cost (USD)"].append(round(self.total_output_cost[model], 4)) |
| |
|
| | data["Total Tokens"].append(self.total_tokens[model]) |
| | data["Total Input Tokens"].append(self.total_input_tokens[model]) |
| | data["Total Output Tokens"].append(self.total_output_tokens[model]) |
| | |
| | |
| | df = pd.DataFrame(data) |
| | if len(df) > 1: |
| | summary = { |
| | "Model": "TOTAL", |
| | "Total Cost (USD)": df["Total Cost (USD)"].sum(), |
| | "Total Input Cost (USD)": df["Total Input Cost (USD)"].sum(), |
| | "Total Output Cost (USD)": df["Total Output Cost (USD)"].sum(), |
| | "Total Tokens": df["Total Tokens"].sum(), |
| | "Total Input Tokens": df["Total Input Tokens"].sum(), |
| | "Total Output Tokens": df["Total Output Tokens"].sum(), |
| | } |
| | df = df._append(summary, ignore_index=True) |
| | |
| | print(df.to_string(index=False)) |
| |
|
| | def get_total_cost(self): |
| | |
| | total_cost = 0.0 |
| | for model in self.total_cost.keys(): |
| | total_cost += self.total_cost[model] |
| | return total_cost |
| |
|
| |
|
| | cost_manager = CostManager() |
| |
|
| |
|
| | def create_llm_instance(llm_config: LLMConfig) -> BaseLLM: |
| |
|
| | llm_cls = MODEL_REGISTRY.get_model(llm_config.llm_type) |
| | llm = llm_cls(config=llm_config) |
| | return llm |
| |
|