|
|
| import os |
| import pandas as pd |
| from datasets import load_dataset, Dataset, DatasetDict |
| from huggingface_hub import login |
| import logging |
| from typing import List, Optional, Dict, Any |
|
|
| from dotenv import load_dotenv |
|
|
| logger = logging.getLogger(__name__) |
|
|
| |
| load_dotenv() |
| load_dotenv("../.env.local") |
|
|
| class DataService: |
| def __init__(self): |
| self.hf_token = os.getenv("HF_TOKEN") |
| self.dataset_name = os.getenv("HF_DATASET_NAME") |
| |
| if not self.hf_token or not self.dataset_name: |
| logger.error("HF_TOKEN or HF_DATASET_NAME not set via environment variables.") |
| |
| |
| |
| if self.hf_token: |
| login(token=self.hf_token) |
|
|
| self.configs = ["files", "refined", "patterns", "results"] |
| self.data: Dict[str, pd.DataFrame] = {} |
| |
| self._load_data() |
|
|
| def _load_data(self): |
| """Loads data from HF Hub for each config. Initializes empty if not found.""" |
| for config in self.configs: |
| try: |
| |
| |
| ds = load_dataset(self.dataset_name, config, split="train") |
| self.data[config] = ds.to_pandas() |
| logger.info(f"Loaded config '{config}' with {len(self.data[config])} rows.") |
| except Exception as e: |
| logger.warning(f"Could not load config '{config}' from HF: {e}. Initializing empty.") |
| self.data[config] = pd.DataFrame() |
|
|
| def _save(self, config_name: str): |
| """Pushes the specific config DataFrame to HF Hub.""" |
| if not self.hf_token or not self.dataset_name: |
| logger.warning("Skipping save to HF: Credentials missing.") |
| return |
|
|
| try: |
| df = self.data[config_name] |
| |
| ds = Dataset.from_pandas(df) |
| |
| |
| ds.push_to_hub(self.dataset_name, config_name=config_name, token=self.hf_token) |
| logger.info(f"Saved config '{config_name}' to HF Hub.") |
| except Exception as e: |
| logger.error(f"Failed to save config '{config_name}': {e}") |
|
|
| |
| |
|
|
| def _ensure_columns(self, config, columns): |
| if self.data[config].empty: |
| self.data[config] = pd.DataFrame(columns=columns) |
| else: |
| |
| for col in columns: |
| if col not in self.data[config].columns: |
| self.data[config][col] = None |
|
|
| |
|
|
| def get_all_files(self) -> List[Dict[str, Any]]: |
| if self.data["files"].empty: |
| return [] |
| return self.data["files"].to_dict(orient="records") |
|
|
| def get_file_content(self, file_id: str) -> Optional[str]: |
| df = self.data["files"] |
| if df.empty: return None |
| row = df[df["file_id"] == file_id] |
| if not row.empty: |
| return row.iloc[0]["content"] |
| return None |
|
|
| def add_file(self, file_data: Dict[str, Any]): |
| self._ensure_columns("files", ["file_id", "working_group", "meeting", "type", "status", "agenda_item", "content", "filename", "timestamp"]) |
| df = self.data["files"] |
| |
| |
| if not df.empty: |
| file_id = file_data["file_id"] |
| df = df[df["file_id"] != file_id] |
| |
| |
| new_row = pd.DataFrame([file_data]) |
| self.data["files"] = pd.concat([df, new_row], ignore_index=True) |
| self._save("files") |
|
|
| |
|
|
| def get_refined_output(self, file_id: str) -> Optional[str]: |
| df = self.data["refined"] |
| if df.empty: return None |
| row = df[df["file_id"] == file_id] |
| if not row.empty: |
| return row.iloc[0]["refined_output"] |
| return None |
|
|
| def add_refined(self, file_id: str, refined_output: str) -> int: |
| self._ensure_columns("refined", ["refined_id", "refined_output", "file_id"]) |
| df = self.data["refined"] |
| |
| |
| next_id = 1 |
| if not df.empty: |
| |
| |
| max_id = pd.to_numeric(df["refined_id"]).max() |
| if not pd.isna(max_id): |
| next_id = int(max_id) + 1 |
| |
| new_row = pd.DataFrame([{ |
| "refined_id": next_id, |
| "refined_output": refined_output, |
| "file_id": file_id |
| }]) |
| |
| self.data["refined"] = pd.concat([df, new_row], ignore_index=True) |
| self._save("refined") |
| return next_id |
| |
| def get_refined_by_file_id(self, file_id: str): |
| df = self.data["refined"] |
| if df.empty: return None |
| row = df[df["file_id"] == file_id] |
| if not row.empty: |
| return row.iloc[0].to_dict() |
| return None |
|
|
| |
|
|
| def get_patterns(self) -> List[Dict[str, Any]]: |
| if self.data["patterns"].empty: |
| return [] |
| return self.data["patterns"].to_dict(orient="records") |
| |
| def get_pattern(self, pattern_id: int): |
| df = self.data["patterns"] |
| if df.empty: return None |
| row = df[df["pattern_id"] == pattern_id] |
| if not row.empty: |
| return row.iloc[0].to_dict() |
| return None |
|
|
| def add_pattern(self, pattern_name: str, prompt: str) -> int: |
| self._ensure_columns("patterns", ["pattern_id", "pattern_name", "prompt"]) |
| df = self.data["patterns"] |
| |
| next_id = 1 |
| if not df.empty: |
| max_id = pd.to_numeric(df["pattern_id"]).max() |
| if not pd.isna(max_id): |
| next_id = int(max_id) + 1 |
| |
| new_row = pd.DataFrame([{ |
| "pattern_id": next_id, |
| "pattern_name": pattern_name, |
| "prompt": prompt |
| }]) |
| self.data["patterns"] = pd.concat([df, new_row], ignore_index=True) |
| self._save("patterns") |
| return next_id |
|
|
| def update_pattern(self, pattern_id: int, pattern_name: str, prompt: str): |
| df = self.data["patterns"] |
| if df.empty: return False |
| |
| |
| if pattern_id not in df["pattern_id"].values: |
| return False |
| |
| |
| self.data["patterns"].loc[df["pattern_id"] == pattern_id, ["pattern_name", "prompt"]] = [pattern_name, prompt] |
| self._save("patterns") |
| return True |
|
|
| |
|
|
| def get_existing_result(self, file_id: str): |
| """ |
| Equivalent to: |
| SELECT ... FROM result r JOIN refined ref ... WHERE refined.file_id = ? |
| """ |
| |
| ref_row = self.get_refined_by_file_id(file_id) |
| file_df = self.data["files"] |
| file_name = "Unknown File" |
| if not file_df.empty: |
| f_row = file_df[file_df["file_id"] == file_id] |
| if not f_row.empty: |
| file_name = f_row.iloc[0]["filename"] |
|
|
| if not ref_row: |
| return None, None, file_name |
| |
| refined_id = ref_row["refined_id"] |
| |
| |
| res_df = self.data["results"] |
| if res_df.empty: |
| return None, refined_id, file_name |
| |
| |
| |
| match = res_df[res_df["refined_id"] == refined_id] |
| |
| if match.empty: |
| return None, refined_id, file_name |
| |
| |
| |
| result_row = match.iloc[-1].to_dict() |
| |
| |
| pat_df = self.data["patterns"] |
| pattern_name = "Unknown" |
| if not pat_df.empty and "pattern_id" in result_row: |
| pat_match = pat_df[pat_df["pattern_id"] == result_row["pattern_id"]] |
| if not pat_match.empty: |
| pattern_name = pat_match.iloc[0]["pattern_name"] |
| |
| result_row["pattern_name"] = pattern_name |
| |
| |
| result_row["content"] = result_row.get("result_content") |
| |
| return result_row, refined_id, file_name |
|
|
| def add_result(self, pattern_id: int, refined_id: int, result_content: str, methodology: str, context: str, problem: str, classification: str = "UNCLASSIFIED") -> int: |
| self._ensure_columns("results", ["result_id", "pattern_id", "refined_id", "result_content", "methodology", "context", "problem", "classification"]) |
| df = self.data["results"] |
| |
| next_id = 1 |
| if not df.empty: |
| max_id = pd.to_numeric(df["result_id"]).max() |
| if not pd.isna(max_id): |
| next_id = int(max_id) + 1 |
| |
| new_row = pd.DataFrame([{ |
| "result_id": next_id, |
| "pattern_id": pattern_id, |
| "refined_id": refined_id, |
| "result_content": result_content, |
| "methodology": methodology, |
| "context": context, |
| "problem": problem, |
| "classification": classification |
| }]) |
| |
| self.data["results"] = pd.concat([df, new_row], ignore_index=True) |
| self._save("results") |
| return next_id |
|
|
| def update_classification(self, result_id: int, classification: str): |
| df = self.data["results"] |
| if df.empty: raise Exception("No results found") |
| |
| if result_id not in df["result_id"].values: |
| return False |
| |
| self.data["results"].loc[df["result_id"] == result_id, "classification"] = classification |
| self._save("results") |
| return True |
|
|
| def get_all_results_joined(self): |
| """ |
| Joins results, refined, file, pattern |
| """ |
| if self.data["results"].empty: |
| return [] |
| |
| res_df = self.data["results"].copy() |
| |
| |
| pat_df = self.data["patterns"] |
| if not pat_df.empty: |
| res_df = res_df.merge(pat_df[["pattern_id", "pattern_name"]], on="pattern_id", how="left") |
| |
| |
| ref_df = self.data["refined"] |
| if not ref_df.empty: |
| res_df = res_df.merge(ref_df[["refined_id", "file_id"]], on="refined_id", how="left") |
| |
| |
| def_file = self.data["files"] |
| if not def_file.empty: |
| res_df = res_df.merge(def_file[["file_id", "filename"]], on="file_id", how="left") |
| |
| |
| |
| out = [] |
| for _, row in res_df.iterrows(): |
| out.append({ |
| "id": row.get("result_id"), |
| "file_name": row.get("filename"), |
| "content": row.get("result_content"), |
| "classification": row.get("classification"), |
| "pattern_name": row.get("pattern_name"), |
| "methodology": row.get("methodology"), |
| "context": row.get("context"), |
| "problem": row.get("problem") |
| }) |
| |
| out.sort(key=lambda x: x["id"] or 0, reverse=True) |
| return out |
|
|