Edit on GitHub

mitmproxy.connection

  1import uuid
  2import warnings
  3from abc import ABCMeta
  4from collections.abc import Sequence
  5from enum import Flag
  6from typing import Literal, Optional
  7
  8from mitmproxy import certs
  9from mitmproxy.coretypes import serializable
 10from mitmproxy.proxy import mode_specs
 11from mitmproxy.net import server_spec
 12from mitmproxy.utils import human
 13
 14
 15class ConnectionState(Flag):
 16    """The current state of the underlying socket."""
 17
 18    CLOSED = 0
 19    CAN_READ = 1
 20    CAN_WRITE = 2
 21    OPEN = CAN_READ | CAN_WRITE
 22
 23
 24TransportProtocol = Literal["tcp", "udp"]
 25
 26
 27# practically speaking we may have IPv6 addresses with flowinfo and scope_id,
 28# but type checking isn't good enough to properly handle tuple unions.
 29# this version at least provides useful type checking messages.
 30Address = tuple[str, int]
 31
 32
 33class Connection(serializable.Serializable, metaclass=ABCMeta):
 34    """
 35    Base class for client and server connections.
 36
 37    The connection object only exposes metadata about the connection, but not the underlying socket object.
 38    This is intentional, all I/O should be handled by `mitmproxy.proxy.server` exclusively.
 39    """
 40
 41    # all connections have a unique id. While
 42    # f.client_conn == f2.client_conn already holds true for live flows (where we have object identity),
 43    # we also want these semantics for recorded flows.
 44    id: str
 45    """A unique UUID to identify the connection."""
 46    state: ConnectionState
 47    """The current connection state."""
 48    transport_protocol: TransportProtocol
 49    """The connection protocol in use."""
 50    peername: Optional[Address]
 51    """The remote's `(ip, port)` tuple for this connection."""
 52    sockname: Optional[Address]
 53    """Our local `(ip, port)` tuple for this connection."""
 54    error: Optional[str] = None
 55    """
 56    A string describing a general error with connections to this address.
 57
 58    The purpose of this property is to signal that new connections to the particular endpoint should not be attempted,
 59    for example because it uses an untrusted TLS certificate. Regular (unexpected) disconnects do not set the error
 60    property. This property is only reused per client connection.
 61    """
 62
 63    tls: bool = False
 64    """
 65    `True` if TLS should be established, `False` otherwise.
 66    Note that this property only describes if a connection should eventually be protected using TLS.
 67    To check if TLS has already been established, use `Connection.tls_established`.
 68    """
 69    certificate_list: Sequence[certs.Cert] = ()
 70    """
 71    The TLS certificate list as sent by the peer.
 72    The first certificate is the end-entity certificate.
 73
 74    > [RFC 8446] Prior to TLS 1.3, "certificate_list" ordering required each
 75    > certificate to certify the one immediately preceding it; however,
 76    > some implementations allowed some flexibility.  Servers sometimes
 77    > send both a current and deprecated intermediate for transitional
 78    > purposes, and others are simply configured incorrectly, but these
 79    > cases can nonetheless be validated properly.  For maximum
 80    > compatibility, all implementations SHOULD be prepared to handle
 81    > potentially extraneous certificates and arbitrary orderings from any
 82    > TLS version, with the exception of the end-entity certificate which
 83    > MUST be first.
 84    """
 85    alpn: Optional[bytes] = None
 86    """The application-layer protocol as negotiated using
 87    [ALPN](https://en.wikipedia.org/wiki/Application-Layer_Protocol_Negotiation)."""
 88    alpn_offers: Sequence[bytes] = ()
 89    """The ALPN offers as sent in the ClientHello."""
 90    # we may want to add SSL_CIPHER_description here, but that's currently not exposed by cryptography
 91    cipher: Optional[str] = None
 92    """The active cipher name as returned by OpenSSL's `SSL_CIPHER_get_name`."""
 93    cipher_list: Sequence[str] = ()
 94    """Ciphers accepted by the proxy server on this connection."""
 95    tls_version: Optional[str] = None
 96    """The active TLS version."""
 97    sni: Optional[str] = None
 98    """
 99    The [Server Name Indication (SNI)](https://en.wikipedia.org/wiki/Server_Name_Indication) sent in the ClientHello.
100    """
101
102    timestamp_start: Optional[float]
103    timestamp_end: Optional[float] = None
104    """*Timestamp:* Connection has been closed."""
105    timestamp_tls_setup: Optional[float] = None
106    """*Timestamp:* TLS handshake has been completed successfully."""
107
108    @property
109    def connected(self) -> bool:
110        """*Read-only:* `True` if Connection.state is ConnectionState.OPEN, `False` otherwise."""
111        return self.state is ConnectionState.OPEN
112
113    @property
114    def tls_established(self) -> bool:
115        """*Read-only:* `True` if TLS has been established, `False` otherwise."""
116        return self.timestamp_tls_setup is not None
117
118    def __eq__(self, other):
119        if isinstance(other, Connection):
120            return self.id == other.id
121        return False
122
123    def __hash__(self):
124        return hash(self.id)
125
126    def __repr__(self):
127        attrs = repr(
128            {
129                k: {
130                    "cipher_list": lambda: f"<{len(v)} ciphers>",
131                    "id": lambda: f"…{v[-6:]}",
132                }.get(k, lambda: v)()
133                for k, v in self.__dict__.items()
134            }
135        )
136        return f"{type(self).__name__}({attrs})"
137
138    @property
139    def alpn_proto_negotiated(self) -> Optional[bytes]:  # pragma: no cover
140        """*Deprecated:* An outdated alias for Connection.alpn."""
141        warnings.warn(
142            "Connection.alpn_proto_negotiated is deprecated, use Connection.alpn instead.",
143            DeprecationWarning,
144            stacklevel=2,
145        )
146        return self.alpn
147
148
149class Client(Connection):
150    """A connection between a client and mitmproxy."""
151
152    peername: Address
153    """The client's address."""
154    sockname: Address
155    """The local address we received this connection on."""
156
157    mitmcert: Optional[certs.Cert] = None
158    """
159    The certificate used by mitmproxy to establish TLS with the client.
160    """
161
162    proxy_mode: mode_specs.ProxyMode
163    """The proxy server type this client has been connecting to."""
164
165    timestamp_start: float
166    """*Timestamp:* TCP SYN received"""
167
168    def __init__(
169        self,
170        peername: Address,
171        sockname: Address,
172        timestamp_start: float,
173        *,
174        transport_protocol: TransportProtocol = "tcp",
175        proxy_mode: mode_specs.ProxyMode = mode_specs.ProxyMode.parse("regular"),
176    ):
177        self.id = str(uuid.uuid4())
178        self.peername = peername
179        self.sockname = sockname
180        self.timestamp_start = timestamp_start
181        self.state = ConnectionState.OPEN
182        self.transport_protocol = transport_protocol
183        self.proxy_mode = proxy_mode
184
185    def __str__(self):
186        if self.alpn:
187            tls_state = f", alpn={self.alpn.decode(errors='replace')}"
188        elif self.tls_established:
189            tls_state = ", tls"
190        else:
191            tls_state = ""
192        return f"Client({human.format_address(self.peername)}, state={self.state.name.lower()}{tls_state})"
193
194    def get_state(self):
195        # Important: Retain full compatibility with old proxy core for now!
196        # This means we need to add all new fields to the old implementation.
197        return {
198            "address": self.peername,
199            "alpn": self.alpn,
200            "cipher_name": self.cipher,
201            "id": self.id,
202            "mitmcert": self.mitmcert.get_state()
203            if self.mitmcert is not None
204            else None,
205            "sni": self.sni,
206            "timestamp_end": self.timestamp_end,
207            "timestamp_start": self.timestamp_start,
208            "timestamp_tls_setup": self.timestamp_tls_setup,
209            "tls_established": self.tls_established,
210            "tls_extensions": [],
211            "tls_version": self.tls_version,
212            # only used in sans-io
213            "state": self.state.value,
214            "sockname": self.sockname,
215            "error": self.error,
216            "tls": self.tls,
217            "certificate_list": [x.get_state() for x in self.certificate_list],
218            "alpn_offers": self.alpn_offers,
219            "cipher_list": self.cipher_list,
220            "proxy_mode": self.proxy_mode.get_state(),
221        }
222
223    @classmethod
224    def from_state(cls, state) -> "Client":
225        client = Client(state["address"], ("mitmproxy", 8080), state["timestamp_start"])
226        client.set_state(state)
227        return client
228
229    def set_state(self, state):
230        self.peername = tuple(state["address"]) if state["address"] else None
231        self.alpn = state["alpn"]
232        self.cipher = state["cipher_name"]
233        self.id = state["id"]
234        self.sni = state["sni"]
235        self.timestamp_end = state["timestamp_end"]
236        self.timestamp_start = state["timestamp_start"]
237        self.timestamp_tls_setup = state["timestamp_tls_setup"]
238        self.tls_version = state["tls_version"]
239        # only used in sans-io
240        self.state = ConnectionState(state["state"])
241        self.sockname = tuple(state["sockname"]) if state["sockname"] else None
242        self.error = state["error"]
243        self.tls = state["tls"]
244        self.certificate_list = [
245            certs.Cert.from_state(x) for x in state["certificate_list"]
246        ]
247        self.mitmcert = (
248            certs.Cert.from_state(state["mitmcert"])
249            if state["mitmcert"] is not None
250            else None
251        )
252        self.alpn_offers = state["alpn_offers"]
253        self.cipher_list = state["cipher_list"]
254        self.proxy_mode = mode_specs.ProxyMode.from_state(state["proxy_mode"])
255
256    @property
257    def address(self):  # pragma: no cover
258        """*Deprecated:* An outdated alias for Client.peername."""
259        warnings.warn(
260            "Client.address is deprecated, use Client.peername instead.",
261            DeprecationWarning,
262            stacklevel=2,
263        )
264        return self.peername
265
266    @address.setter
267    def address(self, x):  # pragma: no cover
268        warnings.warn(
269            "Client.address is deprecated, use Client.peername instead.",
270            DeprecationWarning,
271            stacklevel=2,
272        )
273        self.peername = x
274
275    @property
276    def cipher_name(self) -> Optional[str]:  # pragma: no cover
277        """*Deprecated:* An outdated alias for Connection.cipher."""
278        warnings.warn(
279            "Client.cipher_name is deprecated, use Client.cipher instead.",
280            DeprecationWarning,
281            stacklevel=2,
282        )
283        return self.cipher
284
285    @property
286    def clientcert(self) -> Optional[certs.Cert]:  # pragma: no cover
287        """*Deprecated:* An outdated alias for Connection.certificate_list[0]."""
288        warnings.warn(
289            "Client.clientcert is deprecated, use Client.certificate_list instead.",
290            DeprecationWarning,
291            stacklevel=2,
292        )
293        if self.certificate_list:
294            return self.certificate_list[0]
295        else:
296            return None
297
298    @clientcert.setter
299    def clientcert(self, val):  # pragma: no cover
300        warnings.warn(
301            "Client.clientcert is deprecated, use Client.certificate_list instead.",
302            DeprecationWarning,
303            stacklevel=2,
304        )
305        if val:
306            self.certificate_list = [val]
307        else:
308            self.certificate_list = []
309
310
311class Server(Connection):
312    """A connection between mitmproxy and an upstream server."""
313
314    peername: Optional[Address] = None
315    """The server's resolved `(ip, port)` tuple. Will be set during connection establishment."""
316    sockname: Optional[Address] = None
317    address: Optional[Address]
318    """The server's `(host, port)` address tuple. The host can either be a domain or a plain IP address."""
319
320    timestamp_start: Optional[float] = None
321    """*Timestamp:* TCP SYN sent."""
322    timestamp_tcp_setup: Optional[float] = None
323    """*Timestamp:* TCP ACK received."""
324
325    via: Optional[server_spec.ServerSpec] = None
326    """An optional proxy server specification via which the connection should be established."""
327
328    def __init__(
329        self,
330        address: Optional[Address],
331        *,
332        transport_protocol: TransportProtocol = "tcp",
333    ):
334        self.id = str(uuid.uuid4())
335        self.address = address
336        self.state = ConnectionState.CLOSED
337        self.transport_protocol = transport_protocol
338
339    def __str__(self):
340        if self.alpn:
341            tls_state = f", alpn={self.alpn.decode(errors='replace')}"
342        elif self.tls_established:
343            tls_state = ", tls"
344        else:
345            tls_state = ""
346        if self.sockname:
347            local_port = f", src_port={self.sockname[1]}"
348        else:
349            local_port = ""
350        return f"Server({human.format_address(self.address)}, state={self.state.name.lower()}{tls_state}{local_port})"
351
352    def __setattr__(self, name, value):
353        if name in ("address", "via"):
354            connection_open = (
355                self.__dict__.get("state", ConnectionState.CLOSED)
356                is ConnectionState.OPEN
357            )
358            # assigning the current value is okay, that may be an artifact of calling .set_state().
359            attr_changed = self.__dict__.get(name) != value
360            if connection_open and attr_changed:
361                raise RuntimeError(f"Cannot change server.{name} on open connection.")
362        return super().__setattr__(name, value)
363
364    def get_state(self):
365        return {
366            "address": self.address,
367            "alpn": self.alpn,
368            "id": self.id,
369            "ip_address": self.peername,
370            "sni": self.sni,
371            "source_address": self.sockname,
372            "timestamp_end": self.timestamp_end,
373            "timestamp_start": self.timestamp_start,
374            "timestamp_tcp_setup": self.timestamp_tcp_setup,
375            "timestamp_tls_setup": self.timestamp_tls_setup,
376            "tls_established": self.tls_established,
377            "tls_version": self.tls_version,
378            "via": None,
379            # only used in sans-io
380            "state": self.state.value,
381            "error": self.error,
382            "tls": self.tls,
383            "certificate_list": [x.get_state() for x in self.certificate_list],
384            "alpn_offers": self.alpn_offers,
385            "cipher_name": self.cipher,
386            "cipher_list": self.cipher_list,
387            "via2": self.via,
388        }
389
390    @classmethod
391    def from_state(cls, state) -> "Server":
392        server = Server(None)
393        server.set_state(state)
394        return server
395
396    def set_state(self, state):
397        self.address = tuple(state["address"]) if state["address"] else None
398        self.alpn = state["alpn"]
399        self.id = state["id"]
400        self.peername = tuple(state["ip_address"]) if state["ip_address"] else None
401        self.sni = state["sni"]
402        self.sockname = (
403            tuple(state["source_address"]) if state["source_address"] else None
404        )
405        self.timestamp_end = state["timestamp_end"]
406        self.timestamp_start = state["timestamp_start"]
407        self.timestamp_tcp_setup = state["timestamp_tcp_setup"]
408        self.timestamp_tls_setup = state["timestamp_tls_setup"]
409        self.tls_version = state["tls_version"]
410        self.state = ConnectionState(state["state"])
411        self.error = state["error"]
412        self.tls = state["tls"]
413        self.certificate_list = [
414            certs.Cert.from_state(x) for x in state["certificate_list"]
415        ]
416        self.alpn_offers = state["alpn_offers"]
417        self.cipher = state["cipher_name"]
418        self.cipher_list = state["cipher_list"]
419        self.via = state["via2"]
420
421    @property
422    def ip_address(self) -> Optional[Address]:  # pragma: no cover
423        """*Deprecated:* An outdated alias for `Server.peername`."""
424        warnings.warn(
425            "Server.ip_address is deprecated, use Server.peername instead.",
426            DeprecationWarning,
427            stacklevel=2,
428        )
429        return self.peername
430
431    @property
432    def cert(self) -> Optional[certs.Cert]:  # pragma: no cover
433        """*Deprecated:* An outdated alias for `Connection.certificate_list[0]`."""
434        warnings.warn(
435            "Server.cert is deprecated, use Server.certificate_list instead.",
436            DeprecationWarning,
437            stacklevel=2,
438        )
439        if self.certificate_list:
440            return self.certificate_list[0]
441        else:
442            return None
443
444    @cert.setter
445    def cert(self, val):  # pragma: no cover
446        warnings.warn(
447            "Server.cert is deprecated, use Server.certificate_list instead.",
448            DeprecationWarning,
449            stacklevel=2,
450        )
451        if val:
452            self.certificate_list = [val]
453        else:
454            self.certificate_list = []
455
456
457__all__ = ["Connection", "Client", "Server", "ConnectionState"]
class Connection(mitmproxy.coretypes.serializable.Serializable):
 34class Connection(serializable.Serializable, metaclass=ABCMeta):
 35    """
 36    Base class for client and server connections.
 37
 38    The connection object only exposes metadata about the connection, but not the underlying socket object.
 39    This is intentional, all I/O should be handled by `mitmproxy.proxy.server` exclusively.
 40    """
 41
 42    # all connections have a unique id. While
 43    # f.client_conn == f2.client_conn already holds true for live flows (where we have object identity),
 44    # we also want these semantics for recorded flows.
 45    id: str
 46    """A unique UUID to identify the connection."""
 47    state: ConnectionState
 48    """The current connection state."""
 49    transport_protocol: TransportProtocol
 50    """The connection protocol in use."""
 51    peername: Optional[Address]
 52    """The remote's `(ip, port)` tuple for this connection."""
 53    sockname: Optional[Address]
 54    """Our local `(ip, port)` tuple for this connection."""
 55    error: Optional[str] = None
 56    """
 57    A string describing a general error with connections to this address.
 58
 59    The purpose of this property is to signal that new connections to the particular endpoint should not be attempted,
 60    for example because it uses an untrusted TLS certificate. Regular (unexpected) disconnects do not set the error
 61    property. This property is only reused per client connection.
 62    """
 63
 64    tls: bool = False
 65    """
 66    `True` if TLS should be established, `False` otherwise.
 67    Note that this property only describes if a connection should eventually be protected using TLS.
 68    To check if TLS has already been established, use `Connection.tls_established`.
 69    """
 70    certificate_list: Sequence[certs.Cert] = ()
 71    """
 72    The TLS certificate list as sent by the peer.
 73    The first certificate is the end-entity certificate.
 74
 75    > [RFC 8446] Prior to TLS 1.3, "certificate_list" ordering required each
 76    > certificate to certify the one immediately preceding it; however,
 77    > some implementations allowed some flexibility.  Servers sometimes
 78    > send both a current and deprecated intermediate for transitional
 79    > purposes, and others are simply configured incorrectly, but these
 80    > cases can nonetheless be validated properly.  For maximum
 81    > compatibility, all implementations SHOULD be prepared to handle
 82    > potentially extraneous certificates and arbitrary orderings from any
 83    > TLS version, with the exception of the end-entity certificate which
 84    > MUST be first.
 85    """
 86    alpn: Optional[bytes] = None
 87    """The application-layer protocol as negotiated using
 88    [ALPN](https://en.wikipedia.org/wiki/Application-Layer_Protocol_Negotiation)."""
 89    alpn_offers: Sequence[bytes] = ()
 90    """The ALPN offers as sent in the ClientHello."""
 91    # we may want to add SSL_CIPHER_description here, but that's currently not exposed by cryptography
 92    cipher: Optional[str] = None
 93    """The active cipher name as returned by OpenSSL's `SSL_CIPHER_get_name`."""
 94    cipher_list: Sequence[str] = ()
 95    """Ciphers accepted by the proxy server on this connection."""
 96    tls_version: Optional[str] = None
 97    """The active TLS version."""
 98    sni: Optional[str] = None
 99    """
100    The [Server Name Indication (SNI)](https://en.wikipedia.org/wiki/Server_Name_Indication) sent in the ClientHello.
101    """
102
103    timestamp_start: Optional[float]
104    timestamp_end: Optional[float] = None
105    """*Timestamp:* Connection has been closed."""
106    timestamp_tls_setup: Optional[float] = None
107    """*Timestamp:* TLS handshake has been completed successfully."""
108
109    @property
110    def connected(self) -> bool:
111        """*Read-only:* `True` if Connection.state is ConnectionState.OPEN, `False` otherwise."""
112        return self.state is ConnectionState.OPEN
113
114    @property
115    def tls_established(self) -> bool:
116        """*Read-only:* `True` if TLS has been established, `False` otherwise."""
117        return self.timestamp_tls_setup is not None
118
119    def __eq__(self, other):
120        if isinstance(other, Connection):
121            return self.id == other.id
122        return False
123
124    def __hash__(self):
125        return hash(self.id)
126
127    def __repr__(self):
128        attrs = repr(
129            {
130                k: {
131                    "cipher_list": lambda: f"<{len(v)} ciphers>",
132                    "id": lambda: f"…{v[-6:]}",
133                }.get(k, lambda: v)()
134                for k, v in self.__dict__.items()
135            }
136        )
137        return f"{type(self).__name__}({attrs})"
138
139    @property
140    def alpn_proto_negotiated(self) -> Optional[bytes]:  # pragma: no cover
141        """*Deprecated:* An outdated alias for Connection.alpn."""
142        warnings.warn(
143            "Connection.alpn_proto_negotiated is deprecated, use Connection.alpn instead.",
144            DeprecationWarning,
145            stacklevel=2,
146        )
147        return self.alpn

