Source code for scrapy.core.downloader.handlers._httpx

"""``httpx``-based HTTP(S) download handler. Currently not recommended for production use."""

from __future__ import annotations

import ipaddress
import logging
import ssl
from http.cookiejar import Cookie, CookieJar
from io import BytesIO
from typing import TYPE_CHECKING, Any, NoReturn, TypedDict

from scrapy import Request, signals
from scrapy.exceptions import (
    CannotResolveHostError,
    DownloadCancelledError,
    DownloadConnectionRefusedError,
    DownloadFailedError,
    DownloadTimeoutError,
    NotConfigured,
    ResponseDataLossError,
    UnsupportedURLSchemeError,
)
from scrapy.http import Headers, Response
from scrapy.utils._download_handlers import (
    BaseHttpDownloadHandler,
    check_stop_download,
    get_dataloss_msg,
    get_maxsize_msg,
    get_warnsize_msg,
    make_response,
    normalize_bind_address,
)
from scrapy.utils.asyncio import is_asyncio_available
from scrapy.utils.ssl import _log_sslobj_debug_info, _make_ssl_context

if TYPE_CHECKING:
    from contextlib import AbstractAsyncContextManager
    from http.client import HTTPResponse
    from ipaddress import IPv4Address, IPv6Address
    from urllib.request import Request as ULRequest

    from httpcore import AsyncNetworkStream

    from scrapy.crawler import Crawler


try:
    import httpx
except ImportError:
    httpx = None  # type: ignore[assignment]

logger = logging.getLogger(__name__)


class _BaseResponseArgs(TypedDict):
    status: int
    url: str
    headers: Headers
    ip_address: IPv4Address | IPv6Address
    protocol: str


# workaround for (and from) https://github.com/encode/httpx/issues/2992
class _NullCookieJar(CookieJar):  # pragma: no cover
    """A CookieJar that rejects all cookies."""

    def extract_cookies(self, response: HTTPResponse, request: ULRequest) -> None:
        pass

    def set_cookie(self, cookie: Cookie) -> None:
        pass


