| import requests | |
| class QuestionManager: | |
| def __init__(self, api_url: str): | |
| self.api_url = api_url | |
| self.questions_url = f"{api_url}/questions" | |
| self.random_question_url = f"{api_url}/random_question" | |
| self.files_url = f"{api_url}/files" | |
| self.submit_url = f"{api_url}/submit" | |
| def fetch_random_question(self): | |
| print("Fetching random question") | |
| try: | |
| response = requests.get(self.random_question_url, timeout=15) | |
| response.raise_for_status() | |
| question_data = response.json() | |
| return question_data | |
| except requests.exceptions.RequestException as e: | |
| print(f"Error fetching random question: {e}") | |
| except requests.exceptions.JSONDecodeError as e: | |
| print( | |
| f"Error decoding JSON response from random question endpoint: {e}") | |
| print(f"Response text: {response.text[:500]}") | |
| return None | |
| except Exception as e: | |
| print( | |
| f"An unexpected error occurred fetching random question: {e}") | |
| return None | |
| def fetch_question_by_id(self, question_id: str) -> dict | None: | |
| print(f"Fetching question by id: {question_id}") | |
| questions = self.fetch_questions() | |
| for question in questions: | |
| if question.get("task_id") == question_id: | |
| return question | |
| return None | |
| def fetch_questions(self) -> list[dict]: | |
| print(f"Fetching questions from: {self.questions_url}") | |
| try: | |
| response = requests.get(self.questions_url, timeout=15) | |
| response.raise_for_status() | |
| questions_data = response.json() | |
| if not questions_data: | |
| print("Fetched questions list is empty.") | |
| return "Fetched questions list is empty or invalid format.", None | |
| print(f"Fetched {len(questions_data)} questions.") | |
| return questions_data | |
| except requests.exceptions.RequestException as e: | |
| print(f"Error fetching questions: {e}") | |
| return [] | |
| except requests.exceptions.JSONDecodeError as e: | |
| print(f"Error decoding JSON response from questions endpoint: {e}") | |
| print(f"Response text: {response.text[:500]}") | |
| return [] | |
| except Exception as e: | |
| print(f"An unexpected error occurred fetching questions: {e}") | |
| return [] | |
| def prepare_questions_data(self, questions: list[dict]) -> list[dict]: | |
| print(f"Preparing question data for: {questions}") | |
| questions_data = [] | |
| for item in questions: | |
| task_id = item.get("task_id") | |
| question_text = item.get("question") | |
| file_url = f"{self.files_url}/{task_id}" | |
| if not task_id or question_text is None: | |
| print( | |
| f"Skipping item with missing task_id or question: {item}") | |
| continue | |
| question_data = { | |
| 'task_id': task_id, | |
| 'text': question_text, | |
| 'file_url': file_url | |
| } | |
| questions_data.append(question_data) | |
| return questions_data | |