| | """ |
| | Quick test script for SPARKNET FastAPI backend |
| | Tests all major endpoints with a sample patent. |
| | """ |
| |
|
| | import requests |
| | import json |
| | import time |
| | from pathlib import Path |
| | from rich.console import Console |
| | from rich.table import Table |
| | from rich.progress import Progress, SpinnerColumn, TextColumn |
| |
|
| | console = Console() |
| |
|
| | API_BASE = "http://localhost:8000" |
| |
|
| | def test_health(): |
| | """Test health endpoint""" |
| | console.print("\n[bold blue]1. Testing Health Endpoint[/bold blue]") |
| |
|
| | response = requests.get(f"{API_BASE}/api/health") |
| | data = response.json() |
| |
|
| | console.print(f"Status: [green]{data['status']}[/green]") |
| | console.print(f"Active Workflows: {data['statistics']['active_workflows']}") |
| | console.print(f"Processed Patents: {data['statistics']['processed_patents']}") |
| |
|
| | return response.status_code == 200 |
| |
|
| | def test_upload(patent_path): |
| | """Test patent upload""" |
| | console.print("\n[bold blue]2. Testing Patent Upload[/bold blue]") |
| |
|
| | if not Path(patent_path).exists(): |
| | console.print(f"[red]Patent file not found: {patent_path}[/red]") |
| | console.print("[yellow]Using mock upload test (no actual file)[/yellow]") |
| | return None |
| |
|
| | with open(patent_path, 'rb') as f: |
| | files = {'file': (Path(patent_path).name, f, 'application/pdf')} |
| | response = requests.post(f"{API_BASE}/api/patents/upload", files=files) |
| |
|
| | if response.status_code == 200: |
| | data = response.json() |
| | console.print(f"[green]✓[/green] Patent uploaded successfully") |
| | console.print(f"Patent ID: {data['patent_id']}") |
| | console.print(f"Filename: {data['filename']}") |
| | console.print(f"Size: {data['size']} bytes") |
| | return data['patent_id'] |
| | else: |
| | console.print(f"[red]✗[/red] Upload failed: {response.text}") |
| | return None |
| |
|
| | def test_workflow(patent_id): |
| | """Test workflow execution""" |
| | console.print("\n[bold blue]3. Testing Workflow Execution[/bold blue]") |
| |
|
| | payload = {"patent_id": patent_id, "scenario": "patent_wakeup"} |
| | response = requests.post( |
| | f"{API_BASE}/api/workflows/execute", |
| | json=payload |
| | ) |
| |
|
| | if response.status_code == 200: |
| | data = response.json() |
| | console.print(f"[green]✓[/green] Workflow started") |
| | console.print(f"Workflow ID: {data['workflow_id']}") |
| | return data['workflow_id'] |
| | else: |
| | console.print(f"[red]✗[/red] Workflow start failed: {response.text}") |
| | return None |
| |
|
| | def monitor_workflow(workflow_id): |
| | """Monitor workflow progress""" |
| | console.print("\n[bold blue]4. Monitoring Workflow Progress[/bold blue]") |
| |
|
| | with Progress( |
| | SpinnerColumn(), |
| | TextColumn("[progress.description]{task.description}"), |
| | console=console |
| | ) as progress: |
| |
|
| | task = progress.add_task("Processing workflow...", total=100) |
| |
|
| | while True: |
| | response = requests.get(f"{API_BASE}/api/workflows/{workflow_id}") |
| |
|
| | if response.status_code != 200: |
| | console.print("[red]Failed to get workflow status[/red]") |
| | break |
| |
|
| | data = response.json() |
| | status = data['status'] |
| | prog = data.get('progress', 0) |
| | current_step = data.get('current_step', 'initializing') |
| |
|
| | progress.update(task, completed=prog, description=f"Step: {current_step}") |
| |
|
| | if status in ['completed', 'failed']: |
| | break |
| |
|
| | time.sleep(2) |
| |
|
| | |
| | response = requests.get(f"{API_BASE}/api/workflows/{workflow_id}") |
| | data = response.json() |
| |
|
| | if data['status'] == 'completed': |
| | console.print(f"\n[green]✓ Workflow completed successfully![/green]") |
| | display_results(data['result']) |
| | else: |
| | console.print(f"\n[red]✗ Workflow failed: {data.get('error', 'Unknown error')}[/red]") |
| |
|
| | def display_results(result): |
| | """Display workflow results""" |
| | console.print("\n[bold]Analysis Results:[/bold]\n") |
| |
|
| | |
| | console.print(f"Quality Score: [blue]{result.get('quality_score', 0):.2f}[/blue]") |
| |
|
| | |
| | doc_analysis = result.get('document_analysis', {}) |
| | if doc_analysis: |
| | console.print(f"\n[bold]Patent Analysis:[/bold]") |
| | console.print(f" TRL Level: {doc_analysis.get('trl_level', 'N/A')}/9") |
| | console.print(f" Key Innovations: {len(doc_analysis.get('key_innovations', []))}") |
| |
|
| | |
| | market_analysis = result.get('market_analysis', {}) |
| | if market_analysis: |
| | opportunities = market_analysis.get('opportunities', []) |
| | console.print(f"\n[bold]Market Opportunities:[/bold]") |
| | console.print(f" Found: {len(opportunities)} opportunities") |
| |
|
| | if opportunities: |
| | table = Table(show_header=True) |
| | table.add_column("Sector", style="cyan") |
| | table.add_column("Market Size", justify="right") |
| | table.add_column("Growth", justify="right") |
| | table.add_column("Fit", style="green") |
| |
|
| | for opp in opportunities[:5]: |
| | table.add_row( |
| | opp.get('sector', '')[:30], |
| | f"${opp.get('market_size_usd', 0)/1e9:.1f}B", |
| | f"{opp.get('growth_rate_percent', 0)}%", |
| | opp.get('technology_fit', '') |
| | ) |
| |
|
| | console.print(table) |
| |
|
| | |
| | matches = result.get('matches', []) |
| | if matches: |
| | console.print(f"\n[bold]Stakeholder Matches:[/bold]") |
| | console.print(f" Found: {len(matches)} potential partners") |
| |
|
| | table = Table(show_header=True) |
| | table.add_column("Partner", style="cyan") |
| | table.add_column("Type") |
| | table.add_column("Location") |
| | table.add_column("Fit Score", justify="right", style="green") |
| |
|
| | for match in matches[:5]: |
| | table.add_row( |
| | match.get('stakeholder_name', '')[:30], |
| | match.get('stakeholder_type', ''), |
| | match.get('location', ''), |
| | f"{match.get('overall_fit_score', 0)*100:.0f}%" |
| | ) |
| |
|
| | console.print(table) |
| |
|
| | |
| | brief = result.get('brief', {}) |
| | if brief: |
| | console.print(f"\n[bold]Valorization Brief:[/bold]") |
| | console.print(f" PDF: {brief.get('pdf_path', 'Not generated')}") |
| |
|
| | def test_list_endpoints(): |
| | """Test list endpoints""" |
| | console.print("\n[bold blue]5. Testing List Endpoints[/bold blue]") |
| |
|
| | |
| | response = requests.get(f"{API_BASE}/api/patents/") |
| | if response.status_code == 200: |
| | patents = response.json() |
| | console.print(f"[green]✓[/green] Found {len(patents)} patents") |
| |
|
| | |
| | response = requests.get(f"{API_BASE}/api/workflows/") |
| | if response.status_code == 200: |
| | workflows = response.json() |
| | console.print(f"[green]✓[/green] Found {len(workflows)} workflows") |
| |
|
| | def main(): |
| | """Main test function""" |
| | console.print("\n[bold cyan]SPARKNET API Test Suite[/bold cyan]\n") |
| |
|
| | try: |
| | |
| | if not test_health(): |
| | console.print("[red]Health check failed - is the API running?[/red]") |
| | console.print("Start with: [yellow]python -m api.main[/yellow]") |
| | return |
| |
|
| | |
| | dataset_dir = Path("Dataset") |
| | test_patents = list(dataset_dir.glob("*.pdf")) if dataset_dir.exists() else [] |
| |
|
| | if not test_patents: |
| | console.print("\n[yellow]No patents found in Dataset/ directory[/yellow]") |
| | console.print("Skipping upload and workflow tests") |
| | return |
| |
|
| | |
| | test_patent = test_patents[0] |
| | console.print(f"\nUsing test patent: [cyan]{test_patent.name}[/cyan]") |
| |
|
| | |
| | patent_id = test_upload(str(test_patent)) |
| | if not patent_id: |
| | return |
| |
|
| | |
| | workflow_id = test_workflow(patent_id) |
| | if not workflow_id: |
| | return |
| |
|
| | |
| | monitor_workflow(workflow_id) |
| |
|
| | |
| | test_list_endpoints() |
| |
|
| | console.print("\n[bold green]✓ All tests completed successfully![/bold green]\n") |
| |
|
| | except requests.exceptions.ConnectionError: |
| | console.print("\n[red]✗ Cannot connect to API[/red]") |
| | console.print("Make sure the API is running: [yellow]python -m api.main[/yellow]") |
| | except Exception as e: |
| | console.print(f"\n[red]✗ Test failed: {e}[/red]") |
| | import traceback |
| | traceback.print_exc() |
| |
|
| | if __name__ == "__main__": |
| | main() |
| |
|