Edit on GitHub

mitmproxy.http

   1import binascii
   2import os
   3import re
   4import time
   5import urllib.parse
   6import json
   7import warnings
   8from dataclasses import dataclass
   9from dataclasses import fields
  10from email.utils import formatdate
  11from email.utils import mktime_tz
  12from email.utils import parsedate_tz
  13from typing import Callable
  14from typing import Iterable
  15from typing import Iterator
  16from typing import Mapping
  17from typing import Optional
  18from typing import Union
  19from typing import cast
  20from typing import Any
  21
  22from mitmproxy import flow
  23from mitmproxy.websocket import WebSocketData
  24from mitmproxy.coretypes import multidict
  25from mitmproxy.coretypes import serializable
  26from mitmproxy.net import encoding
  27from mitmproxy.net.http import cookies
  28from mitmproxy.net.http import multipart
  29from mitmproxy.net.http import status_codes
  30from mitmproxy.net.http import url
  31from mitmproxy.net.http.headers import assemble_content_type
  32from mitmproxy.net.http.headers import parse_content_type
  33from mitmproxy.utils import human
  34from mitmproxy.utils import strutils
  35from mitmproxy.utils import typecheck
  36from mitmproxy.utils.strutils import always_bytes
  37from mitmproxy.utils.strutils import always_str
  38
  39
  40# While headers _should_ be ASCII, it's not uncommon for certain headers to be utf-8 encoded.
  41def _native(x: bytes) -> str:
  42    return x.decode("utf-8", "surrogateescape")
  43
  44
  45def _always_bytes(x: Union[str, bytes]) -> bytes:
  46    return strutils.always_bytes(x, "utf-8", "surrogateescape")
  47
  48
  49# This cannot be easily typed with mypy yet, so we just specify MultiDict without concrete types.
  50class Headers(multidict.MultiDict):  # type: ignore
  51    """
  52    Header class which allows both convenient access to individual headers as well as
  53    direct access to the underlying raw data. Provides a full dictionary interface.
  54
  55    Create headers with keyword arguments:
  56    >>> h = Headers(host="example.com", content_type="application/xml")
  57
  58    Headers mostly behave like a normal dict:
  59    >>> h["Host"]
  60    "example.com"
  61
  62    Headers are case insensitive:
  63    >>> h["host"]
  64    "example.com"
  65
  66    Headers can also be created from a list of raw (header_name, header_value) byte tuples:
  67    >>> h = Headers([
  68        (b"Host",b"example.com"),
  69        (b"Accept",b"text/html"),
  70        (b"accept",b"application/xml")
  71    ])
  72
  73    Multiple headers are folded into a single header as per RFC 7230:
  74    >>> h["Accept"]
  75    "text/html, application/xml"
  76
  77    Setting a header removes all existing headers with the same name:
  78    >>> h["Accept"] = "application/text"
  79    >>> h["Accept"]
  80    "application/text"
  81
  82    `bytes(h)` returns an HTTP/1 header block:
  83    >>> print(bytes(h))
  84    Host: example.com
  85    Accept: application/text
  86
  87    For full control, the raw header fields can be accessed:
  88    >>> h.fields
  89
  90    Caveats:
  91     - For use with the "Set-Cookie" and "Cookie" headers, either use `Response.cookies` or see `Headers.get_all`.
  92    """
  93
  94    def __init__(self, fields: Iterable[tuple[bytes, bytes]] = (), **headers):
  95        """
  96        *Args:*
  97         - *fields:* (optional) list of ``(name, value)`` header byte tuples,
  98           e.g. ``[(b"Host", b"example.com")]``. All names and values must be bytes.
  99         - *\\*\\*headers:* Additional headers to set. Will overwrite existing values from `fields`.
 100           For convenience, underscores in header names will be transformed to dashes -
 101           this behaviour does not extend to other methods.
 102
 103        If ``**headers`` contains multiple keys that have equal ``.lower()`` representations,
 104        the behavior is undefined.
 105        """
 106        super().__init__(fields)
 107
 108        for key, value in self.fields:
 109            if not isinstance(key, bytes) or not isinstance(value, bytes):
 110                raise TypeError("Header fields must be bytes.")
 111
 112        # content_type -> content-type
 113        self.update(
 114            {
 115                _always_bytes(name).replace(b"_", b"-"): _always_bytes(value)
 116                for name, value in headers.items()
 117            }
 118        )
 119
 120    fields: tuple[tuple[bytes, bytes], ...]
 121
 122    @staticmethod
 123    def _reduce_values(values) -> str:
 124        # Headers can be folded
 125        return ", ".join(values)
 126
 127    @staticmethod
 128    def _kconv(key) -> str:
 129        # Headers are case-insensitive
 130        return key.lower()
 131
 132    def __bytes__(self) -> bytes:
 133        if self.fields:
 134            return b"\r\n".join(b": ".join(field) for field in self.fields) + b"\r\n"
 135        else:
 136            return b""
 137
 138    def __delitem__(self, key: Union[str, bytes]) -> None:
 139        key = _always_bytes(key)
 140        super().__delitem__(key)
 141
 142    def __iter__(self) -> Iterator[str]:
 143        for x in super().__iter__():
 144            yield _native(x)
 145
 146    def get_all(self, name: Union[str, bytes]) -> list[str]:
 147        """
 148        Like `Headers.get`, but does not fold multiple headers into a single one.
 149        This is useful for Set-Cookie and Cookie headers, which do not support folding.
 150
 151        *See also:*
 152         - <https://tools.ietf.org/html/rfc7230#section-3.2.2>
 153         - <https://datatracker.ietf.org/doc/html/rfc6265#section-5.4>
 154         - <https://datatracker.ietf.org/doc/html/rfc7540#section-8.1.2.5>
 155        """
 156        name = _always_bytes(name)
 157        return [_native(x) for x in super().get_all(name)]
 158
 159    def set_all(self, name: Union[str, bytes], values: list[Union[str, bytes]]):
 160        """
 161        Explicitly set multiple headers for the given key.
 162        See `Headers.get_all`.
 163        """
 164        name = _always_bytes(name)
 165        values = [_always_bytes(x) for x in values]
 166        return super().set_all(name, values)
 167
 168    def insert(self, index: int, key: Union[str, bytes], value: Union[str, bytes]):
 169        key = _always_bytes(key)
 170        value = _always_bytes(value)
 171        super().insert(index, key, value)
 172
 173    def items(self, multi=False):
 174        if multi:
 175            return ((_native(k), _native(v)) for k, v in self.fields)
 176        else:
 177            return super().items()
 178
 179
 180@dataclass
 181class MessageData(serializable.Serializable):
 182    http_version: bytes
 183    headers: Headers
 184    content: Optional[bytes]
 185    trailers: Optional[Headers]
 186    timestamp_start: float
 187    timestamp_end: Optional[float]
 188
 189    # noinspection PyUnreachableCode
 190    if __debug__:
 191
 192        def __post_init__(self):
 193            for field in fields(self):
 194                val = getattr(self, field.name)
 195                typecheck.check_option_type(field.name, val, field.type)
 196
 197    def set_state(self, state):
 198        for k, v in state.items():
 199            if k in ("headers", "trailers") and v is not None:
 200                v = Headers.from_state(v)
 201            setattr(self, k, v)
 202
 203    def get_state(self):
 204        state = vars(self).copy()
 205        state["headers"] = state["headers"].get_state()
 206        if state["trailers"] is not None:
 207            state["trailers"] = state["trailers"].get_state()
 208        return state
 209
 210    @classmethod
 211    def from_state(cls, state):
 212        state["headers"] = Headers.from_state(state["headers"])
 213        if state["trailers"] is not None:
 214            state["trailers"] = Headers.from_state(state["trailers"])
 215        return cls(**state)
 216
 217
 218@dataclass
 219class RequestData(MessageData):
 220    host: str
 221    port: int
 222    method: bytes
 223    scheme: bytes
 224    authority: bytes
 225    path: bytes
 226
 227
 228@dataclass
 229class ResponseData(MessageData):
 230    status_code: int
 231    reason: bytes
 232
 233
 234class Message(serializable.Serializable):
 235    """Base class for `Request` and `Response`."""
 236
 237    @classmethod
 238    def from_state(cls, state):
 239        return cls(**state)
 240
 241    def get_state(self):
 242        return self.data.get_state()
 243
 244    def set_state(self, state):
 245        self.data.set_state(state)
 246
 247    data: MessageData
 248    stream: Union[Callable[[bytes], Union[Iterable[bytes], bytes]], bool] = False
 249    """
 250    This attribute controls if the message body should be streamed.
 251
 252    If `False`, mitmproxy will buffer the entire body before forwarding it to the destination.
 253    This makes it possible to perform string replacements on the entire body.
 254    If `True`, the message body will not be buffered on the proxy
 255    but immediately forwarded instead.
 256    Alternatively, a transformation function can be specified, which will be called for each chunk of data.
 257    Please note that packet boundaries generally should not be relied upon.
 258
 259    This attribute must be set in the `requestheaders` or `responseheaders` hook.
 260    Setting it in `request` or  `response` is already too late, mitmproxy has buffered the message body already.
 261    """
 262
 263    @property
 264    def http_version(self) -> str:
 265        """
 266        HTTP version string, for example `HTTP/1.1`.
 267        """
 268        return self.data.http_version.decode("utf-8", "surrogateescape")
 269
 270    @http_version.setter
 271    def http_version(self, http_version: Union[str, bytes]) -> None:
 272        self.data.http_version = strutils.always_bytes(
 273            http_version, "utf-8", "surrogateescape"
 274        )
 275
 276    @property
 277    def is_http10(self) -> bool:
 278        return self.data.http_version == b"HTTP/1.0"
 279
 280    @property
 281    def is_http11(self) -> bool:
 282        return self.data.http_version == b"HTTP/1.1"
 283
 284    @property
 285    def is_http2(self) -> bool:
 286        return self.data.http_version == b"HTTP/2.0"
 287
 288    @property
 289    def headers(self) -> Headers:
 290        """
 291        The HTTP headers.
 292        """
 293        return self.data.headers
 294
 295    @headers.setter
 296    def headers(self, h: Headers) -> None:
 297        self.data.headers = h
 298
 299    @property
 300    def trailers(self) -> Optional[Headers]:
 301        """
 302        The [HTTP trailers](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Trailer).
 303        """
 304        return self.data.trailers
 305
 306    @trailers.setter
 307    def trailers(self, h: Optional[Headers]) -> None:
 308        self.data.trailers = h
 309
 310    @property
 311    def raw_content(self) -> Optional[bytes]:
 312        """
 313        The raw (potentially compressed) HTTP message body.
 314
 315        In contrast to `Message.content` and `Message.text`, accessing this property never raises.
 316
 317        *See also:* `Message.content`, `Message.text`
 318        """
 319        return self.data.content
 320
 321    @raw_content.setter
 322    def raw_content(self, content: Optional[bytes]) -> None:
 323        self.data.content = content
 324
 325    @property
 326    def content(self) -> Optional[bytes]:
 327        """
 328        The uncompressed HTTP message body as bytes.
 329
 330        Accessing this attribute may raise a `ValueError` when the HTTP content-encoding is invalid.
 331
 332        *See also:* `Message.raw_content`, `Message.text`
 333        """
 334        return self.get_content()
 335
 336    @content.setter
 337    def content(self, value: Optional[bytes]) -> None:
 338        self.set_content(value)
 339
 340    @property
 341    def text(self) -> Optional[str]:
 342        """
 343        The uncompressed and decoded HTTP message body as text.
 344
 345        Accessing this attribute may raise a `ValueError` when either content-encoding or charset is invalid.
 346
 347        *See also:* `Message.raw_content`, `Message.content`
 348        """
 349        return self.get_text()
 350
 351    @text.setter
 352    def text(self, value: Optional[str]) -> None:
 353        self.set_text(value)
 354
 355    def set_content(self, value: Optional[bytes]) -> None:
 356        if value is None:
 357            self.raw_content = None
 358            return
 359        if not isinstance(value, bytes):
 360            raise TypeError(
 361                f"Message content must be bytes, not {type(value).__name__}. "
 362                "Please use .text if you want to assign a str."
 363            )
 364        ce = self.headers.get("content-encoding")
 365        try:
 366            self.raw_content = encoding.encode(value, ce or "identity")
 367        except ValueError:
 368            # So we have an invalid content-encoding?
 369            # Let's remove it!
 370            del self.headers["content-encoding"]
 371            self.raw_content = value
 372
 373        if "transfer-encoding" in self.headers:
 374            # https://httpwg.org/specs/rfc7230.html#header.content-length
 375            # don't set content-length if a transfer-encoding is provided
 376            pass
 377        else:
 378            self.headers["content-length"] = str(len(self.raw_content))
 379
 380    def get_content(self, strict: bool = True) -> Optional[bytes]:
 381        """
 382        Similar to `Message.content`, but does not raise if `strict` is `False`.
 383        Instead, the compressed message body is returned as-is.
 384        """
 385        if self.raw_content is None:
 386            return None
 387        ce = self.headers.get("content-encoding")
 388        if ce:
 389            try:
 390                content = encoding.decode(self.raw_content, ce)
 391                # A client may illegally specify a byte -> str encoding here (e.g. utf8)
 392                if isinstance(content, str):
 393                    raise ValueError(f"Invalid Content-Encoding: {ce}")
 394                return content
 395            except ValueError:
 396                if strict:
 397                    raise
 398                return self.raw_content
 399        else:
 400            return self.raw_content
 401
 402    def _get_content_type_charset(self) -> Optional[str]:
 403        ct = parse_content_type(self.headers.get("content-type", ""))
 404        if ct:
 405            return ct[2].get("charset")
 406        return None
 407
 408    def _guess_encoding(self, content: bytes = b"") -> str:
 409        enc = self._get_content_type_charset()
 410        if not enc:
 411            if "json" in self.headers.get("content-type", ""):
 412                enc = "utf8"
 413        if not enc:
 414            if "html" in self.headers.get("content-type", ""):
 415                meta_charset = re.search(
 416                    rb"""<meta[^>]+charset=['"]?([^'">]+)""", content, re.IGNORECASE
 417                )
 418                if meta_charset:
 419                    enc = meta_charset.group(1).decode("ascii", "ignore")
 420        if not enc:
 421            if "text/css" in self.headers.get("content-type", ""):
 422                # @charset rule must be the very first thing.
 423                css_charset = re.match(
 424                    rb"""@charset "([^"]+)";""", content, re.IGNORECASE
 425                )
 426                if css_charset:
 427                    enc = css_charset.group(1).decode("ascii", "ignore")
 428        if not enc:
 429            enc = "latin-1"
 430        # Use GB 18030 as the superset of GB2312 and GBK to fix common encoding problems on Chinese websites.
 431        if enc.lower() in ("gb2312", "gbk"):
 432            enc = "gb18030"
 433
 434        return enc
 435
 436    def set_text(self, text: Optional[str]) -> None:
 437        if text is None:
 438            self.content = None
 439            return
 440        enc = self._guess_encoding()
 441
 442        try:
 443            self.content = cast(bytes, encoding.encode(text, enc))
 444        except ValueError:
 445            # Fall back to UTF-8 and update the content-type header.
 446            ct = parse_content_type(self.headers.get("content-type", "")) or (
 447                "text",
 448                "plain",
 449                {},
 450            )
 451            ct[2]["charset"] = "utf-8"
 452            self.headers["content-type"] = assemble_content_type(*ct)
 453            enc = "utf8"
 454            self.content = text.encode(enc, "surrogateescape")
 455
 456    def get_text(self, strict: bool = True) -> Optional[str]:
 457        """
 458        Similar to `Message.text`, but does not raise if `strict` is `False`.
 459        Instead, the message body is returned as surrogate-escaped UTF-8.
 460        """
 461        content = self.get_content(strict)
 462        if content is None:
 463            return None
 464        enc = self._guess_encoding(content)
 465        try:
 466            return cast(str, encoding.decode(content, enc))
 467        except ValueError:
 468            if strict:
 469                raise
 470            return content.decode("utf8", "surrogateescape")
 471
 472    @property
 473    def timestamp_start(self) -> float:
 474        """
 475        *Timestamp:* Headers received.
 476        """
 477        return self.data.timestamp_start
 478
 479    @timestamp_start.setter
 480    def timestamp_start(self, timestamp_start: float) -> None:
 481        self.data.timestamp_start = timestamp_start
 482
 483    @property
 484    def timestamp_end(self) -> Optional[float]:
 485        """
 486        *Timestamp:* Last byte received.
 487        """
 488        return self.data.timestamp_end
 489
 490    @timestamp_end.setter
 491    def timestamp_end(self, timestamp_end: Optional[float]):
 492        self.data.timestamp_end = timestamp_end
 493
 494    def decode(self, strict: bool = True) -> None:
 495        """
 496        Decodes body based on the current Content-Encoding header, then
 497        removes the header. If there is no Content-Encoding header, no
 498        action is taken.
 499
 500        *Raises:*
 501         - `ValueError`, when the content-encoding is invalid and strict is True.
 502        """
 503        decoded = self.get_content(strict)
 504        self.headers.pop("content-encoding", None)
 505        self.content = decoded
 506
 507    def encode(self, encoding: str) -> None:
 508        """
 509        Encodes body with the given encoding, where e is "gzip", "deflate", "identity", "br", or "zstd".
 510        Any existing content-encodings are overwritten, the content is not decoded beforehand.
 511
 512        *Raises:*
 513         - `ValueError`, when the specified content-encoding is invalid.
 514        """
 515        self.headers["content-encoding"] = encoding
 516        self.content = self.raw_content
 517        if "content-encoding" not in self.headers:
 518            raise ValueError(f"Invalid content encoding {repr(encoding)}")
 519
 520    def json(self, **kwargs: Any) -> Any:
 521        """
 522        Returns the JSON encoded content of the response, if any.
 523        `**kwargs` are optional arguments that will be
 524        passed to `json.loads()`.
 525
 526        Will raise if the content can not be decoded and then parsed as JSON.
 527
 528        *Raises:*
 529         - `json.decoder.JSONDecodeError` if content is not valid JSON.
 530         - `TypeError` if the content is not available, for example because the response
 531            has been streamed.
 532        """
 533        content = self.get_content(strict=False)
 534        if content is None:
 535            raise TypeError("Message content is not available.")
 536        else:
 537            return json.loads(content, **kwargs)
 538
 539
 540class Request(Message):
 541    """
 542    An HTTP request.
 543    """
 544
 545    data: RequestData
 546
 547    def __init__(
 548        self,
 549        host: str,
 550        port: int,
 551        method: bytes,
 552        scheme: bytes,
 553        authority: bytes,
 554        path: bytes,
 555        http_version: bytes,
 556        headers: Union[Headers, tuple[tuple[bytes, bytes], ...]],
 557        content: Optional[bytes],
 558        trailers: Union[Headers, tuple[tuple[bytes, bytes], ...], None],
 559        timestamp_start: float,
 560        timestamp_end: Optional[float],
 561    ):
 562        # auto-convert invalid types to retain compatibility with older code.
 563        if isinstance(host, bytes):
 564            host = host.decode("idna", "strict")
 565        if isinstance(method, str):
 566            method = method.encode("ascii", "strict")
 567        if isinstance(scheme, str):
 568            scheme = scheme.encode("ascii", "strict")
 569        if isinstance(authority, str):
 570            authority = authority.encode("ascii", "strict")
 571        if isinstance(path, str):
 572            path = path.encode("ascii", "strict")
 573        if isinstance(http_version, str):
 574            http_version = http_version.encode("ascii", "strict")
 575
 576        if isinstance(content, str):
 577            raise ValueError(f"Content must be bytes, not {type(content).__name__}")
 578        if not isinstance(headers, Headers):
 579            headers = Headers(headers)
 580        if trailers is not None and not isinstance(trailers, Headers):
 581            trailers = Headers(trailers)
 582
 583        self.data = RequestData(
 584            host=host,
 585            port=port,
 586            method=method,
 587            scheme=scheme,
 588            authority=authority,
 589            path=path,
 590            http_version=http_version,
 591            headers=headers,
 592            content=content,
 593            trailers=trailers,
 594            timestamp_start=timestamp_start,
 595            timestamp_end=timestamp_end,
 596        )
 597
 598    def __repr__(self) -> str:
 599        if self.host and self.port:
 600            hostport = f"{self.host}:{self.port}"
 601        else:
 602            hostport = ""
 603        path = self.path or ""
 604        return f"Request({self.method} {hostport}{path})"
 605
 606    @classmethod
 607    def make(
 608        cls,
 609        method: str,
 610        url: str,
 611        content: Union[bytes, str] = "",
 612        headers: Union[
 613            Headers,
 614            dict[Union[str, bytes], Union[str, bytes]],
 615            Iterable[tuple[bytes, bytes]],
 616        ] = (),
 617    ) -> "Request":
 618        """
 619        Simplified API for creating request objects.
 620        """
 621        # Headers can be list or dict, we differentiate here.
 622        if isinstance(headers, Headers):
 623            pass
 624        elif isinstance(headers, dict):
 625            headers = Headers(
 626                (
 627                    always_bytes(k, "utf-8", "surrogateescape"),
 628                    always_bytes(v, "utf-8", "surrogateescape"),
 629                )
 630                for k, v in headers.items()
 631            )
 632        elif isinstance(headers, Iterable):
 633            headers = Headers(headers)  # type: ignore
 634        else:
 635            raise TypeError(
 636                "Expected headers to be an iterable or dict, but is {}.".format(
 637                    type(headers).__name__
 638                )
 639            )
 640
 641        req = cls(
 642            "",
 643            0,
 644            method.encode("utf-8", "surrogateescape"),
 645            b"",
 646            b"",
 647            b"",
 648            b"HTTP/1.1",
 649            headers,
 650            b"",
 651            None,
 652            time.time(),
 653            time.time(),
 654        )
 655
 656        req.url = url
 657        # Assign this manually to update the content-length header.
 658        if isinstance(content, bytes):
 659            req.content = content
 660        elif isinstance(content, str):
 661            req.text = content
 662        else:
 663            raise TypeError(
 664                f"Expected content to be str or bytes, but is {type(content).__name__}."
 665            )
 666
 667        return req
 668
 669    @property
 670    def first_line_format(self) -> str:
 671        """
 672        *Read-only:* HTTP request form as defined in [RFC 7230](https://tools.ietf.org/html/rfc7230#section-5.3).
 673
 674        origin-form and asterisk-form are subsumed as "relative".
 675        """
 676        if self.method == "CONNECT":
 677            return "authority"
 678        elif self.authority:
 679            return "absolute"
 680        else:
 681            return "relative"
 682
 683    @property
 684    def method(self) -> str:
 685        """
 686        HTTP request method, e.g. "GET".
 687        """
 688        return self.data.method.decode("utf-8", "surrogateescape").upper()
 689
 690    @method.setter
 691    def method(self, val: Union[str, bytes]) -> None:
 692        self.data.method = always_bytes(val, "utf-8", "surrogateescape")
 693
 694    @property
 695    def scheme(self) -> str:
 696        """
 697        HTTP request scheme, which should be "http" or "https".
 698        """
 699        return self.data.scheme.decode("utf-8", "surrogateescape")
 700
 701    @scheme.setter
 702    def scheme(self, val: Union[str, bytes]) -> None:
 703        self.data.scheme = always_bytes(val, "utf-8", "surrogateescape")
 704
 705    @property
 706    def authority(self) -> str:
 707        """
 708        HTTP request authority.
 709
 710        For HTTP/1, this is the authority portion of the request target
 711        (in either absolute-form or authority-form).
 712        For origin-form and asterisk-form requests, this property is set to an empty string.
 713
 714        For HTTP/2, this is the :authority pseudo header.
 715
 716        *See also:* `Request.host`, `Request.host_header`, `Request.pretty_host`
 717        """
 718        try:
 719            return self.data.authority.decode("idna")
 720        except UnicodeError:
 721            return self.data.authority.decode("utf8", "surrogateescape")
 722
 723    @authority.setter
 724    def authority(self, val: Union[str, bytes]) -> None:
 725        if isinstance(val, str):
 726            try:
 727                val = val.encode("idna", "strict")
 728            except UnicodeError:
 729                val = val.encode("utf8", "surrogateescape")  # type: ignore
 730        self.data.authority = val
 731
 732    @property
 733    def host(self) -> str:
 734        """
 735        Target server for this request. This may be parsed from the raw request
 736        (e.g. from a ``GET http://example.com/ HTTP/1.1`` request line)
 737        or inferred from the proxy mode (e.g. an IP in transparent mode).
 738
 739        Setting the host attribute also updates the host header and authority information, if present.
 740
 741        *See also:* `Request.authority`, `Request.host_header`, `Request.pretty_host`
 742        """
 743        return self.data.host
 744
 745    @host.setter
 746    def host(self, val: Union[str, bytes]) -> None:
 747        self.data.host = always_str(val, "idna", "strict")
 748
 749        # Update host header
 750        if "Host" in self.data.headers:
 751            self.data.headers["Host"] = val
 752        # Update authority
 753        if self.data.authority:
 754            self.authority = url.hostport(self.scheme, self.host, self.port)
 755
 756    @property
 757    def host_header(self) -> Optional[str]:
 758        """
 759        The request's host/authority header.
 760
 761        This property maps to either ``request.headers["Host"]`` or
 762        ``request.authority``, depending on whether it's HTTP/1.x or HTTP/2.0.
 763
 764        *See also:* `Request.authority`,`Request.host`, `Request.pretty_host`
 765        """
 766        if self.is_http2:
 767            return self.authority or self.data.headers.get("Host", None)
 768        else:
 769            return self.data.headers.get("Host", None)
 770
 771    @host_header.setter
 772    def host_header(self, val: Union[None, str, bytes]) -> None:
 773        if val is None:
 774            if self.is_http2:
 775                self.data.authority = b""
 776            self.headers.pop("Host", None)
 777        else:
 778            if self.is_http2:
 779                self.authority = val  # type: ignore
 780            if not self.is_http2 or "Host" in self.headers:
 781                # For h2, we only overwrite, but not create, as :authority is the h2 host header.
 782                self.headers["Host"] = val
 783
 784    @property
 785    def port(self) -> int:
 786        """
 787        Target port.
 788        """
 789        return self.data.port
 790
 791    @port.setter
 792    def port(self, port: int) -> None:
 793        self.data.port = port
 794
 795    @property
 796    def path(self) -> str:
 797        """
 798        HTTP request path, e.g. "/index.html".
 799        Usually starts with a slash, except for OPTIONS requests, which may just be "*".
 800        """
 801        return self.data.path.decode("utf-8", "surrogateescape")
 802
 803    @path.setter
 804    def path(self, val: Union[str, bytes]) -> None:
 805        self.data.path = always_bytes(val, "utf-8", "surrogateescape")
 806
 807    @property
 808    def url(self) -> str:
 809        """
 810        The full URL string, constructed from `Request.scheme`, `Request.host`, `Request.port` and `Request.path`.
 811
 812        Settings this property updates these attributes as well.
 813        """
 814        if self.first_line_format == "authority":
 815            return f"{self.host}:{self.port}"
 816        return url.unparse(self.scheme, self.host, self.port, self.path)
 817
 818    @url.setter
 819    def url(self, val: Union[str, bytes]) -> None:
 820        val = always_str(val, "utf-8", "surrogateescape")
 821        self.scheme, self.host, self.port, self.path = url.parse(val)
 822
 823    @property
 824    def pretty_host(self) -> str:
 825        """
 826        *Read-only:* Like `Request.host`, but using `Request.host_header` header as an additional (preferred) data source.
 827        This is useful in transparent mode where `Request.host` is only an IP address.
 828
 829        *Warning:* When working in adversarial environments, this may not reflect the actual destination
 830        as the Host header could be spoofed.
 831        """
 832        authority = self.host_header
 833        if authority:
 834            return url.parse_authority(authority, check=False)[0]
 835        else:
 836            return self.host
 837
 838    @property
 839    def pretty_url(self) -> str:
 840        """
 841        *Read-only:* Like `Request.url`, but using `Request.pretty_host` instead of `Request.host`.
 842        """
 843        if self.first_line_format == "authority":
 844            return self.authority
 845
 846        host_header = self.host_header
 847        if not host_header:
 848            return self.url
 849
 850        pretty_host, pretty_port = url.parse_authority(host_header, check=False)
 851        pretty_port = pretty_port or url.default_port(self.scheme) or 443
 852
 853        return url.unparse(self.scheme, pretty_host, pretty_port, self.path)
 854
 855    def _get_query(self):
 856        query = urllib.parse.urlparse(self.url).query
 857        return tuple(url.decode(query))
 858
 859    def _set_query(self, query_data):
 860        query = url.encode(query_data)
 861        _, _, path, params, _, fragment = urllib.parse.urlparse(self.url)
 862        self.path = urllib.parse.urlunparse(["", "", path, params, query, fragment])
 863
 864    @property
 865    def query(self) -> multidict.MultiDictView[str, str]:
 866        """
 867        The request query as a mutable mapping view on the request's path.
 868        For the most part, this behaves like a dictionary.
 869        Modifications to the MultiDictView update `Request.path`, and vice versa.
 870        """
 871        return multidict.MultiDictView(self._get_query, self._set_query)
 872
 873    @query.setter
 874    def query(self, value):
 875        self._set_query(value)
 876
 877    def _get_cookies(self):
 878        h = self.headers.get_all("Cookie")
 879        return tuple(cookies.parse_cookie_headers(h))
 880
 881    def _set_cookies(self, value):
 882        self.headers["cookie"] = cookies.format_cookie_header(value)
 883
 884    @property
 885    def cookies(self) -> multidict.MultiDictView[str, str]:
 886        """
 887        The request cookies.
 888        For the most part, this behaves like a dictionary.
 889        Modifications to the MultiDictView update `Request.headers`, and vice versa.
 890        """
 891        return multidict.MultiDictView(self._get_cookies, self._set_cookies)
 892
 893    @cookies.setter
 894    def cookies(self, value):
 895        self._set_cookies(value)
 896
 897    @property
 898    def path_components(self) -> tuple[str, ...]:
 899        """
 900        The URL's path components as a tuple of strings.
 901        Components are unquoted.
 902        """
 903        path = urllib.parse.urlparse(self.url).path
 904        # This needs to be a tuple so that it's immutable.
 905        # Otherwise, this would fail silently:
 906        #   request.path_components.append("foo")
 907        return tuple(url.unquote(i) for i in path.split("/") if i)
 908
 909    @path_components.setter
 910    def path_components(self, components: Iterable[str]):
 911        components = map(lambda x: url.quote(x, safe=""), components)
 912        path = "/" + "/".join(components)
 913        _, _, _, params, query, fragment = urllib.parse.urlparse(self.url)
 914        self.path = urllib.parse.urlunparse(["", "", path, params, query, fragment])
 915
 916    def anticache(self) -> None:
 917        """
 918        Modifies this request to remove headers that might produce a cached response.
 919        """
 920        delheaders = (
 921            "if-modified-since",
 922            "if-none-match",
 923        )
 924        for i in delheaders:
 925            self.headers.pop(i, None)
 926
 927    def anticomp(self) -> None:
 928        """
 929        Modify the Accept-Encoding header to only accept uncompressed responses.
 930        """
 931        self.headers["accept-encoding"] = "identity"
 932
 933    def constrain_encoding(self) -> None:
 934        """
 935        Limits the permissible Accept-Encoding values, based on what we can decode appropriately.
 936        """
 937        accept_encoding = self.headers.get("accept-encoding")
 938        if accept_encoding:
 939            self.headers["accept-encoding"] = ", ".join(
 940                e
 941                for e in {"gzip", "identity", "deflate", "br", "zstd"}
 942                if e in accept_encoding
 943            )
 944
 945    def _get_urlencoded_form(self):
 946        is_valid_content_type = (
 947            "application/x-www-form-urlencoded"
 948            in self.headers.get("content-type", "").lower()
 949        )
 950        if is_valid_content_type:
 951            return tuple(url.decode(self.get_text(strict=False)))
 952        return ()
 953
 954    def _set_urlencoded_form(self, form_data):
 955        """
 956        Sets the body to the URL-encoded form data, and adds the appropriate content-type header.
 957        This will overwrite the existing content if there is one.
 958        """
 959        self.headers["content-type"] = "application/x-www-form-urlencoded"
 960        self.content = url.encode(form_data, self.get_text(strict=False)).encode()
 961
 962    @property
 963    def urlencoded_form(self) -> multidict.MultiDictView[str, str]:
 964        """
 965        The URL-encoded form data.
 966
 967        If the content-type indicates non-form data or the form could not be parsed, this is set to
 968        an empty `MultiDictView`.
 969
 970        Modifications to the MultiDictView update `Request.content`, and vice versa.
 971        """
 972        return multidict.MultiDictView(
 973            self._get_urlencoded_form, self._set_urlencoded_form
 974        )
 975
 976    @urlencoded_form.setter
 977    def urlencoded_form(self, value):
 978        self._set_urlencoded_form(value)
 979
 980    def _get_multipart_form(self):
 981        is_valid_content_type = (
 982            "multipart/form-data" in self.headers.get("content-type", "").lower()
 983        )
 984        if is_valid_content_type:
 985            try:
 986                return multipart.decode(self.headers.get("content-type"), self.content)
 987            except ValueError:
 988                pass
 989        return ()
 990
 991    def _set_multipart_form(self, value):
 992        is_valid_content_type = (
 993            self.headers.get("content-type", "")
 994            .lower()
 995            .startswith("multipart/form-data")
 996        )
 997        if not is_valid_content_type:
 998            """
 999            Generate a random boundary here.
1000
1001            See <https://datatracker.ietf.org/doc/html/rfc2046#section-5.1.1> for specifications
1002            on generating the boundary.
1003            """
1004            boundary = "-" * 20 + binascii.hexlify(os.urandom(16)).decode()
1005            self.headers["content-type"] = f"multipart/form-data; boundary={boundary}"
1006        self.content = multipart.encode(self.headers, value)
1007
1008    @property
1009    def multipart_form(self) -> multidict.MultiDictView[bytes, bytes]:
1010        """
1011        The multipart form data.
1012
1013        If the content-type indicates non-form data or the form could not be parsed, this is set to
1014        an empty `MultiDictView`.
1015
1016        Modifications to the MultiDictView update `Request.content`, and vice versa.
1017        """
1018        return multidict.MultiDictView(
1019            self._get_multipart_form, self._set_multipart_form
1020        )
1021
1022    @multipart_form.setter
1023    def multipart_form(self, value):
1024        self._set_multipart_form(value)
1025
1026
1027class Response(Message):
1028    """
1029    An HTTP response.
1030    """
1031
1032    data: ResponseData
1033
1034    def __init__(
1035        self,
1036        http_version: bytes,
1037        status_code: int,
1038        reason: bytes,
1039        headers: Union[Headers, tuple[tuple[bytes, bytes], ...]],
1040        content: Optional[bytes],
1041        trailers: Union[None, Headers, tuple[tuple[bytes, bytes], ...]],
1042        timestamp_start: float,
1043        timestamp_end: Optional[float],
1044    ):
1045        # auto-convert invalid types to retain compatibility with older code.
1046        if isinstance(http_version, str):
1047            http_version = http_version.encode("ascii", "strict")
1048        if isinstance(reason, str):
1049            reason = reason.encode("ascii", "strict")
1050
1051        if isinstance(content, str):
1052            raise ValueError(f"Content must be bytes, not {type(content).__name__}")
1053        if not isinstance(headers, Headers):
1054            headers = Headers(headers)
1055        if trailers is not None and not isinstance(trailers, Headers):
1056            trailers = Headers(trailers)
1057
1058        self.data = ResponseData(
1059            http_version=http_version,
1060            status_code=status_code,
1061            reason=reason,
1062            headers=headers,
1063            content=content,
1064            trailers=trailers,
1065            timestamp_start=timestamp_start,
1066            timestamp_end=timestamp_end,
1067        )
1068
1069    def __repr__(self) -> str:
1070        if self.raw_content:
1071            ct = self.headers.get("content-type", "unknown content type")
1072            size = human.pretty_size(len(self.raw_content))
1073            details = f"{ct}, {size}"
1074        else:
1075            details = "no content"
1076        return f"Response({self.status_code}, {details})"
1077
1078    @classmethod
1079    def make(
1080        cls,
1081        status_code: int = 200,
1082        content: Union[bytes, str] = b"",
1083        headers: Union[
1084            Headers, Mapping[str, Union[str, bytes]], Iterable[tuple[bytes, bytes]]
1085        ] = (),
1086    ) -> "Response":
1087        """
1088        Simplified API for creating response objects.
1089        """
1090        if isinstance(headers, Headers):
1091            headers = headers
1092        elif isinstance(headers, dict):
1093            headers = Headers(
1094                (
1095                    always_bytes(k, "utf-8", "surrogateescape"),  # type: ignore
1096                    always_bytes(v, "utf-8", "surrogateescape"),
1097                )
1098                for k, v in headers.items()
1099            )
1100        elif isinstance(headers, Iterable):
1101            headers = Headers(headers)  # type: ignore
1102        else:
1103            raise TypeError(
1104                "Expected headers to be an iterable or dict, but is {}.".format(
1105                    type(headers).__name__
1106                )
1107            )
1108
1109        resp = cls(
1110            b"HTTP/1.1",
1111            status_code,
1112            status_codes.RESPONSES.get(status_code, "").encode(),
1113            headers,
1114            None,
1115            None,
1116            time.time(),
1117            time.time(),
1118        )
1119
1120        # Assign this manually to update the content-length header.
1121        if isinstance(content, bytes):
1122            resp.content = content
1123        elif isinstance(content, str):
1124            resp.text = content
1125        else:
1126            raise TypeError(
1127                f"Expected content to be str or bytes, but is {type(content).__name__}."
1128            )
1129
1130        return resp
1131
1132    @property
1133    def status_code(self) -> int:
1134        """
1135        HTTP Status Code, e.g. ``200``.
1136        """
1137        return self.data.status_code
1138
1139    @status_code.setter
1140    def status_code(self, status_code: int) -> None:
1141        self.data.status_code = status_code
1142
1143    @property
1144    def reason(self) -> str:
1145        """
1146        HTTP reason phrase, for example "Not Found".
1147
1148        HTTP/2 responses do not contain a reason phrase, an empty string will be returned instead.
1149        """
1150        # Encoding: http://stackoverflow.com/a/16674906/934719
1151        return self.data.reason.decode("ISO-8859-1")
1152
1153    @reason.setter
1154    def reason(self, reason: Union[str, bytes]) -> None:
1155        self.data.reason = strutils.always_bytes(reason, "ISO-8859-1")
1156
1157    def _get_cookies(self):
1158        h = self.headers.get_all("set-cookie")
1159        all_cookies = cookies.parse_set_cookie_headers(h)
1160        return tuple((name, (value, attrs)) for name, value, attrs in all_cookies)
1161
1162    def _set_cookies(self, value):
1163        cookie_headers = []
1164        for k, v in value:
1165            header = cookies.format_set_cookie_header([(k, v[0], v[1])])
1166            cookie_headers.append(header)
1167        self.headers.set_all("set-cookie", cookie_headers)
1168
1169    @property
1170    def cookies(
1171        self,
1172    ) -> multidict.MultiDictView[
1173        str, tuple[str, multidict.MultiDict[str, Optional[str]]]
1174    ]:
1175        """
1176        The response cookies. A possibly empty `MultiDictView`, where the keys are cookie
1177        name strings, and values are `(cookie value, attributes)` tuples. Within
1178        attributes, unary attributes (e.g. `HTTPOnly`) are indicated by a `None` value.
1179        Modifications to the MultiDictView update `Response.headers`, and vice versa.
1180
1181        *Warning:* Changes to `attributes` will not be picked up unless you also reassign
1182        the `(cookie value, attributes)` tuple directly in the `MultiDictView`.
1183        """
1184        return multidict.MultiDictView(self._get_cookies, self._set_cookies)
1185
1186    @cookies.setter
1187    def cookies(self, value):
1188        self._set_cookies(value)
1189
1190    def refresh(self, now=None):
1191        """
1192        This fairly complex and heuristic function refreshes a server
1193        response for replay.
1194
1195         - It adjusts date, expires, and last-modified headers.
1196         - It adjusts cookie expiration.
1197        """
1198        if not now:
1199            now = time.time()
1200        delta = now - self.timestamp_start
1201        refresh_headers = [
1202            "date",
1203            "expires",
1204            "last-modified",
1205        ]
1206        for i in refresh_headers:
1207            if i in self.headers:
1208                d = parsedate_tz(self.headers[i])
1209                if d:
1210                    new = mktime_tz(d) + delta
1211                    try:
1212                        self.headers[i] = formatdate(new, usegmt=True)
1213                    except OSError:  # pragma: no cover
1214                        pass  # value out of bounds on Windows only (which is why we exclude it from coverage).
1215        c = []
1216        for set_cookie_header in self.headers.get_all("set-cookie"):
1217            try:
1218                refreshed = cookies.refresh_set_cookie_header(set_cookie_header, delta)
1219            except ValueError:
1220                refreshed = set_cookie_header
1221            c.append(refreshed)
1222        if c:
1223            self.headers.set_all("set-cookie", c)
1224
1225
1226class HTTPFlow(flow.Flow):
1227    """
1228    An HTTPFlow is a collection of objects representing a single HTTP
1229    transaction.
1230    """
1231
1232    request: Request
1233    """The client's HTTP request."""
1234    response: Optional[Response] = None
1235    """The server's HTTP response."""
1236    error: Optional[flow.Error] = None
1237    """
1238    A connection or protocol error affecting this flow.
1239
1240    Note that it's possible for a Flow to have both a response and an error
1241    object. This might happen, for instance, when a response was received
1242    from the server, but there was an error sending it back to the client.
1243    """
1244
1245    websocket: Optional[WebSocketData] = None
1246    """
1247    If this HTTP flow initiated a WebSocket connection, this attribute contains all associated WebSocket data.
1248    """
1249
1250    _stateobject_attributes = flow.Flow._stateobject_attributes.copy()
1251    # mypy doesn't support update with kwargs
1252    _stateobject_attributes.update(
1253        dict(request=Request, response=Response, websocket=WebSocketData)
1254    )
1255
1256    def __repr__(self):
1257        s = "<HTTPFlow"
1258        for a in (
1259            "request",
1260            "response",
1261            "websocket",
1262            "error",
1263            "client_conn",
1264            "server_conn",
1265        ):
1266            if getattr(self, a, False):
1267                s += f"\r\n  {a} = {{flow.{a}}}"
1268        s += ">"
1269        return s.format(flow=self)
1270
1271    @property
1272    def timestamp_start(self) -> float:
1273        """*Read-only:* An alias for `Request.timestamp_start`."""
1274        return self.request.timestamp_start
1275
1276    @property
1277    def mode(self) -> str:  # pragma: no cover
1278        warnings.warn("HTTPFlow.mode is deprecated.", DeprecationWarning, stacklevel=2)
1279        return getattr(self, "_mode", "regular")
1280
1281    @mode.setter
1282    def mode(self, val: str) -> None:  # pragma: no cover
1283        warnings.warn("HTTPFlow.mode is deprecated.", DeprecationWarning, stacklevel=2)
1284        self._mode = val
1285
1286    def copy(self):
1287        f = super().copy()
1288        if self.request:
1289            f.request = self.request.copy()
1290        if self.response:
1291            f.response = self.response.copy()
1292        return f
1293
1294
1295__all__ = [
1296    "HTTPFlow",
1297    "Message",
1298    "Request",
1299    "Response",
1300    "Headers",
1301]
class HTTPFlow(mitmproxy.flow.Flow):
1227class HTTPFlow(flow.Flow):
1228    """
1229    An HTTPFlow is a collection of objects representing a single HTTP
1230    transaction.
1231    """
1232
1233    request: Request
1234    """The client's HTTP request."""
1235    response: Optional[Response] = None
1236    """The server's HTTP response."""
1237    error: Optional[flow.Error] = None
1238    """
1239    A connection or protocol error affecting this flow.
1240
1241    Note that it's possible for a Flow to have both a response and an error
1242    object. This might happen, for instance, when a response was received
1243    from the server, but there was an error sending it back to the client.
1244    """
1245
1246    websocket: Optional[WebSocketData] = None
1247    """
1248    If this HTTP flow initiated a WebSocket connection, this attribute contains all associated WebSocket data.
1249    """
1250
1251    _stateobject_attributes = flow.Flow._stateobject_attributes.copy()
1252    # mypy doesn't support update with kwargs
1253    _stateobject_attributes.update(
1254        dict(request=Request, response=Response, websocket=WebSocketData)
1255    )
1256
1257    def __repr__(self):
1258        s = "<HTTPFlow"
1259        for a in (
1260            "request",
1261            "response",
1262            "websocket",
1263            "error",
1264            "client_conn",
1265            "server_conn",
1266        ):
1267            if getattr(self, a, False):
1268                s += f"\r\n  {a} = {{flow.{a}}}"
1269        s += ">"
1270        return s.format(flow=self)
1271
1272    @property
1273    def timestamp_start(self) -> float:
1274        """*Read-only:* An alias for `Request.timestamp_start`."""
1275        return self.request.timestamp_start
1276
1277    @property
1278    def mode(self) -> str:  # pragma: no cover
1279        warnings.warn("HTTPFlow.mode is deprecated.", DeprecationWarning, stacklevel=2)
1280        return getattr(self, "_mode", "regular")
1281
1282    @mode.setter
1283    def mode(self, val: str) -> None:  # pragma: no cover
1284        warnings.warn("HTTPFlow.mode is deprecated.", DeprecationWarning, stacklevel=2)
1285        self._mode = val
1286
1287    def copy(self):
1288        f = super().copy()
1289        if self.request:
1290            f.request = self.request.copy()
1291        if self.response:
1292            f.response = self.response.copy()
1293        return f