Base class for client and server connections.

The connection object only exposes metadata about the connection, but not the underlying socket object. This is intentional, all I/O should be handled by mitmproxy.proxy.server exclusively.

id: str

A unique UUID to identify the connection.

The current connection state.

transport_protocol: Literal['tcp', 'udp']

The connection protocol in use.

peername: Optional[tuple[str, int]]

The remote's (ip, port) tuple for this connection.

sockname: Optional[tuple[str, int]]

Our local (ip, port) tuple for this connection.

error: Optional[str] = None

A string describing a general error with connections to this address.

The purpose of this property is to signal that new connections to the particular endpoint should not be attempted, for example because it uses an untrusted TLS certificate. Regular (unexpected) disconnects do not set the error property. This property is only reused per client connection.

tls: bool = False

True if TLS should be established, False otherwise. Note that this property only describes if a connection should eventually be protected using TLS. To check if TLS has already been established, use Connection.tls_established.

certificate_list: collections.abc.Sequence[mitmproxy.certs.Cert] = ()

The TLS certificate list as sent by the peer. The first certificate is the end-entity certificate.

[RFC 8446] Prior to TLS 1.3, "certificate_list" ordering required each certificate to certify the one immediately preceding it; however, some implementations allowed some flexibility. Servers sometimes send both a current and deprecated intermediate for transitional purposes, and others are simply configured incorrectly, but these cases can nonetheless be validated properly. For maximum compatibility, all implementations SHOULD be prepared to handle potentially extraneous certificates and arbitrary orderings from any TLS version, with the exception of the end-entity certificate which MUST be first.

