| | from __future__ import annotations |
| |
|
| | import functools |
| | import json |
| | import sys |
| | import typing |
| |
|
| | import click |
| | import pygments.lexers |
| | import pygments.util |
| | import rich.console |
| | import rich.markup |
| | import rich.progress |
| | import rich.syntax |
| | import rich.table |
| |
|
| | from ._client import Client |
| | from ._exceptions import RequestError |
| | from ._models import Response |
| | from ._status_codes import codes |
| |
|
| | if typing.TYPE_CHECKING: |
| | import httpcore |
| |
|
| |
|
| | def print_help() -> None: |
| | console = rich.console.Console() |
| |
|
| | console.print("[bold]HTTPX :butterfly:", justify="center") |
| | console.print() |
| | console.print("A next generation HTTP client.", justify="center") |
| | console.print() |
| | console.print( |
| | "Usage: [bold]httpx[/bold] [cyan]<URL> [OPTIONS][/cyan] ", justify="left" |
| | ) |
| | console.print() |
| |
|
| | table = rich.table.Table.grid(padding=1, pad_edge=True) |
| | table.add_column("Parameter", no_wrap=True, justify="left", style="bold") |
| | table.add_column("Description") |
| | table.add_row( |
| | "-m, --method [cyan]METHOD", |
| | "Request method, such as GET, POST, PUT, PATCH, DELETE, OPTIONS, HEAD.\n" |
| | "[Default: GET, or POST if a request body is included]", |
| | ) |
| | table.add_row( |
| | "-p, --params [cyan]<NAME VALUE> ...", |
| | "Query parameters to include in the request URL.", |
| | ) |
| | table.add_row( |
| | "-c, --content [cyan]TEXT", "Byte content to include in the request body." |
| | ) |
| | table.add_row( |
| | "-d, --data [cyan]<NAME VALUE> ...", "Form data to include in the request body." |
| | ) |
| | table.add_row( |
| | "-f, --files [cyan]<NAME FILENAME> ...", |
| | "Form files to include in the request body.", |
| | ) |
| | table.add_row("-j, --json [cyan]TEXT", "JSON data to include in the request body.") |
| | table.add_row( |
| | "-h, --headers [cyan]<NAME VALUE> ...", |
| | "Include additional HTTP headers in the request.", |
| | ) |
| | table.add_row( |
| | "--cookies [cyan]<NAME VALUE> ...", "Cookies to include in the request." |
| | ) |
| | table.add_row( |
| | "--auth [cyan]<USER PASS>", |
| | "Username and password to include in the request. Specify '-' for the password" |
| | " to use a password prompt. Note that using --verbose/-v will expose" |
| | " the Authorization header, including the password encoding" |
| | " in a trivially reversible format.", |
| | ) |
| |
|
| | table.add_row( |
| | "--proxy [cyan]URL", |
| | "Send the request via a proxy. Should be the URL giving the proxy address.", |
| | ) |
| |
|
| | table.add_row( |
| | "--timeout [cyan]FLOAT", |
| | "Timeout value to use for network operations, such as establishing the" |
| | " connection, reading some data, etc... [Default: 5.0]", |
| | ) |
| |
|
| | table.add_row("--follow-redirects", "Automatically follow redirects.") |
| | table.add_row("--no-verify", "Disable SSL verification.") |
| | table.add_row( |
| | "--http2", "Send the request using HTTP/2, if the remote server supports it." |
| | ) |
| |
|
| | table.add_row( |
| | "--download [cyan]FILE", |
| | "Save the response content as a file, rather than displaying it.", |
| | ) |
| |
|
| | table.add_row("-v, --verbose", "Verbose output. Show request as well as response.") |
| | table.add_row("--help", "Show this message and exit.") |
| | console.print(table) |
| |
|
| |
|
| | def get_lexer_for_response(response: Response) -> str: |
| | content_type = response.headers.get("Content-Type") |
| | if content_type is not None: |
| | mime_type, _, _ = content_type.partition(";") |
| | try: |
| | return typing.cast( |
| | str, pygments.lexers.get_lexer_for_mimetype(mime_type.strip()).name |
| | ) |
| | except pygments.util.ClassNotFound: |
| | pass |
| | return "" |
| |
|
| |
|
| | def format_request_headers(request: httpcore.Request, http2: bool = False) -> str: |
| | version = "HTTP/2" if http2 else "HTTP/1.1" |
| | headers = [ |
| | (name.lower() if http2 else name, value) for name, value in request.headers |
| | ] |
| | method = request.method.decode("ascii") |
| | target = request.url.target.decode("ascii") |
| | lines = [f"{method} {target} {version}"] + [ |
| | f"{name.decode('ascii')}: {value.decode('ascii')}" for name, value in headers |
| | ] |
| | return "\n".join(lines) |
| |
|
| |
|
| | def format_response_headers( |
| | http_version: bytes, |
| | status: int, |
| | reason_phrase: bytes | None, |
| | headers: list[tuple[bytes, bytes]], |
| | ) -> str: |
| | version = http_version.decode("ascii") |
| | reason = ( |
| | codes.get_reason_phrase(status) |
| | if reason_phrase is None |
| | else reason_phrase.decode("ascii") |
| | ) |
| | lines = [f"{version} {status} {reason}"] + [ |
| | f"{name.decode('ascii')}: {value.decode('ascii')}" for name, value in headers |
| | ] |
| | return "\n".join(lines) |
| |
|
| |
|
| | def print_request_headers(request: httpcore.Request, http2: bool = False) -> None: |
| | console = rich.console.Console() |
| | http_text = format_request_headers(request, http2=http2) |
| | syntax = rich.syntax.Syntax(http_text, "http", theme="ansi_dark", word_wrap=True) |
| | console.print(syntax) |
| | syntax = rich.syntax.Syntax("", "http", theme="ansi_dark", word_wrap=True) |
| | console.print(syntax) |
| |
|
| |
|
| | def print_response_headers( |
| | http_version: bytes, |
| | status: int, |
| | reason_phrase: bytes | None, |
| | headers: list[tuple[bytes, bytes]], |
| | ) -> None: |
| | console = rich.console.Console() |
| | http_text = format_response_headers(http_version, status, reason_phrase, headers) |
| | syntax = rich.syntax.Syntax(http_text, "http", theme="ansi_dark", word_wrap=True) |
| | console.print(syntax) |
| | syntax = rich.syntax.Syntax("", "http", theme="ansi_dark", word_wrap=True) |
| | console.print(syntax) |
| |
|
| |
|
| | def print_response(response: Response) -> None: |
| | console = rich.console.Console() |
| | lexer_name = get_lexer_for_response(response) |
| | if lexer_name: |
| | if lexer_name.lower() == "json": |
| | try: |
| | data = response.json() |
| | text = json.dumps(data, indent=4) |
| | except ValueError: |
| | text = response.text |
| | else: |
| | text = response.text |
| |
|
| | syntax = rich.syntax.Syntax(text, lexer_name, theme="ansi_dark", word_wrap=True) |
| | console.print(syntax) |
| | else: |
| | console.print(f"<{len(response.content)} bytes of binary data>") |
| |
|
| |
|
| | _PCTRTT = typing.Tuple[typing.Tuple[str, str], ...] |
| | _PCTRTTT = typing.Tuple[_PCTRTT, ...] |
| | _PeerCertRetDictType = typing.Dict[str, typing.Union[str, _PCTRTTT, _PCTRTT]] |
| |
|
| |
|
| | def format_certificate(cert: _PeerCertRetDictType) -> str: |
| | lines = [] |
| | for key, value in cert.items(): |
| | if isinstance(value, (list, tuple)): |
| | lines.append(f"* {key}:") |
| | for item in value: |
| | if key in ("subject", "issuer"): |
| | for sub_item in item: |
| | lines.append(f"* {sub_item[0]}: {sub_item[1]!r}") |
| | elif isinstance(item, tuple) and len(item) == 2: |
| | lines.append(f"* {item[0]}: {item[1]!r}") |
| | else: |
| | lines.append(f"* {item!r}") |
| | else: |
| | lines.append(f"* {key}: {value!r}") |
| | return "\n".join(lines) |
| |
|
| |
|
| | def trace( |
| | name: str, info: typing.Mapping[str, typing.Any], verbose: bool = False |
| | ) -> None: |
| | console = rich.console.Console() |
| | if name == "connection.connect_tcp.started" and verbose: |
| | host = info["host"] |
| | console.print(f"* Connecting to {host!r}") |
| | elif name == "connection.connect_tcp.complete" and verbose: |
| | stream = info["return_value"] |
| | server_addr = stream.get_extra_info("server_addr") |
| | console.print(f"* Connected to {server_addr[0]!r} on port {server_addr[1]}") |
| | elif name == "connection.start_tls.complete" and verbose: |
| | stream = info["return_value"] |
| | ssl_object = stream.get_extra_info("ssl_object") |
| | version = ssl_object.version() |
| | cipher = ssl_object.cipher() |
| | server_cert = ssl_object.getpeercert() |
| | alpn = ssl_object.selected_alpn_protocol() |
| | console.print(f"* SSL established using {version!r} / {cipher[0]!r}") |
| | console.print(f"* Selected ALPN protocol: {alpn!r}") |
| | if server_cert: |
| | console.print("* Server certificate:") |
| | console.print(format_certificate(server_cert)) |
| | elif name == "http11.send_request_headers.started" and verbose: |
| | request = info["request"] |
| | print_request_headers(request, http2=False) |
| | elif name == "http2.send_request_headers.started" and verbose: |
| | request = info["request"] |
| | print_request_headers(request, http2=True) |
| | elif name == "http11.receive_response_headers.complete": |
| | http_version, status, reason_phrase, headers = info["return_value"] |
| | print_response_headers(http_version, status, reason_phrase, headers) |
| | elif name == "http2.receive_response_headers.complete": |
| | status, headers = info["return_value"] |
| | http_version = b"HTTP/2" |
| | reason_phrase = None |
| | print_response_headers(http_version, status, reason_phrase, headers) |
| |
|
| |
|
| | def download_response(response: Response, download: typing.BinaryIO) -> None: |
| | console = rich.console.Console() |
| | console.print() |
| | content_length = response.headers.get("Content-Length") |
| | with rich.progress.Progress( |
| | "[progress.description]{task.description}", |
| | "[progress.percentage]{task.percentage:>3.0f}%", |
| | rich.progress.BarColumn(bar_width=None), |
| | rich.progress.DownloadColumn(), |
| | rich.progress.TransferSpeedColumn(), |
| | ) as progress: |
| | description = f"Downloading [bold]{rich.markup.escape(download.name)}" |
| | download_task = progress.add_task( |
| | description, |
| | total=int(content_length or 0), |
| | start=content_length is not None, |
| | ) |
| | for chunk in response.iter_bytes(): |
| | download.write(chunk) |
| | progress.update(download_task, completed=response.num_bytes_downloaded) |
| |
|
| |
|
| | def validate_json( |
| | ctx: click.Context, |
| | param: click.Option | click.Parameter, |
| | value: typing.Any, |
| | ) -> typing.Any: |
| | if value is None: |
| | return None |
| |
|
| | try: |
| | return json.loads(value) |
| | except json.JSONDecodeError: |
| | raise click.BadParameter("Not valid JSON") |
| |
|
| |
|
| | def validate_auth( |
| | ctx: click.Context, |
| | param: click.Option | click.Parameter, |
| | value: typing.Any, |
| | ) -> typing.Any: |
| | if value == (None, None): |
| | return None |
| |
|
| | username, password = value |
| | if password == "-": |
| | password = click.prompt("Password", hide_input=True) |
| | return (username, password) |
| |
|
| |
|
| | def handle_help( |
| | ctx: click.Context, |
| | param: click.Option | click.Parameter, |
| | value: typing.Any, |
| | ) -> None: |
| | if not value or ctx.resilient_parsing: |
| | return |
| |
|
| | print_help() |
| | ctx.exit() |
| |
|
| |
|
| | @click.command(add_help_option=False) |
| | @click.argument("url", type=str) |
| | @click.option( |
| | "--method", |
| | "-m", |
| | "method", |
| | type=str, |
| | help=( |
| | "Request method, such as GET, POST, PUT, PATCH, DELETE, OPTIONS, HEAD. " |
| | "[Default: GET, or POST if a request body is included]" |
| | ), |
| | ) |
| | @click.option( |
| | "--params", |
| | "-p", |
| | "params", |
| | type=(str, str), |
| | multiple=True, |
| | help="Query parameters to include in the request URL.", |
| | ) |
| | @click.option( |
| | "--content", |
| | "-c", |
| | "content", |
| | type=str, |
| | help="Byte content to include in the request body.", |
| | ) |
| | @click.option( |
| | "--data", |
| | "-d", |
| | "data", |
| | type=(str, str), |
| | multiple=True, |
| | help="Form data to include in the request body.", |
| | ) |
| | @click.option( |
| | "--files", |
| | "-f", |
| | "files", |
| | type=(str, click.File(mode="rb")), |
| | multiple=True, |
| | help="Form files to include in the request body.", |
| | ) |
| | @click.option( |
| | "--json", |
| | "-j", |
| | "json", |
| | type=str, |
| | callback=validate_json, |
| | help="JSON data to include in the request body.", |
| | ) |
| | @click.option( |
| | "--headers", |
| | "-h", |
| | "headers", |
| | type=(str, str), |
| | multiple=True, |
| | help="Include additional HTTP headers in the request.", |
| | ) |
| | @click.option( |
| | "--cookies", |
| | "cookies", |
| | type=(str, str), |
| | multiple=True, |
| | help="Cookies to include in the request.", |
| | ) |
| | @click.option( |
| | "--auth", |
| | "auth", |
| | type=(str, str), |
| | default=(None, None), |
| | callback=validate_auth, |
| | help=( |
| | "Username and password to include in the request. " |
| | "Specify '-' for the password to use a password prompt. " |
| | "Note that using --verbose/-v will expose the Authorization header, " |
| | "including the password encoding in a trivially reversible format." |
| | ), |
| | ) |
| | @click.option( |
| | "--proxy", |
| | "proxy", |
| | type=str, |
| | default=None, |
| | help="Send the request via a proxy. Should be the URL giving the proxy address.", |
| | ) |
| | @click.option( |
| | "--timeout", |
| | "timeout", |
| | type=float, |
| | default=5.0, |
| | help=( |
| | "Timeout value to use for network operations, such as establishing the " |
| | "connection, reading some data, etc... [Default: 5.0]" |
| | ), |
| | ) |
| | @click.option( |
| | "--follow-redirects", |
| | "follow_redirects", |
| | is_flag=True, |
| | default=False, |
| | help="Automatically follow redirects.", |
| | ) |
| | @click.option( |
| | "--no-verify", |
| | "verify", |
| | is_flag=True, |
| | default=True, |
| | help="Disable SSL verification.", |
| | ) |
| | @click.option( |
| | "--http2", |
| | "http2", |
| | type=bool, |
| | is_flag=True, |
| | default=False, |
| | help="Send the request using HTTP/2, if the remote server supports it.", |
| | ) |
| | @click.option( |
| | "--download", |
| | type=click.File("wb"), |
| | help="Save the response content as a file, rather than displaying it.", |
| | ) |
| | @click.option( |
| | "--verbose", |
| | "-v", |
| | type=bool, |
| | is_flag=True, |
| | default=False, |
| | help="Verbose. Show request as well as response.", |
| | ) |
| | @click.option( |
| | "--help", |
| | is_flag=True, |
| | is_eager=True, |
| | expose_value=False, |
| | callback=handle_help, |
| | help="Show this message and exit.", |
| | ) |
| | def main( |
| | url: str, |
| | method: str, |
| | params: list[tuple[str, str]], |
| | content: str, |
| | data: list[tuple[str, str]], |
| | files: list[tuple[str, click.File]], |
| | json: str, |
| | headers: list[tuple[str, str]], |
| | cookies: list[tuple[str, str]], |
| | auth: tuple[str, str] | None, |
| | proxy: str, |
| | timeout: float, |
| | follow_redirects: bool, |
| | verify: bool, |
| | http2: bool, |
| | download: typing.BinaryIO | None, |
| | verbose: bool, |
| | ) -> None: |
| | """ |
| | An HTTP command line client. |
| | Sends a request and displays the response. |
| | """ |
| | if not method: |
| | method = "POST" if content or data or files or json else "GET" |
| |
|
| | try: |
| | with Client(proxy=proxy, timeout=timeout, http2=http2, verify=verify) as client: |
| | with client.stream( |
| | method, |
| | url, |
| | params=list(params), |
| | content=content, |
| | data=dict(data), |
| | files=files, |
| | json=json, |
| | headers=headers, |
| | cookies=dict(cookies), |
| | auth=auth, |
| | follow_redirects=follow_redirects, |
| | extensions={"trace": functools.partial(trace, verbose=verbose)}, |
| | ) as response: |
| | if download is not None: |
| | download_response(response, download) |
| | else: |
| | response.read() |
| | if response.content: |
| | print_response(response) |
| |
|
| | except RequestError as exc: |
| | console = rich.console.Console() |
| | console.print(f"[red]{type(exc).__name__}[/red]: {exc}") |
| | sys.exit(1) |
| |
|
| | sys.exit(0 if response.is_success else 1) |
| |
|