An HTTPFlow is a collection of objects representing a single HTTP transaction.

The client's HTTP request.

response: Optional[mitmproxy.http.Response] = None

The server's HTTP response.

error: Optional[mitmproxy.flow.Error] = None

A connection or protocol error affecting this flow.

Note that it's possible for a Flow to have both a response and an error object. This might happen, for instance, when a response was received from the server, but there was an error sending it back to the client.

websocket: Optional[mitmproxy.websocket.WebSocketData] = None

If this HTTP flow initiated a WebSocket connection, this attribute contains all associated WebSocket data.

timestamp_start: float

Read-only: An alias for Request.timestamp_start.

def copy(self):
1287    def copy(self):
1288        f = super().copy()
1289        if self.request:
1290            f.request = self.request.copy()
1291        if self.response:
1292            f.response = self.response.copy()
1293        return f

Make a copy of this flow.

type: ClassVar[str] = 'http'

The flow type, for example http, tcp, or dns.

class Message(mitmproxy.coretypes.serializable.Serializable):
235class Message(serializable.Serializable):
236    """Base class for `Request` and `Response`."""
237
238    @classmethod
239    def from_state(cls, state):
240        return cls(**state)
241
242    def get_state(self):
243        return self.data.get_state()
244
245    def set_state(self, state):
246        self.data.set_state(state)
247
248    data: MessageData
249    stream: Union[Callable[[bytes], Union[Iterable[bytes], bytes]], bool] = False
250    """
251    This attribute controls if the message body should be streamed.
252
253    If `False`, mitmproxy will buffer the entire body before forwarding it to the destination.
254    This makes it possible to perform string replacements on the entire body.
255    If `True`, the message body will not be buffered on the proxy
256    but immediately forwarded instead.
257    Alternatively, a transformation function can be specified, which will be called for each chunk of data.
258    Please note that packet boundaries generally should not be relied upon.
259
260    This attribute must be set in the `requestheaders` or `responseheaders` hook.
261    Setting it in `request` or  `response` is already too late, mitmproxy has buffered the message body already.
262    """
263
264    @property
265    def http_version(self) -> str:
266        """
267        HTTP version string, for example `HTTP/1.1`.
268        """
269        return self.data.http_version.decode("utf-8", "surrogateescape")
270
271    @http_version.setter
272    def http_version(self, http_version: Union[str, bytes]) -> None:
273        self.data.http_version = strutils.always_bytes(
274            http_version, "utf-8", "surrogateescape"
275        )
276
277    @property
278    def is_http10(self) -> bool:
279        return self.data.http_version == b"HTTP/1.0"
280
281    @property
282    def is_http11(self) -> bool:
283        return self.data.http_version == b"HTTP/1.1"
284
285    @property
286    def is_http2(self) -> bool:
287        return self.data.http_version == b"HTTP/2.0"
288
289    @property
290    def headers(self) -> Headers:
291        """
292        The HTTP headers.
293        """
294        return self.data.headers
295
296    @headers.setter
297    def headers(self, h: Headers) -> None:
298        self.data.headers = h
299
300    @property
301    def trailers(self) -> Optional[Headers]:
302        """
303        The [HTTP trailers](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Trailer).
304        """
305        return self.data.trailers
306
307    @trailers.setter
308    def trailers(self, h: Optional[Headers]) -> None:
309        self.data.trailers = h
310
311    @property
312    def raw_content(self) -> Optional[bytes]:
313        """
314        The raw (potentially compressed) HTTP message body.
315
316        In contrast to `Message.content` and `Message.text`, accessing this property never raises.
317
318        *See also:* `Message.content`, `Message.text`
319        """
320        return self.data.content
321
322    @raw_content.setter
323    def raw_content(self, content: Optional[bytes]) -> None:
324        self.data.content = content
325
326    @property
327    def content(self) -> Optional[bytes]:
328        """
329        The uncompressed HTTP message body as bytes.
330
331        Accessing this attribute may raise a `ValueError` when the HTTP content-encoding is invalid.
332
333        *See also:* `Message.raw_content`, `Message.text`
334        """
335        return self.get_content()
336
337    @content.setter
338    def content(self, value: Optional[bytes]) -> None:
339        self.set_content(value)
340
341    @property
342    def text(self) -> Optional[str]:
343        """
344        The uncompressed and decoded HTTP message body as text.
345
346        Accessing this attribute may raise a `ValueError` when either content-encoding or charset is invalid.
347
348        *See also:* `Message.raw_content`, `Message.content`
349        """
350        return self.get_text()
351
352    @text.setter
353    def text(self, value: Optional[str]) -> None:
354        self.set_text(value)
355
356    def set_content(self, value: Optional[bytes]) -> None:
357        if value is None:
358            self.raw_content = None
359            return
360        if not isinstance(value, bytes):
361            raise TypeError(
362                f"Message content must be bytes, not {type(value).__name__}. "
363                "Please use .text if you want to assign a str."
364            )
365        ce = self.headers.get("content-encoding")
366        try:
367            self.raw_content = encoding.encode(value, ce or "identity")
368        except ValueError:
369            # So we have an invalid content-encoding?
370            # Let's remove it!
371            del self.headers["content-encoding"]
372            self.raw_content = value
373
374        if "transfer-encoding" in self.headers:
375            # https://httpwg.org/specs/rfc7230.html#header.content-length
376            # don't set content-length if a transfer-encoding is provided
377            pass
378        else:
379            self.headers["content-length"] = str(len(self.raw_content))
380
381    def get_content(self, strict: bool = True) -> Optional[bytes]:
382        """
383        Similar to `Message.content`, but does not raise if `strict` is `False`.
384        Instead, the compressed message body is returned as-is.
385        """
386        if self.raw_content is None:
387            return None
388        ce = self.headers.get("content-encoding")
389        if ce:
390            try:
391                content = encoding.decode(self.raw_content, ce)
392                # A client may illegally specify a byte -> str encoding here (e.g. utf8)
393                if isinstance(content, str):
394                    raise ValueError(f"Invalid Content-Encoding: {ce}")
395                return content
396            except ValueError:
397                if strict:
398                    raise
399                return self.raw_content
400        else:
401            return self.raw_content
402
403    def _get_content_type_charset(self) -> Optional[str]:
404        ct = parse_content_type(self.headers.get("content-type", ""))
405        if ct:
406            return ct[2].get("charset")
407        return None
408
409    def _guess_encoding(self, content: bytes = b"") -> str:
410        enc = self._get_content_type_charset()
411        if not enc:
412            if "json" in self.headers.get("content-type", ""):
413                enc = "utf8"
414        if not enc:
415            if "html" in self.headers.get("content-type", ""):
416                meta_charset = re.search(
417                    rb"""<meta[^>]+charset=['"]?([^'">]+)""", content, re.IGNORECASE
418                )
419                if meta_charset:
420                    enc = meta_charset.group(1).decode("ascii", "ignore")
421        if not enc:
422            if "text/css" in self.headers.get("content-type", ""):
423                # @charset rule must be the very first thing.
424                css_charset = re.match(
425                    rb"""@charset "([^"]+)";""", content, re.IGNORECASE
426                )
427                if css_charset:
428                    enc = css_charset.group(1).decode("ascii", "ignore")
429        if not enc:
430            enc = "latin-1"
431        # Use GB 18030 as the superset of GB2312 and GBK to fix common encoding problems on Chinese websites.
432        if enc.lower() in ("gb2312", "gbk"):
433            enc = "gb18030"
434
435        return enc
436
437    def set_text(self, text: Optional[str]) -> None:
438        if text is None:
439            self.content = None
440            return
441        enc = self._guess_encoding()
442
443        try:
444            self.content = cast(bytes, encoding.encode(text, enc))
445        except ValueError:
446            # Fall back to UTF-8 and update the content-type header.
447            ct = parse_content_type(self.headers.get("content-type", "")) or (
448                "text",
449                "plain",
450                {},
451            )
452            ct[2]["charset"] = "utf-8"
453            self.headers["content-type"] = assemble_content_type(*ct)
454            enc = "utf8"
455            self.content = text.encode(enc, "surrogateescape")
456
457    def get_text(self, strict: bool = True) -> Optional[str]:
458        """
459        Similar to `Message.text`, but does not raise if `strict` is `False`.
460        Instead, the message body is returned as surrogate-escaped UTF-8.
461        """
462        content = self.get_content(strict)
463        if content is None:
464            return None
465        enc = self._guess_encoding(content)
466        try:
467            return cast(str, encoding.decode(content, enc))
468        except ValueError:
469            if strict:
470                raise
471            return content.decode("utf8", "surrogateescape")
472
473    @property
474    def timestamp_start(self) -> float:
475        """
476        *Timestamp:* Headers received.
477        """
478        return self.data.timestamp_start
479
480    @timestamp_start.setter
481    def timestamp_start(self, timestamp_start: float) -> None:
482        self.data.timestamp_start = timestamp_start
483
484    @property
485    def timestamp_end(self) -> Optional[float]:
486        """
487        *Timestamp:* Last byte received.
488        """
489        return self.data.timestamp_end
490
491    @timestamp_end.setter
492    def timestamp_end(self, timestamp_end: Optional[float]):
493        self.data.timestamp_end = timestamp_end
494
495    def decode(self, strict: bool = True) -> None:
496        """
497        Decodes body based on the current Content-Encoding header, then
498        removes the header. If there is no Content-Encoding header, no
499        action is taken.
500
501        *Raises:*
502         - `ValueError`, when the content-encoding is invalid and strict is True.
503        """
504        decoded = self.get_content(strict)
505        self.headers.pop("content-encoding", None)
506        self.content = decoded
507
508    def encode(self, encoding: str) -> None:
509        """
510        Encodes body with the given encoding, where e is "gzip", "deflate", "identity", "br", or "zstd".
511        Any existing content-encodings are overwritten, the content is not decoded beforehand.
512
513        *Raises:*
514         - `ValueError`, when the specified content-encoding is invalid.
515        """
516        self.headers["content-encoding"] = encoding
517        self.content = self.raw_content
518        if "content-encoding" not in self.headers:
519            raise ValueError(f"Invalid content encoding {repr(encoding)}")
520
521    def json(self, **kwargs: Any) -> Any:
522        """
523        Returns the JSON encoded content of the response, if any.
524        `**kwargs` are optional arguments that will be
525        passed to `json.loads()`.
526
527        Will raise if the content can not be decoded and then parsed as JSON.
528
529        *Raises:*
530         - `json.decoder.JSONDecodeError` if content is not valid JSON.
531         - `TypeError` if the content is not available, for example because the response
532            has been streamed.
533        """
534        content = self.get_content(strict=False)
535        if content is None:
536            raise TypeError("Message content is not available.")
537        else:
538            return json.loads(content, **kwargs)

