CandidateExplorer / utils /decorator.py
ishaq101's picture
[KM-383] [CEX] [AI] Deployment AI Engine / BE
f3bdba1
import asyncio
import asyncpg
import inspect
import time
from functools import wraps
from typing import Callable, Any
from sqlalchemy.exc import OperationalError, InterfaceError, PendingRollbackError
def trace_runtime(func: Callable) -> Callable:
@wraps(func)
async def async_wrapper(*args, **kwargs) -> Any:
# This wrapper runs if the original func was async
start_time = time.perf_counter()
# Await the coroutine returned by func(*args, **kwargs)
result = await func(*args, **kwargs)
end_time = time.perf_counter()
duration = end_time - start_time
print(f"⏱️ ASYNC Function '{func.__name__}' took {duration:.6f} seconds")
return result
@wraps(func)
def sync_wrapper(*args, **kwargs) -> Any:
# This wrapper runs if the original func was sync
start_time = time.perf_counter()
result = func(*args, **kwargs)
end_time = time.perf_counter()
duration = end_time - start_time
print(f"⏱️ SYNC Function '{func.__name__}' took {duration:.6f} seconds")
return result
# Check if the function being decorated is an async function
if inspect.iscoroutinefunction(func):
# If it's async, return the async wrapper
return async_wrapper
else:
# If it's sync, return the sync wrapper
return sync_wrapper
def retry_db(
retries: int = 3,
delay: float = 2.0,
backoff: float = 2.0,
) -> Callable:
def decorator(func: Callable) -> Callable:
@wraps(func)
async def async_wrapper(*args, **kwargs) -> Any:
current_delay = delay
for attempt in range(1, retries + 1):
try:
return await func(*args, **kwargs)
except (
OperationalError,
InterfaceError,
PendingRollbackError, # πŸ‘ˆ Add this
asyncpg.exceptions.PostgresConnectionError,
asyncpg.exceptions.CannotConnectNowError,
ConnectionError,
TimeoutError,
) as e:
if attempt == retries:
raise
print(
f"πŸ” Retry {attempt}/{retries} for '{func.__name__}' "
f"after {current_delay:.2f}s due to: {type(e).__name__}"
)
# πŸ‘‡ Roll back the broken session before retrying
db = args[0] if args else kwargs.get("db")
if db is not None:
try:
await db.rollback()
except Exception:
pass # If rollback itself fails, just continue
await asyncio.sleep(current_delay)
current_delay *= backoff
return async_wrapper
return decorator