| import json |
| import pandas as pd |
| import gradio as gr |
| from typing import Dict, Any, Type |
| from web2json.preprocessor import BasicPreprocessor |
| from web2json.ai_extractor import AIExtractor, GeminiLLMClient |
| from web2json.postprocessor import PostProcessor |
| from web2json.pipeline import Pipeline |
| from pydantic import BaseModel, Field, create_model |
| import os |
| import dotenv |
|
|
| dotenv.load_dotenv() |
|
|
| def parse_schema_input(schema_input: str) -> Type[BaseModel]: |
| """ |
| Convert user schema input to a Pydantic BaseModel. |
| Supports multiple input formats: |
| 1. JSON schema format |
| 2. Python class definition |
| 3. Simple field definitions |
| """ |
| schema_input = schema_input.strip() |
| |
| if not schema_input: |
| |
| return create_model('DefaultSchema', |
| title=(str, Field(description="Title of the content")), |
| content=(str, Field(description="Main content"))) |
| |
| try: |
| |
| if schema_input.startswith('{'): |
| schema_dict = json.loads(schema_input) |
| return json_schema_to_basemodel(schema_dict) |
| |
| |
| elif 'class ' in schema_input and 'BaseModel' in schema_input: |
| return python_class_to_basemodel(schema_input) |
| |
| |
| else: |
| return simple_fields_to_basemodel(schema_input) |
| |
| except Exception as e: |
| raise ValueError(f"Could not parse schema: {str(e)}. Please check your schema format.") |
|
|
| def json_schema_to_basemodel(schema_dict: Dict) -> Type[BaseModel]: |
| """Convert JSON schema to BaseModel""" |
| fields = {} |
| properties = schema_dict.get('properties', {}) |
| required = schema_dict.get('required', []) |
| |
| for field_name, field_info in properties.items(): |
| field_type = get_python_type(field_info.get('type', 'string')) |
| field_description = field_info.get('description', '') |
| |
| if field_name in required: |
| fields[field_name] = (field_type, Field(description=field_description)) |
| else: |
| fields[field_name] = (field_type, Field(default=None, description=field_description)) |
| |
| return create_model('DynamicSchema', **fields) |
|
|
| def python_class_to_basemodel(class_definition: str) -> Type[BaseModel]: |
| """Convert Python class definition to BaseModel""" |
| try: |
| |
| namespace = {'BaseModel': BaseModel, 'Field': Field, 'str': str, 'int': int, |
| 'float': float, 'bool': bool, 'list': list, 'dict': dict} |
| exec(class_definition, namespace) |
| |
| |
| for name, obj in namespace.items(): |
| if (isinstance(obj, type) and |
| issubclass(obj, BaseModel) and |
| obj != BaseModel): |
| return obj |
| |
| raise ValueError("No BaseModel class found in definition") |
| except Exception as e: |
| raise ValueError(f"Invalid Python class definition: {str(e)}") |
|
|
| def simple_fields_to_basemodel(fields_text: str) -> Type[BaseModel]: |
| """Convert simple field definitions to BaseModel""" |
| fields = {} |
| |
| for line in fields_text.strip().split('\n'): |
| line = line.strip() |
| if not line or line.startswith('#'): |
| continue |
| |
| |
| if ':' in line: |
| parts = line.split(':', 1) |
| field_name = parts[0].strip() |
| |
| type_and_desc = parts[1].strip() |
| if '=' in type_and_desc: |
| type_part, desc_part = type_and_desc.split('=', 1) |
| field_type = get_python_type(type_part.strip()) |
| description = desc_part.strip().strip('"\'') |
| else: |
| field_type = get_python_type(type_and_desc.strip()) |
| description = "" |
| |
| fields[field_name] = (field_type, Field(description=description)) |
| else: |
| |
| field_name = line.strip() |
| fields[field_name] = (str, Field(description="")) |
| |
| if not fields: |
| raise ValueError("No valid fields found in schema definition") |
| |
| return create_model('DynamicSchema', **fields) |
|
|
| def get_python_type(type_str: str): |
| """Convert type string to Python type""" |
| type_str = type_str.lower().strip() |
| type_mapping = { |
| 'string': str, 'str': str, |
| 'integer': int, 'int': int, |
| 'number': float, 'float': float, |
| 'boolean': bool, 'bool': bool, |
| 'array': list, 'list': list, |
| 'object': dict, 'dict': dict |
| } |
| return type_mapping.get(type_str, str) |
|
|
| def webpage_to_json_wrapper(content: str, is_url: bool, schema_input: str) -> Dict[str, Any]: |
| """Wrapper function that converts schema input to BaseModel""" |
| try: |
| |
| schema_model = parse_schema_input(schema_input) |
| |
| |
| return webpage_to_json(content, is_url, schema_model) |
| |
| except Exception as e: |
| return {"error": f"Schema parsing error: {str(e)}"} |
|
|
| def webpage_to_json(content: str, is_url: bool, schema: BaseModel) -> Dict[str, Any]: |
| """ |
| Extracts structured JSON information from a given content based on a specified schema. |
| This function sets up a processing pipeline that includes: |
| - Preprocessing the input content. |
| - Utilizing an AI language model to extract information according to the provided schema. |
| - Postprocessing the extracted output to match the exact schema requirements. |
| Parameters: |
| content (str): The input content to be analyzed. This can be direct text or a URL content. |
| is_url (bool): A flag indicating whether the provided content is a URL (True) or raw text (False). |
| schema (BaseModel): A Pydantic BaseModel defining the expected structure and data types for the output. |
| Returns: |
| Dict[str, Any]: A dictionary containing the extracted data matching the schema. In case of errors during initialization |
| or processing, the dictionary will include an "error" key with a descriptive message. |
| """ |
| prompt_template = """Extract the following information from the provided content according to the specified schema. |
| |
| Content to analyze: |
| {content} |
| |
| Schema requirements: |
| {schema} |
| |
| Instructions: |
| - Extract only information that is explicitly present in the content |
| - Follow the exact structure and data types specified in the schema |
| - If a required field cannot be found, indicate this clearly |
| - Preserve the original formatting and context where relevant |
| - Return the extracted data in the format specified by the schema""" |
| |
| |
| preprocessor = BasicPreprocessor(config={'keep_tags': False}) |
| try: |
| llm = GeminiLLMClient(config={'api_key': os.getenv('GEMINI_API_KEY')}) |
| except Exception as e: |
| return {"error": f"Failed to initialize LLM client: {str(e)}"} |
| |
| ai_extractor = AIExtractor(llm_client=llm, prompt_template=prompt_template) |
| postprocessor = PostProcessor() |
| pipeline = Pipeline(preprocessor, ai_extractor, postprocessor) |
| |
| try: |
| result = pipeline.run(content, is_url, schema) |
| print("-"*80) |
| print(f"Processed result: {result}") |
| return result |
| except Exception as e: |
| return {"error": f"Processing error: {str(e)}"} |
|
|
| |
| example_schemas = """ |
| **Example Schema Formats:** |
| |
| 1. **Simple field definitions:** |
| ``` |
| title: str = Page title |
| price: float = Product price |
| description: str = Product description |
| available: bool = Is available |
| ``` |
| |
| 2. **JSON Schema:** |
| ```json |
| { |
| "properties": { |
| "title": {"type": "string", "description": "Page title"}, |
| "price": {"type": "number", "description": "Product price"}, |
| "description": {"type": "string", "description": "Product description"} |
| }, |
| "required": ["title"] |
| } |
| ``` |
| |
| 3. **Python Class Definition:** |
| ```python |
| class ProductSchema(BaseModel): |
| title: str = Field(description="Product title") |
| price: float = Field(description="Product price") |
| description: str = Field(description="Product description") |
| available: bool = Field(default=False, description="Availability status") |
| ``` |
| """ |
|
|
| |
| demo = gr.Interface( |
| fn=webpage_to_json_wrapper, |
| inputs=[ |
| gr.Textbox( |
| label="Content (URL or Raw Text)", |
| lines=10, |
| placeholder="Enter URL or paste raw HTML/text here." |
| ), |
| gr.Checkbox(label="Content is URL?", value=False), |
| gr.Textbox( |
| label="Schema Definition", |
| lines=15, |
| placeholder="Define your extraction schema (see examples below)", |
| info=example_schemas |
| ) |
| ], |
| outputs=gr.JSON(label="Output JSON"), |
| title="Webpage to JSON Converter", |
| description="Convert web pages or raw text into structured JSON using customizable schemas. Define your schema using simple field definitions, JSON schema, or Python class syntax.", |
| examples=[ |
| [ |
| "https://example.com", |
| True, |
| "title: str = Page title\nprice: float = Product price\ndescription: str = Description" |
| ], |
| [ |
| "<h1>Sample Product</h1><p>Price: $29.99</p><p>Great quality item</p>", |
| False, |
| '''{ |
| "type": "object", |
| "properties": { |
| "title": { |
| "type": "string", |
| "description": "Name of the product" |
| }, |
| "price": { |
| "type": "number", |
| "description": "Price of the product" |
| }, |
| "description": { |
| "type": "string", |
| "description": "Detailed description of the product" |
| }, |
| "availability": { |
| "type": "boolean", |
| "description": "Whether the product is in stock (true) or not (false)" |
| } |
| }, |
| "required": ["title", "price"] |
| }''' |
| ] |
| ] |
| ) |
|
|
| if __name__ == "__main__": |
| demo.launch(mcp_server=True) |