Base class for Request and Response.

stream: Union[Callable[[bytes], Union[Iterable[bytes], bytes]], bool] = False

This attribute controls if the message body should be streamed.

If False, mitmproxy will buffer the entire body before forwarding it to the destination. This makes it possible to perform string replacements on the entire body. If True, the message body will not be buffered on the proxy but immediately forwarded instead. Alternatively, a transformation function can be specified, which will be called for each chunk of data. Please note that packet boundaries generally should not be relied upon.

This attribute must be set in the requestheaders or responseheaders hook. Setting it in request or response is already too late, mitmproxy has buffered the message body already.

http_version: str

HTTP version string, for example HTTP/1.1.

The HTTP headers.

trailers: Optional[mitmproxy.http.Headers]
raw_content: Optional[bytes]

The raw (potentially compressed) HTTP message body.

In contrast to Message.content and Message.text, accessing this property never raises.

See also: Message.content, Message.text

content: Optional[bytes]

The uncompressed HTTP message body as bytes.

Accessing this attribute may raise a ValueError when the HTTP content-encoding is invalid.

See also: Message.raw_content, Message.text

text: Optional[str]

The uncompressed and decoded HTTP message body as text.

Accessing this attribute may raise a ValueError when either content-encoding or charset is invalid.

See also: Message.raw_content, Message.content

