| """ |
| Output for vt100 terminals. |
| |
| A lot of thanks, regarding outputting of colors, goes to the Pygments project: |
| (We don't rely on Pygments anymore, because many things are very custom, and |
| everything has been highly optimized.) |
| http://pygments.org/ |
| """ |
| from __future__ import annotations |
|
|
| import io |
| import os |
| import sys |
| from typing import Callable, Dict, Hashable, Iterable, Sequence, TextIO, Tuple |
|
|
| from prompt_toolkit.cursor_shapes import CursorShape |
| from prompt_toolkit.data_structures import Size |
| from prompt_toolkit.output import Output |
| from prompt_toolkit.styles import ANSI_COLOR_NAMES, Attrs |
| from prompt_toolkit.utils import is_dumb_terminal |
|
|
| from .color_depth import ColorDepth |
| from .flush_stdout import flush_stdout |
|
|
| __all__ = [ |
| "Vt100_Output", |
| ] |
|
|
|
|
| FG_ANSI_COLORS = { |
| "ansidefault": 39, |
| |
| "ansiblack": 30, |
| "ansired": 31, |
| "ansigreen": 32, |
| "ansiyellow": 33, |
| "ansiblue": 34, |
| "ansimagenta": 35, |
| "ansicyan": 36, |
| "ansigray": 37, |
| |
| "ansibrightblack": 90, |
| "ansibrightred": 91, |
| "ansibrightgreen": 92, |
| "ansibrightyellow": 93, |
| "ansibrightblue": 94, |
| "ansibrightmagenta": 95, |
| "ansibrightcyan": 96, |
| "ansiwhite": 97, |
| } |
|
|
| BG_ANSI_COLORS = { |
| "ansidefault": 49, |
| |
| "ansiblack": 40, |
| "ansired": 41, |
| "ansigreen": 42, |
| "ansiyellow": 43, |
| "ansiblue": 44, |
| "ansimagenta": 45, |
| "ansicyan": 46, |
| "ansigray": 47, |
| |
| "ansibrightblack": 100, |
| "ansibrightred": 101, |
| "ansibrightgreen": 102, |
| "ansibrightyellow": 103, |
| "ansibrightblue": 104, |
| "ansibrightmagenta": 105, |
| "ansibrightcyan": 106, |
| "ansiwhite": 107, |
| } |
|
|
|
|
| ANSI_COLORS_TO_RGB = { |
| "ansidefault": ( |
| 0x00, |
| 0x00, |
| 0x00, |
| ), |
| "ansiblack": (0x00, 0x00, 0x00), |
| "ansigray": (0xE5, 0xE5, 0xE5), |
| "ansibrightblack": (0x7F, 0x7F, 0x7F), |
| "ansiwhite": (0xFF, 0xFF, 0xFF), |
| |
| "ansired": (0xCD, 0x00, 0x00), |
| "ansigreen": (0x00, 0xCD, 0x00), |
| "ansiyellow": (0xCD, 0xCD, 0x00), |
| "ansiblue": (0x00, 0x00, 0xCD), |
| "ansimagenta": (0xCD, 0x00, 0xCD), |
| "ansicyan": (0x00, 0xCD, 0xCD), |
| |
| "ansibrightred": (0xFF, 0x00, 0x00), |
| "ansibrightgreen": (0x00, 0xFF, 0x00), |
| "ansibrightyellow": (0xFF, 0xFF, 0x00), |
| "ansibrightblue": (0x00, 0x00, 0xFF), |
| "ansibrightmagenta": (0xFF, 0x00, 0xFF), |
| "ansibrightcyan": (0x00, 0xFF, 0xFF), |
| } |
|
|
|
|
| assert set(FG_ANSI_COLORS) == set(ANSI_COLOR_NAMES) |
| assert set(BG_ANSI_COLORS) == set(ANSI_COLOR_NAMES) |
| assert set(ANSI_COLORS_TO_RGB) == set(ANSI_COLOR_NAMES) |
|
|
|
|
| def _get_closest_ansi_color(r: int, g: int, b: int, exclude: Sequence[str] = ()) -> str: |
| """ |
| Find closest ANSI color. Return it by name. |
| |
| :param r: Red (Between 0 and 255.) |
| :param g: Green (Between 0 and 255.) |
| :param b: Blue (Between 0 and 255.) |
| :param exclude: A tuple of color names to exclude. (E.g. ``('ansired', )``.) |
| """ |
| exclude = list(exclude) |
|
|
| |
| |
| saturation = abs(r - g) + abs(g - b) + abs(b - r) |
|
|
| if saturation > 30: |
| exclude.extend(["ansilightgray", "ansidarkgray", "ansiwhite", "ansiblack"]) |
|
|
| |
| |
| distance = 257 * 257 * 3 |
| match = "ansidefault" |
|
|
| for name, (r2, g2, b2) in ANSI_COLORS_TO_RGB.items(): |
| if name != "ansidefault" and name not in exclude: |
| d = (r - r2) ** 2 + (g - g2) ** 2 + (b - b2) ** 2 |
|
|
| if d < distance: |
| match = name |
| distance = d |
|
|
| return match |
|
|
|
|
| _ColorCodeAndName = Tuple[int, str] |
|
|
|
|
| class _16ColorCache: |
| """ |
| Cache which maps (r, g, b) tuples to 16 ansi colors. |
| |
| :param bg: Cache for background colors, instead of foreground. |
| """ |
|
|
| def __init__(self, bg: bool = False) -> None: |
| self.bg = bg |
| self._cache: dict[Hashable, _ColorCodeAndName] = {} |
|
|
| def get_code( |
| self, value: tuple[int, int, int], exclude: Sequence[str] = () |
| ) -> _ColorCodeAndName: |
| """ |
| Return a (ansi_code, ansi_name) tuple. (E.g. ``(44, 'ansiblue')``.) for |
| a given (r,g,b) value. |
| """ |
| key: Hashable = (value, tuple(exclude)) |
| cache = self._cache |
|
|
| if key not in cache: |
| cache[key] = self._get(value, exclude) |
|
|
| return cache[key] |
|
|
| def _get( |
| self, value: tuple[int, int, int], exclude: Sequence[str] = () |
| ) -> _ColorCodeAndName: |
| r, g, b = value |
| match = _get_closest_ansi_color(r, g, b, exclude=exclude) |
|
|
| |
| if self.bg: |
| code = BG_ANSI_COLORS[match] |
| else: |
| code = FG_ANSI_COLORS[match] |
|
|
| return code, match |
|
|
|
|
| class _256ColorCache(Dict[Tuple[int, int, int], int]): |
| """ |
| Cache which maps (r, g, b) tuples to 256 colors. |
| """ |
|
|
| def __init__(self) -> None: |
| |
| colors: list[tuple[int, int, int]] = [] |
|
|
| |
| colors.append((0x00, 0x00, 0x00)) |
| colors.append((0xCD, 0x00, 0x00)) |
| colors.append((0x00, 0xCD, 0x00)) |
| colors.append((0xCD, 0xCD, 0x00)) |
| colors.append((0x00, 0x00, 0xEE)) |
| colors.append((0xCD, 0x00, 0xCD)) |
| colors.append((0x00, 0xCD, 0xCD)) |
| colors.append((0xE5, 0xE5, 0xE5)) |
| colors.append((0x7F, 0x7F, 0x7F)) |
| colors.append((0xFF, 0x00, 0x00)) |
| colors.append((0x00, 0xFF, 0x00)) |
| colors.append((0xFF, 0xFF, 0x00)) |
| colors.append((0x5C, 0x5C, 0xFF)) |
| colors.append((0xFF, 0x00, 0xFF)) |
| colors.append((0x00, 0xFF, 0xFF)) |
| colors.append((0xFF, 0xFF, 0xFF)) |
|
|
| |
| valuerange = (0x00, 0x5F, 0x87, 0xAF, 0xD7, 0xFF) |
|
|
| for i in range(217): |
| r = valuerange[(i // 36) % 6] |
| g = valuerange[(i // 6) % 6] |
| b = valuerange[i % 6] |
| colors.append((r, g, b)) |
|
|
| |
| for i in range(1, 22): |
| v = 8 + i * 10 |
| colors.append((v, v, v)) |
|
|
| self.colors = colors |
|
|
| def __missing__(self, value: tuple[int, int, int]) -> int: |
| r, g, b = value |
|
|
| |
| |
| distance = 257 * 257 * 3 |
| match = 0 |
|
|
| for i, (r2, g2, b2) in enumerate(self.colors): |
| if i >= 16: |
| |
| |
| d = (r - r2) ** 2 + (g - g2) ** 2 + (b - b2) ** 2 |
|
|
| if d < distance: |
| match = i |
| distance = d |
|
|
| |
| self[value] = match |
| return match |
|
|
|
|
| _16_fg_colors = _16ColorCache(bg=False) |
| _16_bg_colors = _16ColorCache(bg=True) |
| _256_colors = _256ColorCache() |
|
|
|
|
| class _EscapeCodeCache(Dict[Attrs, str]): |
| """ |
| Cache for VT100 escape codes. It maps |
| (fgcolor, bgcolor, bold, underline, strike, reverse) tuples to VT100 |
| escape sequences. |
| |
| :param true_color: When True, use 24bit colors instead of 256 colors. |
| """ |
|
|
| def __init__(self, color_depth: ColorDepth) -> None: |
| self.color_depth = color_depth |
|
|
| def __missing__(self, attrs: Attrs) -> str: |
| ( |
| fgcolor, |
| bgcolor, |
| bold, |
| underline, |
| strike, |
| italic, |
| blink, |
| reverse, |
| hidden, |
| ) = attrs |
| parts: list[str] = [] |
|
|
| parts.extend(self._colors_to_code(fgcolor or "", bgcolor or "")) |
|
|
| if bold: |
| parts.append("1") |
| if italic: |
| parts.append("3") |
| if blink: |
| parts.append("5") |
| if underline: |
| parts.append("4") |
| if reverse: |
| parts.append("7") |
| if hidden: |
| parts.append("8") |
| if strike: |
| parts.append("9") |
|
|
| if parts: |
| result = "\x1b[0;" + ";".join(parts) + "m" |
| else: |
| result = "\x1b[0m" |
|
|
| self[attrs] = result |
| return result |
|
|
| def _color_name_to_rgb(self, color: str) -> tuple[int, int, int]: |
| "Turn 'ffffff', into (0xff, 0xff, 0xff)." |
| try: |
| rgb = int(color, 16) |
| except ValueError: |
| raise |
| else: |
| r = (rgb >> 16) & 0xFF |
| g = (rgb >> 8) & 0xFF |
| b = rgb & 0xFF |
| return r, g, b |
|
|
| def _colors_to_code(self, fg_color: str, bg_color: str) -> Iterable[str]: |
| """ |
| Return a tuple with the vt100 values that represent this color. |
| """ |
| |
| |
| |
| fg_ansi = "" |
|
|
| def get(color: str, bg: bool) -> list[int]: |
| nonlocal fg_ansi |
|
|
| table = BG_ANSI_COLORS if bg else FG_ANSI_COLORS |
|
|
| if not color or self.color_depth == ColorDepth.DEPTH_1_BIT: |
| return [] |
|
|
| |
| elif color in table: |
| return [table[color]] |
|
|
| |
| else: |
| try: |
| rgb = self._color_name_to_rgb(color) |
| except ValueError: |
| return [] |
|
|
| |
| if self.color_depth == ColorDepth.DEPTH_4_BIT: |
| if bg: |
| if fg_color != bg_color: |
| exclude = [fg_ansi] |
| else: |
| exclude = [] |
| code, name = _16_bg_colors.get_code(rgb, exclude=exclude) |
| return [code] |
| else: |
| code, name = _16_fg_colors.get_code(rgb) |
| fg_ansi = name |
| return [code] |
|
|
| |
| elif self.color_depth == ColorDepth.DEPTH_24_BIT: |
| r, g, b = rgb |
| return [(48 if bg else 38), 2, r, g, b] |
|
|
| |
| else: |
| return [(48 if bg else 38), 5, _256_colors[rgb]] |
|
|
| result: list[int] = [] |
| result.extend(get(fg_color, False)) |
| result.extend(get(bg_color, True)) |
|
|
| return map(str, result) |
|
|
|
|
| def _get_size(fileno: int) -> tuple[int, int]: |
| """ |
| Get the size of this pseudo terminal. |
| |
| :param fileno: stdout.fileno() |
| :returns: A (rows, cols) tuple. |
| """ |
| size = os.get_terminal_size(fileno) |
| return size.lines, size.columns |
|
|
|
|
| class Vt100_Output(Output): |
| """ |
| :param get_size: A callable which returns the `Size` of the output terminal. |
| :param stdout: Any object with has a `write` and `flush` method + an 'encoding' property. |
| :param term: The terminal environment variable. (xterm, xterm-256color, linux, ...) |
| :param enable_cpr: When `True` (the default), send "cursor position |
| request" escape sequences to the output in order to detect the cursor |
| position. That way, we can properly determine how much space there is |
| available for the UI (especially for drop down menus) to render. The |
| `Renderer` will still try to figure out whether the current terminal |
| does respond to CPR escapes. When `False`, never attempt to send CPR |
| requests. |
| """ |
|
|
| |
| |
| _fds_not_a_terminal: set[int] = set() |
|
|
| def __init__( |
| self, |
| stdout: TextIO, |
| get_size: Callable[[], Size], |
| term: str | None = None, |
| default_color_depth: ColorDepth | None = None, |
| enable_bell: bool = True, |
| enable_cpr: bool = True, |
| ) -> None: |
| assert all(hasattr(stdout, a) for a in ("write", "flush")) |
|
|
| self._buffer: list[str] = [] |
| self.stdout: TextIO = stdout |
| self.default_color_depth = default_color_depth |
| self._get_size = get_size |
| self.term = term |
| self.enable_bell = enable_bell |
| self.enable_cpr = enable_cpr |
|
|
| |
| self._escape_code_caches: dict[ColorDepth, _EscapeCodeCache] = { |
| ColorDepth.DEPTH_1_BIT: _EscapeCodeCache(ColorDepth.DEPTH_1_BIT), |
| ColorDepth.DEPTH_4_BIT: _EscapeCodeCache(ColorDepth.DEPTH_4_BIT), |
| ColorDepth.DEPTH_8_BIT: _EscapeCodeCache(ColorDepth.DEPTH_8_BIT), |
| ColorDepth.DEPTH_24_BIT: _EscapeCodeCache(ColorDepth.DEPTH_24_BIT), |
| } |
|
|
| |
| |
| |
| self._cursor_shape_changed = False |
|
|
| @classmethod |
| def from_pty( |
| cls, |
| stdout: TextIO, |
| term: str | None = None, |
| default_color_depth: ColorDepth | None = None, |
| enable_bell: bool = True, |
| ) -> Vt100_Output: |
| """ |
| Create an Output class from a pseudo terminal. |
| (This will take the dimensions by reading the pseudo |
| terminal attributes.) |
| """ |
| fd: int | None |
| |
| |
| |
| try: |
| fd = stdout.fileno() |
| except io.UnsupportedOperation: |
| fd = None |
|
|
| if not stdout.isatty() and (fd is None or fd not in cls._fds_not_a_terminal): |
| msg = "Warning: Output is not a terminal (fd=%r).\n" |
| sys.stderr.write(msg % fd) |
| sys.stderr.flush() |
| if fd is not None: |
| cls._fds_not_a_terminal.add(fd) |
|
|
| def get_size() -> Size: |
| |
| |
| |
| rows, columns = (None, None) |
|
|
| |
| |
| |
| |
| try: |
| rows, columns = _get_size(stdout.fileno()) |
| except OSError: |
| pass |
| return Size(rows=rows or 24, columns=columns or 80) |
|
|
| return cls( |
| stdout, |
| get_size, |
| term=term, |
| default_color_depth=default_color_depth, |
| enable_bell=enable_bell, |
| ) |
|
|
| def get_size(self) -> Size: |
| return self._get_size() |
|
|
| def fileno(self) -> int: |
| "Return file descriptor." |
| return self.stdout.fileno() |
|
|
| def encoding(self) -> str: |
| "Return encoding used for stdout." |
| return self.stdout.encoding |
|
|
| def write_raw(self, data: str) -> None: |
| """ |
| Write raw data to output. |
| """ |
| self._buffer.append(data) |
|
|
| def write(self, data: str) -> None: |
| """ |
| Write text to output. |
| (Removes vt100 escape codes. -- used for safely writing text.) |
| """ |
| self._buffer.append(data.replace("\x1b", "?")) |
|
|
| def set_title(self, title: str) -> None: |
| """ |
| Set terminal title. |
| """ |
| if self.term not in ( |
| "linux", |
| "eterm-color", |
| ): |
| self.write_raw( |
| "\x1b]2;%s\x07" % title.replace("\x1b", "").replace("\x07", "") |
| ) |
|
|
| def clear_title(self) -> None: |
| self.set_title("") |
|
|
| def erase_screen(self) -> None: |
| """ |
| Erases the screen with the background color and moves the cursor to |
| home. |
| """ |
| self.write_raw("\x1b[2J") |
|
|
| def enter_alternate_screen(self) -> None: |
| self.write_raw("\x1b[?1049h\x1b[H") |
|
|
| def quit_alternate_screen(self) -> None: |
| self.write_raw("\x1b[?1049l") |
|
|
| def enable_mouse_support(self) -> None: |
| self.write_raw("\x1b[?1000h") |
|
|
| |
| self.write_raw("\x1b[?1003h") |
|
|
| |
| self.write_raw("\x1b[?1015h") |
|
|
| |
| self.write_raw("\x1b[?1006h") |
|
|
| |
| |
|
|
| def disable_mouse_support(self) -> None: |
| self.write_raw("\x1b[?1000l") |
| self.write_raw("\x1b[?1015l") |
| self.write_raw("\x1b[?1006l") |
| self.write_raw("\x1b[?1003l") |
|
|
| def erase_end_of_line(self) -> None: |
| """ |
| Erases from the current cursor position to the end of the current line. |
| """ |
| self.write_raw("\x1b[K") |
|
|
| def erase_down(self) -> None: |
| """ |
| Erases the screen from the current line down to the bottom of the |
| screen. |
| """ |
| self.write_raw("\x1b[J") |
|
|
| def reset_attributes(self) -> None: |
| self.write_raw("\x1b[0m") |
|
|
| def set_attributes(self, attrs: Attrs, color_depth: ColorDepth) -> None: |
| """ |
| Create new style and output. |
| |
| :param attrs: `Attrs` instance. |
| """ |
| |
| escape_code_cache = self._escape_code_caches[color_depth] |
|
|
| |
| self.write_raw(escape_code_cache[attrs]) |
|
|
| def disable_autowrap(self) -> None: |
| self.write_raw("\x1b[?7l") |
|
|
| def enable_autowrap(self) -> None: |
| self.write_raw("\x1b[?7h") |
|
|
| def enable_bracketed_paste(self) -> None: |
| self.write_raw("\x1b[?2004h") |
|
|
| def disable_bracketed_paste(self) -> None: |
| self.write_raw("\x1b[?2004l") |
|
|
| def reset_cursor_key_mode(self) -> None: |
| """ |
| For vt100 only. |
| Put the terminal in cursor mode (instead of application mode). |
| """ |
| |
| self.write_raw("\x1b[?1l") |
|
|
| def cursor_goto(self, row: int = 0, column: int = 0) -> None: |
| """ |
| Move cursor position. |
| """ |
| self.write_raw("\x1b[%i;%iH" % (row, column)) |
|
|
| def cursor_up(self, amount: int) -> None: |
| if amount == 0: |
| pass |
| elif amount == 1: |
| self.write_raw("\x1b[A") |
| else: |
| self.write_raw("\x1b[%iA" % amount) |
|
|
| def cursor_down(self, amount: int) -> None: |
| if amount == 0: |
| pass |
| elif amount == 1: |
| |
| |
| self.write_raw("\x1b[B") |
| else: |
| self.write_raw("\x1b[%iB" % amount) |
|
|
| def cursor_forward(self, amount: int) -> None: |
| if amount == 0: |
| pass |
| elif amount == 1: |
| self.write_raw("\x1b[C") |
| else: |
| self.write_raw("\x1b[%iC" % amount) |
|
|
| def cursor_backward(self, amount: int) -> None: |
| if amount == 0: |
| pass |
| elif amount == 1: |
| self.write_raw("\b") |
| else: |
| self.write_raw("\x1b[%iD" % amount) |
|
|
| def hide_cursor(self) -> None: |
| self.write_raw("\x1b[?25l") |
|
|
| def show_cursor(self) -> None: |
| self.write_raw("\x1b[?12l\x1b[?25h") |
|
|
| def set_cursor_shape(self, cursor_shape: CursorShape) -> None: |
| if cursor_shape == CursorShape._NEVER_CHANGE: |
| return |
|
|
| self._cursor_shape_changed = True |
| self.write_raw( |
| { |
| CursorShape.BLOCK: "\x1b[2 q", |
| CursorShape.BEAM: "\x1b[6 q", |
| CursorShape.UNDERLINE: "\x1b[4 q", |
| CursorShape.BLINKING_BLOCK: "\x1b[1 q", |
| CursorShape.BLINKING_BEAM: "\x1b[5 q", |
| CursorShape.BLINKING_UNDERLINE: "\x1b[3 q", |
| }.get(cursor_shape, "") |
| ) |
|
|
| def reset_cursor_shape(self) -> None: |
| "Reset cursor shape." |
| |
| if self._cursor_shape_changed: |
| self._cursor_shape_changed = False |
|
|
| |
| self.write_raw("\x1b[0 q") |
|
|
| def flush(self) -> None: |
| """ |
| Write to output stream and flush. |
| """ |
| if not self._buffer: |
| return |
|
|
| data = "".join(self._buffer) |
| self._buffer = [] |
|
|
| flush_stdout(self.stdout, data) |
|
|
| def ask_for_cpr(self) -> None: |
| """ |
| Asks for a cursor position report (CPR). |
| """ |
| self.write_raw("\x1b[6n") |
| self.flush() |
|
|
| @property |
| def responds_to_cpr(self) -> bool: |
| if not self.enable_cpr: |
| return False |
|
|
| |
| |
| if os.environ.get("PROMPT_TOOLKIT_NO_CPR", "") == "1": |
| return False |
|
|
| if is_dumb_terminal(self.term): |
| return False |
| try: |
| return self.stdout.isatty() |
| except ValueError: |
| return False |
|
|
| def bell(self) -> None: |
| "Sound bell." |
| if self.enable_bell: |
| self.write_raw("\a") |
| self.flush() |
|
|
| def get_default_color_depth(self) -> ColorDepth: |
| """ |
| Return the default color depth for a vt100 terminal, according to the |
| our term value. |
| |
| We prefer 256 colors almost always, because this is what most terminals |
| support these days, and is a good default. |
| """ |
| if self.default_color_depth is not None: |
| return self.default_color_depth |
|
|
| term = self.term |
|
|
| if term is None: |
| return ColorDepth.DEFAULT |
|
|
| if is_dumb_terminal(term): |
| return ColorDepth.DEPTH_1_BIT |
|
|
| if term in ("linux", "eterm-color"): |
| return ColorDepth.DEPTH_4_BIT |
|
|
| return ColorDepth.DEFAULT |
|
|