Spaces:
Sleeping
Sleeping
| import asyncio | |
| import asyncpg | |
| import inspect | |
| import time | |
| from functools import wraps | |
| from typing import Callable, Any | |
| from sqlalchemy.exc import OperationalError, InterfaceError, PendingRollbackError | |
| def trace_runtime(func: Callable) -> Callable: | |
| async def async_wrapper(*args, **kwargs) -> Any: | |
| # This wrapper runs if the original func was async | |
| start_time = time.perf_counter() | |
| # Await the coroutine returned by func(*args, **kwargs) | |
| result = await func(*args, **kwargs) | |
| end_time = time.perf_counter() | |
| duration = end_time - start_time | |
| print(f"β±οΈ ASYNC Function '{func.__name__}' took {duration:.6f} seconds") | |
| return result | |
| def sync_wrapper(*args, **kwargs) -> Any: | |
| # This wrapper runs if the original func was sync | |
| start_time = time.perf_counter() | |
| result = func(*args, **kwargs) | |
| end_time = time.perf_counter() | |
| duration = end_time - start_time | |
| print(f"β±οΈ SYNC Function '{func.__name__}' took {duration:.6f} seconds") | |
| return result | |
| # Check if the function being decorated is an async function | |
| if inspect.iscoroutinefunction(func): | |
| # If it's async, return the async wrapper | |
| return async_wrapper | |
| else: | |
| # If it's sync, return the sync wrapper | |
| return sync_wrapper | |
| def retry_db( | |
| retries: int = 3, | |
| delay: float = 2.0, | |
| backoff: float = 2.0, | |
| ) -> Callable: | |
| def decorator(func: Callable) -> Callable: | |
| async def async_wrapper(*args, **kwargs) -> Any: | |
| current_delay = delay | |
| for attempt in range(1, retries + 1): | |
| try: | |
| return await func(*args, **kwargs) | |
| except ( | |
| OperationalError, | |
| InterfaceError, | |
| PendingRollbackError, # π Add this | |
| asyncpg.exceptions.PostgresConnectionError, | |
| asyncpg.exceptions.CannotConnectNowError, | |
| ConnectionError, | |
| TimeoutError, | |
| ) as e: | |
| if attempt == retries: | |
| raise | |
| print( | |
| f"π Retry {attempt}/{retries} for '{func.__name__}' " | |
| f"after {current_delay:.2f}s due to: {type(e).__name__}" | |
| ) | |
| # π Roll back the broken session before retrying | |
| db = args[0] if args else kwargs.get("db") | |
| if db is not None: | |
| try: | |
| await db.rollback() | |
| except Exception: | |
| pass # If rollback itself fails, just continue | |
| await asyncio.sleep(current_delay) | |
| current_delay *= backoff | |
| return async_wrapper | |
| return decorator |