def set_content(self, value: Optional[bytes]) -> None:
356    def set_content(self, value: Optional[bytes]) -> None:
357        if value is None:
358            self.raw_content = None
359            return
360        if not isinstance(value, bytes):
361            raise TypeError(
362                f"Message content must be bytes, not {type(value).__name__}. "
363                "Please use .text if you want to assign a str."
364            )
365        ce = self.headers.get("content-encoding")
366        try:
367            self.raw_content = encoding.encode(value, ce or "identity")
368        except ValueError:
369            # So we have an invalid content-encoding?
370            # Let's remove it!
371            del self.headers["content-encoding"]
372            self.raw_content = value
373
374        if "transfer-encoding" in self.headers:
375            # https://httpwg.org/specs/rfc7230.html#header.content-length
376            # don't set content-length if a transfer-encoding is provided
377            pass
378        else:
379            self.headers["content-length"] = str(len(self.raw_content))
def get_content(self, strict: bool = True) -> Optional[bytes]:
381    def get_content(self, strict: bool = True) -> Optional[bytes]:
382        """
383        Similar to `Message.content`, but does not raise if `strict` is `False`.
384        Instead, the compressed message body is returned as-is.
385        """
386        if self.raw_content is None:
387            return None
388        ce = self.headers.get("content-encoding")
389        if ce:
390            try:
391                content = encoding.decode(self.raw_content, ce)
392                # A client may illegally specify a byte -> str encoding here (e.g. utf8)
393                if isinstance(content, str):
394                    raise ValueError(f"Invalid Content-Encoding: {ce}")
395                return content
396            except ValueError:
397                if strict:
398                    raise
399                return self.raw_content
400        else:
401            return self.raw_content