alpn: Optional[bytes] = None

The application-layer protocol as negotiated using ALPN.

alpn_offers: collections.abc.Sequence[bytes] = ()

The ALPN offers as sent in the ClientHello.

cipher: Optional[str] = None

The active cipher name as returned by OpenSSL's SSL_CIPHER_get_name.

cipher_list: collections.abc.Sequence[str] = ()

Ciphers accepted by the proxy server on this connection.

tls_version: Optional[str] = None

The active TLS version.

sni: Optional[str] = None

The Server Name Indication (SNI) sent in the ClientHello.

timestamp_end: Optional[float] = None

Timestamp: Connection has been closed.

timestamp_tls_setup: Optional[float] = None

Timestamp: TLS handshake has been completed successfully.

connected: bool

Read-only: True if Connection.state is ConnectionState.OPEN, False otherwise.

tls_established: bool

Read-only: True if TLS has been established, False otherwise.

alpn_proto_negotiated: Optional[bytes]

Deprecated: An outdated alias for Connection.alpn.

Inherited Members
mitmproxy.coretypes.serializable.Serializable
copy
class Client(Connection):
150class Client(Connection):
151    """A connection between a client and mitmproxy."""
152
153    peername: Address
154    """The client's address."""
155    sockname: Address
156    """The local address we received this connection on."""
157
158    mitmcert: Optional[certs.Cert] = None
159    """
160    The certificate used by mitmproxy to establish TLS with the client.
161    """
162
163    proxy_mode: mode_specs.ProxyMode
164    """The proxy server type this client has been connecting to."""
165
166    timestamp_start: float
167    """*Timestamp:* TCP SYN received"""
168
169    def __init__(
170        self,
171        peername: Address,
172        sockname: Address,
173        timestamp_start: float,
174        *,
175        transport_protocol: TransportProtocol = "tcp",
176        proxy_mode: mode_specs.ProxyMode = mode_specs.ProxyMode.parse("regular"),
177    ):
178        self.id = str(uuid.uuid4())
179        self.peername = peername
180        self.sockname = sockname
181        self.timestamp_start = timestamp_start
182        self.state = ConnectionState.OPEN
183        self.transport_protocol = transport_protocol
184        self.proxy_mode = proxy_mode
185
186    def __str__(self):
187        if self.alpn:
188            tls_state = f", alpn={self.alpn.decode(errors='replace')}"
189        elif self.tls_established:
190            tls_state = ", tls"
191        else:
192            tls_state = ""
193        return f"Client({human.format_address(self.peername)}, state={self.state.name.lower()}{tls_state})"
194
195    def get_state(self):
196        # Important: Retain full compatibility with old proxy core for now!
197        # This means we need to add all new fields to the old implementation.
198        return {
199            "address": self.peername,
200            "alpn": self.alpn,
201            "cipher_name": self.cipher,
202            "id": self.id,
203            "mitmcert": self.mitmcert.get_state()
204            if self.mitmcert is not None
205            else None,
206            "sni": self.sni,
207            "timestamp_end": self.timestamp_end,
208            "timestamp_start": self.timestamp_start,
209            "timestamp_tls_setup": self.timestamp_tls_setup,
210            "tls_established": self.tls_established,
211            "tls_extensions": [],
212            "tls_version": self.tls_version,
213            # only used in sans-io
214            "state": self.state.value,
215            "sockname": self.sockname,
216            "error": self.error,
217            "tls": self.tls,
218            "certificate_list": [x.get_state() for x in self.certificate_list],
219            "alpn_offers": self.alpn_offers,
220            "cipher_list": self.cipher_list,
221            "proxy_mode": self.proxy_mode.get_state(),
222        }
223
224    @classmethod
225    def from_state(cls, state) -> "Client":
226        client = Client(state["address"], ("mitmproxy", 8080), state["timestamp_start"])
227        client.set_state(state)
228        return client
229
230    def set_state(self, state):
231        self.peername = tuple(state["address"]) if state["address"] else None
232        self.alpn = state["alpn"]
233        self.cipher = state["cipher_name"]
234        self.id = state["id"]
235        self.sni = state["sni"]
236        self.timestamp_end = state["timestamp_end"]
237        self.timestamp_start = state["timestamp_start"]
238        self.timestamp_tls_setup = state["timestamp_tls_setup"]
239        self.tls_version = state["tls_version"]
240        # only used in sans-io
241        self.state = ConnectionState(state["state"])
242        self.sockname = tuple(state["sockname"]) if state["sockname"] else None
243        self.error = state["error"]
244        self.tls = state["tls"]
245        self.certificate_list = [
246            certs.Cert.from_state(x) for x in state["certificate_list"]
247        ]
248        self.mitmcert = (
249            certs.Cert.from_state(state["mitmcert"])
250            if state["mitmcert"] is not None
251            else None
252        )
253        self.alpn_offers = state["alpn_offers"]
254        self.cipher_list = state["cipher_list"]
255        self.proxy_mode = mode_specs.ProxyMode.from_state(state["proxy_mode"])
256
257    @property
258    def address(self):  # pragma: no cover
259        """*Deprecated:* An outdated alias for Client.peername."""
260        warnings.warn(
261            "Client.address is deprecated, use Client.peername instead.",
262            DeprecationWarning,
263            stacklevel=2,
264        )
265        return self.peername
266
267    @address.setter
268    def address(self, x):  # pragma: no cover
269        warnings.warn(
270            "Client.address is deprecated, use Client.peername instead.",
271            DeprecationWarning,
272            stacklevel=2,
273        )
274        self.peername = x
275
276    @property
277    def cipher_name(self) -> Optional[str]:  # pragma: no cover
278        """*Deprecated:* An outdated alias for Connection.cipher."""
279        warnings.warn(
280            "Client.cipher_name is deprecated, use Client.cipher instead.",
281            DeprecationWarning,
282            stacklevel=2,
283        )
284        return self.cipher
285
286    @property
287    def clientcert(self) -> Optional[certs.Cert]:  # pragma: no cover
288        """*Deprecated:* An outdated alias for Connection.certificate_list[0]."""
289        warnings.warn(
290            "Client.clientcert is deprecated, use Client.certificate_list instead.",
291            DeprecationWarning,
292            stacklevel=2,
293        )
294        if self.certificate_list:
295            return self.certificate_list[0]
296        else:
297            return None
298
299    @clientcert.setter
300    def clientcert(self, val):  # pragma: no cover
301        warnings.warn(
302            "Client.clientcert is deprecated, use Client.certificate_list instead.",
303            DeprecationWarning,
304            stacklevel=2,
305        )
306        if val:
307            self.certificate_list = [val]
308        else:
309            self.certificate_list = []

