| """
|
| This file defines a useful high-level abstraction to build Gradio chatbots: ChatInterface.
|
| """
|
|
|
| from __future__ import annotations
|
|
|
| import inspect
|
| from typing import AsyncGenerator, Callable, Literal, Union, cast
|
|
|
| import anyio
|
| from gradio_client.documentation import document
|
|
|
| from gradio.blocks import Blocks
|
| from gradio.components import (
|
| Button,
|
| Chatbot,
|
| Component,
|
| Markdown,
|
| MultimodalTextbox,
|
| State,
|
| Textbox,
|
| get_component_instance,
|
| Dataset,
|
| )
|
| from gradio.events import Dependency, on
|
| from gradio.helpers import special_args
|
| from gradio.layouts import Accordion, Group, Row
|
| from gradio.routes import Request
|
| from gradio.themes import ThemeClass as Theme
|
| from gradio.utils import SyncToAsyncIterator, async_iteration, async_lambda
|
|
|
|
|
| @document()
|
| class ChatInterface(Blocks):
|
| """
|
| ChatInterface is Gradio's high-level abstraction for creating chatbot UIs, and allows you to create
|
| a web-based demo around a chatbot model in a few lines of code. Only one parameter is required: fn, which
|
| takes a function that governs the response of the chatbot based on the user input and chat history. Additional
|
| parameters can be used to control the appearance and behavior of the demo.
|
|
|
| Example:
|
| import gradio as gr
|
|
|
| def echo(message, history):
|
| return message
|
|
|
| demo = gr.ChatInterface(fn=echo, examples=["hello", "hola", "merhaba"], title="Echo Bot")
|
| demo.launch()
|
| Demos: chatinterface_multimodal, chatinterface_random_response, chatinterface_streaming_echo
|
| Guides: creating-a-chatbot-fast, sharing-your-app
|
| """
|
|
|
| def __init__(
|
| self,
|
| fn: Callable,
|
| post_fn: Callable,
|
| pre_fn: Callable,
|
| chatbot: Chatbot,
|
| *,
|
| post_fn_kwargs: dict = None,
|
| pre_fn_kwargs: dict = None,
|
| multimodal: bool = False,
|
| textbox: Textbox | MultimodalTextbox | None = None,
|
| additional_inputs: str | Component | list[str | Component] | None = None,
|
| additional_inputs_accordion_name: str | None = None,
|
| additional_inputs_accordion: str | Accordion | None = None,
|
| examples: Dataset = None,
|
| title: str | None = None,
|
| description: str | None = None,
|
| theme: Theme | str | None = None,
|
| css: str | None = None,
|
| js: str | None = None,
|
| head: str | None = None,
|
| analytics_enabled: bool | None = None,
|
| submit_btn: str | None | Button = "Submit",
|
| stop_btn: str | None | Button = "Stop",
|
| retry_btn: str | None | Button = "🔄 Retry",
|
| undo_btn: str | None | Button = "↩️ Undo",
|
| clear_btn: str | None | Button = "🗑️ Clear",
|
| autofocus: bool = True,
|
| concurrency_limit: int | None | Literal["default"] = "default",
|
| fill_height: bool = True,
|
| delete_cache: tuple[int, int] | None = None,
|
| ):
|
| super().__init__(
|
| analytics_enabled=analytics_enabled,
|
| mode="chat_interface",
|
| css=css,
|
| title=title or "Gradio",
|
| theme=theme,
|
| js=js,
|
| head=head,
|
| fill_height=fill_height,
|
| delete_cache=delete_cache,
|
| )
|
|
|
| if post_fn_kwargs is None:
|
| post_fn_kwargs = []
|
|
|
| self.post_fn = post_fn
|
| self.post_fn_kwargs = post_fn_kwargs
|
|
|
| self.pre_fn = pre_fn
|
| self.pre_fn_kwargs = pre_fn_kwargs
|
|
|
| self.interrupter = State(None)
|
|
|
| self.multimodal = multimodal
|
| self.concurrency_limit = concurrency_limit
|
| self.fn = fn
|
| self.is_async = inspect.iscoroutinefunction(
|
| self.fn
|
| ) or inspect.isasyncgenfunction(self.fn)
|
| self.is_generator = inspect.isgeneratorfunction(
|
| self.fn
|
| ) or inspect.isasyncgenfunction(self.fn)
|
|
|
| if additional_inputs:
|
| if not isinstance(additional_inputs, list):
|
| additional_inputs = [additional_inputs]
|
| self.additional_inputs = [
|
| get_component_instance(i)
|
| for i in additional_inputs
|
| ]
|
| else:
|
| self.additional_inputs = []
|
| if additional_inputs_accordion_name is not None:
|
| print(
|
| "The `additional_inputs_accordion_name` parameter is deprecated and will be removed in a future version of Gradio. Use the `additional_inputs_accordion` parameter instead."
|
| )
|
| self.additional_inputs_accordion_params = {
|
| "label": additional_inputs_accordion_name
|
| }
|
| if additional_inputs_accordion is None:
|
| self.additional_inputs_accordion_params = {
|
| "label": "Additional Inputs",
|
| "open": False,
|
| }
|
| elif isinstance(additional_inputs_accordion, str):
|
| self.additional_inputs_accordion_params = {
|
| "label": additional_inputs_accordion
|
| }
|
| elif isinstance(additional_inputs_accordion, Accordion):
|
| self.additional_inputs_accordion_params = (
|
| additional_inputs_accordion.recover_kwargs(
|
| additional_inputs_accordion.get_config()
|
| )
|
| )
|
| else:
|
| raise ValueError(
|
| f"The `additional_inputs_accordion` parameter must be a string or gr.Accordion, not {type(additional_inputs_accordion)}"
|
| )
|
|
|
| with self:
|
| if title:
|
| Markdown(
|
| f"<h1 style='text-align: center; margin-bottom: 1rem'>{self.title}</h1>"
|
| )
|
| if description:
|
| Markdown(description)
|
|
|
| self.chatbot = chatbot.render()
|
|
|
| self.buttons = [retry_btn, undo_btn, clear_btn]
|
|
|
| with Group():
|
| with Row():
|
| if textbox:
|
| if self.multimodal:
|
| submit_btn = None
|
| else:
|
| textbox.container = False
|
| textbox.show_label = False
|
| textbox_ = textbox.render()
|
| if not isinstance(textbox_, (Textbox, MultimodalTextbox)):
|
| raise TypeError(
|
| f"Expected a gr.Textbox or gr.MultimodalTextbox component, but got {type(textbox_)}"
|
| )
|
| self.textbox = textbox_
|
| elif self.multimodal:
|
| submit_btn = None
|
| self.textbox = MultimodalTextbox(
|
| show_label=False,
|
| label="Message",
|
| placeholder="Type a message...",
|
| scale=7,
|
| autofocus=autofocus,
|
| )
|
| else:
|
| self.textbox = Textbox(
|
| container=False,
|
| show_label=False,
|
| label="Message",
|
| placeholder="Type a message...",
|
| scale=7,
|
| autofocus=autofocus,
|
| )
|
| if submit_btn is not None and not multimodal:
|
| if isinstance(submit_btn, Button):
|
| submit_btn.render()
|
| elif isinstance(submit_btn, str):
|
| submit_btn = Button(
|
| submit_btn,
|
| variant="primary",
|
| scale=1,
|
| min_width=150,
|
| )
|
| else:
|
| raise ValueError(
|
| f"The submit_btn parameter must be a gr.Button, string, or None, not {type(submit_btn)}"
|
| )
|
| if stop_btn is not None:
|
| if isinstance(stop_btn, Button):
|
| stop_btn.visible = False
|
| stop_btn.render()
|
| elif isinstance(stop_btn, str):
|
| stop_btn = Button(
|
| stop_btn,
|
| variant="stop",
|
| visible=False,
|
| scale=1,
|
| min_width=150,
|
| )
|
| else:
|
| raise ValueError(
|
| f"The stop_btn parameter must be a gr.Button, string, or None, not {type(stop_btn)}"
|
| )
|
| self.buttons.extend([submit_btn, stop_btn])
|
|
|
| self.fake_api_btn = Button("Fake API", visible=False)
|
| self.fake_response_textbox = Textbox(label="Response", visible=False)
|
| (
|
| self.retry_btn,
|
| self.undo_btn,
|
| self.clear_btn,
|
| self.submit_btn,
|
| self.stop_btn,
|
| ) = self.buttons
|
|
|
| any_unrendered_inputs = any(
|
| not inp.is_rendered for inp in self.additional_inputs
|
| )
|
| if self.additional_inputs and any_unrendered_inputs:
|
| with Accordion(**self.additional_inputs_accordion_params):
|
| for input_component in self.additional_inputs:
|
| if not input_component.is_rendered:
|
| input_component.render()
|
|
|
| self.saved_input = State()
|
| self.chatbot_state = (
|
| State(self.chatbot.value) if self.chatbot.value else State([])
|
| )
|
|
|
| self._setup_events()
|
| self._setup_api()
|
|
|
| if examples:
|
| examples.click(lambda x: x[0], inputs=[examples], outputs=self.textbox, show_progress=False, queue=False)
|
|
|
| def _setup_events(self) -> None:
|
| submit_fn = self._stream_fn if self.is_generator else self._submit_fn
|
| submit_triggers = (
|
| [self.textbox.submit, self.submit_btn.click]
|
| if self.submit_btn
|
| else [self.textbox.submit]
|
| )
|
| submit_event = (
|
| on(
|
| submit_triggers,
|
| self._clear_and_save_textbox,
|
| [self.textbox],
|
| [self.textbox, self.saved_input],
|
| show_api=False,
|
| queue=False,
|
| )
|
| .then(
|
| self.pre_fn,
|
| **self.pre_fn_kwargs,
|
| show_api=False,
|
| queue=False,
|
| )
|
| .then(
|
| self._display_input,
|
| [self.saved_input, self.chatbot_state],
|
| [self.chatbot, self.chatbot_state],
|
| show_api=False,
|
| queue=False,
|
| )
|
| .then(
|
| submit_fn,
|
| [self.saved_input, self.chatbot_state] + self.additional_inputs,
|
| [self.chatbot, self.chatbot_state, self.interrupter],
|
| show_api=False,
|
| concurrency_limit=cast(
|
| Union[int, Literal["default"], None], self.concurrency_limit
|
| ),
|
| ).then(
|
| self.post_fn,
|
| **self.post_fn_kwargs,
|
| show_api=False,
|
| concurrency_limit=cast(
|
| Union[int, Literal["default"], None], self.concurrency_limit
|
| ),
|
| )
|
| )
|
| self._setup_stop_events(submit_triggers, submit_event)
|
|
|
| if self.retry_btn:
|
| retry_event = (
|
| self.retry_btn.click(
|
| self._delete_prev_fn,
|
| [self.saved_input, self.chatbot_state],
|
| [self.chatbot, self.saved_input, self.chatbot_state],
|
| show_api=False,
|
| queue=False,
|
| )
|
| .then(
|
| self.pre_fn,
|
| **self.pre_fn_kwargs,
|
| show_api=False,
|
| queue=False,
|
| )
|
| .then(
|
| self._display_input,
|
| [self.saved_input, self.chatbot_state],
|
| [self.chatbot, self.chatbot_state],
|
| show_api=False,
|
| queue=False,
|
| )
|
| .then(
|
| submit_fn,
|
| [self.saved_input, self.chatbot_state] + self.additional_inputs,
|
| [self.chatbot, self.chatbot_state],
|
| show_api=False,
|
| concurrency_limit=cast(
|
| Union[int, Literal["default"], None], self.concurrency_limit
|
| ),
|
| ).then(
|
| self.post_fn,
|
| **self.post_fn_kwargs,
|
| show_api=False,
|
| concurrency_limit=cast(
|
| Union[int, Literal["default"], None], self.concurrency_limit
|
| ),
|
| )
|
| )
|
| self._setup_stop_events([self.retry_btn.click], retry_event)
|
|
|
| if self.undo_btn:
|
| self.undo_btn.click(
|
| self._delete_prev_fn,
|
| [self.saved_input, self.chatbot_state],
|
| [self.chatbot, self.saved_input, self.chatbot_state],
|
| show_api=False,
|
| queue=False,
|
| ).then(
|
| self.pre_fn,
|
| **self.pre_fn_kwargs,
|
| show_api=False,
|
| queue=False,
|
| ).then(
|
| async_lambda(lambda x: x),
|
| [self.saved_input],
|
| [self.textbox],
|
| show_api=False,
|
| queue=False,
|
| ).then(
|
| self.post_fn,
|
| **self.post_fn_kwargs,
|
| show_api=False,
|
| concurrency_limit=cast(
|
| Union[int, Literal["default"], None], self.concurrency_limit
|
| ),
|
| )
|
|
|
| if self.clear_btn:
|
| self.clear_btn.click(
|
| async_lambda(lambda: ([], [], None)),
|
| None,
|
| [self.chatbot, self.chatbot_state, self.saved_input],
|
| queue=False,
|
| show_api=False,
|
| ).then(
|
| self.pre_fn,
|
| **self.pre_fn_kwargs,
|
| show_api=False,
|
| queue=False,
|
| ).then(
|
| self.post_fn,
|
| **self.post_fn_kwargs,
|
| show_api=False,
|
| concurrency_limit=cast(
|
| Union[int, Literal["default"], None], self.concurrency_limit
|
| ),
|
| )
|
|
|
| def _setup_stop_events(
|
| self, event_triggers: list[Callable], event_to_cancel: Dependency
|
| ) -> None:
|
| def perform_interrupt(ipc):
|
| if ipc is not None:
|
| ipc()
|
| return
|
|
|
| if self.stop_btn and self.is_generator:
|
| if self.submit_btn:
|
| for event_trigger in event_triggers:
|
| event_trigger(
|
| async_lambda(
|
| lambda: (
|
| Button(visible=False),
|
| Button(visible=True),
|
| )
|
| ),
|
| None,
|
| [self.submit_btn, self.stop_btn],
|
| show_api=False,
|
| queue=False,
|
| )
|
| event_to_cancel.then(
|
| async_lambda(lambda: (Button(visible=True), Button(visible=False))),
|
| None,
|
| [self.submit_btn, self.stop_btn],
|
| show_api=False,
|
| queue=False,
|
| )
|
| else:
|
| for event_trigger in event_triggers:
|
| event_trigger(
|
| async_lambda(lambda: Button(visible=True)),
|
| None,
|
| [self.stop_btn],
|
| show_api=False,
|
| queue=False,
|
| )
|
| event_to_cancel.then(
|
| async_lambda(lambda: Button(visible=False)),
|
| None,
|
| [self.stop_btn],
|
| show_api=False,
|
| queue=False,
|
| )
|
| self.stop_btn.click(
|
| fn=perform_interrupt,
|
| inputs=[self.interrupter],
|
| cancels=event_to_cancel,
|
| show_api=False,
|
| )
|
|
|
| def _setup_api(self) -> None:
|
| api_fn = self._api_stream_fn if self.is_generator else self._api_submit_fn
|
|
|
| self.fake_api_btn.click(
|
| api_fn,
|
| [self.textbox, self.chatbot_state] + self.additional_inputs,
|
| [self.textbox, self.chatbot_state],
|
| api_name="chat",
|
| concurrency_limit=cast(
|
| Union[int, Literal["default"], None], self.concurrency_limit
|
| ),
|
| )
|
|
|
| def _clear_and_save_textbox(self, message: str) -> tuple[str | dict, str]:
|
| if self.multimodal:
|
| return {"text": "", "files": []}, message
|
| else:
|
| return "", message
|
|
|
| def _append_multimodal_history(
|
| self,
|
| message: dict[str, list],
|
| response: str | None,
|
| history: list[list[str | tuple | None]],
|
| ):
|
| for x in message["files"]:
|
| history.append([(x,), None])
|
| if message["text"] is None or not isinstance(message["text"], str):
|
| return
|
| elif message["text"] == "" and message["files"] != []:
|
| history.append([None, response])
|
| else:
|
| history.append([message["text"], response])
|
|
|
| async def _display_input(
|
| self, message: str | dict[str, list], history: list[list[str | tuple | None]]
|
| ) -> tuple[list[list[str | tuple | None]], list[list[str | tuple | None]]]:
|
| if self.multimodal and isinstance(message, dict):
|
| self._append_multimodal_history(message, None, history)
|
| elif isinstance(message, str):
|
| history.append([message, None])
|
| return history, history
|
|
|
| async def _submit_fn(
|
| self,
|
| message: str | dict[str, list],
|
| history_with_input: list[list[str | tuple | None]],
|
| request: Request,
|
| *args,
|
| ) -> tuple[list[list[str | tuple | None]], list[list[str | tuple | None]]]:
|
| if self.multimodal and isinstance(message, dict):
|
| remove_input = (
|
| len(message["files"]) + 1
|
| if message["text"] is not None
|
| else len(message["files"])
|
| )
|
| history = history_with_input[:-remove_input]
|
| else:
|
| history = history_with_input[:-1]
|
| inputs, _, _ = special_args(
|
| self.fn, inputs=[message, history, *args], request=request
|
| )
|
|
|
| if self.is_async:
|
| response = await self.fn(*inputs)
|
| else:
|
| response = await anyio.to_thread.run_sync(
|
| self.fn, *inputs, limiter=self.limiter
|
| )
|
|
|
| if self.multimodal and isinstance(message, dict):
|
| self._append_multimodal_history(message, response, history)
|
| elif isinstance(message, str):
|
| history.append([message, response])
|
| return history, history
|
|
|
| async def _stream_fn(
|
| self,
|
| message: str | dict[str, list],
|
| history_with_input: list[list[str | tuple | None]],
|
| request: Request,
|
| *args,
|
| ) -> AsyncGenerator:
|
| if self.multimodal and isinstance(message, dict):
|
| remove_input = (
|
| len(message["files"]) + 1
|
| if message["text"] is not None
|
| else len(message["files"])
|
| )
|
| history = history_with_input[:-remove_input]
|
| else:
|
| history = history_with_input[:-1]
|
| inputs, _, _ = special_args(
|
| self.fn, inputs=[message, history, *args], request=request
|
| )
|
|
|
| if self.is_async:
|
| generator = self.fn(*inputs)
|
| else:
|
| generator = await anyio.to_thread.run_sync(
|
| self.fn, *inputs, limiter=self.limiter
|
| )
|
| generator = SyncToAsyncIterator(generator, self.limiter)
|
| try:
|
| first_response, first_interrupter = await async_iteration(generator)
|
| if self.multimodal and isinstance(message, dict):
|
| for x in message["files"]:
|
| history.append([(x,), None])
|
| update = history + [[message["text"], first_response]]
|
| yield update, update
|
| else:
|
| update = history + [[message, first_response]]
|
| yield update, update, first_interrupter
|
| except StopIteration:
|
| if self.multimodal and isinstance(message, dict):
|
| self._append_multimodal_history(message, None, history)
|
| yield history, history
|
| else:
|
| update = history + [[message, None]]
|
| yield update, update, first_interrupter
|
| async for response, interrupter in generator:
|
| if self.multimodal and isinstance(message, dict):
|
| update = history + [[message["text"], response]]
|
| yield update, update
|
| else:
|
| update = history + [[message, response]]
|
| yield update, update, interrupter
|
|
|
| async def _api_submit_fn(
|
| self, message: str, history: list[list[str | None]], request: Request, *args
|
| ) -> tuple[str, list[list[str | None]]]:
|
| inputs, _, _ = special_args(
|
| self.fn, inputs=[message, history, *args], request=request
|
| )
|
|
|
| if self.is_async:
|
| response = await self.fn(*inputs)
|
| else:
|
| response = await anyio.to_thread.run_sync(
|
| self.fn, *inputs, limiter=self.limiter
|
| )
|
| history.append([message, response])
|
| return response, history
|
|
|
| async def _api_stream_fn(
|
| self, message: str, history: list[list[str | None]], request: Request, *args
|
| ) -> AsyncGenerator:
|
| inputs, _, _ = special_args(
|
| self.fn, inputs=[message, history, *args], request=request
|
| )
|
|
|
| if self.is_async:
|
| generator = self.fn(*inputs)
|
| else:
|
| generator = await anyio.to_thread.run_sync(
|
| self.fn, *inputs, limiter=self.limiter
|
| )
|
| generator = SyncToAsyncIterator(generator, self.limiter)
|
| try:
|
| first_response = await async_iteration(generator)
|
| yield first_response, history + [[message, first_response]]
|
| except StopIteration:
|
| yield None, history + [[message, None]]
|
| async for response in generator:
|
| yield response, history + [[message, response]]
|
|
|
| async def _delete_prev_fn(
|
| self,
|
| message: str | dict[str, list],
|
| history: list[list[str | tuple | None]],
|
| ) -> tuple[
|
| list[list[str | tuple | None]],
|
| str | dict[str, list],
|
| list[list[str | tuple | None]],
|
| ]:
|
| if self.multimodal and isinstance(message, dict):
|
| remove_input = (
|
| len(message["files"]) + 1
|
| if message["text"] is not None
|
| else len(message["files"])
|
| )
|
| history = history[:-remove_input]
|
| else:
|
| while history:
|
| deleted_a, deleted_b = history[-1]
|
| history = history[:-1]
|
| if isinstance(deleted_a, str) and isinstance(deleted_b, str):
|
| break
|
| return history, message or "", history
|
|
|