Similar to Message.content, but does not raise if strict is False. Instead, the compressed message body is returned as-is.

def set_text(self, text: Optional[str]) -> None:
437    def set_text(self, text: Optional[str]) -> None:
438        if text is None:
439            self.content = None
440            return
441        enc = self._guess_encoding()
442
443        try:
444            self.content = cast(bytes, encoding.encode(text, enc))
445        except ValueError:
446            # Fall back to UTF-8 and update the content-type header.
447            ct = parse_content_type(self.headers.get("content-type", "")) or (
448                "text",
449                "plain",
450                {},
451            )
452            ct[2]["charset"] = "utf-8"
453            self.headers["content-type"] = assemble_content_type(*ct)
454            enc = "utf8"
455            self.content = text.encode(enc, "surrogateescape")
def get_text(self, strict: bool = True) -> Optional[str]:
457    def get_text(self, strict: bool = True) -> Optional[str]:
458        """
459        Similar to `Message.text`, but does not raise if `strict` is `False`.
460        Instead, the message body is returned as surrogate-escaped UTF-8.
461        """
462        content = self.get_content(strict)
463        if content is None:
464            return None
465        enc = self._guess_encoding(content)
466        try:
467            return cast(str, encoding.decode(content, enc))
468        except ValueError:
469            if strict:
470                raise
471            return content.decode("utf8", "surrogateescape")

Similar to Message.text, but does not raise if strict is False. Instead, the message body is returned as surrogate-escaped UTF-8.

timestamp_start: float

Timestamp: Headers received.

timestamp_end: Optional[float]

Timestamp: Last byte received.

def decode(self, strict: bool = True) -> None:
495    def decode(self, strict: bool = True) -> None:
496        """
497        Decodes body based on the current Content-Encoding header, then
498        removes the header. If there is no Content-Encoding header, no
499        action is taken.
500
501        *Raises:*
502         - `ValueError`, when the content-encoding is invalid and strict is True.
503        """
504        decoded = self.get_content(strict)
505        self.headers.pop("content-encoding", None)
506        self.content = decoded

Decodes body based on the current Content-Encoding header, then removes the header. If there is no Content-Encoding header, no action is taken.

Raises:

  • ValueError, when the content-encoding is invalid and strict is True.
def encode(self, encoding: str) -> None:
508    def encode(self, encoding: str) -> None:
509        """
510        Encodes body with the given encoding, where e is "gzip", "deflate", "identity", "br", or "zstd".
511        Any existing content-encodings are overwritten, the content is not decoded beforehand.
512
513        *Raises:*
514         - `ValueError`, when the specified content-encoding is invalid.
515        """
516        self.headers["content-encoding"] = encoding
517        self.content = self.raw_content
518        if "content-encoding" not in self.headers:
519            raise ValueError(f"Invalid content encoding {repr(encoding)}")

Encodes body with the given encoding, where e is "gzip", "deflate", "identity", "br", or "zstd". Any existing content-encodings are overwritten, the content is not decoded beforehand.

Raises:

  • ValueError, when the specified content-encoding is invalid.
def json(self, **kwargs: Any) -> Any:
521    def json(self, **kwargs: Any) -> Any:
522        """
523        Returns the JSON encoded content of the response, if any.
524        `**kwargs` are optional arguments that will be
525        passed to `json.loads()`.
526
527        Will raise if the content can not be decoded and then parsed as JSON.
528
529        *Raises:*
530         - `json.decoder.JSONDecodeError` if content is not valid JSON.
531         - `TypeError` if the content is not available, for example because the response
532            has been streamed.
533        """
534        content = self.get_content(strict=False)
535        if content is None:
536            raise TypeError("Message content is not available.")
537        else:
538            return json.loads(content, **kwargs)

Returns the JSON encoded content of the response, if any. **kwargs are optional arguments that will be passed to json.loads().

Will raise if the content can not be decoded and then parsed as JSON.

Raises:

  • json.decoder.JSONDecodeError if content is not valid JSON.
  • TypeError if the content is not available, for example because the response has been streamed.