A connection between a client and mitmproxy.

Client( peername: tuple[str, int], sockname: tuple[str, int], timestamp_start: float, *, transport_protocol: Literal['tcp', 'udp'] = 'tcp', proxy_mode: mitmproxy.proxy.mode_specs.ProxyMode = ProxyMode.parse('regular'))
169    def __init__(
170        self,
171        peername: Address,
172        sockname: Address,
173        timestamp_start: float,
174        *,
175        transport_protocol: TransportProtocol = "tcp",
176        proxy_mode: mode_specs.ProxyMode = mode_specs.ProxyMode.parse("regular"),
177    ):
178        self.id = str(uuid.uuid4())
179        self.peername = peername
180        self.sockname = sockname
181        self.timestamp_start = timestamp_start
182        self.state = ConnectionState.OPEN
183        self.transport_protocol = transport_protocol
184        self.proxy_mode = proxy_mode
peername: tuple[str, int]

The client's address.

sockname: tuple[str, int]

The local address we received this connection on.

mitmcert: Optional[mitmproxy.certs.Cert] = None

The certificate used by mitmproxy to establish TLS with the client.

The proxy server type this client has been connecting to.

timestamp_start: float

Timestamp: TCP SYN received

address

Deprecated: An outdated alias for Client.peername.

cipher_name: Optional[str]