[docs] class HttpxDownloadHandler(BaseHttpDownloadHandler): _DEFAULT_CONNECT_TIMEOUT = 10 def __init__(self, crawler: Crawler): # we skip HttpxDownloadHandler tests with the non-asyncio reactor if not is_asyncio_available(): # pragma: no cover raise NotConfigured( f"{type(self).__name__} requires the asyncio support. Make" f" sure that you have either enabled the asyncio Twisted" f" reactor in the TWISTED_REACTOR setting or disabled the" f" TWISTED_REACTOR_ENABLED setting. See the asyncio" f" documentation of Scrapy for more information." ) if httpx is None: # pragma: no cover raise NotConfigured( f"{type(self).__name__} requires the httpx library to be installed." ) super().__init__(crawler) logger.warning( "HttpxDownloadHandler is experimental and is not recommended for production use." ) bind_address = crawler.settings.get("DOWNLOAD_BIND_ADDRESS") bind_address = normalize_bind_address(bind_address) self._bind_address: str | None = None if bind_address is not None: host, port = bind_address if port != 0: logger.warning( "DOWNLOAD_BIND_ADDRESS specifies a port (%s), but %s does not " "support binding to a specific local port. Ignoring the port " "and binding only to %r.", port, type(self).__name__, host, ) self._bind_address = host self._client = httpx.AsyncClient( cookies=_NullCookieJar(), transport=httpx.AsyncHTTPTransport( verify=_make_ssl_context(crawler.settings), local_address=self._bind_address, ), ) async def download_request(self, request: Request) -> Response: self._warn_unsupported_meta(request.meta) timeout: float = request.meta.get( "download_timeout", self._DEFAULT_CONNECT_TIMEOUT ) try: async with self._get_httpx_response(request, timeout) as httpx_response: return await self._read_response(httpx_response, request) except httpx.TimeoutException as e: raise DownloadTimeoutError( f"Getting {request.url} took longer than {timeout} seconds." ) from e except httpx.UnsupportedProtocol as e: raise UnsupportedURLSchemeError(str(e)) from e except httpx.ConnectError as e: error_message = str(e) if ( "Name or service not known" in error_message or "getaddrinfo failed" in error_message or "nodename nor servname" in error_message or "Temporary failure in name resolution" in error_message ): raise CannotResolveHostError(error_message) from e raise DownloadConnectionRefusedError(str(e)) from e except httpx.NetworkError as e: raise DownloadFailedError(str(e)) from e except httpx.RemoteProtocolError as e: raise DownloadFailedError(str(e)) from e def _warn_unsupported_meta(self, meta: dict[str, Any]) -> None: if meta.get("bindaddress"): # configurable only per-client: # https://github.com/encode/httpx/issues/755#issuecomment-2746121794 logger.error( f"The 'bindaddress' request meta key is not supported by" f" {type(self).__name__} and will be ignored." ) if meta.get("proxy"): # configurable only per-client: # https://github.com/encode/httpx/issues/486 logger.error( f"The 'proxy' request meta key is not supported by" f" {type(self).__name__} and will be ignored." ) def _get_httpx_response( self, request: Request, timeout: float ) -> AbstractAsyncContextManager[httpx.Response]: return self._client.stream( request.method, request.url, content=request.body, headers=request.headers.to_tuple_list(), timeout=timeout, ) async def _read_response( self, httpx_response: httpx.Response, request: Request ) -> Response: maxsize: int = request.meta.get("download_maxsize", self._default_maxsize) warnsize: int = request.meta.get("download_warnsize", self._default_warnsize) content_length = httpx_response.headers.get("Content-Length") expected_size = int(content_length) if content_length is not None else None if maxsize and expected_size and expected_size > maxsize: self._cancel_maxsize(expected_size, maxsize, request, expected=True) reached_warnsize = False if warnsize and expected_size and expected_size > warnsize: reached_warnsize = True logger.warning( get_warnsize_msg(expected_size, warnsize, request, expected=True) ) headers = Headers(httpx_response.headers.multi_items()) network_stream: AsyncNetworkStream = httpx_response.extensions["network_stream"] make_response_base_args: _BaseResponseArgs = { "status": httpx_response.status_code, "url": request.url, "headers": headers, "ip_address": self._get_server_ip(network_stream), "protocol": httpx_response.http_version, } self._log_tls_info(network_stream) if stop_download := check_stop_download( signals.headers_received, self.crawler, request, headers=headers, body_length=expected_size, ): return make_response( **make_response_base_args, stop_download=stop_download, ) response_body = BytesIO() bytes_received = 0 try: async for chunk in httpx_response.aiter_raw(): response_body.write(chunk) bytes_received += len(chunk) if stop_download := check_stop_download( signals.bytes_received, self.crawler, request, data=chunk ): return make_response( **make_response_base_args, body=response_body.getvalue(), stop_download=stop_download, ) if maxsize and bytes_received > maxsize: response_body.truncate(0) self._cancel_maxsize( bytes_received, maxsize, request, expected=False ) if warnsize and bytes_received > warnsize and not reached_warnsize: reached_warnsize = True logger.warning( get_warnsize_msg( bytes_received, warnsize, request, expected=False ) ) except httpx.RemoteProtocolError as e: # special handling of the dataloss case if ( "peer closed connection without sending complete message body" not in str(e) ): raise fail_on_dataloss: bool = request.meta.get( "download_fail_on_dataloss", self._fail_on_dataloss ) if not fail_on_dataloss: return make_response( **make_response_base_args, body=response_body.getvalue(), flags=["dataloss"], ) self._log_dataloss_warning(request.url) raise ResponseDataLossError(str(e)) from e return make_response( **make_response_base_args, body=response_body.getvalue(), ) @staticmethod def _get_server_ip(network_stream: AsyncNetworkStream) -> IPv4Address | IPv6Address: extra_server_addr = network_stream.get_extra_info("server_addr") return ipaddress.ip_address(extra_server_addr[0]) def _log_tls_info(self, network_stream: AsyncNetworkStream) -> None: if not self._tls_verbose_logging: return extra_ssl_object = network_stream.get_extra_info("ssl_object") if isinstance(extra_ssl_object, ssl.SSLObject): _log_sslobj_debug_info(extra_ssl_object) def _log_dataloss_warning(self, url: str) -> None: if self._fail_on_dataloss_warned: return logger.warning(get_dataloss_msg(url)) self._fail_on_dataloss_warned = True @staticmethod def _cancel_maxsize( size: int, limit: int, request: Request, *, expected: bool ) -> NoReturn: warning_msg = get_maxsize_msg(size, limit, request, expected=expected) logger.warning(warning_msg) raise DownloadCancelledError(warning_msg) async def close(self) -> None: await self._client.aclose()