Inherited Members
mitmproxy.coretypes.serializable.Serializable
copy
class Request(Message):
 541class Request(Message):
 542    """
 543    An HTTP request.
 544    """
 545
 546    data: RequestData
 547
 548    def __init__(
 549        self,
 550        host: str,
 551        port: int,
 552        method: bytes,
 553        scheme: bytes,
 554        authority: bytes,
 555        path: bytes,
 556        http_version: bytes,
 557        headers: Union[Headers, tuple[tuple[bytes, bytes], ...]],
 558        content: Optional[bytes],
 559        trailers: Union[Headers, tuple[tuple[bytes, bytes], ...], None],
 560        timestamp_start: float,
 561        timestamp_end: Optional[float],
 562    ):
 563        # auto-convert invalid types to retain compatibility with older code.
 564        if isinstance(host, bytes):
 565            host = host.decode("idna", "strict")
 566        if isinstance(method, str):
 567            method = method.encode("ascii", "strict")
 568        if isinstance(scheme, str):
 569            scheme = scheme.encode("ascii", "strict")
 570        if isinstance(authority, str):
 571            authority = authority.encode("ascii", "strict")
 572        if isinstance(path, str):
 573            path = path.encode("ascii", "strict")
 574        if isinstance(http_version, str):
 575            http_version = http_version.encode("ascii", "strict")
 576
 577        if isinstance(content, str):
 578            raise ValueError(f"Content must be bytes, not {type(content).__name__}")
 579        if not isinstance(headers, Headers):
 580            headers = Headers(headers)
 581        if trailers is not None and not isinstance(trailers, Headers):
 582            trailers = Headers(trailers)
 583
 584        self.data = RequestData(
 585            host=host,
 586            port=port,
 587            method=method,
 588            scheme=scheme,
 589            authority=authority,
 590            path=path,
 591            http_version=http_version,
 592            headers=headers,
 593            content=content,
 594            trailers=trailers,
 595            timestamp_start=timestamp_start,
 596            timestamp_end=timestamp_end,
 597        )
 598
 599    def __repr__(self) -> str:
 600        if self.host and self.port:
 601            hostport = f"{self.host}:{self.port}"
 602        else:
 603            hostport = ""
 604        path = self.path or ""
 605        return f"Request({self.method} {hostport}{path})"
 606
 607    @classmethod
 608    def make(
 609        cls,
 610        method: str,
 611        url: str,
 612        content: Union[bytes, str] = "",
 613        headers: Union[
 614            Headers,
 615            dict[Union[str, bytes], Union[str, bytes]],
 616            Iterable[tuple[bytes, bytes]],
 617        ] = (),
 618    ) -> "Request":
 619        """
 620        Simplified API for creating request objects.
 621        """
 622        # Headers can be list or dict, we differentiate here.
 623        if isinstance(headers, Headers):
 624            pass
 625        elif isinstance(headers, dict):
 626            headers = Headers(
 627                (
 628                    always_bytes(k, "utf-8", "surrogateescape"),
 629                    always_bytes(v, "utf-8", "surrogateescape"),
 630                )
 631                for k, v in headers.items()
 632            )
 633        elif isinstance(headers, Iterable):
 634            headers = Headers(headers)  # type: ignore
 635        else:
 636            raise TypeError(
 637                "Expected headers to be an iterable or dict, but is {}.".format(
 638                    type(headers).__name__
 639                )
 640            )
 641
 642        req = cls(
 643            "",
 644            0,
 645            method.encode("utf-8", "surrogateescape"),
 646            b"",
 647            b"",
 648            b"",
 649            b"HTTP/1.1",
 650            headers,
 651            b"",
 652            None,
 653            time.time(),
 654            time.time(),
 655        )
 656
 657        req.url = url
 658        # Assign this manually to update the content-length header.
 659        if isinstance(content, bytes):
 660            req.content = content
 661        elif isinstance(content, str):
 662            req.text = content
 663        else:
 664            raise TypeError(
 665                f"Expected content to be str or bytes, but is {type(content).__name__}."
 666            )
 667
 668        return req
 669
 670    @property
 671    def first_line_format(self) -> str:
 672        """
 673        *Read-only:* HTTP request form as defined in [RFC 7230](https://tools.ietf.org/html/rfc7230#section-5.3).
 674
 675        origin-form and asterisk-form are subsumed as "relative".
 676        """
 677        if self.method == "CONNECT":
 678            return "authority"
 679        elif self.authority:
 680            return "absolute"
 681        else:
 682            return "relative"
 683
 684    @property
 685    def method(self) -> str:
 686        """
 687        HTTP request method, e.g. "GET".
 688        """
 689        return self.data.method.decode("utf-8", "surrogateescape").upper()
 690
 691    @method.setter
 692    def method(self, val: Union[str, bytes]) -> None:
 693        self.data.method = always_bytes(val, "utf-8", "surrogateescape")
 694
 695    @property
 696    def scheme(self) -> str:
 697        """
 698        HTTP request scheme, which should be "http" or "https".
 699        """
 700        return self.data.scheme.decode("utf-8", "surrogateescape")
 701
 702    @scheme.setter
 703    def scheme(self, val: Union[str, bytes]) -> None:
 704        self.data.scheme = always_bytes(val, "utf-8", "surrogateescape")
 705
 706    @property
 707    def authority(self) -> str:
 708        """
 709        HTTP request authority.
 710
 711        For HTTP/1, this is the authority portion of the request target
 712        (in either absolute-form or authority-form).
 713        For origin-form and asterisk-form requests, this property is set to an empty string.
 714
 715        For HTTP/2, this is the :authority pseudo header.
 716
 717        *See also:* `Request.host`, `Request.host_header`, `Request.pretty_host`
 718        """
 719        try:
 720            return self.data.authority.decode("idna")
 721        except UnicodeError:
 722            return self.data.authority.decode("utf8", "surrogateescape")
 723
 724    @authority.setter
 725    def authority(self, val: Union[str, bytes]) -> None:
 726        if isinstance(val, str):
 727            try:
 728                val = val.encode("idna", "strict")
 729            except UnicodeError:
 730                val = val.encode("utf8", "surrogateescape")  # type: ignore
 731        self.data.authority = val
 732
 733    @property
 734    def host(self) -> str:
 735        """
 736        Target server for this request. This may be parsed from the raw request
 737        (e.g. from a ``GET http://example.com/ HTTP/1.1`` request line)
 738        or inferred from the proxy mode (e.g. an IP in transparent mode).
 739
 740        Setting the host attribute also updates the host header and authority information, if present.
 741
 742        *See also:* `Request.authority`, `Request.host_header`, `Request.pretty_host`
 743        """
 744        return self.data.host
 745
 746    @host.setter
 747    def host(self, val: Union[str, bytes]) -> None:
 748        self.data.host = always_str(val, "idna", "strict")
 749
 750        # Update host header
 751        if "Host" in self.data.headers:
 752            self.data.headers["Host"] = val
 753        # Update authority
 754        if self.data.authority:
 755            self.authority = url.hostport(self.scheme, self.host, self.port)
 756
 757    @property
 758    def host_header(self) -> Optional[str]:
 759        """
 760        The request's host/authority header.
 761
 762        This property maps to either ``request.headers["Host"]`` or
 763        ``request.authority``, depending on whether it's HTTP/1.x or HTTP/2.0.
 764
 765        *See also:* `Request.authority`,`Request.host`, `Request.pretty_host`
 766        """
 767        if self.is_http2:
 768            return self.authority or self.data.headers.get("Host", None)
 769        else:
 770            return self.data.headers.get("Host", None)
 771
 772    @host_header.setter
 773    def host_header(self, val: Union[None, str, bytes]) -> None:
 774        if val is None:
 775            if self.is_http2:
 776                self.data.authority = b""
 777            self.headers.pop("Host", None)
 778        else:
 779            if self.is_http2:
 780                self.authority = val  # type: ignore
 781            if not self.is_http2 or "Host" in self.headers:
 782                # For h2, we only overwrite, but not create, as :authority is the h2 host header.
 783                self.headers["Host"] = val
 784
 785    @property
 786    def port(self) -> int:
 787        """
 788        Target port.
 789        """
 790        return self.data.port
 791
 792    @port.setter
 793    def port(self, port: int) -> None:
 794        self.data.port = port
 795
 796    @property
 797    def path(self) -> str:
 798        """
 799        HTTP request path, e.g. "/index.html".
 800        Usually starts with a slash, except for OPTIONS requests, which may just be "*".
 801        """
 802        return self.data.path.decode("utf-8", "surrogateescape")
 803
 804    @path.setter
 805    def path(self, val: Union[str, bytes]) -> None:
 806        self.data.path = always_bytes(val, "utf-8", "surrogateescape")
 807
 808    @property
 809    def url(self) -> str:
 810        """
 811        The full URL string, constructed from `Request.scheme`, `Request.host`, `Request.port` and `Request.path`.
 812
 813        Settings this property updates these attributes as well.
 814        """
 815        if self.first_line_format == "authority":
 816            return f"{self.host}:{self.port}"
 817        return url.unparse(self.scheme, self.host, self.port, self.path)
 818
 819    @url.setter
 820    def url(self, val: Union[str, bytes]) -> None:
 821        val = always_str(val, "utf-8", "surrogateescape")
 822        self.scheme, self.host, self.port, self.path = url.parse(val)
 823
 824    @property
 825    def pretty_host(self) -> str:
 826        """
 827        *Read-only:* Like `Request.host`, but using `Request.host_header` header as an additional (preferred) data source.
 828        This is useful in transparent mode where `Request.host` is only an IP address.
 829
 830        *Warning:* When working in adversarial environments, this may not reflect the actual destination
 831        as the Host header could be spoofed.
 832        """
 833        authority = self.host_header
 834        if authority:
 835            return url.parse_authority(authority, check=False)[0]
 836        else:
 837            return self.host
 838
 839    @property
 840    def pretty_url(self) -> str:
 841        """
 842        *Read-only:* Like `Request.url`, but using `Request.pretty_host` instead of `Request.host`.
 843        """
 844        if self.first_line_format == "authority":
 845            return self.authority
 846
 847        host_header = self.host_header
 848        if not host_header:
 849            return self.url
 850
 851        pretty_host, pretty_port = url.parse_authority(host_header, check=False)
 852        pretty_port = pretty_port or url.default_port(self.scheme) or 443
 853
 854        return url.unparse(self.scheme, pretty_host, pretty_port, self.path)
 855
 856    def _get_query(self):
 857        query = urllib.parse.urlparse(self.url).query
 858        return tuple(url.decode(query))
 859
 860    def _set_query(self, query_data):
 861        query = url.encode(query_data)
 862        _, _, path, params, _, fragment = urllib.parse.urlparse(self.url)
 863        self.path = urllib.parse.urlunparse(["", "", path, params, query, fragment])
 864
 865    @property
 866    def query(self) -> multidict.MultiDictView[str, str]:
 867        """
 868        The request query as a mutable mapping view on the request's path.
 869        For the most part, this behaves like a dictionary.
 870        Modifications to the MultiDictView update `Request.path`, and vice versa.
 871        """
 872        return multidict.MultiDictView(self._get_query, self._set_query)
 873
 874    @query.setter
 875    def query(self, value):
 876        self._set_query(value)
 877
 878    def _get_cookies(self):
 879        h = self.headers.get_all("Cookie")
 880        return tuple(cookies.parse_cookie_headers(h))
 881
 882    def _set_cookies(self, value):
 883        self.headers["cookie"] = cookies.format_cookie_header(value)
 884
 885    @property
 886    def cookies(self) -> multidict.MultiDictView[str, str]:
 887        """
 888        The request cookies.
 889        For the most part, this behaves like a dictionary.
 890        Modifications to the MultiDictView update `Request.headers`, and vice versa.
 891        """
 892        return multidict.MultiDictView(self._get_cookies, self._set_cookies)
 893
 894    @cookies.setter
 895    def cookies(self, value):
 896        self._set_cookies(value)
 897
 898    @property
 899    def path_components(self) -> tuple[str, ...]:
 900        """
 901        The URL's path components as a tuple of strings.
 902        Components are unquoted.
 903        """
 904        path = urllib.parse.urlparse(self.url).path
 905        # This needs to be a tuple so that it's immutable.
 906        # Otherwise, this would fail silently:
 907        #   request.path_components.append("foo")
 908        return tuple(url.unquote(i) for i in path.split("/") if i)
 909
 910    @path_components.setter
 911    def path_components(self, components: Iterable[str]):
 912        components = map(lambda x: url.quote(x, safe=""), components)
 913        path = "/" + "/".join(components)
 914        _, _, _, params, query, fragment = urllib.parse.urlparse(self.url)
 915        self.path = urllib.parse.urlunparse(["", "", path, params, query, fragment])
 916
 917    def anticache(self) -> None:
 918        """
 919        Modifies this request to remove headers that might produce a cached response.
 920        """
 921        delheaders = (
 922            "if-modified-since",
 923            "if-none-match",
 924        )
 925        for i in delheaders:
 926            self.headers.pop(i, None)
 927
 928    def anticomp(self) -> None:
 929        """
 930        Modify the Accept-Encoding header to only accept uncompressed responses.
 931        """
 932        self.headers["accept-encoding"] = "identity"
 933
 934    def constrain_encoding(self) -> None:
 935        """
 936        Limits the permissible Accept-Encoding values, based on what we can decode appropriately.
 937        """
 938        accept_encoding = self.headers.get("accept-encoding")
 939        if accept_encoding:
 940            self.headers["accept-encoding"] = ", ".join(
 941                e
 942                for e in {"gzip", "identity", "deflate", "br", "zstd"}
 943                if e in accept_encoding
 944            )
 945
 946    def _get_urlencoded_form(self):
 947        is_valid_content_type = (
 948            "application/x-www-form-urlencoded"
 949            in self.headers.get("content-type", "").lower()
 950        )
 951        if is_valid_content_type:
 952            return tuple(url.decode(self.get_text(strict=False)))
 953        return ()
 954
 955    def _set_urlencoded_form(self, form_data):
 956        """
 957        Sets the body to the URL-encoded form data, and adds the appropriate content-type header.
 958        This will overwrite the existing content if there is one.
 959        """
 960        self.headers["content-type"] = "application/x-www-form-urlencoded"
 961        self.content = url.encode(form_data, self.get_text(strict=False)).encode()
 962
 963    @property
 964    def urlencoded_form(self) -> multidict.MultiDictView[str, str]:
 965        """
 966        The URL-encoded form data.
 967
 968        If the content-type indicates non-form data or the form could not be parsed, this is set to
 969        an empty `MultiDictView`.
 970
 971        Modifications to the MultiDictView update `Request.content`, and vice versa.
 972        """
 973        return multidict.MultiDictView(
 974            self._get_urlencoded_form, self._set_urlencoded_form
 975        )
 976
 977    @urlencoded_form.setter
 978    def urlencoded_form(self, value):
 979        self._set_urlencoded_form(value)
 980
 981    def _get_multipart_form(self):
 982        is_valid_content_type = (
 983            "multipart/form-data" in self.headers.get("content-type", "").lower()
 984        )
 985        if is_valid_content_type:
 986            try:
 987                return multipart.decode(self.headers.get("content-type"), self.content)
 988            except ValueError:
 989                pass
 990        return ()
 991
 992    def _set_multipart_form(self, value):
 993        is_valid_content_type = (
 994            self.headers.get("content-type", "")
 995            .lower()
 996            .startswith("multipart/form-data")
 997        )
 998        if not is_valid_content_type:
 999            """
1000            Generate a random boundary here.
1001
1002            See <https://datatracker.ietf.org/doc/html/rfc2046#section-5.1.1> for specifications
1003            on generating the boundary.
1004            """
1005            boundary = "-" * 20 + binascii.hexlify(os.urandom(16)).decode()
1006            self.headers["content-type"] = f"multipart/form-data; boundary={boundary}"
1007        self.content = multipart.encode(self.headers, value)
1008
1009    @property
1010    def multipart_form(self) -> multidict.MultiDictView[bytes, bytes]:
1011        """
1012        The multipart form data.
1013
1014        If the content-type indicates non-form data or the form could not be parsed, this is set to
1015        an empty `MultiDictView`.
1016
1017        Modifications to the MultiDictView update `Request.content`, and vice versa.
1018        """
1019        return multidict.MultiDictView(
1020            self._get_multipart_form, self._set_multipart_form
1021        )
1022
1023    @multipart_form.setter
1024    def multipart_form(self, value):
1025        self._set_multipart_form(value)

An HTTP request.