Deprecated: An outdated alias for Connection.cipher.

clientcert: Optional[mitmproxy.certs.Cert]

Deprecated: An outdated alias for Connection.certificate_list[0].

class Server(Connection):
312class Server(Connection):
313    """A connection between mitmproxy and an upstream server."""
314
315    peername: Optional[Address] = None
316    """The server's resolved `(ip, port)` tuple. Will be set during connection establishment."""
317    sockname: Optional[Address] = None
318    address: Optional[Address]
319    """The server's `(host, port)` address tuple. The host can either be a domain or a plain IP address."""
320
321    timestamp_start: Optional[float] = None
322    """*Timestamp:* TCP SYN sent."""
323    timestamp_tcp_setup: Optional[float] = None
324    """*Timestamp:* TCP ACK received."""
325
326    via: Optional[server_spec.ServerSpec] = None
327    """An optional proxy server specification via which the connection should be established."""
328
329    def __init__(
330        self,
331        address: Optional[Address],
332        *,
333        transport_protocol: TransportProtocol = "tcp",
334    ):
335        self.id = str(uuid.uuid4())
336        self.address = address
337        self.state = ConnectionState.CLOSED
338        self.transport_protocol = transport_protocol
339
340    def __str__(self):
341        if self.alpn:
342            tls_state = f", alpn={self.alpn.decode(errors='replace')}"
343        elif self.tls_established:
344            tls_state = ", tls"
345        else:
346            tls_state = ""
347        if self.sockname:
348            local_port = f", src_port={self.sockname[1]}"
349        else:
350            local_port = ""
351        return f"Server({human.format_address(self.address)}, state={self.state.name.lower()}{tls_state}{local_port})"
352
353    def __setattr__(self, name, value):
354        if name in ("address", "via"):
355            connection_open = (
356                self.__dict__.get("state", ConnectionState.CLOSED)
357                is ConnectionState.OPEN
358            )
359            # assigning the current value is okay, that may be an artifact of calling .set_state().
360            attr_changed = self.__dict__.get(name) != value
361            if connection_open and attr_changed:
362                raise RuntimeError(f"Cannot change server.{name} on open connection.")
363        return super().__setattr__(name, value)
364
365    def get_state(self):
366        return {
367            "address": self.address,
368            "alpn": self.alpn,
369            "id": self.id,
370            "ip_address": self.peername,
371            "sni": self.sni,
372            "source_address": self.sockname,
373            "timestamp_end": self.timestamp_end,
374            "timestamp_start": self.timestamp_start,
375            "timestamp_tcp_setup": self.timestamp_tcp_setup,
376            "timestamp_tls_setup": self.timestamp_tls_setup,
377            "tls_established": self.tls_established,
378            "tls_version": self.tls_version,
379            "via": None,
380            # only used in sans-io
381            "state": self.state.value,
382            "error": self.error,
383            "tls": self.tls,
384            "certificate_list": [x.get_state() for x in self.certificate_list],
385            "alpn_offers": self.alpn_offers,
386            "cipher_name": self.cipher,
387            "cipher_list": self.cipher_list,
388            "via2": self.via,
389        }
390
391    @classmethod
392    def from_state(cls, state) -> "Server":
393        server = Server(None)
394        server.set_state(state)
395        return server
396
397    def set_state(self, state):
398        self.address = tuple(state["address"]) if state["address"] else None
399        self.alpn = state["alpn"]
400        self.id = state["id"]
401        self.peername = tuple(state["ip_address"]) if state["ip_address"] else None
402        self.sni = state["sni"]
403        self.sockname = (
404            tuple(state["source_address"]) if state["source_address"] else None
405        )
406        self.timestamp_end = state["timestamp_end"]
407        self.timestamp_start = state["timestamp_start"]
408        self.timestamp_tcp_setup = state["timestamp_tcp_setup"]
409        self.timestamp_tls_setup = state["timestamp_tls_setup"]
410        self.tls_version = state["tls_version"]
411        self.state = ConnectionState(state["state"])
412        self.error = state["error"]
413        self.tls = state["tls"]
414        self.certificate_list = [
415            certs.Cert.from_state(x) for x in state["certificate_list"]
416        ]
417        self.alpn_offers = state["alpn_offers"]
418        self.cipher = state["cipher_name"]
419        self.cipher_list = state["cipher_list"]
420        self.via = state["via2"]
421
422    @property
423    def ip_address(self) -> Optional[Address]:  # pragma: no cover
424        """*Deprecated:* An outdated alias for `Server.peername`."""
425        warnings.warn(
426            "Server.ip_address is deprecated, use Server.peername instead.",
427            DeprecationWarning,
428            stacklevel=2,
429        )
430        return self.peername
431
432    @property
433    def cert(self) -> Optional[certs.Cert]:  # pragma: no cover
434        """*Deprecated:* An outdated alias for `Connection.certificate_list[0]`."""
435        warnings.warn(
436            "Server.cert is deprecated, use Server.certificate_list instead.",
437            DeprecationWarning,
438            stacklevel=2,
439        )
440        if self.certificate_list:
441            return self.certificate_list[0]
442        else:
443            return None
444
445    @cert.setter
446    def cert(self, val):  # pragma: no cover
447        warnings.warn(
448            "Server.cert is deprecated, use Server.certificate_list instead.",
449            DeprecationWarning,
450            stacklevel=2,
451        )
452        if val:
453            self.certificate_list = [val]
454        else:
455            self.certificate_list = []

