mitmproxy.http
1import binascii 2import os 3import re 4import time 5import urllib.parse 6import json 7import warnings 8from dataclasses import dataclass 9from dataclasses import fields 10from email.utils import formatdate 11from email.utils import mktime_tz 12from email.utils import parsedate_tz 13from typing import Callable 14from typing import Iterable 15from typing import Iterator 16from typing import Mapping 17from typing import Optional 18from typing import Union 19from typing import cast 20from typing import Any 21 22from mitmproxy import flow 23from mitmproxy.websocket import WebSocketData 24from mitmproxy.coretypes import multidict 25from mitmproxy.coretypes import serializable 26from mitmproxy.net import encoding 27from mitmproxy.net.http import cookies 28from mitmproxy.net.http import multipart 29from mitmproxy.net.http import status_codes 30from mitmproxy.net.http import url 31from mitmproxy.net.http.headers import assemble_content_type 32from mitmproxy.net.http.headers import parse_content_type 33from mitmproxy.utils import human 34from mitmproxy.utils import strutils 35from mitmproxy.utils import typecheck 36from mitmproxy.utils.strutils import always_bytes 37from mitmproxy.utils.strutils import always_str 38 39 40# While headers _should_ be ASCII, it's not uncommon for certain headers to be utf-8 encoded. 41def _native(x: bytes) -> str: 42 return x.decode("utf-8", "surrogateescape") 43 44 45def _always_bytes(x: Union[str, bytes]) -> bytes: 46 return strutils.always_bytes(x, "utf-8", "surrogateescape") 47 48 49# This cannot be easily typed with mypy yet, so we just specify MultiDict without concrete types. 50class Headers(multidict.MultiDict): # type: ignore 51 """ 52 Header class which allows both convenient access to individual headers as well as 53 direct access to the underlying raw data. Provides a full dictionary interface. 54 55 Create headers with keyword arguments: 56 >>> h = Headers(host="example.com", content_type="application/xml") 57 58 Headers mostly behave like a normal dict: 59 >>> h["Host"] 60 "example.com" 61 62 Headers are case insensitive: 63 >>> h["host"] 64 "example.com" 65 66 Headers can also be created from a list of raw (header_name, header_value) byte tuples: 67 >>> h = Headers([ 68 (b"Host",b"example.com"), 69 (b"Accept",b"text/html"), 70 (b"accept",b"application/xml") 71 ]) 72 73 Multiple headers are folded into a single header as per RFC 7230: 74 >>> h["Accept"] 75 "text/html, application/xml" 76 77 Setting a header removes all existing headers with the same name: 78 >>> h["Accept"] = "application/text" 79 >>> h["Accept"] 80 "application/text" 81 82 `bytes(h)` returns an HTTP/1 header block: 83 >>> print(bytes(h)) 84 Host: example.com 85 Accept: application/text 86 87 For full control, the raw header fields can be accessed: 88 >>> h.fields 89 90 Caveats: 91 - For use with the "Set-Cookie" and "Cookie" headers, either use `Response.cookies` or see `Headers.get_all`. 92 """ 93 94 def __init__(self, fields: Iterable[tuple[bytes, bytes]] = (), **headers): 95 """ 96 *Args:* 97 - *fields:* (optional) list of ``(name, value)`` header byte tuples, 98 e.g. ``[(b"Host", b"example.com")]``. All names and values must be bytes. 99 - *\\*\\*headers:* Additional headers to set. Will overwrite existing values from `fields`. 100 For convenience, underscores in header names will be transformed to dashes - 101 this behaviour does not extend to other methods. 102 103 If ``**headers`` contains multiple keys that have equal ``.lower()`` representations, 104 the behavior is undefined. 105 """ 106 super().__init__(fields) 107 108 for key, value in self.fields: 109 if not isinstance(key, bytes) or not isinstance(value, bytes): 110 raise TypeError("Header fields must be bytes.") 111 112 # content_type -> content-type 113 self.update( 114 { 115 _always_bytes(name).replace(b"_", b"-"): _always_bytes(value) 116 for name, value in headers.items() 117 } 118 ) 119 120 fields: tuple[tuple[bytes, bytes], ...] 121 122 @staticmethod 123 def _reduce_values(values) -> str: 124 # Headers can be folded 125 return ", ".join(values) 126 127 @staticmethod 128 def _kconv(key) -> str: 129 # Headers are case-insensitive 130 return key.lower() 131 132 def __bytes__(self) -> bytes: 133 if self.fields: 134 return b"\r\n".join(b": ".join(field) for field in self.fields) + b"\r\n" 135 else: 136 return b"" 137 138 def __delitem__(self, key: Union[str, bytes]) -> None: 139 key = _always_bytes(key) 140 super().__delitem__(key) 141 142 def __iter__(self) -> Iterator[str]: 143 for x in super().__iter__(): 144 yield _native(x) 145 146 def get_all(self, name: Union[str, bytes]) -> list[str]: 147 """ 148 Like `Headers.get`, but does not fold multiple headers into a single one. 149 This is useful for Set-Cookie and Cookie headers, which do not support folding. 150 151 *See also:* 152 - <https://tools.ietf.org/html/rfc7230#section-3.2.2> 153 - <https://datatracker.ietf.org/doc/html/rfc6265#section-5.4> 154 - <https://datatracker.ietf.org/doc/html/rfc7540#section-8.1.2.5> 155 """ 156 name = _always_bytes(name) 157 return [_native(x) for x in super().get_all(name)] 158 159 def set_all(self, name: Union[str, bytes], values: list[Union[str, bytes]]): 160 """ 161 Explicitly set multiple headers for the given key. 162 See `Headers.get_all`. 163 """ 164 name = _always_bytes(name) 165 values = [_always_bytes(x) for x in values] 166 return super().set_all(name, values) 167 168 def insert(self, index: int, key: Union[str, bytes], value: Union[str, bytes]): 169 key = _always_bytes(key) 170 value = _always_bytes(value) 171 super().insert(index, key, value) 172 173 def items(self, multi=False): 174 if multi: 175 return ((_native(k), _native(v)) for k, v in self.fields) 176 else: 177 return super().items() 178 179 180@dataclass 181class MessageData(serializable.Serializable): 182 http_version: bytes 183 headers: Headers 184 content: Optional[bytes] 185 trailers: Optional[Headers] 186 timestamp_start: float 187 timestamp_end: Optional[float] 188 189 # noinspection PyUnreachableCode 190 if __debug__: 191 192 def __post_init__(self): 193 for field in fields(self): 194 val = getattr(self, field.name) 195 typecheck.check_option_type(field.name, val, field.type) 196 197 def set_state(self, state): 198 for k, v in state.items(): 199 if k in ("headers", "trailers") and v is not None: 200 v = Headers.from_state(v) 201 setattr(self, k, v) 202 203 def get_state(self): 204 state = vars(self).copy() 205 state["headers"] = state["headers"].get_state() 206 if state["trailers"] is not None: 207 state["trailers"] = state["trailers"].get_state() 208 return state 209 210 @classmethod 211 def from_state(cls, state): 212 state["headers"] = Headers.from_state(state["headers"]) 213 if state["trailers"] is not None: 214 state["trailers"] = Headers.from_state(state["trailers"]) 215 return cls(**state) 216 217 218@dataclass 219class RequestData(MessageData): 220 host: str 221 port: int 222 method: bytes 223 scheme: bytes 224 authority: bytes 225 path: bytes 226 227 228@dataclass 229class ResponseData(MessageData): 230 status_code: int 231 reason: bytes 232 233 234class Message(serializable.Serializable): 235 """Base class for `Request` and `Response`.""" 236 237 @classmethod 238 def from_state(cls, state): 239 return cls(**state) 240 241 def get_state(self): 242 return self.data.get_state() 243 244 def set_state(self, state): 245 self.data.set_state(state) 246 247 data: MessageData 248 stream: Union[Callable[[bytes], Union[Iterable[bytes], bytes]], bool] = False 249 """ 250 This attribute controls if the message body should be streamed. 251 252 If `False`, mitmproxy will buffer the entire body before forwarding it to the destination. 253 This makes it possible to perform string replacements on the entire body. 254 If `True`, the message body will not be buffered on the proxy 255 but immediately forwarded instead. 256 Alternatively, a transformation function can be specified, which will be called for each chunk of data. 257 Please note that packet boundaries generally should not be relied upon. 258 259 This attribute must be set in the `requestheaders` or `responseheaders` hook. 260 Setting it in `request` or `response` is already too late, mitmproxy has buffered the message body already. 261 """ 262 263 @property 264 def http_version(self) -> str: 265 """ 266 HTTP version string, for example `HTTP/1.1`. 267 """ 268 return self.data.http_version.decode("utf-8", "surrogateescape") 269 270 @http_version.setter 271 def http_version(self, http_version: Union[str, bytes]) -> None: 272 self.data.http_version = strutils.always_bytes( 273 http_version, "utf-8", "surrogateescape" 274 ) 275 276 @property 277 def is_http10(self) -> bool: 278 return self.data.http_version == b"HTTP/1.0" 279 280 @property 281 def is_http11(self) -> bool: 282 return self.data.http_version == b"HTTP/1.1" 283 284 @property 285 def is_http2(self) -> bool: 286 return self.data.http_version == b"HTTP/2.0" 287 288 @property 289 def headers(self) -> Headers: 290 """ 291 The HTTP headers. 292 """ 293 return self.data.headers 294 295 @headers.setter 296 def headers(self, h: Headers) -> None: 297 self.data.headers = h 298 299 @property 300 def trailers(self) -> Optional[Headers]: 301 """ 302 The [HTTP trailers](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Trailer). 303 """ 304 return self.data.trailers 305 306 @trailers.setter 307 def trailers(self, h: Optional[Headers]) -> None: 308 self.data.trailers = h 309 310 @property 311 def raw_content(self) -> Optional[bytes]: 312 """ 313 The raw (potentially compressed) HTTP message body. 314 315 In contrast to `Message.content` and `Message.text`, accessing this property never raises. 316 317 *See also:* `Message.content`, `Message.text` 318 """ 319 return self.data.content 320 321 @raw_content.setter 322 def raw_content(self, content: Optional[bytes]) -> None: 323 self.data.content = content 324 325 @property 326 def content(self) -> Optional[bytes]: 327 """ 328 The uncompressed HTTP message body as bytes. 329 330 Accessing this attribute may raise a `ValueError` when the HTTP content-encoding is invalid. 331 332 *See also:* `Message.raw_content`, `Message.text` 333 """ 334 return self.get_content() 335 336 @content.setter 337 def content(self, value: Optional[bytes]) -> None: 338 self.set_content(value) 339 340 @property 341 def text(self) -> Optional[str]: 342 """ 343 The uncompressed and decoded HTTP message body as text. 344 345 Accessing this attribute may raise a `ValueError` when either content-encoding or charset is invalid. 346 347 *See also:* `Message.raw_content`, `Message.content` 348 """ 349 return self.get_text() 350 351 @text.setter 352 def text(self, value: Optional[str]) -> None: 353 self.set_text(value) 354 355 def set_content(self, value: Optional[bytes]) -> None: 356 if value is None: 357 self.raw_content = None 358 return 359 if not isinstance(value, bytes): 360 raise TypeError( 361 f"Message content must be bytes, not {type(value).__name__}. " 362 "Please use .text if you want to assign a str." 363 ) 364 ce = self.headers.get("content-encoding") 365 try: 366 self.raw_content = encoding.encode(value, ce or "identity") 367 except ValueError: 368 # So we have an invalid content-encoding? 369 # Let's remove it! 370 del self.headers["content-encoding"] 371 self.raw_content = value 372 373 if "transfer-encoding" in self.headers: 374 # https://httpwg.org/specs/rfc7230.html#header.content-length 375 # don't set content-length if a transfer-encoding is provided 376 pass 377 else: 378 self.headers["content-length"] = str(len(self.raw_content)) 379 380 def get_content(self, strict: bool = True) -> Optional[bytes]: 381 """ 382 Similar to `Message.content`, but does not raise if `strict` is `False`. 383 Instead, the compressed message body is returned as-is. 384 """ 385 if self.raw_content is None: 386 return None 387 ce = self.headers.get("content-encoding") 388 if ce: 389 try: 390 content = encoding.decode(self.raw_content, ce) 391 # A client may illegally specify a byte -> str encoding here (e.g. utf8) 392 if isinstance(content, str): 393 raise ValueError(f"Invalid Content-Encoding: {ce}") 394 return content 395 except ValueError: 396 if strict: 397 raise 398 return self.raw_content 399 else: 400 return self.raw_content 401 402 def _get_content_type_charset(self) -> Optional[str]: 403 ct = parse_content_type(self.headers.get("content-type", "")) 404 if ct: 405 return ct[2].get("charset") 406 return None 407 408 def _guess_encoding(self, content: bytes = b"") -> str: 409 enc = self._get_content_type_charset() 410 if not enc: 411 if "json" in self.headers.get("content-type", ""): 412 enc = "utf8" 413 if not enc: 414 if "html" in self.headers.get("content-type", ""): 415 meta_charset = re.search( 416 rb"""<meta[^>]+charset=['"]?([^'">]+)""", content, re.IGNORECASE 417 ) 418 if meta_charset: 419 enc = meta_charset.group(1).decode("ascii", "ignore") 420 if not enc: 421 if "text/css" in self.headers.get("content-type", ""): 422 # @charset rule must be the very first thing. 423 css_charset = re.match( 424 rb"""@charset "([^"]+)";""", content, re.IGNORECASE 425 ) 426 if css_charset: 427 enc = css_charset.group(1).decode("ascii", "ignore") 428 if not enc: 429 enc = "latin-1" 430 # Use GB 18030 as the superset of GB2312 and GBK to fix common encoding problems on Chinese websites. 431 if enc.lower() in ("gb2312", "gbk"): 432 enc = "gb18030" 433 434 return enc 435 436 def set_text(self, text: Optional[str]) -> None: 437 if text is None: 438 self.content = None 439 return 440 enc = self._guess_encoding() 441 442 try: 443 self.content = cast(bytes, encoding.encode(text, enc)) 444 except ValueError: 445 # Fall back to UTF-8 and update the content-type header. 446 ct = parse_content_type(self.headers.get("content-type", "")) or ( 447 "text", 448 "plain", 449 {}, 450 ) 451 ct[2]["charset"] = "utf-8" 452 self.headers["content-type"] = assemble_content_type(*ct) 453 enc = "utf8" 454 self.content = text.encode(enc, "surrogateescape") 455 456 def get_text(self, strict: bool = True) -> Optional[str]: 457 """ 458 Similar to `Message.text`, but does not raise if `strict` is `False`. 459 Instead, the message body is returned as surrogate-escaped UTF-8. 460 """ 461 content = self.get_content(strict) 462 if content is None: 463 return None 464 enc = self._guess_encoding(content) 465 try: 466 return cast(str, encoding.decode(content, enc)) 467 except ValueError: 468 if strict: 469 raise 470 return content.decode("utf8", "surrogateescape") 471 472 @property 473 def timestamp_start(self) -> float: 474 """ 475 *Timestamp:* Headers received. 476 """ 477 return self.data.timestamp_start 478 479 @timestamp_start.setter 480 def timestamp_start(self, timestamp_start: float) -> None: 481 self.data.timestamp_start = timestamp_start 482 483 @property 484 def timestamp_end(self) -> Optional[float]: 485 """ 486 *Timestamp:* Last byte received. 487 """ 488 return self.data.timestamp_end 489 490 @timestamp_end.setter 491 def timestamp_end(self, timestamp_end: Optional[float]): 492 self.data.timestamp_end = timestamp_end 493 494 def decode(self, strict: bool = True) -> None: 495 """ 496 Decodes body based on the current Content-Encoding header, then 497 removes the header. If there is no Content-Encoding header, no 498 action is taken. 499 500 *Raises:* 501 - `ValueError`, when the content-encoding is invalid and strict is True. 502 """ 503 decoded = self.get_content(strict) 504 self.headers.pop("content-encoding", None) 505 self.content = decoded 506 507 def encode(self, encoding: str) -> None: 508 """ 509 Encodes body with the given encoding, where e is "gzip", "deflate", "identity", "br", or "zstd". 510 Any existing content-encodings are overwritten, the content is not decoded beforehand. 511 512 *Raises:* 513 - `ValueError`, when the specified content-encoding is invalid. 514 """ 515 self.headers["content-encoding"] = encoding 516 self.content = self.raw_content 517 if "content-encoding" not in self.headers: 518 raise ValueError(f"Invalid content encoding {repr(encoding)}") 519 520 def json(self, **kwargs: Any) -> Any: 521 """ 522 Returns the JSON encoded content of the response, if any. 523 `**kwargs` are optional arguments that will be 524 passed to `json.loads()`. 525 526 Will raise if the content can not be decoded and then parsed as JSON. 527 528 *Raises:* 529 - `json.decoder.JSONDecodeError` if content is not valid JSON. 530 - `TypeError` if the content is not available, for example because the response 531 has been streamed. 532 """ 533 content = self.get_content(strict=False) 534 if content is None: 535 raise TypeError("Message content is not available.") 536 else: 537 return json.loads(content, **kwargs) 538 539 540class Request(Message): 541 """ 542 An HTTP request. 543 """ 544 545 data: RequestData 546 547 def __init__( 548 self, 549 host: str, 550 port: int, 551 method: bytes, 552 scheme: bytes, 553 authority: bytes, 554 path: bytes, 555 http_version: bytes, 556 headers: Union[Headers, tuple[tuple[bytes, bytes], ...]], 557 content: Optional[bytes], 558 trailers: Union[Headers, tuple[tuple[bytes, bytes], ...], None], 559 timestamp_start: float, 560 timestamp_end: Optional[float], 561 ): 562 # auto-convert invalid types to retain compatibility with older code. 563 if isinstance(host, bytes): 564 host = host.decode("idna", "strict") 565 if isinstance(method, str): 566 method = method.encode("ascii", "strict") 567 if isinstance(scheme, str): 568 scheme = scheme.encode("ascii", "strict") 569 if isinstance(authority, str): 570 authority = authority.encode("ascii", "strict") 571 if isinstance(path, str): 572 path = path.encode("ascii", "strict") 573 if isinstance(http_version, str): 574 http_version = http_version.encode("ascii", "strict") 575 576 if isinstance(content, str): 577 raise ValueError(f"Content must be bytes, not {type(content).__name__}") 578 if not isinstance(headers, Headers): 579 headers = Headers(headers) 580 if trailers is not None and not isinstance(trailers, Headers): 581 trailers = Headers(trailers) 582 583 self.data = RequestData( 584 host=host, 585 port=port, 586 method=method, 587 scheme=scheme, 588 authority=authority, 589 path=path, 590 http_version=http_version, 591 headers=headers, 592 content=content, 593 trailers=trailers, 594 timestamp_start=timestamp_start, 595 timestamp_end=timestamp_end, 596 ) 597 598 def __repr__(self) -> str: 599 if self.host and self.port: 600 hostport = f"{self.host}:{self.port}" 601 else: 602 hostport = "" 603 path = self.path or "" 604 return f"Request({self.method} {hostport}{path})" 605 606 @classmethod 607 def make( 608 cls, 609 method: str, 610 url: str, 611 content: Union[bytes, str] = "", 612 headers: Union[ 613 Headers, 614 dict[Union[str, bytes], Union[str, bytes]], 615 Iterable[tuple[bytes, bytes]], 616 ] = (), 617 ) -> "Request": 618 """ 619 Simplified API for creating request objects. 620 """ 621 # Headers can be list or dict, we differentiate here. 622 if isinstance(headers, Headers): 623 pass 624 elif isinstance(headers, dict): 625 headers = Headers( 626 ( 627 always_bytes(k, "utf-8", "surrogateescape"), 628 always_bytes(v, "utf-8", "surrogateescape"), 629 ) 630 for k, v in headers.items() 631 ) 632 elif isinstance(headers, Iterable): 633 headers = Headers(headers) # type: ignore 634 else: 635 raise TypeError( 636 "Expected headers to be an iterable or dict, but is {}.".format( 637 type(headers).__name__ 638 ) 639 ) 640 641 req = cls( 642 "", 643 0, 644 method.encode("utf-8", "surrogateescape"), 645 b"", 646 b"", 647 b"", 648 b"HTTP/1.1", 649 headers, 650 b"", 651 None, 652 time.time(), 653 time.time(), 654 ) 655 656 req.url = url 657 # Assign this manually to update the content-length header. 658 if isinstance(content, bytes): 659 req.content = content 660 elif isinstance(content, str): 661 req.text = content 662 else: 663 raise TypeError( 664 f"Expected content to be str or bytes, but is {type(content).__name__}." 665 ) 666 667 return req 668 669 @property 670 def first_line_format(self) -> str: 671 """ 672 *Read-only:* HTTP request form as defined in [RFC 7230](https://tools.ietf.org/html/rfc7230#section-5.3). 673 674 origin-form and asterisk-form are subsumed as "relative". 675 """ 676 if self.method == "CONNECT": 677 return "authority" 678 elif self.authority: 679 return "absolute" 680 else: 681 return "relative" 682 683 @property 684 def method(self) -> str: 685 """ 686 HTTP request method, e.g. "GET". 687 """ 688 return self.data.method.decode("utf-8", "surrogateescape").upper() 689 690 @method.setter 691 def method(self, val: Union[str, bytes]) -> None: 692 self.data.method = always_bytes(val, "utf-8", "surrogateescape") 693 694 @property 695 def scheme(self) -> str: 696 """ 697 HTTP request scheme, which should be "http" or "https". 698 """ 699 return self.data.scheme.decode("utf-8", "surrogateescape") 700 701 @scheme.setter 702 def scheme(self, val: Union[str, bytes]) -> None: 703 self.data.scheme = always_bytes(val, "utf-8", "surrogateescape") 704 705 @property 706 def authority(self) -> str: 707 """ 708 HTTP request authority. 709 710 For HTTP/1, this is the authority portion of the request target 711 (in either absolute-form or authority-form). 712 For origin-form and asterisk-form requests, this property is set to an empty string. 713 714 For HTTP/2, this is the :authority pseudo header. 715 716 *See also:* `Request.host`, `Request.host_header`, `Request.pretty_host` 717 """ 718 try: 719 return self.data.authority.decode("idna") 720 except UnicodeError: 721 return self.data.authority.decode("utf8", "surrogateescape") 722 723 @authority.setter 724 def authority(self, val: Union[str, bytes]) -> None: 725 if isinstance(val, str): 726 try: 727 val = val.encode("idna", "strict") 728 except UnicodeError: 729 val = val.encode("utf8", "surrogateescape") # type: ignore 730 self.data.authority = val 731 732 @property 733 def host(self) -> str: 734 """ 735 Target server for this request. This may be parsed from the raw request 736 (e.g. from a ``GET http://example.com/ HTTP/1.1`` request line) 737 or inferred from the proxy mode (e.g. an IP in transparent mode). 738 739 Setting the host attribute also updates the host header and authority information, if present. 740 741 *See also:* `Request.authority`, `Request.host_header`, `Request.pretty_host` 742 """ 743 return self.data.host 744 745 @host.setter 746 def host(self, val: Union[str, bytes]) -> None: 747 self.data.host = always_str(val, "idna", "strict") 748 749 # Update host header 750 if "Host" in self.data.headers: 751 self.data.headers["Host"] = val 752 # Update authority 753 if self.data.authority: 754 self.authority = url.hostport(self.scheme, self.host, self.port) 755 756 @property 757 def host_header(self) -> Optional[str]: 758 """ 759 The request's host/authority header. 760 761 This property maps to either ``request.headers["Host"]`` or 762 ``request.authority``, depending on whether it's HTTP/1.x or HTTP/2.0. 763 764 *See also:* `Request.authority`,`Request.host`, `Request.pretty_host` 765 """ 766 if self.is_http2: 767 return self.authority or self.data.headers.get("Host", None) 768 else: 769 return self.data.headers.get("Host", None) 770 771 @host_header.setter 772 def host_header(self, val: Union[None, str, bytes]) -> None: 773 if val is None: 774 if self.is_http2: 775 self.data.authority = b"" 776 self.headers.pop("Host", None) 777 else: 778 if self.is_http2: 779 self.authority = val # type: ignore 780 if not self.is_http2 or "Host" in self.headers: 781 # For h2, we only overwrite, but not create, as :authority is the h2 host header. 782 self.headers["Host"] = val 783 784 @property 785 def port(self) -> int: 786 """ 787 Target port. 788 """ 789 return self.data.port 790 791 @port.setter 792 def port(self, port: int) -> None: 793 self.data.port = port 794 795 @property 796 def path(self) -> str: 797 """ 798 HTTP request path, e.g. "/index.html". 799 Usually starts with a slash, except for OPTIONS requests, which may just be "*". 800 """ 801 return self.data.path.decode("utf-8", "surrogateescape") 802 803 @path.setter 804 def path(self, val: Union[str, bytes]) -> None: 805 self.data.path = always_bytes(val, "utf-8", "surrogateescape") 806 807 @property 808 def url(self) -> str: 809 """ 810 The full URL string, constructed from `Request.scheme`, `Request.host`, `Request.port` and `Request.path`. 811 812 Settings this property updates these attributes as well. 813 """ 814 if self.first_line_format == "authority": 815 return f"{self.host}:{self.port}" 816 return url.unparse(self.scheme, self.host, self.port, self.path) 817 818 @url.setter 819 def url(self, val: Union[str, bytes]) -> None: 820 val = always_str(val, "utf-8", "surrogateescape") 821 self.scheme, self.host, self.port, self.path = url.parse(val) 822 823 @property 824 def pretty_host(self) -> str: 825 """ 826 *Read-only:* Like `Request.host`, but using `Request.host_header` header as an additional (preferred) data source. 827 This is useful in transparent mode where `Request.host` is only an IP address. 828 829 *Warning:* When working in adversarial environments, this may not reflect the actual destination 830 as the Host header could be spoofed. 831 """ 832 authority = self.host_header 833 if authority: 834 return url.parse_authority(authority, check=False)[0] 835 else: 836 return self.host 837 838 @property 839 def pretty_url(self) -> str: 840 """ 841 *Read-only:* Like `Request.url`, but using `Request.pretty_host` instead of `Request.host`. 842 """ 843 if self.first_line_format == "authority": 844 return self.authority 845 846 host_header = self.host_header 847 if not host_header: 848 return self.url 849 850 pretty_host, pretty_port = url.parse_authority(host_header, check=False) 851 pretty_port = pretty_port or url.default_port(self.scheme) or 443 852 853 return url.unparse(self.scheme, pretty_host, pretty_port, self.path) 854 855 def _get_query(self): 856 query = urllib.parse.urlparse(self.url).query 857 return tuple(url.decode(query)) 858 859 def _set_query(self, query_data): 860 query = url.encode(query_data) 861 _, _, path, params, _, fragment = urllib.parse.urlparse(self.url) 862 self.path = urllib.parse.urlunparse(["", "", path, params, query, fragment]) 863 864 @property 865 def query(self) -> multidict.MultiDictView[str, str]: 866 """ 867 The request query as a mutable mapping view on the request's path. 868 For the most part, this behaves like a dictionary. 869 Modifications to the MultiDictView update `Request.path`, and vice versa. 870 """ 871 return multidict.MultiDictView(self._get_query, self._set_query) 872 873 @query.setter 874 def query(self, value): 875 self._set_query(value) 876 877 def _get_cookies(self): 878 h = self.headers.get_all("Cookie") 879 return tuple(cookies.parse_cookie_headers(h)) 880 881 def _set_cookies(self, value): 882 self.headers["cookie"] = cookies.format_cookie_header(value) 883 884 @property 885 def cookies(self) -> multidict.MultiDictView[str, str]: 886 """ 887 The request cookies. 888 For the most part, this behaves like a dictionary. 889 Modifications to the MultiDictView update `Request.headers`, and vice versa. 890 """ 891 return multidict.MultiDictView(self._get_cookies, self._set_cookies) 892 893 @cookies.setter 894 def cookies(self, value): 895 self._set_cookies(value) 896 897 @property 898 def path_components(self) -> tuple[str, ...]: 899 """ 900 The URL's path components as a tuple of strings. 901 Components are unquoted. 902 """ 903 path = urllib.parse.urlparse(self.url).path 904 # This needs to be a tuple so that it's immutable. 905 # Otherwise, this would fail silently: 906 # request.path_components.append("foo") 907 return tuple(url.unquote(i) for i in path.split("/") if i) 908 909 @path_components.setter 910 def path_components(self, components: Iterable[str]): 911 components = map(lambda x: url.quote(x, safe=""), components) 912 path = "/" + "/".join(components) 913 _, _, _, params, query, fragment = urllib.parse.urlparse(self.url) 914 self.path = urllib.parse.urlunparse(["", "", path, params, query, fragment]) 915 916 def anticache(self) -> None: 917 """ 918 Modifies this request to remove headers that might produce a cached response. 919 """ 920 delheaders = ( 921 "if-modified-since", 922 "if-none-match", 923 ) 924 for i in delheaders: 925 self.headers.pop(i, None) 926 927 def anticomp(self) -> None: 928 """ 929 Modify the Accept-Encoding header to only accept uncompressed responses. 930 """ 931 self.headers["accept-encoding"] = "identity" 932 933 def constrain_encoding(self) -> None: 934 """ 935 Limits the permissible Accept-Encoding values, based on what we can decode appropriately. 936 """ 937 accept_encoding = self.headers.get("accept-encoding") 938 if accept_encoding: 939 self.headers["accept-encoding"] = ", ".join( 940 e 941 for e in {"gzip", "identity", "deflate", "br", "zstd"} 942 if e in accept_encoding 943 ) 944 945 def _get_urlencoded_form(self): 946 is_valid_content_type = ( 947 "application/x-www-form-urlencoded" 948 in self.headers.get("content-type", "").lower() 949 ) 950 if is_valid_content_type: 951 return tuple(url.decode(self.get_text(strict=False))) 952 return () 953 954 def _set_urlencoded_form(self, form_data): 955 """ 956 Sets the body to the URL-encoded form data, and adds the appropriate content-type header. 957 This will overwrite the existing content if there is one. 958 """ 959 self.headers["content-type"] = "application/x-www-form-urlencoded" 960 self.content = url.encode(form_data, self.get_text(strict=False)).encode() 961 962 @property 963 def urlencoded_form(self) -> multidict.MultiDictView[str, str]: 964 """ 965 The URL-encoded form data. 966 967 If the content-type indicates non-form data or the form could not be parsed, this is set to 968 an empty `MultiDictView`. 969 970 Modifications to the MultiDictView update `Request.content`, and vice versa. 971 """ 972 return multidict.MultiDictView( 973 self._get_urlencoded_form, self._set_urlencoded_form 974 ) 975 976 @urlencoded_form.setter 977 def urlencoded_form(self, value): 978 self._set_urlencoded_form(value) 979 980 def _get_multipart_form(self): 981 is_valid_content_type = ( 982 "multipart/form-data" in self.headers.get("content-type", "").lower() 983 ) 984 if is_valid_content_type: 985 try: 986 return multipart.decode(self.headers.get("content-type"), self.content) 987 except ValueError: 988 pass 989 return () 990 991 def _set_multipart_form(self, value): 992 is_valid_content_type = ( 993 self.headers.get("content-type", "") 994 .lower() 995 .startswith("multipart/form-data") 996 ) 997 if not is_valid_content_type: 998 """ 999 Generate a random boundary here. 1000 1001 See <https://datatracker.ietf.org/doc/html/rfc2046#section-5.1.1> for specifications 1002 on generating the boundary. 1003 """ 1004 boundary = "-" * 20 + binascii.hexlify(os.urandom(16)).decode() 1005 self.headers["content-type"] = f"multipart/form-data; boundary={boundary}" 1006 self.content = multipart.encode(self.headers, value) 1007 1008 @property 1009 def multipart_form(self) -> multidict.MultiDictView[bytes, bytes]: 1010 """ 1011 The multipart form data. 1012 1013 If the content-type indicates non-form data or the form could not be parsed, this is set to 1014 an empty `MultiDictView`. 1015 1016 Modifications to the MultiDictView update `Request.content`, and vice versa. 1017 """ 1018 return multidict.MultiDictView( 1019 self._get_multipart_form, self._set_multipart_form 1020 ) 1021 1022 @multipart_form.setter 1023 def multipart_form(self, value): 1024 self._set_multipart_form(value) 1025 1026 1027class Response(Message): 1028 """ 1029 An HTTP response. 1030 """ 1031 1032 data: ResponseData 1033 1034 def __init__( 1035 self, 1036 http_version: bytes, 1037 status_code: int, 1038 reason: bytes, 1039 headers: Union[Headers, tuple[tuple[bytes, bytes], ...]], 1040 content: Optional[bytes], 1041 trailers: Union[None, Headers, tuple[tuple[bytes, bytes], ...]], 1042 timestamp_start: float, 1043 timestamp_end: Optional[float], 1044 ): 1045 # auto-convert invalid types to retain compatibility with older code. 1046 if isinstance(http_version, str): 1047 http_version = http_version.encode("ascii", "strict") 1048 if isinstance(reason, str): 1049 reason = reason.encode("ascii", "strict") 1050 1051 if isinstance(content, str): 1052 raise ValueError(f"Content must be bytes, not {type(content).__name__}") 1053 if not isinstance(headers, Headers): 1054 headers = Headers(headers) 1055 if trailers is not None and not isinstance(trailers, Headers): 1056 trailers = Headers(trailers) 1057 1058 self.data = ResponseData( 1059 http_version=http_version, 1060 status_code=status_code, 1061 reason=reason, 1062 headers=headers, 1063 content=content, 1064 trailers=trailers, 1065 timestamp_start=timestamp_start, 1066 timestamp_end=timestamp_end, 1067 ) 1068 1069 def __repr__(self) -> str: 1070 if self.raw_content: 1071 ct = self.headers.get("content-type", "unknown content type") 1072 size = human.pretty_size(len(self.raw_content)) 1073 details = f"{ct}, {size}" 1074 else: 1075 details = "no content" 1076 return f"Response({self.status_code}, {details})" 1077 1078 @classmethod 1079 def make( 1080 cls, 1081 status_code: int = 200, 1082 content: Union[bytes, str] = b"", 1083 headers: Union[ 1084 Headers, Mapping[str, Union[str, bytes]], Iterable[tuple[bytes, bytes]] 1085 ] = (), 1086 ) -> "Response": 1087 """ 1088 Simplified API for creating response objects. 1089 """ 1090 if isinstance(headers, Headers): 1091 headers = headers 1092 elif isinstance(headers, dict): 1093 headers = Headers( 1094 ( 1095 always_bytes(k, "utf-8", "surrogateescape"), # type: ignore 1096 always_bytes(v, "utf-8", "surrogateescape"), 1097 ) 1098 for k, v in headers.items() 1099 ) 1100 elif isinstance(headers, Iterable): 1101 headers = Headers(headers) # type: ignore 1102 else: 1103 raise TypeError( 1104 "Expected headers to be an iterable or dict, but is {}.".format( 1105 type(headers).__name__ 1106 ) 1107 ) 1108 1109 resp = cls( 1110 b"HTTP/1.1", 1111 status_code, 1112 status_codes.RESPONSES.get(status_code, "").encode(), 1113 headers, 1114 None, 1115 None, 1116 time.time(), 1117 time.time(), 1118 ) 1119 1120 # Assign this manually to update the content-length header. 1121 if isinstance(content, bytes): 1122 resp.content = content 1123 elif isinstance(content, str): 1124 resp.text = content 1125 else: 1126 raise TypeError( 1127 f"Expected content to be str or bytes, but is {type(content).__name__}." 1128 ) 1129 1130 return resp 1131 1132 @property 1133 def status_code(self) -> int: 1134 """ 1135 HTTP Status Code, e.g. ``200``. 1136 """ 1137 return self.data.status_code 1138 1139 @status_code.setter 1140 def status_code(self, status_code: int) -> None: 1141 self.data.status_code = status_code 1142 1143 @property 1144 def reason(self) -> str: 1145 """ 1146 HTTP reason phrase, for example "Not Found". 1147 1148 HTTP/2 responses do not contain a reason phrase, an empty string will be returned instead. 1149 """ 1150 # Encoding: http://stackoverflow.com/a/16674906/934719 1151 return self.data.reason.decode("ISO-8859-1") 1152 1153 @reason.setter 1154 def reason(self, reason: Union[str, bytes]) -> None: 1155 self.data.reason = strutils.always_bytes(reason, "ISO-8859-1") 1156 1157 def _get_cookies(self): 1158 h = self.headers.get_all("set-cookie") 1159 all_cookies = cookies.parse_set_cookie_headers(h) 1160 return tuple((name, (value, attrs)) for name, value, attrs in all_cookies) 1161 1162 def _set_cookies(self, value): 1163 cookie_headers = [] 1164 for k, v in value: 1165 header = cookies.format_set_cookie_header([(k, v[0], v[1])]) 1166 cookie_headers.append(header) 1167 self.headers.set_all("set-cookie", cookie_headers) 1168 1169 @property 1170 def cookies( 1171 self, 1172 ) -> multidict.MultiDictView[ 1173 str, tuple[str, multidict.MultiDict[str, Optional[str]]] 1174 ]: 1175 """ 1176 The response cookies. A possibly empty `MultiDictView`, where the keys are cookie 1177 name strings, and values are `(cookie value, attributes)` tuples. Within 1178 attributes, unary attributes (e.g. `HTTPOnly`) are indicated by a `None` value. 1179 Modifications to the MultiDictView update `Response.headers`, and vice versa. 1180 1181 *Warning:* Changes to `attributes` will not be picked up unless you also reassign 1182 the `(cookie value, attributes)` tuple directly in the `MultiDictView`. 1183 """ 1184 return multidict.MultiDictView(self._get_cookies, self._set_cookies) 1185 1186 @cookies.setter 1187 def cookies(self, value): 1188 self._set_cookies(value) 1189 1190 def refresh(self, now=None): 1191 """ 1192 This fairly complex and heuristic function refreshes a server 1193 response for replay. 1194 1195 - It adjusts date, expires, and last-modified headers. 1196 - It adjusts cookie expiration. 1197 """ 1198 if not now: 1199 now = time.time() 1200 delta = now - self.timestamp_start 1201 refresh_headers = [ 1202 "date", 1203 "expires", 1204 "last-modified", 1205 ] 1206 for i in refresh_headers: 1207 if i in self.headers: 1208 d = parsedate_tz(self.headers[i]) 1209 if d: 1210 new = mktime_tz(d) + delta 1211 try: 1212 self.headers[i] = formatdate(new, usegmt=True) 1213 except OSError: # pragma: no cover 1214 pass # value out of bounds on Windows only (which is why we exclude it from coverage). 1215 c = [] 1216 for set_cookie_header in self.headers.get_all("set-cookie"): 1217 try: 1218 refreshed = cookies.refresh_set_cookie_header(set_cookie_header, delta) 1219 except ValueError: 1220 refreshed = set_cookie_header 1221 c.append(refreshed) 1222 if c: 1223 self.headers.set_all("set-cookie", c) 1224 1225 1226class HTTPFlow(flow.Flow): 1227 """ 1228 An HTTPFlow is a collection of objects representing a single HTTP 1229 transaction. 1230 """ 1231 1232 request: Request 1233 """The client's HTTP request.""" 1234 response: Optional[Response] = None 1235 """The server's HTTP response.""" 1236 error: Optional[flow.Error] = None 1237 """ 1238 A connection or protocol error affecting this flow. 1239 1240 Note that it's possible for a Flow to have both a response and an error 1241 object. This might happen, for instance, when a response was received 1242 from the server, but there was an error sending it back to the client. 1243 """ 1244 1245 websocket: Optional[WebSocketData] = None 1246 """ 1247 If this HTTP flow initiated a WebSocket connection, this attribute contains all associated WebSocket data. 1248 """ 1249 1250 _stateobject_attributes = flow.Flow._stateobject_attributes.copy() 1251 # mypy doesn't support update with kwargs 1252 _stateobject_attributes.update( 1253 dict(request=Request, response=Response, websocket=WebSocketData) 1254 ) 1255 1256 def __repr__(self): 1257 s = "<HTTPFlow" 1258 for a in ( 1259 "request", 1260 "response", 1261 "websocket", 1262 "error", 1263 "client_conn", 1264 "server_conn", 1265 ): 1266 if getattr(self, a, False): 1267 s += f"\r\n {a} = {{flow.{a}}}" 1268 s += ">" 1269 return s.format(flow=self) 1270 1271 @property 1272 def timestamp_start(self) -> float: 1273 """*Read-only:* An alias for `Request.timestamp_start`.""" 1274 return self.request.timestamp_start 1275 1276 @property 1277 def mode(self) -> str: # pragma: no cover 1278 warnings.warn("HTTPFlow.mode is deprecated.", DeprecationWarning, stacklevel=2) 1279 return getattr(self, "_mode", "regular") 1280 1281 @mode.setter 1282 def mode(self, val: str) -> None: # pragma: no cover 1283 warnings.warn("HTTPFlow.mode is deprecated.", DeprecationWarning, stacklevel=2) 1284 self._mode = val 1285 1286 def copy(self): 1287 f = super().copy() 1288 if self.request: 1289 f.request = self.request.copy() 1290 if self.response: 1291 f.response = self.response.copy() 1292 return f 1293 1294 1295__all__ = [ 1296 "HTTPFlow", 1297 "Message", 1298 "Request", 1299 "Response", 1300 "Headers", 1301]
1227class HTTPFlow(flow.Flow): 1228 """ 1229 An HTTPFlow is a collection of objects representing a single HTTP 1230 transaction. 1231 """ 1232 1233 request: Request 1234 """The client's HTTP request.""" 1235 response: Optional[Response] = None 1236 """The server's HTTP response.""" 1237 error: Optional[flow.Error] = None 1238 """ 1239 A connection or protocol error affecting this flow. 1240 1241 Note that it's possible for a Flow to have both a response and an error 1242 object. This might happen, for instance, when a response was received 1243 from the server, but there was an error sending it back to the client. 1244 """ 1245 1246 websocket: Optional[WebSocketData] = None 1247 """ 1248 If this HTTP flow initiated a WebSocket connection, this attribute contains all associated WebSocket data. 1249 """ 1250 1251 _stateobject_attributes = flow.Flow._stateobject_attributes.copy() 1252 # mypy doesn't support update with kwargs 1253 _stateobject_attributes.update( 1254 dict(request=Request, response=Response, websocket=WebSocketData) 1255 ) 1256 1257 def __repr__(self): 1258 s = "<HTTPFlow" 1259 for a in ( 1260 "request", 1261 "response", 1262 "websocket", 1263 "error", 1264 "client_conn", 1265 "server_conn", 1266 ): 1267 if getattr(self, a, False): 1268 s += f"\r\n {a} = {{flow.{a}}}" 1269 s += ">" 1270 return s.format(flow=self) 1271 1272 @property 1273 def timestamp_start(self) -> float: 1274 """*Read-only:* An alias for `Request.timestamp_start`.""" 1275 return self.request.timestamp_start 1276 1277 @property 1278 def mode(self) -> str: # pragma: no cover 1279 warnings.warn("HTTPFlow.mode is deprecated.", DeprecationWarning, stacklevel=2) 1280 return getattr(self, "_mode", "regular") 1281 1282 @mode.setter 1283 def mode(self, val: str) -> None: # pragma: no cover 1284 warnings.warn("HTTPFlow.mode is deprecated.", DeprecationWarning, stacklevel=2) 1285 self._mode = val 1286 1287 def copy(self): 1288 f = super().copy() 1289 if self.request: 1290 f.request = self.request.copy() 1291 if self.response: 1292 f.response = self.response.copy() 1293 return f
An HTTPFlow is a collection of objects representing a single HTTP transaction.
A connection or protocol error affecting this flow.
Note that it's possible for a Flow to have both a response and an error object. This might happen, for instance, when a response was received from the server, but there was an error sending it back to the client.
If this HTTP flow initiated a WebSocket connection, this attribute contains all associated WebSocket data.
235class Message(serializable.Serializable): 236 """Base class for `Request` and `Response`.""" 237 238 @classmethod 239 def from_state(cls, state): 240 return cls(**state) 241 242 def get_state(self): 243 return self.data.get_state() 244 245 def set_state(self, state): 246 self.data.set_state(state) 247 248 data: MessageData 249 stream: Union[Callable[[bytes], Union[Iterable[bytes], bytes]], bool] = False 250 """ 251 This attribute controls if the message body should be streamed. 252 253 If `False`, mitmproxy will buffer the entire body before forwarding it to the destination. 254 This makes it possible to perform string replacements on the entire body. 255 If `True`, the message body will not be buffered on the proxy 256 but immediately forwarded instead. 257 Alternatively, a transformation function can be specified, which will be called for each chunk of data. 258 Please note that packet boundaries generally should not be relied upon. 259 260 This attribute must be set in the `requestheaders` or `responseheaders` hook. 261 Setting it in `request` or `response` is already too late, mitmproxy has buffered the message body already. 262 """ 263 264 @property 265 def http_version(self) -> str: 266 """ 267 HTTP version string, for example `HTTP/1.1`. 268 """ 269 return self.data.http_version.decode("utf-8", "surrogateescape") 270 271 @http_version.setter 272 def http_version(self, http_version: Union[str, bytes]) -> None: 273 self.data.http_version = strutils.always_bytes( 274 http_version, "utf-8", "surrogateescape" 275 ) 276 277 @property 278 def is_http10(self) -> bool: 279 return self.data.http_version == b"HTTP/1.0" 280 281 @property 282 def is_http11(self) -> bool: 283 return self.data.http_version == b"HTTP/1.1" 284 285 @property 286 def is_http2(self) -> bool: 287 return self.data.http_version == b"HTTP/2.0" 288 289 @property 290 def headers(self) -> Headers: 291 """ 292 The HTTP headers. 293 """ 294 return self.data.headers 295 296 @headers.setter 297 def headers(self, h: Headers) -> None: 298 self.data.headers = h 299 300 @property 301 def trailers(self) -> Optional[Headers]: 302 """ 303 The [HTTP trailers](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Trailer). 304 """ 305 return self.data.trailers 306 307 @trailers.setter 308 def trailers(self, h: Optional[Headers]) -> None: 309 self.data.trailers = h 310 311 @property 312 def raw_content(self) -> Optional[bytes]: 313 """ 314 The raw (potentially compressed) HTTP message body. 315 316 In contrast to `Message.content` and `Message.text`, accessing this property never raises. 317 318 *See also:* `Message.content`, `Message.text` 319 """ 320 return self.data.content 321 322 @raw_content.setter 323 def raw_content(self, content: Optional[bytes]) -> None: 324 self.data.content = content 325 326 @property 327 def content(self) -> Optional[bytes]: 328 """ 329 The uncompressed HTTP message body as bytes. 330 331 Accessing this attribute may raise a `ValueError` when the HTTP content-encoding is invalid. 332 333 *See also:* `Message.raw_content`, `Message.text` 334 """ 335 return self.get_content() 336 337 @content.setter 338 def content(self, value: Optional[bytes]) -> None: 339 self.set_content(value) 340 341 @property 342 def text(self) -> Optional[str]: 343 """ 344 The uncompressed and decoded HTTP message body as text. 345 346 Accessing this attribute may raise a `ValueError` when either content-encoding or charset is invalid. 347 348 *See also:* `Message.raw_content`, `Message.content` 349 """ 350 return self.get_text() 351 352 @text.setter 353 def text(self, value: Optional[str]) -> None: 354 self.set_text(value) 355 356 def set_content(self, value: Optional[bytes]) -> None: 357 if value is None: 358 self.raw_content = None 359 return 360 if not isinstance(value, bytes): 361 raise TypeError( 362 f"Message content must be bytes, not {type(value).__name__}. " 363 "Please use .text if you want to assign a str." 364 ) 365 ce = self.headers.get("content-encoding") 366 try: 367 self.raw_content = encoding.encode(value, ce or "identity") 368 except ValueError: 369 # So we have an invalid content-encoding? 370 # Let's remove it! 371 del self.headers["content-encoding"] 372 self.raw_content = value 373 374 if "transfer-encoding" in self.headers: 375 # https://httpwg.org/specs/rfc7230.html#header.content-length 376 # don't set content-length if a transfer-encoding is provided 377 pass 378 else: 379 self.headers["content-length"] = str(len(self.raw_content)) 380 381 def get_content(self, strict: bool = True) -> Optional[bytes]: 382 """ 383 Similar to `Message.content`, but does not raise if `strict` is `False`. 384 Instead, the compressed message body is returned as-is. 385 """ 386 if self.raw_content is None: 387 return None 388 ce = self.headers.get("content-encoding") 389 if ce: 390 try: 391 content = encoding.decode(self.raw_content, ce) 392 # A client may illegally specify a byte -> str encoding here (e.g. utf8) 393 if isinstance(content, str): 394 raise ValueError(f"Invalid Content-Encoding: {ce}") 395 return content 396 except ValueError: 397 if strict: 398 raise 399 return self.raw_content 400 else: 401 return self.raw_content 402 403 def _get_content_type_charset(self) -> Optional[str]: 404 ct = parse_content_type(self.headers.get("content-type", "")) 405 if ct: 406 return ct[2].get("charset") 407 return None 408 409 def _guess_encoding(self, content: bytes = b"") -> str: 410 enc = self._get_content_type_charset() 411 if not enc: 412 if "json" in self.headers.get("content-type", ""): 413 enc = "utf8" 414 if not enc: 415 if "html" in self.headers.get("content-type", ""): 416 meta_charset = re.search( 417 rb"""<meta[^>]+charset=['"]?([^'">]+)""", content, re.IGNORECASE 418 ) 419 if meta_charset: 420 enc = meta_charset.group(1).decode("ascii", "ignore") 421 if not enc: 422 if "text/css" in self.headers.get("content-type", ""): 423 # @charset rule must be the very first thing. 424 css_charset = re.match( 425 rb"""@charset "([^"]+)";""", content, re.IGNORECASE 426 ) 427 if css_charset: 428 enc = css_charset.group(1).decode("ascii", "ignore") 429 if not enc: 430 enc = "latin-1" 431 # Use GB 18030 as the superset of GB2312 and GBK to fix common encoding problems on Chinese websites. 432 if enc.lower() in ("gb2312", "gbk"): 433 enc = "gb18030" 434 435 return enc 436 437 def set_text(self, text: Optional[str]) -> None: 438 if text is None: 439 self.content = None 440 return 441 enc = self._guess_encoding() 442 443 try: 444 self.content = cast(bytes, encoding.encode(text, enc)) 445 except ValueError: 446 # Fall back to UTF-8 and update the content-type header. 447 ct = parse_content_type(self.headers.get("content-type", "")) or ( 448 "text", 449 "plain", 450 {}, 451 ) 452 ct[2]["charset"] = "utf-8" 453 self.headers["content-type"] = assemble_content_type(*ct) 454 enc = "utf8" 455 self.content = text.encode(enc, "surrogateescape") 456 457 def get_text(self, strict: bool = True) -> Optional[str]: 458 """ 459 Similar to `Message.text`, but does not raise if `strict` is `False`. 460 Instead, the message body is returned as surrogate-escaped UTF-8. 461 """ 462 content = self.get_content(strict) 463 if content is None: 464 return None 465 enc = self._guess_encoding(content) 466 try: 467 return cast(str, encoding.decode(content, enc)) 468 except ValueError: 469 if strict: 470 raise 471 return content.decode("utf8", "surrogateescape") 472 473 @property 474 def timestamp_start(self) -> float: 475 """ 476 *Timestamp:* Headers received. 477 """ 478 return self.data.timestamp_start 479 480 @timestamp_start.setter 481 def timestamp_start(self, timestamp_start: float) -> None: 482 self.data.timestamp_start = timestamp_start 483 484 @property 485 def timestamp_end(self) -> Optional[float]: 486 """ 487 *Timestamp:* Last byte received. 488 """ 489 return self.data.timestamp_end 490 491 @timestamp_end.setter 492 def timestamp_end(self, timestamp_end: Optional[float]): 493 self.data.timestamp_end = timestamp_end 494 495 def decode(self, strict: bool = True) -> None: 496 """ 497 Decodes body based on the current Content-Encoding header, then 498 removes the header. If there is no Content-Encoding header, no 499 action is taken. 500 501 *Raises:* 502 - `ValueError`, when the content-encoding is invalid and strict is True. 503 """ 504 decoded = self.get_content(strict) 505 self.headers.pop("content-encoding", None) 506 self.content = decoded 507 508 def encode(self, encoding: str) -> None: 509 """ 510 Encodes body with the given encoding, where e is "gzip", "deflate", "identity", "br", or "zstd". 511 Any existing content-encodings are overwritten, the content is not decoded beforehand. 512 513 *Raises:* 514 - `ValueError`, when the specified content-encoding is invalid. 515 """ 516 self.headers["content-encoding"] = encoding 517 self.content = self.raw_content 518 if "content-encoding" not in self.headers: 519 raise ValueError(f"Invalid content encoding {repr(encoding)}") 520 521 def json(self, **kwargs: Any) -> Any: 522 """ 523 Returns the JSON encoded content of the response, if any. 524 `**kwargs` are optional arguments that will be 525 passed to `json.loads()`. 526 527 Will raise if the content can not be decoded and then parsed as JSON. 528 529 *Raises:* 530 - `json.decoder.JSONDecodeError` if content is not valid JSON. 531 - `TypeError` if the content is not available, for example because the response 532 has been streamed. 533 """ 534 content = self.get_content(strict=False) 535 if content is None: 536 raise TypeError("Message content is not available.") 537 else: 538 return json.loads(content, **kwargs)
This attribute controls if the message body should be streamed.
If False
, mitmproxy will buffer the entire body before forwarding it to the destination.
This makes it possible to perform string replacements on the entire body.
If True
, the message body will not be buffered on the proxy
but immediately forwarded instead.
Alternatively, a transformation function can be specified, which will be called for each chunk of data.
Please note that packet boundaries generally should not be relied upon.
This attribute must be set in the requestheaders
or responseheaders
hook.
Setting it in request
or response
is already too late, mitmproxy has buffered the message body already.
The raw (potentially compressed) HTTP message body.
In contrast to Message.content
and Message.text
, accessing this property never raises.
See also: Message.content
, Message.text
The uncompressed HTTP message body as bytes.
Accessing this attribute may raise a ValueError
when the HTTP content-encoding is invalid.
See also: Message.raw_content
, Message.text
The uncompressed and decoded HTTP message body as text.
Accessing this attribute may raise a ValueError
when either content-encoding or charset is invalid.
See also: Message.raw_content
, Message.content
356 def set_content(self, value: Optional[bytes]) -> None: 357 if value is None: 358 self.raw_content = None 359 return 360 if not isinstance(value, bytes): 361 raise TypeError( 362 f"Message content must be bytes, not {type(value).__name__}. " 363 "Please use .text if you want to assign a str." 364 ) 365 ce = self.headers.get("content-encoding") 366 try: 367 self.raw_content = encoding.encode(value, ce or "identity") 368 except ValueError: 369 # So we have an invalid content-encoding? 370 # Let's remove it! 371 del self.headers["content-encoding"] 372 self.raw_content = value 373 374 if "transfer-encoding" in self.headers: 375 # https://httpwg.org/specs/rfc7230.html#header.content-length 376 # don't set content-length if a transfer-encoding is provided 377 pass 378 else: 379 self.headers["content-length"] = str(len(self.raw_content))
381 def get_content(self, strict: bool = True) -> Optional[bytes]: 382 """ 383 Similar to `Message.content`, but does not raise if `strict` is `False`. 384 Instead, the compressed message body is returned as-is. 385 """ 386 if self.raw_content is None: 387 return None 388 ce = self.headers.get("content-encoding") 389 if ce: 390 try: 391 content = encoding.decode(self.raw_content, ce) 392 # A client may illegally specify a byte -> str encoding here (e.g. utf8) 393 if isinstance(content, str): 394 raise ValueError(f"Invalid Content-Encoding: {ce}") 395 return content 396 except ValueError: 397 if strict: 398 raise 399 return self.raw_content 400 else: 401 return self.raw_content
Similar to Message.content
, but does not raise if strict
is False
.
Instead, the compressed message body is returned as-is.
437 def set_text(self, text: Optional[str]) -> None: 438 if text is None: 439 self.content = None 440 return 441 enc = self._guess_encoding() 442 443 try: 444 self.content = cast(bytes, encoding.encode(text, enc)) 445 except ValueError: 446 # Fall back to UTF-8 and update the content-type header. 447 ct = parse_content_type(self.headers.get("content-type", "")) or ( 448 "text", 449 "plain", 450 {}, 451 ) 452 ct[2]["charset"] = "utf-8" 453 self.headers["content-type"] = assemble_content_type(*ct) 454 enc = "utf8" 455 self.content = text.encode(enc, "surrogateescape")
457 def get_text(self, strict: bool = True) -> Optional[str]: 458 """ 459 Similar to `Message.text`, but does not raise if `strict` is `False`. 460 Instead, the message body is returned as surrogate-escaped UTF-8. 461 """ 462 content = self.get_content(strict) 463 if content is None: 464 return None 465 enc = self._guess_encoding(content) 466 try: 467 return cast(str, encoding.decode(content, enc)) 468 except ValueError: 469 if strict: 470 raise 471 return content.decode("utf8", "surrogateescape")
Similar to Message.text
, but does not raise if strict
is False
.
Instead, the message body is returned as surrogate-escaped UTF-8.
495 def decode(self, strict: bool = True) -> None: 496 """ 497 Decodes body based on the current Content-Encoding header, then 498 removes the header. If there is no Content-Encoding header, no 499 action is taken. 500 501 *Raises:* 502 - `ValueError`, when the content-encoding is invalid and strict is True. 503 """ 504 decoded = self.get_content(strict) 505 self.headers.pop("content-encoding", None) 506 self.content = decoded
Decodes body based on the current Content-Encoding header, then removes the header. If there is no Content-Encoding header, no action is taken.
Raises:
ValueError
, when the content-encoding is invalid and strict is True.
508 def encode(self, encoding: str) -> None: 509 """ 510 Encodes body with the given encoding, where e is "gzip", "deflate", "identity", "br", or "zstd". 511 Any existing content-encodings are overwritten, the content is not decoded beforehand. 512 513 *Raises:* 514 - `ValueError`, when the specified content-encoding is invalid. 515 """ 516 self.headers["content-encoding"] = encoding 517 self.content = self.raw_content 518 if "content-encoding" not in self.headers: 519 raise ValueError(f"Invalid content encoding {repr(encoding)}")
Encodes body with the given encoding, where e is "gzip", "deflate", "identity", "br", or "zstd". Any existing content-encodings are overwritten, the content is not decoded beforehand.
Raises:
ValueError
, when the specified content-encoding is invalid.
521 def json(self, **kwargs: Any) -> Any: 522 """ 523 Returns the JSON encoded content of the response, if any. 524 `**kwargs` are optional arguments that will be 525 passed to `json.loads()`. 526 527 Will raise if the content can not be decoded and then parsed as JSON. 528 529 *Raises:* 530 - `json.decoder.JSONDecodeError` if content is not valid JSON. 531 - `TypeError` if the content is not available, for example because the response 532 has been streamed. 533 """ 534 content = self.get_content(strict=False) 535 if content is None: 536 raise TypeError("Message content is not available.") 537 else: 538 return json.loads(content, **kwargs)
Returns the JSON encoded content of the response, if any.
**kwargs
are optional arguments that will be
passed to json.loads()
.
Will raise if the content can not be decoded and then parsed as JSON.
Raises:
json.decoder.JSONDecodeError
if content is not valid JSON.TypeError
if the content is not available, for example because the response has been streamed.
Inherited Members
- mitmproxy.coretypes.serializable.Serializable
- copy
541class Request(Message): 542 """ 543 An HTTP request. 544 """ 545 546 data: RequestData 547 548 def __init__( 549 self, 550 host: str, 551 port: int, 552 method: bytes, 553 scheme: bytes, 554 authority: bytes, 555 path: bytes, 556 http_version: bytes, 557 headers: Union[Headers, tuple[tuple[bytes, bytes], ...]], 558 content: Optional[bytes], 559 trailers: Union[Headers, tuple[tuple[bytes, bytes], ...], None], 560 timestamp_start: float, 561 timestamp_end: Optional[float], 562 ): 563 # auto-convert invalid types to retain compatibility with older code. 564 if isinstance(host, bytes): 565 host = host.decode("idna", "strict") 566 if isinstance(method, str): 567 method = method.encode("ascii", "strict") 568 if isinstance(scheme, str): 569 scheme = scheme.encode("ascii", "strict") 570 if isinstance(authority, str): 571 authority = authority.encode("ascii", "strict") 572 if isinstance(path, str): 573 path = path.encode("ascii", "strict") 574 if isinstance(http_version, str): 575 http_version = http_version.encode("ascii", "strict") 576 577 if isinstance(content, str): 578 raise ValueError(f"Content must be bytes, not {type(content).__name__}") 579 if not isinstance(headers, Headers): 580 headers = Headers(headers) 581 if trailers is not None and not isinstance(trailers, Headers): 582 trailers = Headers(trailers) 583 584 self.data = RequestData( 585 host=host, 586 port=port, 587 method=method, 588 scheme=scheme, 589 authority=authority, 590 path=path, 591 http_version=http_version, 592 headers=headers, 593 content=content, 594 trailers=trailers, 595 timestamp_start=timestamp_start, 596 timestamp_end=timestamp_end, 597 ) 598 599 def __repr__(self) -> str: 600 if self.host and self.port: 601 hostport = f"{self.host}:{self.port}" 602 else: 603 hostport = "" 604 path = self.path or "" 605 return f"Request({self.method} {hostport}{path})" 606 607 @classmethod 608 def make( 609 cls, 610 method: str, 611 url: str, 612 content: Union[bytes, str] = "", 613 headers: Union[ 614 Headers, 615 dict[Union[str, bytes], Union[str, bytes]], 616 Iterable[tuple[bytes, bytes]], 617 ] = (), 618 ) -> "Request": 619 """ 620 Simplified API for creating request objects. 621 """ 622 # Headers can be list or dict, we differentiate here. 623 if isinstance(headers, Headers): 624 pass 625 elif isinstance(headers, dict): 626 headers = Headers( 627 ( 628 always_bytes(k, "utf-8", "surrogateescape"), 629 always_bytes(v, "utf-8", "surrogateescape"), 630 ) 631 for k, v in headers.items() 632 ) 633 elif isinstance(headers, Iterable): 634 headers = Headers(headers) # type: ignore 635 else: 636 raise TypeError( 637 "Expected headers to be an iterable or dict, but is {}.".format( 638 type(headers).__name__ 639 ) 640 ) 641 642 req = cls( 643 "", 644 0, 645 method.encode("utf-8", "surrogateescape"), 646 b"", 647 b"", 648 b"", 649 b"HTTP/1.1", 650 headers, 651 b"", 652 None, 653 time.time(), 654 time.time(), 655 ) 656 657 req.url = url 658 # Assign this manually to update the content-length header. 659 if isinstance(content, bytes): 660 req.content = content 661 elif isinstance(content, str): 662 req.text = content 663 else: 664 raise TypeError( 665 f"Expected content to be str or bytes, but is {type(content).__name__}." 666 ) 667 668 return req 669 670 @property 671 def first_line_format(self) -> str: 672 """ 673 *Read-only:* HTTP request form as defined in [RFC 7230](https://tools.ietf.org/html/rfc7230#section-5.3). 674 675 origin-form and asterisk-form are subsumed as "relative". 676 """ 677 if self.method == "CONNECT": 678 return "authority" 679 elif self.authority: 680 return "absolute" 681 else: 682 return "relative" 683 684 @property 685 def method(self) -> str: 686 """ 687 HTTP request method, e.g. "GET". 688 """ 689 return self.data.method.decode("utf-8", "surrogateescape").upper() 690 691 @method.setter 692 def method(self, val: Union[str, bytes]) -> None: 693 self.data.method = always_bytes(val, "utf-8", "surrogateescape") 694 695 @property 696 def scheme(self) -> str: 697 """ 698 HTTP request scheme, which should be "http" or "https". 699 """ 700 return self.data.scheme.decode("utf-8", "surrogateescape") 701 702 @scheme.setter 703 def scheme(self, val: Union[str, bytes]) -> None: 704 self.data.scheme = always_bytes(val, "utf-8", "surrogateescape") 705 706 @property 707 def authority(self) -> str: 708 """ 709 HTTP request authority. 710 711 For HTTP/1, this is the authority portion of the request target 712 (in either absolute-form or authority-form). 713 For origin-form and asterisk-form requests, this property is set to an empty string. 714 715 For HTTP/2, this is the :authority pseudo header. 716 717 *See also:* `Request.host`, `Request.host_header`, `Request.pretty_host` 718 """ 719 try: 720 return self.data.authority.decode("idna") 721 except UnicodeError: 722 return self.data.authority.decode("utf8", "surrogateescape") 723 724 @authority.setter 725 def authority(self, val: Union[str, bytes]) -> None: 726 if isinstance(val, str): 727 try: 728 val = val.encode("idna", "strict") 729 except UnicodeError: 730 val = val.encode("utf8", "surrogateescape") # type: ignore 731 self.data.authority = val 732 733 @property 734 def host(self) -> str: 735 """ 736 Target server for this request. This may be parsed from the raw request 737 (e.g. from a ``GET http://example.com/ HTTP/1.1`` request line) 738 or inferred from the proxy mode (e.g. an IP in transparent mode). 739 740 Setting the host attribute also updates the host header and authority information, if present. 741 742 *See also:* `Request.authority`, `Request.host_header`, `Request.pretty_host` 743 """ 744 return self.data.host 745 746 @host.setter 747 def host(self, val: Union[str, bytes]) -> None: 748 self.data.host = always_str(val, "idna", "strict") 749 750 # Update host header 751 if "Host" in self.data.headers: 752 self.data.headers["Host"] = val 753 # Update authority 754 if self.data.authority: 755 self.authority = url.hostport(self.scheme, self.host, self.port) 756 757 @property 758 def host_header(self) -> Optional[str]: 759 """ 760 The request's host/authority header. 761 762 This property maps to either ``request.headers["Host"]`` or 763 ``request.authority``, depending on whether it's HTTP/1.x or HTTP/2.0. 764 765 *See also:* `Request.authority`,`Request.host`, `Request.pretty_host` 766 """ 767 if self.is_http2: 768 return self.authority or self.data.headers.get("Host", None) 769 else: 770 return self.data.headers.get("Host", None) 771 772 @host_header.setter 773 def host_header(self, val: Union[None, str, bytes]) -> None: 774 if val is None: 775 if self.is_http2: 776 self.data.authority = b"" 777 self.headers.pop("Host", None) 778 else: 779 if self.is_http2: 780 self.authority = val # type: ignore 781 if not self.is_http2 or "Host" in self.headers: 782 # For h2, we only overwrite, but not create, as :authority is the h2 host header. 783 self.headers["Host"] = val 784 785 @property 786 def port(self) -> int: 787 """ 788 Target port. 789 """ 790 return self.data.port 791 792 @port.setter 793 def port(self, port: int) -> None: 794 self.data.port = port 795 796 @property 797 def path(self) -> str: 798 """ 799 HTTP request path, e.g. "/index.html". 800 Usually starts with a slash, except for OPTIONS requests, which may just be "*". 801 """ 802 return self.data.path.decode("utf-8", "surrogateescape") 803 804 @path.setter 805 def path(self, val: Union[str, bytes]) -> None: 806 self.data.path = always_bytes(val, "utf-8", "surrogateescape") 807 808 @property 809 def url(self) -> str: 810 """ 811 The full URL string, constructed from `Request.scheme`, `Request.host`, `Request.port` and `Request.path`. 812 813 Settings this property updates these attributes as well. 814 """ 815 if self.first_line_format == "authority": 816 return f"{self.host}:{self.port}" 817 return url.unparse(self.scheme, self.host, self.port, self.path) 818 819 @url.setter 820 def url(self, val: Union[str, bytes]) -> None: 821 val = always_str(val, "utf-8", "surrogateescape") 822 self.scheme, self.host, self.port, self.path = url.parse(val) 823 824 @property 825 def pretty_host(self) -> str: 826 """ 827 *Read-only:* Like `Request.host`, but using `Request.host_header` header as an additional (preferred) data source. 828 This is useful in transparent mode where `Request.host` is only an IP address. 829 830 *Warning:* When working in adversarial environments, this may not reflect the actual destination 831 as the Host header could be spoofed. 832 """ 833 authority = self.host_header 834 if authority: 835 return url.parse_authority(authority, check=False)[0] 836 else: 837 return self.host 838 839 @property 840 def pretty_url(self) -> str: 841 """ 842 *Read-only:* Like `Request.url`, but using `Request.pretty_host` instead of `Request.host`. 843 """ 844 if self.first_line_format == "authority": 845 return self.authority 846 847 host_header = self.host_header 848 if not host_header: 849 return self.url 850 851 pretty_host, pretty_port = url.parse_authority(host_header, check=False) 852 pretty_port = pretty_port or url.default_port(self.scheme) or 443 853 854 return url.unparse(self.scheme, pretty_host, pretty_port, self.path) 855 856 def _get_query(self): 857 query = urllib.parse.urlparse(self.url).query 858 return tuple(url.decode(query)) 859 860 def _set_query(self, query_data): 861 query = url.encode(query_data) 862 _, _, path, params, _, fragment = urllib.parse.urlparse(self.url) 863 self.path = urllib.parse.urlunparse(["", "", path, params, query, fragment]) 864 865 @property 866 def query(self) -> multidict.MultiDictView[str, str]: 867 """ 868 The request query as a mutable mapping view on the request's path. 869 For the most part, this behaves like a dictionary. 870 Modifications to the MultiDictView update `Request.path`, and vice versa. 871 """ 872 return multidict.MultiDictView(self._get_query, self._set_query) 873 874 @query.setter 875 def query(self, value): 876 self._set_query(value) 877 878 def _get_cookies(self): 879 h = self.headers.get_all("Cookie") 880 return tuple(cookies.parse_cookie_headers(h)) 881 882 def _set_cookies(self, value): 883 self.headers["cookie"] = cookies.format_cookie_header(value) 884 885 @property 886 def cookies(self) -> multidict.MultiDictView[str, str]: 887 """ 888 The request cookies. 889 For the most part, this behaves like a dictionary. 890 Modifications to the MultiDictView update `Request.headers`, and vice versa. 891 """ 892 return multidict.MultiDictView(self._get_cookies, self._set_cookies) 893 894 @cookies.setter 895 def cookies(self, value): 896 self._set_cookies(value) 897 898 @property 899 def path_components(self) -> tuple[str, ...]: 900 """ 901 The URL's path components as a tuple of strings. 902 Components are unquoted. 903 """ 904 path = urllib.parse.urlparse(self.url).path 905 # This needs to be a tuple so that it's immutable. 906 # Otherwise, this would fail silently: 907 # request.path_components.append("foo") 908 return tuple(url.unquote(i) for i in path.split("/") if i) 909 910 @path_components.setter 911 def path_components(self, components: Iterable[str]): 912 components = map(lambda x: url.quote(x, safe=""), components) 913 path = "/" + "/".join(components) 914 _, _, _, params, query, fragment = urllib.parse.urlparse(self.url) 915 self.path = urllib.parse.urlunparse(["", "", path, params, query, fragment]) 916 917 def anticache(self) -> None: 918 """ 919 Modifies this request to remove headers that might produce a cached response. 920 """ 921 delheaders = ( 922 "if-modified-since", 923 "if-none-match", 924 ) 925 for i in delheaders: 926 self.headers.pop(i, None) 927 928 def anticomp(self) -> None: 929 """ 930 Modify the Accept-Encoding header to only accept uncompressed responses. 931 """ 932 self.headers["accept-encoding"] = "identity" 933 934 def constrain_encoding(self) -> None: 935 """ 936 Limits the permissible Accept-Encoding values, based on what we can decode appropriately. 937 """ 938 accept_encoding = self.headers.get("accept-encoding") 939 if accept_encoding: 940 self.headers["accept-encoding"] = ", ".join( 941 e 942 for e in {"gzip", "identity", "deflate", "br", "zstd"} 943 if e in accept_encoding 944 ) 945 946 def _get_urlencoded_form(self): 947 is_valid_content_type = ( 948 "application/x-www-form-urlencoded" 949 in self.headers.get("content-type", "").lower() 950 ) 951 if is_valid_content_type: 952 return tuple(url.decode(self.get_text(strict=False))) 953 return () 954 955 def _set_urlencoded_form(self, form_data): 956 """ 957 Sets the body to the URL-encoded form data, and adds the appropriate content-type header. 958 This will overwrite the existing content if there is one. 959 """ 960 self.headers["content-type"] = "application/x-www-form-urlencoded" 961 self.content = url.encode(form_data, self.get_text(strict=False)).encode() 962 963 @property 964 def urlencoded_form(self) -> multidict.MultiDictView[str, str]: 965 """ 966 The URL-encoded form data. 967 968 If the content-type indicates non-form data or the form could not be parsed, this is set to 969 an empty `MultiDictView`. 970 971 Modifications to the MultiDictView update `Request.content`, and vice versa. 972 """ 973 return multidict.MultiDictView( 974 self._get_urlencoded_form, self._set_urlencoded_form 975 ) 976 977 @urlencoded_form.setter 978 def urlencoded_form(self, value): 979 self._set_urlencoded_form(value) 980 981 def _get_multipart_form(self): 982 is_valid_content_type = ( 983 "multipart/form-data" in self.headers.get("content-type", "").lower() 984 ) 985 if is_valid_content_type: 986 try: 987 return multipart.decode(self.headers.get("content-type"), self.content) 988 except ValueError: 989 pass 990 return () 991 992 def _set_multipart_form(self, value): 993 is_valid_content_type = ( 994 self.headers.get("content-type", "") 995 .lower() 996 .startswith("multipart/form-data") 997 ) 998 if not is_valid_content_type: 999 """ 1000 Generate a random boundary here. 1001 1002 See <https://datatracker.ietf.org/doc/html/rfc2046#section-5.1.1> for specifications 1003 on generating the boundary. 1004 """ 1005 boundary = "-" * 20 + binascii.hexlify(os.urandom(16)).decode() 1006 self.headers["content-type"] = f"multipart/form-data; boundary={boundary}" 1007 self.content = multipart.encode(self.headers, value) 1008 1009 @property 1010 def multipart_form(self) -> multidict.MultiDictView[bytes, bytes]: 1011 """ 1012 The multipart form data. 1013 1014 If the content-type indicates non-form data or the form could not be parsed, this is set to 1015 an empty `MultiDictView`. 1016 1017 Modifications to the MultiDictView update `Request.content`, and vice versa. 1018 """ 1019 return multidict.MultiDictView( 1020 self._get_multipart_form, self._set_multipart_form 1021 ) 1022 1023 @multipart_form.setter 1024 def multipart_form(self, value): 1025 self._set_multipart_form(value)
An HTTP request.
548 def __init__( 549 self, 550 host: str, 551 port: int, 552 method: bytes, 553 scheme: bytes, 554 authority: bytes, 555 path: bytes, 556 http_version: bytes, 557 headers: Union[Headers, tuple[tuple[bytes, bytes], ...]], 558 content: Optional[bytes], 559 trailers: Union[Headers, tuple[tuple[bytes, bytes], ...], None], 560 timestamp_start: float, 561 timestamp_end: Optional[float], 562 ): 563 # auto-convert invalid types to retain compatibility with older code. 564 if isinstance(host, bytes): 565 host = host.decode("idna", "strict") 566 if isinstance(method, str): 567 method = method.encode("ascii", "strict") 568 if isinstance(scheme, str): 569 scheme = scheme.encode("ascii", "strict") 570 if isinstance(authority, str): 571 authority = authority.encode("ascii", "strict") 572 if isinstance(path, str): 573 path = path.encode("ascii", "strict") 574 if isinstance(http_version, str): 575 http_version = http_version.encode("ascii", "strict") 576 577 if isinstance(content, str): 578 raise ValueError(f"Content must be bytes, not {type(content).__name__}") 579 if not isinstance(headers, Headers): 580 headers = Headers(headers) 581 if trailers is not None and not isinstance(trailers, Headers): 582 trailers = Headers(trailers) 583 584 self.data = RequestData( 585 host=host, 586 port=port, 587 method=method, 588 scheme=scheme, 589 authority=authority, 590 path=path, 591 http_version=http_version, 592 headers=headers, 593 content=content, 594 trailers=trailers, 595 timestamp_start=timestamp_start, 596 timestamp_end=timestamp_end, 597 )
607 @classmethod 608 def make( 609 cls, 610 method: str, 611 url: str, 612 content: Union[bytes, str] = "", 613 headers: Union[ 614 Headers, 615 dict[Union[str, bytes], Union[str, bytes]], 616 Iterable[tuple[bytes, bytes]], 617 ] = (), 618 ) -> "Request": 619 """ 620 Simplified API for creating request objects. 621 """ 622 # Headers can be list or dict, we differentiate here. 623 if isinstance(headers, Headers): 624 pass 625 elif isinstance(headers, dict): 626 headers = Headers( 627 ( 628 always_bytes(k, "utf-8", "surrogateescape"), 629 always_bytes(v, "utf-8", "surrogateescape"), 630 ) 631 for k, v in headers.items() 632 ) 633 elif isinstance(headers, Iterable): 634 headers = Headers(headers) # type: ignore 635 else: 636 raise TypeError( 637 "Expected headers to be an iterable or dict, but is {}.".format( 638 type(headers).__name__ 639 ) 640 ) 641 642 req = cls( 643 "", 644 0, 645 method.encode("utf-8", "surrogateescape"), 646 b"", 647 b"", 648 b"", 649 b"HTTP/1.1", 650 headers, 651 b"", 652 None, 653 time.time(), 654 time.time(), 655 ) 656 657 req.url = url 658 # Assign this manually to update the content-length header. 659 if isinstance(content, bytes): 660 req.content = content 661 elif isinstance(content, str): 662 req.text = content 663 else: 664 raise TypeError( 665 f"Expected content to be str or bytes, but is {type(content).__name__}." 666 ) 667 668 return req
Simplified API for creating request objects.
Read-only: HTTP request form as defined in RFC 7230.
origin-form and asterisk-form are subsumed as "relative".
Target server for this request. This may be parsed from the raw request
(e.g. from a GET http://example.com/ HTTP/1.1
request line)
or inferred from the proxy mode (e.g. an IP in transparent mode).
Setting the host attribute also updates the host header and authority information, if present.
See also: Request.authority
, Request.host_header
, Request.pretty_host
The request's host/authority header.
This property maps to either request.headers["Host"]
or
request.authority
, depending on whether it's HTTP/1.x or HTTP/2.0.
See also: Request.authority
,Request.host
, Request.pretty_host
HTTP request path, e.g. "/index.html". Usually starts with a slash, except for OPTIONS requests, which may just be "*".
The full URL string, constructed from Request.scheme
, Request.host
, Request.port
and Request.path
.
Settings this property updates these attributes as well.
Read-only: Like Request.host
, but using Request.host_header
header as an additional (preferred) data source.
This is useful in transparent mode where Request.host
is only an IP address.
Warning: When working in adversarial environments, this may not reflect the actual destination as the Host header could be spoofed.
The request query as a mutable mapping view on the request's path.
For the most part, this behaves like a dictionary.
Modifications to the MultiDictView update Request.path
, and vice versa.
The URL's path components as a tuple of strings. Components are unquoted.
917 def anticache(self) -> None: 918 """ 919 Modifies this request to remove headers that might produce a cached response. 920 """ 921 delheaders = ( 922 "if-modified-since", 923 "if-none-match", 924 ) 925 for i in delheaders: 926 self.headers.pop(i, None)
Modifies this request to remove headers that might produce a cached response.
928 def anticomp(self) -> None: 929 """ 930 Modify the Accept-Encoding header to only accept uncompressed responses. 931 """ 932 self.headers["accept-encoding"] = "identity"
Modify the Accept-Encoding header to only accept uncompressed responses.
934 def constrain_encoding(self) -> None: 935 """ 936 Limits the permissible Accept-Encoding values, based on what we can decode appropriately. 937 """ 938 accept_encoding = self.headers.get("accept-encoding") 939 if accept_encoding: 940 self.headers["accept-encoding"] = ", ".join( 941 e 942 for e in {"gzip", "identity", "deflate", "br", "zstd"} 943 if e in accept_encoding 944 )
Limits the permissible Accept-Encoding values, based on what we can decode appropriately.
The URL-encoded form data.
If the content-type indicates non-form data or the form could not be parsed, this is set to
an empty MultiDictView
.
Modifications to the MultiDictView update Request.content
, and vice versa.
The multipart form data.
If the content-type indicates non-form data or the form could not be parsed, this is set to
an empty MultiDictView
.
Modifications to the MultiDictView update Request.content
, and vice versa.
Inherited Members
- Message
- stream
- http_version
- headers
- trailers
- raw_content
- content
- text
- set_content
- get_content
- set_text
- get_text
- timestamp_start
- timestamp_end
- decode
- encode
- json
- mitmproxy.coretypes.serializable.Serializable
- copy
1028class Response(Message): 1029 """ 1030 An HTTP response. 1031 """ 1032 1033 data: ResponseData 1034 1035 def __init__( 1036 self, 1037 http_version: bytes, 1038 status_code: int, 1039 reason: bytes, 1040 headers: Union[Headers, tuple[tuple[bytes, bytes], ...]], 1041 content: Optional[bytes], 1042 trailers: Union[None, Headers, tuple[tuple[bytes, bytes], ...]], 1043 timestamp_start: float, 1044 timestamp_end: Optional[float], 1045 ): 1046 # auto-convert invalid types to retain compatibility with older code. 1047 if isinstance(http_version, str): 1048 http_version = http_version.encode("ascii", "strict") 1049 if isinstance(reason, str): 1050 reason = reason.encode("ascii", "strict") 1051 1052 if isinstance(content, str): 1053 raise ValueError(f"Content must be bytes, not {type(content).__name__}") 1054 if not isinstance(headers, Headers): 1055 headers = Headers(headers) 1056 if trailers is not None and not isinstance(trailers, Headers): 1057 trailers = Headers(trailers) 1058 1059 self.data = ResponseData( 1060 http_version=http_version, 1061 status_code=status_code, 1062 reason=reason, 1063 headers=headers, 1064 content=content, 1065 trailers=trailers, 1066 timestamp_start=timestamp_start, 1067 timestamp_end=timestamp_end, 1068 ) 1069 1070 def __repr__(self) -> str: 1071 if self.raw_content: 1072 ct = self.headers.get("content-type", "unknown content type") 1073 size = human.pretty_size(len(self.raw_content)) 1074 details = f"{ct}, {size}" 1075 else: 1076 details = "no content" 1077 return f"Response({self.status_code}, {details})" 1078 1079 @classmethod 1080 def make( 1081 cls, 1082 status_code: int = 200, 1083 content: Union[bytes, str] = b"", 1084 headers: Union[ 1085 Headers, Mapping[str, Union[str, bytes]], Iterable[tuple[bytes, bytes]] 1086 ] = (), 1087 ) -> "Response": 1088 """ 1089 Simplified API for creating response objects. 1090 """ 1091 if isinstance(headers, Headers): 1092 headers = headers 1093 elif isinstance(headers, dict): 1094 headers = Headers( 1095 ( 1096 always_bytes(k, "utf-8", "surrogateescape"), # type: ignore 1097 always_bytes(v, "utf-8", "surrogateescape"), 1098 ) 1099 for k, v in headers.items() 1100 ) 1101 elif isinstance(headers, Iterable): 1102 headers = Headers(headers) # type: ignore 1103 else: 1104 raise TypeError( 1105 "Expected headers to be an iterable or dict, but is {}.".format( 1106 type(headers).__name__ 1107 ) 1108 ) 1109 1110 resp = cls( 1111 b"HTTP/1.1", 1112 status_code, 1113 status_codes.RESPONSES.get(status_code, "").encode(), 1114 headers, 1115 None, 1116 None, 1117 time.time(), 1118 time.time(), 1119 ) 1120 1121 # Assign this manually to update the content-length header. 1122 if isinstance(content, bytes): 1123 resp.content = content 1124 elif isinstance(content, str): 1125 resp.text = content 1126 else: 1127 raise TypeError( 1128 f"Expected content to be str or bytes, but is {type(content).__name__}." 1129 ) 1130 1131 return resp 1132 1133 @property 1134 def status_code(self) -> int: 1135 """ 1136 HTTP Status Code, e.g. ``200``. 1137 """ 1138 return self.data.status_code 1139 1140 @status_code.setter 1141 def status_code(self, status_code: int) -> None: 1142 self.data.status_code = status_code 1143 1144 @property 1145 def reason(self) -> str: 1146 """ 1147 HTTP reason phrase, for example "Not Found". 1148 1149 HTTP/2 responses do not contain a reason phrase, an empty string will be returned instead. 1150 """ 1151 # Encoding: http://stackoverflow.com/a/16674906/934719 1152 return self.data.reason.decode("ISO-8859-1") 1153 1154 @reason.setter 1155 def reason(self, reason: Union[str, bytes]) -> None: 1156 self.data.reason = strutils.always_bytes(reason, "ISO-8859-1") 1157 1158 def _get_cookies(self): 1159 h = self.headers.get_all("set-cookie") 1160 all_cookies = cookies.parse_set_cookie_headers(h) 1161 return tuple((name, (value, attrs)) for name, value, attrs in all_cookies) 1162 1163 def _set_cookies(self, value): 1164 cookie_headers = [] 1165 for k, v in value: 1166 header = cookies.format_set_cookie_header([(k, v[0], v[1])]) 1167 cookie_headers.append(header) 1168 self.headers.set_all("set-cookie", cookie_headers) 1169 1170 @property 1171 def cookies( 1172 self, 1173 ) -> multidict.MultiDictView[ 1174 str, tuple[str, multidict.MultiDict[str, Optional[str]]] 1175 ]: 1176 """ 1177 The response cookies. A possibly empty `MultiDictView`, where the keys are cookie 1178 name strings, and values are `(cookie value, attributes)` tuples. Within 1179 attributes, unary attributes (e.g. `HTTPOnly`) are indicated by a `None` value. 1180 Modifications to the MultiDictView update `Response.headers`, and vice versa. 1181 1182 *Warning:* Changes to `attributes` will not be picked up unless you also reassign 1183 the `(cookie value, attributes)` tuple directly in the `MultiDictView`. 1184 """ 1185 return multidict.MultiDictView(self._get_cookies, self._set_cookies) 1186 1187 @cookies.setter 1188 def cookies(self, value): 1189 self._set_cookies(value) 1190 1191 def refresh(self, now=None): 1192 """ 1193 This fairly complex and heuristic function refreshes a server 1194 response for replay. 1195 1196 - It adjusts date, expires, and last-modified headers. 1197 - It adjusts cookie expiration. 1198 """ 1199 if not now: 1200 now = time.time() 1201 delta = now - self.timestamp_start 1202 refresh_headers = [ 1203 "date", 1204 "expires", 1205 "last-modified", 1206 ] 1207 for i in refresh_headers: 1208 if i in self.headers: 1209 d = parsedate_tz(self.headers[i]) 1210 if d: 1211 new = mktime_tz(d) + delta 1212 try: 1213 self.headers[i] = formatdate(new, usegmt=True) 1214 except OSError: # pragma: no cover 1215 pass # value out of bounds on Windows only (which is why we exclude it from coverage). 1216 c = [] 1217 for set_cookie_header in self.headers.get_all("set-cookie"): 1218 try: 1219 refreshed = cookies.refresh_set_cookie_header(set_cookie_header, delta) 1220 except ValueError: 1221 refreshed = set_cookie_header 1222 c.append(refreshed) 1223 if c: 1224 self.headers.set_all("set-cookie", c)
An HTTP response.
1035 def __init__( 1036 self, 1037 http_version: bytes, 1038 status_code: int, 1039 reason: bytes, 1040 headers: Union[Headers, tuple[tuple[bytes, bytes], ...]], 1041 content: Optional[bytes], 1042 trailers: Union[None, Headers, tuple[tuple[bytes, bytes], ...]], 1043 timestamp_start: float, 1044 timestamp_end: Optional[float], 1045 ): 1046 # auto-convert invalid types to retain compatibility with older code. 1047 if isinstance(http_version, str): 1048 http_version = http_version.encode("ascii", "strict") 1049 if isinstance(reason, str): 1050 reason = reason.encode("ascii", "strict") 1051 1052 if isinstance(content, str): 1053 raise ValueError(f"Content must be bytes, not {type(content).__name__}") 1054 if not isinstance(headers, Headers): 1055 headers = Headers(headers) 1056 if trailers is not None and not isinstance(trailers, Headers): 1057 trailers = Headers(trailers) 1058 1059 self.data = ResponseData( 1060 http_version=http_version, 1061 status_code=status_code, 1062 reason=reason, 1063 headers=headers, 1064 content=content, 1065 trailers=trailers, 1066 timestamp_start=timestamp_start, 1067 timestamp_end=timestamp_end, 1068 )
1079 @classmethod 1080 def make( 1081 cls, 1082 status_code: int = 200, 1083 content: Union[bytes, str] = b"", 1084 headers: Union[ 1085 Headers, Mapping[str, Union[str, bytes]], Iterable[tuple[bytes, bytes]] 1086 ] = (), 1087 ) -> "Response": 1088 """ 1089 Simplified API for creating response objects. 1090 """ 1091 if isinstance(headers, Headers): 1092 headers = headers 1093 elif isinstance(headers, dict): 1094 headers = Headers( 1095 ( 1096 always_bytes(k, "utf-8", "surrogateescape"), # type: ignore 1097 always_bytes(v, "utf-8", "surrogateescape"), 1098 ) 1099 for k, v in headers.items() 1100 ) 1101 elif isinstance(headers, Iterable): 1102 headers = Headers(headers) # type: ignore 1103 else: 1104 raise TypeError( 1105 "Expected headers to be an iterable or dict, but is {}.".format( 1106 type(headers).__name__ 1107 ) 1108 ) 1109 1110 resp = cls( 1111 b"HTTP/1.1", 1112 status_code, 1113 status_codes.RESPONSES.get(status_code, "").encode(), 1114 headers, 1115 None, 1116 None, 1117 time.time(), 1118 time.time(), 1119 ) 1120 1121 # Assign this manually to update the content-length header. 1122 if isinstance(content, bytes): 1123 resp.content = content 1124 elif isinstance(content, str): 1125 resp.text = content 1126 else: 1127 raise TypeError( 1128 f"Expected content to be str or bytes, but is {type(content).__name__}." 1129 ) 1130 1131 return resp
Simplified API for creating response objects.
HTTP reason phrase, for example "Not Found".
HTTP/2 responses do not contain a reason phrase, an empty string will be returned instead.
1191 def refresh(self, now=None): 1192 """ 1193 This fairly complex and heuristic function refreshes a server 1194 response for replay. 1195 1196 - It adjusts date, expires, and last-modified headers. 1197 - It adjusts cookie expiration. 1198 """ 1199 if not now: 1200 now = time.time() 1201 delta = now - self.timestamp_start 1202 refresh_headers = [ 1203 "date", 1204 "expires", 1205 "last-modified", 1206 ] 1207 for i in refresh_headers: 1208 if i in self.headers: 1209 d = parsedate_tz(self.headers[i]) 1210 if d: 1211 new = mktime_tz(d) + delta 1212 try: 1213 self.headers[i] = formatdate(new, usegmt=True) 1214 except OSError: # pragma: no cover 1215 pass # value out of bounds on Windows only (which is why we exclude it from coverage). 1216 c = [] 1217 for set_cookie_header in self.headers.get_all("set-cookie"): 1218 try: 1219 refreshed = cookies.refresh_set_cookie_header(set_cookie_header, delta) 1220 except ValueError: 1221 refreshed = set_cookie_header 1222 c.append(refreshed) 1223 if c: 1224 self.headers.set_all("set-cookie", c)
This fairly complex and heuristic function refreshes a server response for replay.
- It adjusts date, expires, and last-modified headers.
- It adjusts cookie expiration.
Inherited Members
- Message
- stream
- http_version
- headers
- trailers
- raw_content
- content
- text
- set_content
- get_content
- set_text
- get_text
- timestamp_start
- timestamp_end
- decode
- encode
- json
- mitmproxy.coretypes.serializable.Serializable
- copy
51class Headers(multidict.MultiDict): # type: ignore 52 """ 53 Header class which allows both convenient access to individual headers as well as 54 direct access to the underlying raw data. Provides a full dictionary interface. 55 56 Create headers with keyword arguments: 57 >>> h = Headers(host="example.com", content_type="application/xml") 58 59 Headers mostly behave like a normal dict: 60 >>> h["Host"] 61 "example.com" 62 63 Headers are case insensitive: 64 >>> h["host"] 65 "example.com" 66 67 Headers can also be created from a list of raw (header_name, header_value) byte tuples: 68 >>> h = Headers([ 69 (b"Host",b"example.com"), 70 (b"Accept",b"text/html"), 71 (b"accept",b"application/xml") 72 ]) 73 74 Multiple headers are folded into a single header as per RFC 7230: 75 >>> h["Accept"] 76 "text/html, application/xml" 77 78 Setting a header removes all existing headers with the same name: 79 >>> h["Accept"] = "application/text" 80 >>> h["Accept"] 81 "application/text" 82 83 `bytes(h)` returns an HTTP/1 header block: 84 >>> print(bytes(h)) 85 Host: example.com 86 Accept: application/text 87 88 For full control, the raw header fields can be accessed: 89 >>> h.fields 90 91 Caveats: 92 - For use with the "Set-Cookie" and "Cookie" headers, either use `Response.cookies` or see `Headers.get_all`. 93 """ 94 95 def __init__(self, fields: Iterable[tuple[bytes, bytes]] = (), **headers): 96 """ 97 *Args:* 98 - *fields:* (optional) list of ``(name, value)`` header byte tuples, 99 e.g. ``[(b"Host", b"example.com")]``. All names and values must be bytes. 100 - *\\*\\*headers:* Additional headers to set. Will overwrite existing values from `fields`. 101 For convenience, underscores in header names will be transformed to dashes - 102 this behaviour does not extend to other methods. 103 104 If ``**headers`` contains multiple keys that have equal ``.lower()`` representations, 105 the behavior is undefined. 106 """ 107 super().__init__(fields) 108 109 for key, value in self.fields: 110 if not isinstance(key, bytes) or not isinstance(value, bytes): 111 raise TypeError("Header fields must be bytes.") 112 113 # content_type -> content-type 114 self.update( 115 { 116 _always_bytes(name).replace(b"_", b"-"): _always_bytes(value) 117 for name, value in headers.items() 118 } 119 ) 120 121 fields: tuple[tuple[bytes, bytes], ...] 122 123 @staticmethod 124 def _reduce_values(values) -> str: 125 # Headers can be folded 126 return ", ".join(values) 127 128 @staticmethod 129 def _kconv(key) -> str: 130 # Headers are case-insensitive 131 return key.lower() 132 133 def __bytes__(self) -> bytes: 134 if self.fields: 135 return b"\r\n".join(b": ".join(field) for field in self.fields) + b"\r\n" 136 else: 137 return b"" 138 139 def __delitem__(self, key: Union[str, bytes]) -> None: 140 key = _always_bytes(key) 141 super().__delitem__(key) 142 143 def __iter__(self) -> Iterator[str]: 144 for x in super().__iter__(): 145 yield _native(x) 146 147 def get_all(self, name: Union[str, bytes]) -> list[str]: 148 """ 149 Like `Headers.get`, but does not fold multiple headers into a single one. 150 This is useful for Set-Cookie and Cookie headers, which do not support folding. 151 152 *See also:* 153 - <https://tools.ietf.org/html/rfc7230#section-3.2.2> 154 - <https://datatracker.ietf.org/doc/html/rfc6265#section-5.4> 155 - <https://datatracker.ietf.org/doc/html/rfc7540#section-8.1.2.5> 156 """ 157 name = _always_bytes(name) 158 return [_native(x) for x in super().get_all(name)] 159 160 def set_all(self, name: Union[str, bytes], values: list[Union[str, bytes]]): 161 """ 162 Explicitly set multiple headers for the given key. 163 See `Headers.get_all`. 164 """ 165 name = _always_bytes(name) 166 values = [_always_bytes(x) for x in values] 167 return super().set_all(name, values) 168 169 def insert(self, index: int, key: Union[str, bytes], value: Union[str, bytes]): 170 key = _always_bytes(key) 171 value = _always_bytes(value) 172 super().insert(index, key, value) 173 174 def items(self, multi=False): 175 if multi: 176 return ((_native(k), _native(v)) for k, v in self.fields) 177 else: 178 return super().items()
Header class which allows both convenient access to individual headers as well as direct access to the underlying raw data. Provides a full dictionary interface.
Create headers with keyword arguments:
>>> h = Headers(host="example.com", content_type="application/xml")
Headers mostly behave like a normal dict:
>>> h["Host"]
"example.com"
Headers are case insensitive:
>>> h["host"]
"example.com"
Headers can also be created from a list of raw (header_name, header_value) byte tuples:
>>> h = Headers([
(b"Host",b"example.com"),
(b"Accept",b"text/html"),
(b"accept",b"application/xml")
])
Multiple headers are folded into a single header as per RFC 7230:
>>> h["Accept"]
"text/html, application/xml"
Setting a header removes all existing headers with the same name:
>>> h["Accept"] = "application/text"
>>> h["Accept"]
"application/text"
bytes(h)
returns an HTTP/1 header block:
>>> print(bytes(h))
Host: example.com
Accept: application/text
For full control, the raw header fields can be accessed:
>>> h.fields
Caveats:
- For use with the "Set-Cookie" and "Cookie" headers, either use
Response.cookies
or seeHeaders.get_all
.
95 def __init__(self, fields: Iterable[tuple[bytes, bytes]] = (), **headers): 96 """ 97 *Args:* 98 - *fields:* (optional) list of ``(name, value)`` header byte tuples, 99 e.g. ``[(b"Host", b"example.com")]``. All names and values must be bytes. 100 - *\\*\\*headers:* Additional headers to set. Will overwrite existing values from `fields`. 101 For convenience, underscores in header names will be transformed to dashes - 102 this behaviour does not extend to other methods. 103 104 If ``**headers`` contains multiple keys that have equal ``.lower()`` representations, 105 the behavior is undefined. 106 """ 107 super().__init__(fields) 108 109 for key, value in self.fields: 110 if not isinstance(key, bytes) or not isinstance(value, bytes): 111 raise TypeError("Header fields must be bytes.") 112 113 # content_type -> content-type 114 self.update( 115 { 116 _always_bytes(name).replace(b"_", b"-"): _always_bytes(value) 117 for name, value in headers.items() 118 } 119 )
Args:
- fields: (optional) list of
(name, value)
header byte tuples, e.g.[(b"Host", b"example.com")]
. All names and values must be bytes. - **headers: Additional headers to set. Will overwrite existing values from
fields
. For convenience, underscores in header names will be transformed to dashes - this behaviour does not extend to other methods.
If **headers
contains multiple keys that have equal .lower()
representations,
the behavior is undefined.
147 def get_all(self, name: Union[str, bytes]) -> list[str]: 148 """ 149 Like `Headers.get`, but does not fold multiple headers into a single one. 150 This is useful for Set-Cookie and Cookie headers, which do not support folding. 151 152 *See also:* 153 - <https://tools.ietf.org/html/rfc7230#section-3.2.2> 154 - <https://datatracker.ietf.org/doc/html/rfc6265#section-5.4> 155 - <https://datatracker.ietf.org/doc/html/rfc7540#section-8.1.2.5> 156 """ 157 name = _always_bytes(name) 158 return [_native(x) for x in super().get_all(name)]
Like Headers.get
, but does not fold multiple headers into a single one.
This is useful for Set-Cookie and Cookie headers, which do not support folding.
See also:
160 def set_all(self, name: Union[str, bytes], values: list[Union[str, bytes]]): 161 """ 162 Explicitly set multiple headers for the given key. 163 See `Headers.get_all`. 164 """ 165 name = _always_bytes(name) 166 values = [_always_bytes(x) for x in values] 167 return super().set_all(name, values)
Explicitly set multiple headers for the given key.
See Headers.get_all
.
169 def insert(self, index: int, key: Union[str, bytes], value: Union[str, bytes]): 170 key = _always_bytes(key) 171 value = _always_bytes(value) 172 super().insert(index, key, value)
Insert an additional value for the given key at the specified position.
174 def items(self, multi=False): 175 if multi: 176 return ((_native(k), _native(v)) for k, v in self.fields) 177 else: 178 return super().items()
Get all (key, value) tuples.
If multi
is True, all (key, value)
pairs will be returned.
If False, only one tuple per key is returned.
Inherited Members
- mitmproxy.coretypes.serializable.Serializable
- copy
- collections.abc.MutableMapping
- pop
- popitem
- clear
- update
- setdefault
- collections.abc.Mapping
- get