Request( host: str, port: int, method: bytes, scheme: bytes, authority: bytes, path: bytes, http_version: bytes, headers: Union[mitmproxy.http.Headers, tuple[tuple[bytes, bytes], ...]], content: Optional[bytes], trailers: Union[mitmproxy.http.Headers, tuple[tuple[bytes, bytes], ...], NoneType], timestamp_start: float, timestamp_end: Optional[float])
548    def __init__(
549        self,
550        host: str,
551        port: int,
552        method: bytes,
553        scheme: bytes,
554        authority: bytes,
555        path: bytes,
556        http_version: bytes,
557        headers: Union[Headers, tuple[tuple[bytes, bytes], ...]],
558        content: Optional[bytes],
559        trailers: Union[Headers, tuple[tuple[bytes, bytes], ...], None],
560        timestamp_start: float,
561        timestamp_end: Optional[float],
562    ):
563        # auto-convert invalid types to retain compatibility with older code.
564        if isinstance(host, bytes):
565            host = host.decode("idna", "strict")
566        if isinstance(method, str):
567            method = method.encode("ascii", "strict")
568        if isinstance(scheme, str):
569            scheme = scheme.encode("ascii", "strict")
570        if isinstance(authority, str):
571            authority = authority.encode("ascii", "strict")
572        if isinstance(path, str):
573            path = path.encode("ascii", "strict")
574        if isinstance(http_version, str):
575            http_version = http_version.encode("ascii", "strict")
576
577        if isinstance(content, str):
578            raise ValueError(f"Content must be bytes, not {type(content).__name__}")
579        if not isinstance(headers, Headers):
580            headers = Headers(headers)
581        if trailers is not None and not isinstance(trailers, Headers):
582            trailers = Headers(trailers)
583
584        self.data = RequestData(
585            host=host,
586            port=port,
587            method=method,
588            scheme=scheme,
589            authority=authority,
590            path=path,
591            http_version=http_version,
592            headers=headers,
593            content=content,
594            trailers=trailers,
595            timestamp_start=timestamp_start,
596            timestamp_end=timestamp_end,
597        )
@classmethod
def make( cls, method: str, url: str, content: Union[bytes, str] = '', headers: Union[mitmproxy.http.Headers, dict[Union[str, bytes], Union[str, bytes]], Iterable[tuple[bytes, bytes]]] = ()) -> mitmproxy.http.Request:
607    @classmethod
608    def make(
609        cls,
610        method: str,
611        url: str,
612        content: Union[bytes, str] = "",
613        headers: Union[
614            Headers,
615            dict[Union[str, bytes], Union[str, bytes]],
616            Iterable[tuple[bytes, bytes]],
617        ] = (),
618    ) -> "Request":
619        """
620        Simplified API for creating request objects.
621        """
622        # Headers can be list or dict, we differentiate here.
623        if isinstance(headers, Headers):
624            pass
625        elif isinstance(headers, dict):
626            headers = Headers(
627                (
628                    always_bytes(k, "utf-8", "surrogateescape"),
629                    always_bytes(v, "utf-8", "surrogateescape"),
630                )
631                for k, v in headers.items()
632            )
633        elif isinstance(headers, Iterable):
634            headers = Headers(headers)  # type: ignore
635        else:
636            raise TypeError(
637                "Expected headers to be an iterable or dict, but is {}.".format(
638                    type(headers).__name__
639                )
640            )
641
642        req = cls(
643            "",
644            0,
645            method.encode("utf-8", "surrogateescape"),
646            b"",
647            b"",
648            b"",
649            b"HTTP/1.1",
650            headers,
651            b"",
652            None,
653            time.time(),
654            time.time(),
655        )
656
657        req.url = url
658        # Assign this manually to update the content-length header.
659        if isinstance(content, bytes):
660            req.content = content
661        elif isinstance(content, str):
662            req.text = content
663        else:
664            raise TypeError(
665                f"Expected content to be str or bytes, but is {type(content).__name__}."
666            )
667
668        return req

Simplified API for creating request objects.

first_line_format: str

Read-only: HTTP request form as defined in RFC 7230.

origin-form and asterisk-form are subsumed as "relative".

method: str

HTTP request method, e.g. "GET".

scheme: str

HTTP request scheme, which should be "http" or "https".

authority: str

HTTP request authority.

For HTTP/1, this is the authority portion of the request target (in either absolute-form or authority-form). For origin-form and asterisk-form requests, this property is set to an empty string.

For HTTP/2, this is the :authority pseudo header.

See also: Request.host, Request.host_header, Request.pretty_host

host: str