A connection between mitmproxy and an upstream server.

Server( address: Optional[tuple[str, int]], *, transport_protocol: Literal['tcp', 'udp'] = 'tcp')
329    def __init__(
330        self,
331        address: Optional[Address],
332        *,
333        transport_protocol: TransportProtocol = "tcp",
334    ):
335        self.id = str(uuid.uuid4())
336        self.address = address
337        self.state = ConnectionState.CLOSED
338        self.transport_protocol = transport_protocol
peername: Optional[tuple[str, int]] = None

The server's resolved (ip, port) tuple. Will be set during connection establishment.

sockname: Optional[tuple[str, int]] = None

Our local (ip, port) tuple for this connection.

address: Optional[tuple[str, int]]

The server's (host, port) address tuple. The host can either be a domain or a plain IP address.

timestamp_start: Optional[float] = None

Timestamp: TCP SYN sent.

timestamp_tcp_setup: Optional[float] = None

Timestamp: TCP ACK received.

via: Optional[tuple[Literal['http', 'https', 'tls', 'dtls', 'tcp', 'udp', 'dns'], tuple[str, int]]] = None

An optional proxy server specification via which the connection should be established.

ip_address: Optional[tuple[str, int]]

Deprecated: An outdated alias for Server.peername.

cert: Optional[mitmproxy.certs.Cert]

Deprecated: An outdated alias for Connection.certificate_list[0].

class ConnectionState(enum.Flag):
16class ConnectionState(Flag):
17    """The current state of the underlying socket."""
18
19    CLOSED = 0
20    CAN_READ = 1
21    CAN_WRITE = 2
22    OPEN = CAN_READ | CAN_WRITE

The current state of the underlying socket.

Inherited Members
enum.Enum
name
value