Target server for this request. This may be parsed from the raw request (e.g. from a GET http://example.com/ HTTP/1.1 request line) or inferred from the proxy mode (e.g. an IP in transparent mode).

Setting the host attribute also updates the host header and authority information, if present.

See also: Request.authority, Request.host_header, Request.pretty_host

host_header: Optional[str]

The request's host/authority header.

This property maps to either request.headers["Host"] or request.authority, depending on whether it's HTTP/1.x or HTTP/2.0.

See also: Request.authority,Request.host, Request.pretty_host

port: int

Target port.

path: str

HTTP request path, e.g. "/index.html". Usually starts with a slash, except for OPTIONS requests, which may just be "*".

url: str

The full URL string, constructed from Request.scheme, Request.host, Request.port and Request.path.

Settings this property updates these attributes as well.

pretty_host: str

Read-only: Like Request.host, but using Request.host_header header as an additional (preferred) data source. This is useful in transparent mode where Request.host is only an IP address.

Warning: When working in adversarial environments, this may not reflect the actual destination as the Host header could be spoofed.

pretty_url: str

Read-only: Like Request.url, but using Request.pretty_host instead of Request.host.

The request query as a mutable mapping view on the request's path. For the most part, this behaves like a dictionary. Modifications to the MultiDictView update Request.path, and vice versa.

The request cookies. For the most part, this behaves like a dictionary. Modifications to the MultiDictView update Request.headers, and vice versa.

path_components: tuple[str, ...]

The URL's path components as a tuple of strings. Components are unquoted.

def anticache(self) -> None:
917    def anticache(self) -> None:
918        """
919        Modifies this request to remove headers that might produce a cached response.
920        """
921        delheaders = (
922            "if-modified-since",
923            "if-none-match",
924        )
925        for i in delheaders:
926            self.headers.pop(i, None)

Modifies this request to remove headers that might produce a cached response.

def anticomp(self) -> None:
928    def anticomp(self) -> None:
929        """
930        Modify the Accept-Encoding header to only accept uncompressed responses.
931        """
932        self.headers["accept-encoding"] = "identity"

Modify the Accept-Encoding header to only accept uncompressed responses.

def constrain_encoding(self) -> None:
934    def constrain_encoding(self) -> None:
935        """
936        Limits the permissible Accept-Encoding values, based on what we can decode appropriately.
937        """
938        accept_encoding = self.headers.get("accept-encoding")
939        if accept_encoding:
940            self.headers["accept-encoding"] = ", ".join(
941                e
942                for e in {"gzip", "identity", "deflate", "br", "zstd"}
943                if e in accept_encoding
944            )

Limits the permissible Accept-Encoding values, based on what we can decode appropriately.

urlencoded_form: mitmproxy.coretypes.multidict.MultiDictView[str, str]

The URL-encoded form data.

If the content-type indicates non-form data or the form could not be parsed, this is set to an empty MultiDictView.

Modifications to the MultiDictView update Request.content, and vice versa.

multipart_form: mitmproxy.coretypes.multidict.MultiDictView[bytes, bytes]

The multipart form data.

If the content-type indicates non-form data or the form could not be parsed, this is set to an empty MultiDictView.

Modifications to the MultiDictView update Request.content, and vice versa.

class Response(Message):
1028class Response(Message):
1029    """
1030    An HTTP response.
1031    """
1032
1033    data: ResponseData
1034
1035    def __init__(
1036        self,
1037        http_version: bytes,
1038        status_code: int,
1039        reason: bytes,
1040        headers: Union[Headers, tuple[tuple[bytes, bytes], ...]],
1041        content: Optional[bytes],
1042        trailers: Union[None, Headers, tuple[tuple[bytes, bytes], ...]],
1043        timestamp_start: float,
1044        timestamp_end: Optional[float],
1045    ):
1046        # auto-convert invalid types to retain compatibility with older code.
1047        if isinstance(http_version, str):
1048            http_version = http_version.encode("ascii", "strict")
1049        if isinstance(reason, str):
1050            reason = reason.encode("ascii", "strict")
1051
1052        if isinstance(content, str):
1053            raise ValueError(f"Content must be bytes, not {type(content).__name__}")
1054        if not isinstance(headers, Headers):
1055            headers = Headers(headers)
1056        if trailers is not None and not isinstance(trailers, Headers):
1057            trailers = Headers(trailers)
1058
1059        self.data = ResponseData(
1060            http_version=http_version,
1061            status_code=status_code,
1062            reason=reason,
1063            headers=headers,
1064            content=content,
1065            trailers=trailers,
1066            timestamp_start=timestamp_start,
1067            timestamp_end=timestamp_end,
1068        )
1069
1070    def __repr__(self) -> str:
1071        if self.raw_content:
1072            ct = self.headers.get("content-type", "unknown content type")
1073            size = human.pretty_size(len(self.raw_content))
1074            details = f"{ct}, {size}"
1075        else:
1076            details = "no content"
1077        return f"Response({self.status_code}, {details})"
1078
1079    @classmethod
1080    def make(
1081        cls,
1082        status_code: int = 200,
1083        content: Union[bytes, str] = b"",
1084        headers: Union[
1085            Headers, Mapping[str, Union[str, bytes]], Iterable[tuple[bytes, bytes]]
1086        ] = (),
1087    ) -> "Response":
1088        """
1089        Simplified API for creating response objects.
1090        """
1091        if isinstance(headers, Headers):
1092            headers = headers
1093        elif isinstance(headers, dict):
1094            headers = Headers(
1095                (
1096                    always_bytes(k, "utf-8", "surrogateescape"),  # type: ignore
1097                    always_bytes(v, "utf-8", "surrogateescape"),
1098                )
1099                for k, v in headers.items()
1100            )
1101        elif isinstance(headers, Iterable):
1102            headers = Headers(headers)  # type: ignore
1103        else:
1104            raise TypeError(
1105                "Expected headers to be an iterable or dict, but is {}.".format(
1106                    type(headers).__name__
1107                )
1108            )
1109
1110        resp = cls(
1111            b"HTTP/1.1",
1112            status_code,
1113            status_codes.RESPONSES.get(status_code, "").encode(),
1114            headers,
1115            None,
1116            None,
1117            time.time(),
1118            time.time(),
1119        )
1120
1121        # Assign this manually to update the content-length header.
1122        if isinstance(content, bytes):
1123            resp.content = content
1124        elif isinstance(content, str):
1125            resp.text = content
1126        else:
1127            raise TypeError(
1128                f"Expected content to be str or bytes, but is {type(content).__name__}."
1129            )
1130
1131        return resp
1132
1133    @property
1134    def status_code(self) -> int:
1135        """
1136        HTTP Status Code, e.g. ``200``.
1137        """
1138        return self.data.status_code
1139
1140    @status_code.setter
1141    def status_code(self, status_code: int) -> None:
1142        self.data.status_code = status_code
1143
1144    @property
1145    def reason(self) -> str:
1146        """
1147        HTTP reason phrase, for example "Not Found".
1148
1149        HTTP/2 responses do not contain a reason phrase, an empty string will be returned instead.
1150        """
1151        # Encoding: http://stackoverflow.com/a/16674906/934719
1152        return self.data.reason.decode("ISO-8859-1")
1153
1154    @reason.setter
1155    def reason(self, reason: Union[str, bytes]) -> None:
1156        self.data.reason = strutils.always_bytes(reason, "ISO-8859-1")
1157
1158    def _get_cookies(self):
1159        h = self.headers.get_all("set-cookie")
1160        all_cookies = cookies.parse_set_cookie_headers(h)
1161        return tuple((name, (value, attrs)) for name, value, attrs in all_cookies)
1162
1163    def _set_cookies(self, value):
1164        cookie_headers = []
1165        for k, v in value:
1166            header = cookies.format_set_cookie_header([(k, v[0], v[1])])
1167            cookie_headers.append(header)
1168        self.headers.set_all("set-cookie", cookie_headers)
1169
1170    @property
1171    def cookies(
1172        self,
1173    ) -> multidict.MultiDictView[
1174        str, tuple[str, multidict.MultiDict[str, Optional[str]]]
1175    ]:
1176        """
1177        The response cookies. A possibly empty `MultiDictView`, where the keys are cookie
1178        name strings, and values are `(cookie value, attributes)` tuples. Within
1179        attributes, unary attributes (e.g. `HTTPOnly`) are indicated by a `None` value.
1180        Modifications to the MultiDictView update `Response.headers`, and vice versa.
1181
1182        *Warning:* Changes to `attributes` will not be picked up unless you also reassign
1183        the `(cookie value, attributes)` tuple directly in the `MultiDictView`.
1184        """
1185        return multidict.MultiDictView(self._get_cookies, self._set_cookies)
1186
1187    @cookies.setter
1188    def cookies(self, value):
1189        self._set_cookies(value)
1190
1191    def refresh(self, now=None):
1192        """
1193        This fairly complex and heuristic function refreshes a server
1194        response for replay.
1195
1196         - It adjusts date, expires, and last-modified headers.
1197         - It adjusts cookie expiration.
1198        """
1199        if not now:
1200            now = time.time()
1201        delta = now - self.timestamp_start
1202        refresh_headers = [
1203            "date",
1204            "expires",
1205            "last-modified",
1206        ]
1207        for i in refresh_headers:
1208            if i in self.headers:
1209                d = parsedate_tz(self.headers[i])
1210                if d:
1211                    new = mktime_tz(d) + delta
1212                    try:
1213                        self.headers[i] = formatdate(new, usegmt=True)
1214                    except OSError:  # pragma: no cover
1215                        pass  # value out of bounds on Windows only (which is why we exclude it from coverage).
1216        c = []
1217        for set_cookie_header in self.headers.get_all("set-cookie"):
1218            try:
1219                refreshed = cookies.refresh_set_cookie_header(set_cookie_header, delta)
1220            except ValueError:
1221                refreshed = set_cookie_header
1222            c.append(refreshed)
1223        if c:
1224            self.headers.set_all("set-cookie", c)

An HTTP response.

Response( http_version: bytes, status_code: int, reason: bytes, headers: Union[mitmproxy.http.Headers, tuple[tuple[bytes, bytes], ...]], content: Optional[bytes], trailers: Union[NoneType, mitmproxy.http.Headers, tuple[tuple[bytes, bytes], ...]], timestamp_start: float, timestamp_end: Optional[float])
1035    def __init__(
1036        self,
1037        http_version: bytes,
1038        status_code: int,
1039        reason: bytes,
1040        headers: Union[Headers, tuple[tuple[bytes, bytes], ...]],
1041        content: Optional[bytes],
1042        trailers: Union[None, Headers, tuple[tuple[bytes, bytes], ...]],
1043        timestamp_start: float,
1044        timestamp_end: Optional[float],
1045    ):
1046        # auto-convert invalid types to retain compatibility with older code.
1047        if isinstance(http_version, str):
1048            http_version = http_version.encode("ascii", "strict")
1049        if isinstance(reason, str):
1050            reason = reason.encode("ascii", "strict")
1051
1052        if isinstance(content, str):
1053            raise ValueError(f"Content must be bytes, not {type(content).__name__}")
1054        if not isinstance(headers, Headers):
1055            headers = Headers(headers)
1056        if trailers is not None and not isinstance(trailers, Headers):
1057            trailers = Headers(trailers)
1058
1059        self.data = ResponseData(
1060            http_version=http_version,
1061            status_code=status_code,
1062            reason=reason,
1063            headers=headers,
1064            content=content,
1065            trailers=trailers,
1066            timestamp_start=timestamp_start,
1067            timestamp_end=timestamp_end,
1068        )
@classmethod
def make( cls, status_code: int = 200, content: Union[bytes, str] = b'', headers: Union[mitmproxy.http.Headers, Mapping[str, Union[str, bytes]], Iterable[tuple[bytes, bytes]]] = ()) -> mitmproxy.http.Response:
1079    @classmethod
1080    def make(
1081        cls,
1082        status_code: int = 200,
1083        content: Union[bytes, str] = b"",
1084        headers: Union[
1085            Headers, Mapping[str, Union[str, bytes]], Iterable[tuple[bytes, bytes]]
1086        ] = (),
1087    ) -> "Response":
1088        """
1089        Simplified API for creating response objects.
1090        """
1091        if isinstance(headers, Headers):
1092            headers = headers
1093        elif isinstance(headers, dict):
1094            headers = Headers(
1095                (
1096                    always_bytes(k, "utf-8", "surrogateescape"),  # type: ignore
1097                    always_bytes(v, "utf-8", "surrogateescape"),
1098                )
1099                for k, v in headers.items()
1100            )
1101        elif isinstance(headers, Iterable):
1102            headers = Headers(headers)  # type: ignore
1103        else:
1104            raise TypeError(
1105                "Expected headers to be an iterable or dict, but is {}.".format(
1106                    type(headers).__name__
1107                )
1108            )
1109
1110        resp = cls(
1111            b"HTTP/1.1",
1112            status_code,
1113            status_codes.RESPONSES.get(status_code, "").encode(),
1114            headers,
1115            None,
1116            None,
1117            time.time(),
1118            time.time(),
1119        )
1120
1121        # Assign this manually to update the content-length header.
1122        if isinstance(content, bytes):
1123            resp.content = content
1124        elif isinstance(content, str):
1125            resp.text = content
1126        else:
1127            raise TypeError(
1128                f"Expected content to be str or bytes, but is {type(content).__name__}."
1129            )
1130
1131        return resp

Simplified API for creating response objects.

status_code: int

HTTP Status Code, e.g. 200.

reason: str

HTTP reason phrase, for example "Not Found".

HTTP/2 responses do not contain a reason phrase, an empty string will be returned instead.

cookies: mitmproxy.coretypes.multidict.MultiDictView[str, tuple[str, mitmproxy.coretypes.multidict.MultiDict[str, typing.Optional[str]]]]

The response cookies. A possibly empty MultiDictView, where the keys are cookie name strings, and values are (cookie value, attributes) tuples. Within attributes, unary attributes (e.g. HTTPOnly) are indicated by a None value. Modifications to the MultiDictView update Response.headers, and vice versa.

Warning: Changes to attributes will not be picked up unless you also reassign the (cookie value, attributes) tuple directly in the MultiDictView.

def refresh(self, now=None):
1191    def refresh(self, now=None):
1192        """
1193        This fairly complex and heuristic function refreshes a server
1194        response for replay.
1195
1196         - It adjusts date, expires, and last-modified headers.
1197         - It adjusts cookie expiration.
1198        """
1199        if not now:
1200            now = time.time()
1201        delta = now - self.timestamp_start
1202        refresh_headers = [
1203            "date",
1204            "expires",
1205            "last-modified",
1206        ]
1207        for i in refresh_headers:
1208            if i in self.headers:
1209                d = parsedate_tz(self.headers[i])
1210                if d:
1211                    new = mktime_tz(d) + delta
1212                    try:
1213                        self.headers[i] = formatdate(new, usegmt=True)
1214                    except OSError:  # pragma: no cover
1215                        pass  # value out of bounds on Windows only (which is why we exclude it from coverage).
1216        c = []
1217        for set_cookie_header in self.headers.get_all("set-cookie"):
1218            try:
1219                refreshed = cookies.refresh_set_cookie_header(set_cookie_header, delta)
1220            except ValueError:
1221                refreshed = set_cookie_header
1222            c.append(refreshed)
1223        if c:
1224            self.headers.set_all("set-cookie", c)

This fairly complex and heuristic function refreshes a server response for replay.

  • It adjusts date, expires, and last-modified headers.
  • It adjusts cookie expiration.
class Headers(mitmproxy.coretypes.multidict._MultiDict[~KT, ~VT], mitmproxy.coretypes.serializable.Serializable):
 51class Headers(multidict.MultiDict):  # type: ignore
 52    """
 53    Header class which allows both convenient access to individual headers as well as
 54    direct access to the underlying raw data. Provides a full dictionary interface.
 55
 56    Create headers with keyword arguments:
 57    >>> h = Headers(host="example.com", content_type="application/xml")
 58
 59    Headers mostly behave like a normal dict:
 60    >>> h["Host"]
 61    "example.com"
 62
 63    Headers are case insensitive:
 64    >>> h["host"]
 65    "example.com"
 66
 67    Headers can also be created from a list of raw (header_name, header_value) byte tuples:
 68    >>> h = Headers([
 69        (b"Host",b"example.com"),
 70        (b"Accept",b"text/html"),
 71        (b"accept",b"application/xml")
 72    ])
 73
 74    Multiple headers are folded into a single header as per RFC 7230:
 75    >>> h["Accept"]
 76    "text/html, application/xml"
 77
 78    Setting a header removes all existing headers with the same name:
 79    >>> h["Accept"] = "application/text"
 80    >>> h["Accept"]
 81    "application/text"
 82
 83    `bytes(h)` returns an HTTP/1 header block:
 84    >>> print(bytes(h))
 85    Host: example.com
 86    Accept: application/text
 87
 88    For full control, the raw header fields can be accessed:
 89    >>> h.fields
 90
 91    Caveats:
 92     - For use with the "Set-Cookie" and "Cookie" headers, either use `Response.cookies` or see `Headers.get_all`.
 93    """
 94
 95    def __init__(self, fields: Iterable[tuple[bytes, bytes]] = (), **headers):
 96        """
 97        *Args:*
 98         - *fields:* (optional) list of ``(name, value)`` header byte tuples,
 99           e.g. ``[(b"Host", b"example.com")]``. All names and values must be bytes.
100         - *\\*\\*headers:* Additional headers to set. Will overwrite existing values from `fields`.
101           For convenience, underscores in header names will be transformed to dashes -
102           this behaviour does not extend to other methods.
103
104        If ``**headers`` contains multiple keys that have equal ``.lower()`` representations,
105        the behavior is undefined.
106        """
107        super().__init__(fields)
108
109        for key, value in self.fields:
110            if not isinstance(key, bytes) or not isinstance(value, bytes):
111                raise TypeError("Header fields must be bytes.")
112
113        # content_type -> content-type
114        self.update(
115            {
116                _always_bytes(name).replace(b"_", b"-"): _always_bytes(value)
117                for name, value in headers.items()
118            }
119        )
120
121    fields: tuple[tuple[bytes, bytes], ...]
122
123    @staticmethod
124    def _reduce_values(values) -> str:
125        # Headers can be folded
126        return ", ".join(values)
127
128    @staticmethod
129    def _kconv(key) -> str:
130        # Headers are case-insensitive
131        return key.lower()
132
133    def __bytes__(self) -> bytes:
134        if self.fields:
135            return b"\r\n".join(b": ".join(field) for field in self.fields) + b"\r\n"
136        else:
137            return b""
138
139    def __delitem__(self, key: Union[str, bytes]) -> None:
140        key = _always_bytes(key)
141        super().__delitem__(key)
142
143    def __iter__(self) -> Iterator[str]:
144        for x in super().__iter__():
145            yield _native(x)
146
147    def get_all(self, name: Union[str, bytes]) -> list[str]:
148        """
149        Like `Headers.get`, but does not fold multiple headers into a single one.
150        This is useful for Set-Cookie and Cookie headers, which do not support folding.
151
152        *See also:*
153         - <https://tools.ietf.org/html/rfc7230#section-3.2.2>
154         - <https://datatracker.ietf.org/doc/html/rfc6265#section-5.4>
155         - <https://datatracker.ietf.org/doc/html/rfc7540#section-8.1.2.5>
156        """
157        name = _always_bytes(name)
158        return [_native(x) for x in super().get_all(name)]
159
160    def set_all(self, name: Union[str, bytes], values: list[Union[str, bytes]]):
161        """
162        Explicitly set multiple headers for the given key.
163        See `Headers.get_all`.
164        """
165        name = _always_bytes(name)
166        values = [_always_bytes(x) for x in values]
167        return super().set_all(name, values)
168
169    def insert(self, index: int, key: Union[str, bytes], value: Union[str, bytes]):
170        key = _always_bytes(key)
171        value = _always_bytes(value)
172        super().insert(index, key, value)
173
174    def items(self, multi=False):
175        if multi:
176            return ((_native(k), _native(v)) for k, v in self.fields)
177        else:
178            return super().items()

Header class which allows both convenient access to individual headers as well as direct access to the underlying raw data. Provides a full dictionary interface.

Create headers with keyword arguments:

>>> h = Headers(host="example.com", content_type="application/xml")

Headers mostly behave like a normal dict:

>>> h["Host"]
"example.com"

Headers are case insensitive:

>>> h["host"]
"example.com"

Headers can also be created from a list of raw (header_name, header_value) byte tuples:

>>> h = Headers([
    (b"Host",b"example.com"),
    (b"Accept",b"text/html"),
    (b"accept",b"application/xml")
])

Multiple headers are folded into a single header as per RFC 7230:

>>> h["Accept"]
"text/html, application/xml"

Setting a header removes all existing headers with the same name:

>>> h["Accept"] = "application/text"
>>> h["Accept"]
"application/text"

bytes(h) returns an HTTP/1 header block:

>>> print(bytes(h))
Host: example.com
Accept: application/text

For full control, the raw header fields can be accessed:

>>> h.fields

Caveats:

Headers(fields: Iterable[tuple[bytes, bytes]] = (), **headers)
 95    def __init__(self, fields: Iterable[tuple[bytes, bytes]] = (), **headers):
 96        """
 97        *Args:*
 98         - *fields:* (optional) list of ``(name, value)`` header byte tuples,
 99           e.g. ``[(b"Host", b"example.com")]``. All names and values must be bytes.
100         - *\\*\\*headers:* Additional headers to set. Will overwrite existing values from `fields`.
101           For convenience, underscores in header names will be transformed to dashes -
102           this behaviour does not extend to other methods.
103
104        If ``**headers`` contains multiple keys that have equal ``.lower()`` representations,
105        the behavior is undefined.
106        """
107        super().__init__(fields)
108
109        for key, value in self.fields:
110            if not isinstance(key, bytes) or not isinstance(value, bytes):
111                raise TypeError("Header fields must be bytes.")
112
113        # content_type -> content-type
114        self.update(
115            {
116                _always_bytes(name).replace(b"_", b"-"): _always_bytes(value)
117                for name, value in headers.items()
118            }
119        )

Args:

  • fields: (optional) list of (name, value) header byte tuples, e.g. [(b"Host", b"example.com")]. All names and values must be bytes.
  • **headers: Additional headers to set. Will overwrite existing values from fields. For convenience, underscores in header names will be transformed to dashes - this behaviour does not extend to other methods.

If **headers contains multiple keys that have equal .lower() representations, the behavior is undefined.

fields: tuple[tuple[bytes, bytes], ...]

The underlying raw datastructure.

def get_all(self, name: Union[str, bytes]) -> list[str]:
147    def get_all(self, name: Union[str, bytes]) -> list[str]:
148        """
149        Like `Headers.get`, but does not fold multiple headers into a single one.
150        This is useful for Set-Cookie and Cookie headers, which do not support folding.
151
152        *See also:*
153         - <https://tools.ietf.org/html/rfc7230#section-3.2.2>
154         - <https://datatracker.ietf.org/doc/html/rfc6265#section-5.4>
155         - <https://datatracker.ietf.org/doc/html/rfc7540#section-8.1.2.5>
156        """
157        name = _always_bytes(name)
158        return [_native(x) for x in super().get_all(name)]

Like Headers.get, but does not fold multiple headers into a single one. This is useful for Set-Cookie and Cookie headers, which do not support folding.

See also:

def set_all( self, name: Union[str, bytes], values: list[typing.Union[str, bytes]]):
160    def set_all(self, name: Union[str, bytes], values: list[Union[str, bytes]]):
161        """
162        Explicitly set multiple headers for the given key.
163        See `Headers.get_all`.
164        """
165        name = _always_bytes(name)
166        values = [_always_bytes(x) for x in values]
167        return super().set_all(name, values)

Explicitly set multiple headers for the given key. See Headers.get_all.

def insert(self, index: int, key: Union[str, bytes], value: Union[str, bytes]):
169    def insert(self, index: int, key: Union[str, bytes], value: Union[str, bytes]):
170        key = _always_bytes(key)
171        value = _always_bytes(value)
172        super().insert(index, key, value)

Insert an additional value for the given key at the specified position.

def items(self, multi=False):
174    def items(self, multi=False):
175        if multi:
176            return ((_native(k), _native(v)) for k, v in self.fields)
177        else:
178            return super().items()

Get all (key, value) tuples.

If multi is True, all (key, value) pairs will be returned. If False, only one tuple per key is returned.

Inherited Members
mitmproxy.coretypes.serializable.Serializable
copy
collections.abc.MutableMapping
pop
popitem
clear
update
setdefault
collections.abc.Mapping
get