mitmproxy.connection
1import uuid 2import warnings 3from abc import ABCMeta 4from collections.abc import Sequence 5from enum import Flag 6from typing import Literal, Optional 7 8from mitmproxy import certs 9from mitmproxy.coretypes import serializable 10from mitmproxy.proxy import mode_specs 11from mitmproxy.net import server_spec 12from mitmproxy.utils import human 13 14 15class ConnectionState(Flag): 16 """The current state of the underlying socket.""" 17 18 CLOSED = 0 19 CAN_READ = 1 20 CAN_WRITE = 2 21 OPEN = CAN_READ | CAN_WRITE 22 23 24TransportProtocol = Literal["tcp", "udp"] 25 26 27# practically speaking we may have IPv6 addresses with flowinfo and scope_id, 28# but type checking isn't good enough to properly handle tuple unions. 29# this version at least provides useful type checking messages. 30Address = tuple[str, int] 31 32 33class Connection(serializable.Serializable, metaclass=ABCMeta): 34 """ 35 Base class for client and server connections. 36 37 The connection object only exposes metadata about the connection, but not the underlying socket object. 38 This is intentional, all I/O should be handled by `mitmproxy.proxy.server` exclusively. 39 """ 40 41 # all connections have a unique id. While 42 # f.client_conn == f2.client_conn already holds true for live flows (where we have object identity), 43 # we also want these semantics for recorded flows. 44 id: str 45 """A unique UUID to identify the connection.""" 46 state: ConnectionState 47 """The current connection state.""" 48 transport_protocol: TransportProtocol 49 """The connection protocol in use.""" 50 peername: Optional[Address] 51 """The remote's `(ip, port)` tuple for this connection.""" 52 sockname: Optional[Address] 53 """Our local `(ip, port)` tuple for this connection.""" 54 error: Optional[str] = None 55 """ 56 A string describing a general error with connections to this address. 57 58 The purpose of this property is to signal that new connections to the particular endpoint should not be attempted, 59 for example because it uses an untrusted TLS certificate. Regular (unexpected) disconnects do not set the error 60 property. This property is only reused per client connection. 61 """ 62 63 tls: bool = False 64 """ 65 `True` if TLS should be established, `False` otherwise. 66 Note that this property only describes if a connection should eventually be protected using TLS. 67 To check if TLS has already been established, use `Connection.tls_established`. 68 """ 69 certificate_list: Sequence[certs.Cert] = () 70 """ 71 The TLS certificate list as sent by the peer. 72 The first certificate is the end-entity certificate. 73 74 > [RFC 8446] Prior to TLS 1.3, "certificate_list" ordering required each 75 > certificate to certify the one immediately preceding it; however, 76 > some implementations allowed some flexibility. Servers sometimes 77 > send both a current and deprecated intermediate for transitional 78 > purposes, and others are simply configured incorrectly, but these 79 > cases can nonetheless be validated properly. For maximum 80 > compatibility, all implementations SHOULD be prepared to handle 81 > potentially extraneous certificates and arbitrary orderings from any 82 > TLS version, with the exception of the end-entity certificate which 83 > MUST be first. 84 """ 85 alpn: Optional[bytes] = None 86 """The application-layer protocol as negotiated using 87 [ALPN](https://en.wikipedia.org/wiki/Application-Layer_Protocol_Negotiation).""" 88 alpn_offers: Sequence[bytes] = () 89 """The ALPN offers as sent in the ClientHello.""" 90 # we may want to add SSL_CIPHER_description here, but that's currently not exposed by cryptography 91 cipher: Optional[str] = None 92 """The active cipher name as returned by OpenSSL's `SSL_CIPHER_get_name`.""" 93 cipher_list: Sequence[str] = () 94 """Ciphers accepted by the proxy server on this connection.""" 95 tls_version: Optional[str] = None 96 """The active TLS version.""" 97 sni: Optional[str] = None 98 """ 99 The [Server Name Indication (SNI)](https://en.wikipedia.org/wiki/Server_Name_Indication) sent in the ClientHello. 100 """ 101 102 timestamp_start: Optional[float] 103 timestamp_end: Optional[float] = None 104 """*Timestamp:* Connection has been closed.""" 105 timestamp_tls_setup: Optional[float] = None 106 """*Timestamp:* TLS handshake has been completed successfully.""" 107 108 @property 109 def connected(self) -> bool: 110 """*Read-only:* `True` if Connection.state is ConnectionState.OPEN, `False` otherwise.""" 111 return self.state is ConnectionState.OPEN 112 113 @property 114 def tls_established(self) -> bool: 115 """*Read-only:* `True` if TLS has been established, `False` otherwise.""" 116 return self.timestamp_tls_setup is not None 117 118 def __eq__(self, other): 119 if isinstance(other, Connection): 120 return self.id == other.id 121 return False 122 123 def __hash__(self): 124 return hash(self.id) 125 126 def __repr__(self): 127 attrs = repr( 128 { 129 k: { 130 "cipher_list": lambda: f"<{len(v)} ciphers>", 131 "id": lambda: f"…{v[-6:]}", 132 }.get(k, lambda: v)() 133 for k, v in self.__dict__.items() 134 } 135 ) 136 return f"{type(self).__name__}({attrs})" 137 138 @property 139 def alpn_proto_negotiated(self) -> Optional[bytes]: # pragma: no cover 140 """*Deprecated:* An outdated alias for Connection.alpn.""" 141 warnings.warn( 142 "Connection.alpn_proto_negotiated is deprecated, use Connection.alpn instead.", 143 DeprecationWarning, 144 stacklevel=2, 145 ) 146 return self.alpn 147 148 149class Client(Connection): 150 """A connection between a client and mitmproxy.""" 151 152 peername: Address 153 """The client's address.""" 154 sockname: Address 155 """The local address we received this connection on.""" 156 157 mitmcert: Optional[certs.Cert] = None 158 """ 159 The certificate used by mitmproxy to establish TLS with the client. 160 """ 161 162 proxy_mode: mode_specs.ProxyMode 163 """The proxy server type this client has been connecting to.""" 164 165 timestamp_start: float 166 """*Timestamp:* TCP SYN received""" 167 168 def __init__( 169 self, 170 peername: Address, 171 sockname: Address, 172 timestamp_start: float, 173 *, 174 transport_protocol: TransportProtocol = "tcp", 175 proxy_mode: mode_specs.ProxyMode = mode_specs.ProxyMode.parse("regular"), 176 ): 177 self.id = str(uuid.uuid4()) 178 self.peername = peername 179 self.sockname = sockname 180 self.timestamp_start = timestamp_start 181 self.state = ConnectionState.OPEN 182 self.transport_protocol = transport_protocol 183 self.proxy_mode = proxy_mode 184 185 def __str__(self): 186 if self.alpn: 187 tls_state = f", alpn={self.alpn.decode(errors='replace')}" 188 elif self.tls_established: 189 tls_state = ", tls" 190 else: 191 tls_state = "" 192 return f"Client({human.format_address(self.peername)}, state={self.state.name.lower()}{tls_state})" 193 194 def get_state(self): 195 # Important: Retain full compatibility with old proxy core for now! 196 # This means we need to add all new fields to the old implementation. 197 return { 198 "address": self.peername, 199 "alpn": self.alpn, 200 "cipher_name": self.cipher, 201 "id": self.id, 202 "mitmcert": self.mitmcert.get_state() 203 if self.mitmcert is not None 204 else None, 205 "sni": self.sni, 206 "timestamp_end": self.timestamp_end, 207 "timestamp_start": self.timestamp_start, 208 "timestamp_tls_setup": self.timestamp_tls_setup, 209 "tls_established": self.tls_established, 210 "tls_extensions": [], 211 "tls_version": self.tls_version, 212 # only used in sans-io 213 "state": self.state.value, 214 "sockname": self.sockname, 215 "error": self.error, 216 "tls": self.tls, 217 "certificate_list": [x.get_state() for x in self.certificate_list], 218 "alpn_offers": self.alpn_offers, 219 "cipher_list": self.cipher_list, 220 "proxy_mode": self.proxy_mode.get_state(), 221 } 222 223 @classmethod 224 def from_state(cls, state) -> "Client": 225 client = Client(state["address"], ("mitmproxy", 8080), state["timestamp_start"]) 226 client.set_state(state) 227 return client 228 229 def set_state(self, state): 230 self.peername = tuple(state["address"]) if state["address"] else None 231 self.alpn = state["alpn"] 232 self.cipher = state["cipher_name"] 233 self.id = state["id"] 234 self.sni = state["sni"] 235 self.timestamp_end = state["timestamp_end"] 236 self.timestamp_start = state["timestamp_start"] 237 self.timestamp_tls_setup = state["timestamp_tls_setup"] 238 self.tls_version = state["tls_version"] 239 # only used in sans-io 240 self.state = ConnectionState(state["state"]) 241 self.sockname = tuple(state["sockname"]) if state["sockname"] else None 242 self.error = state["error"] 243 self.tls = state["tls"] 244 self.certificate_list = [ 245 certs.Cert.from_state(x) for x in state["certificate_list"] 246 ] 247 self.mitmcert = ( 248 certs.Cert.from_state(state["mitmcert"]) 249 if state["mitmcert"] is not None 250 else None 251 ) 252 self.alpn_offers = state["alpn_offers"] 253 self.cipher_list = state["cipher_list"] 254 self.proxy_mode = mode_specs.ProxyMode.from_state(state["proxy_mode"]) 255 256 @property 257 def address(self): # pragma: no cover 258 """*Deprecated:* An outdated alias for Client.peername.""" 259 warnings.warn( 260 "Client.address is deprecated, use Client.peername instead.", 261 DeprecationWarning, 262 stacklevel=2, 263 ) 264 return self.peername 265 266 @address.setter 267 def address(self, x): # pragma: no cover 268 warnings.warn( 269 "Client.address is deprecated, use Client.peername instead.", 270 DeprecationWarning, 271 stacklevel=2, 272 ) 273 self.peername = x 274 275 @property 276 def cipher_name(self) -> Optional[str]: # pragma: no cover 277 """*Deprecated:* An outdated alias for Connection.cipher.""" 278 warnings.warn( 279 "Client.cipher_name is deprecated, use Client.cipher instead.", 280 DeprecationWarning, 281 stacklevel=2, 282 ) 283 return self.cipher 284 285 @property 286 def clientcert(self) -> Optional[certs.Cert]: # pragma: no cover 287 """*Deprecated:* An outdated alias for Connection.certificate_list[0].""" 288 warnings.warn( 289 "Client.clientcert is deprecated, use Client.certificate_list instead.", 290 DeprecationWarning, 291 stacklevel=2, 292 ) 293 if self.certificate_list: 294 return self.certificate_list[0] 295 else: 296 return None 297 298 @clientcert.setter 299 def clientcert(self, val): # pragma: no cover 300 warnings.warn( 301 "Client.clientcert is deprecated, use Client.certificate_list instead.", 302 DeprecationWarning, 303 stacklevel=2, 304 ) 305 if val: 306 self.certificate_list = [val] 307 else: 308 self.certificate_list = [] 309 310 311class Server(Connection): 312 """A connection between mitmproxy and an upstream server.""" 313 314 peername: Optional[Address] = None 315 """The server's resolved `(ip, port)` tuple. Will be set during connection establishment.""" 316 sockname: Optional[Address] = None 317 address: Optional[Address] 318 """The server's `(host, port)` address tuple. The host can either be a domain or a plain IP address.""" 319 320 timestamp_start: Optional[float] = None 321 """*Timestamp:* TCP SYN sent.""" 322 timestamp_tcp_setup: Optional[float] = None 323 """*Timestamp:* TCP ACK received.""" 324 325 via: Optional[server_spec.ServerSpec] = None 326 """An optional proxy server specification via which the connection should be established.""" 327 328 def __init__( 329 self, 330 address: Optional[Address], 331 *, 332 transport_protocol: TransportProtocol = "tcp", 333 ): 334 self.id = str(uuid.uuid4()) 335 self.address = address 336 self.state = ConnectionState.CLOSED 337 self.transport_protocol = transport_protocol 338 339 def __str__(self): 340 if self.alpn: 341 tls_state = f", alpn={self.alpn.decode(errors='replace')}" 342 elif self.tls_established: 343 tls_state = ", tls" 344 else: 345 tls_state = "" 346 if self.sockname: 347 local_port = f", src_port={self.sockname[1]}" 348 else: 349 local_port = "" 350 return f"Server({human.format_address(self.address)}, state={self.state.name.lower()}{tls_state}{local_port})" 351 352 def __setattr__(self, name, value): 353 if name in ("address", "via"): 354 connection_open = ( 355 self.__dict__.get("state", ConnectionState.CLOSED) 356 is ConnectionState.OPEN 357 ) 358 # assigning the current value is okay, that may be an artifact of calling .set_state(). 359 attr_changed = self.__dict__.get(name) != value 360 if connection_open and attr_changed: 361 raise RuntimeError(f"Cannot change server.{name} on open connection.") 362 return super().__setattr__(name, value) 363 364 def get_state(self): 365 return { 366 "address": self.address, 367 "alpn": self.alpn, 368 "id": self.id, 369 "ip_address": self.peername, 370 "sni": self.sni, 371 "source_address": self.sockname, 372 "timestamp_end": self.timestamp_end, 373 "timestamp_start": self.timestamp_start, 374 "timestamp_tcp_setup": self.timestamp_tcp_setup, 375 "timestamp_tls_setup": self.timestamp_tls_setup, 376 "tls_established": self.tls_established, 377 "tls_version": self.tls_version, 378 "via": None, 379 # only used in sans-io 380 "state": self.state.value, 381 "error": self.error, 382 "tls": self.tls, 383 "certificate_list": [x.get_state() for x in self.certificate_list], 384 "alpn_offers": self.alpn_offers, 385 "cipher_name": self.cipher, 386 "cipher_list": self.cipher_list, 387 "via2": self.via, 388 } 389 390 @classmethod 391 def from_state(cls, state) -> "Server": 392 server = Server(None) 393 server.set_state(state) 394 return server 395 396 def set_state(self, state): 397 self.address = tuple(state["address"]) if state["address"] else None 398 self.alpn = state["alpn"] 399 self.id = state["id"] 400 self.peername = tuple(state["ip_address"]) if state["ip_address"] else None 401 self.sni = state["sni"] 402 self.sockname = ( 403 tuple(state["source_address"]) if state["source_address"] else None 404 ) 405 self.timestamp_end = state["timestamp_end"] 406 self.timestamp_start = state["timestamp_start"] 407 self.timestamp_tcp_setup = state["timestamp_tcp_setup"] 408 self.timestamp_tls_setup = state["timestamp_tls_setup"] 409 self.tls_version = state["tls_version"] 410 self.state = ConnectionState(state["state"]) 411 self.error = state["error"] 412 self.tls = state["tls"] 413 self.certificate_list = [ 414 certs.Cert.from_state(x) for x in state["certificate_list"] 415 ] 416 self.alpn_offers = state["alpn_offers"] 417 self.cipher = state["cipher_name"] 418 self.cipher_list = state["cipher_list"] 419 self.via = state["via2"] 420 421 @property 422 def ip_address(self) -> Optional[Address]: # pragma: no cover 423 """*Deprecated:* An outdated alias for `Server.peername`.""" 424 warnings.warn( 425 "Server.ip_address is deprecated, use Server.peername instead.", 426 DeprecationWarning, 427 stacklevel=2, 428 ) 429 return self.peername 430 431 @property 432 def cert(self) -> Optional[certs.Cert]: # pragma: no cover 433 """*Deprecated:* An outdated alias for `Connection.certificate_list[0]`.""" 434 warnings.warn( 435 "Server.cert is deprecated, use Server.certificate_list instead.", 436 DeprecationWarning, 437 stacklevel=2, 438 ) 439 if self.certificate_list: 440 return self.certificate_list[0] 441 else: 442 return None 443 444 @cert.setter 445 def cert(self, val): # pragma: no cover 446 warnings.warn( 447 "Server.cert is deprecated, use Server.certificate_list instead.", 448 DeprecationWarning, 449 stacklevel=2, 450 ) 451 if val: 452 self.certificate_list = [val] 453 else: 454 self.certificate_list = [] 455 456 457__all__ = ["Connection", "Client", "Server", "ConnectionState"]
34class Connection(serializable.Serializable, metaclass=ABCMeta): 35 """ 36 Base class for client and server connections. 37 38 The connection object only exposes metadata about the connection, but not the underlying socket object. 39 This is intentional, all I/O should be handled by `mitmproxy.proxy.server` exclusively. 40 """ 41 42 # all connections have a unique id. While 43 # f.client_conn == f2.client_conn already holds true for live flows (where we have object identity), 44 # we also want these semantics for recorded flows. 45 id: str 46 """A unique UUID to identify the connection.""" 47 state: ConnectionState 48 """The current connection state.""" 49 transport_protocol: TransportProtocol 50 """The connection protocol in use.""" 51 peername: Optional[Address] 52 """The remote's `(ip, port)` tuple for this connection.""" 53 sockname: Optional[Address] 54 """Our local `(ip, port)` tuple for this connection.""" 55 error: Optional[str] = None 56 """ 57 A string describing a general error with connections to this address. 58 59 The purpose of this property is to signal that new connections to the particular endpoint should not be attempted, 60 for example because it uses an untrusted TLS certificate. Regular (unexpected) disconnects do not set the error 61 property. This property is only reused per client connection. 62 """ 63 64 tls: bool = False 65 """ 66 `True` if TLS should be established, `False` otherwise. 67 Note that this property only describes if a connection should eventually be protected using TLS. 68 To check if TLS has already been established, use `Connection.tls_established`. 69 """ 70 certificate_list: Sequence[certs.Cert] = () 71 """ 72 The TLS certificate list as sent by the peer. 73 The first certificate is the end-entity certificate. 74 75 > [RFC 8446] Prior to TLS 1.3, "certificate_list" ordering required each 76 > certificate to certify the one immediately preceding it; however, 77 > some implementations allowed some flexibility. Servers sometimes 78 > send both a current and deprecated intermediate for transitional 79 > purposes, and others are simply configured incorrectly, but these 80 > cases can nonetheless be validated properly. For maximum 81 > compatibility, all implementations SHOULD be prepared to handle 82 > potentially extraneous certificates and arbitrary orderings from any 83 > TLS version, with the exception of the end-entity certificate which 84 > MUST be first. 85 """ 86 alpn: Optional[bytes] = None 87 """The application-layer protocol as negotiated using 88 [ALPN](https://en.wikipedia.org/wiki/Application-Layer_Protocol_Negotiation).""" 89 alpn_offers: Sequence[bytes] = () 90 """The ALPN offers as sent in the ClientHello.""" 91 # we may want to add SSL_CIPHER_description here, but that's currently not exposed by cryptography 92 cipher: Optional[str] = None 93 """The active cipher name as returned by OpenSSL's `SSL_CIPHER_get_name`.""" 94 cipher_list: Sequence[str] = () 95 """Ciphers accepted by the proxy server on this connection.""" 96 tls_version: Optional[str] = None 97 """The active TLS version.""" 98 sni: Optional[str] = None 99 """ 100 The [Server Name Indication (SNI)](https://en.wikipedia.org/wiki/Server_Name_Indication) sent in the ClientHello. 101 """ 102 103 timestamp_start: Optional[float] 104 timestamp_end: Optional[float] = None 105 """*Timestamp:* Connection has been closed.""" 106 timestamp_tls_setup: Optional[float] = None 107 """*Timestamp:* TLS handshake has been completed successfully.""" 108 109 @property 110 def connected(self) -> bool: 111 """*Read-only:* `True` if Connection.state is ConnectionState.OPEN, `False` otherwise.""" 112 return self.state is ConnectionState.OPEN 113 114 @property 115 def tls_established(self) -> bool: 116 """*Read-only:* `True` if TLS has been established, `False` otherwise.""" 117 return self.timestamp_tls_setup is not None 118 119 def __eq__(self, other): 120 if isinstance(other, Connection): 121 return self.id == other.id 122 return False 123 124 def __hash__(self): 125 return hash(self.id) 126 127 def __repr__(self): 128 attrs = repr( 129 { 130 k: { 131 "cipher_list": lambda: f"<{len(v)} ciphers>", 132 "id": lambda: f"…{v[-6:]}", 133 }.get(k, lambda: v)() 134 for k, v in self.__dict__.items() 135 } 136 ) 137 return f"{type(self).__name__}({attrs})" 138 139 @property 140 def alpn_proto_negotiated(self) -> Optional[bytes]: # pragma: no cover 141 """*Deprecated:* An outdated alias for Connection.alpn.""" 142 warnings.warn( 143 "Connection.alpn_proto_negotiated is deprecated, use Connection.alpn instead.", 144 DeprecationWarning, 145 stacklevel=2, 146 ) 147 return self.alpn
Base class for client and server connections.
The connection object only exposes metadata about the connection, but not the underlying socket object.
This is intentional, all I/O should be handled by mitmproxy.proxy.server
exclusively.
A string describing a general error with connections to this address.
The purpose of this property is to signal that new connections to the particular endpoint should not be attempted, for example because it uses an untrusted TLS certificate. Regular (unexpected) disconnects do not set the error property. This property is only reused per client connection.
True
if TLS should be established, False
otherwise.
Note that this property only describes if a connection should eventually be protected using TLS.
To check if TLS has already been established, use Connection.tls_established
.
The TLS certificate list as sent by the peer. The first certificate is the end-entity certificate.
[RFC 8446] Prior to TLS 1.3, "certificate_list" ordering required each certificate to certify the one immediately preceding it; however, some implementations allowed some flexibility. Servers sometimes send both a current and deprecated intermediate for transitional purposes, and others are simply configured incorrectly, but these cases can nonetheless be validated properly. For maximum compatibility, all implementations SHOULD be prepared to handle potentially extraneous certificates and arbitrary orderings from any TLS version, with the exception of the end-entity certificate which MUST be first.
Ciphers accepted by the proxy server on this connection.
Timestamp: TLS handshake has been completed successfully.
Inherited Members
- mitmproxy.coretypes.serializable.Serializable
- copy
150class Client(Connection): 151 """A connection between a client and mitmproxy.""" 152 153 peername: Address 154 """The client's address.""" 155 sockname: Address 156 """The local address we received this connection on.""" 157 158 mitmcert: Optional[certs.Cert] = None 159 """ 160 The certificate used by mitmproxy to establish TLS with the client. 161 """ 162 163 proxy_mode: mode_specs.ProxyMode 164 """The proxy server type this client has been connecting to.""" 165 166 timestamp_start: float 167 """*Timestamp:* TCP SYN received""" 168 169 def __init__( 170 self, 171 peername: Address, 172 sockname: Address, 173 timestamp_start: float, 174 *, 175 transport_protocol: TransportProtocol = "tcp", 176 proxy_mode: mode_specs.ProxyMode = mode_specs.ProxyMode.parse("regular"), 177 ): 178 self.id = str(uuid.uuid4()) 179 self.peername = peername 180 self.sockname = sockname 181 self.timestamp_start = timestamp_start 182 self.state = ConnectionState.OPEN 183 self.transport_protocol = transport_protocol 184 self.proxy_mode = proxy_mode 185 186 def __str__(self): 187 if self.alpn: 188 tls_state = f", alpn={self.alpn.decode(errors='replace')}" 189 elif self.tls_established: 190 tls_state = ", tls" 191 else: 192 tls_state = "" 193 return f"Client({human.format_address(self.peername)}, state={self.state.name.lower()}{tls_state})" 194 195 def get_state(self): 196 # Important: Retain full compatibility with old proxy core for now! 197 # This means we need to add all new fields to the old implementation. 198 return { 199 "address": self.peername, 200 "alpn": self.alpn, 201 "cipher_name": self.cipher, 202 "id": self.id, 203 "mitmcert": self.mitmcert.get_state() 204 if self.mitmcert is not None 205 else None, 206 "sni": self.sni, 207 "timestamp_end": self.timestamp_end, 208 "timestamp_start": self.timestamp_start, 209 "timestamp_tls_setup": self.timestamp_tls_setup, 210 "tls_established": self.tls_established, 211 "tls_extensions": [], 212 "tls_version": self.tls_version, 213 # only used in sans-io 214 "state": self.state.value, 215 "sockname": self.sockname, 216 "error": self.error, 217 "tls": self.tls, 218 "certificate_list": [x.get_state() for x in self.certificate_list], 219 "alpn_offers": self.alpn_offers, 220 "cipher_list": self.cipher_list, 221 "proxy_mode": self.proxy_mode.get_state(), 222 } 223 224 @classmethod 225 def from_state(cls, state) -> "Client": 226 client = Client(state["address"], ("mitmproxy", 8080), state["timestamp_start"]) 227 client.set_state(state) 228 return client 229 230 def set_state(self, state): 231 self.peername = tuple(state["address"]) if state["address"] else None 232 self.alpn = state["alpn"] 233 self.cipher = state["cipher_name"] 234 self.id = state["id"] 235 self.sni = state["sni"] 236 self.timestamp_end = state["timestamp_end"] 237 self.timestamp_start = state["timestamp_start"] 238 self.timestamp_tls_setup = state["timestamp_tls_setup"] 239 self.tls_version = state["tls_version"] 240 # only used in sans-io 241 self.state = ConnectionState(state["state"]) 242 self.sockname = tuple(state["sockname"]) if state["sockname"] else None 243 self.error = state["error"] 244 self.tls = state["tls"] 245 self.certificate_list = [ 246 certs.Cert.from_state(x) for x in state["certificate_list"] 247 ] 248 self.mitmcert = ( 249 certs.Cert.from_state(state["mitmcert"]) 250 if state["mitmcert"] is not None 251 else None 252 ) 253 self.alpn_offers = state["alpn_offers"] 254 self.cipher_list = state["cipher_list"] 255 self.proxy_mode = mode_specs.ProxyMode.from_state(state["proxy_mode"]) 256 257 @property 258 def address(self): # pragma: no cover 259 """*Deprecated:* An outdated alias for Client.peername.""" 260 warnings.warn( 261 "Client.address is deprecated, use Client.peername instead.", 262 DeprecationWarning, 263 stacklevel=2, 264 ) 265 return self.peername 266 267 @address.setter 268 def address(self, x): # pragma: no cover 269 warnings.warn( 270 "Client.address is deprecated, use Client.peername instead.", 271 DeprecationWarning, 272 stacklevel=2, 273 ) 274 self.peername = x 275 276 @property 277 def cipher_name(self) -> Optional[str]: # pragma: no cover 278 """*Deprecated:* An outdated alias for Connection.cipher.""" 279 warnings.warn( 280 "Client.cipher_name is deprecated, use Client.cipher instead.", 281 DeprecationWarning, 282 stacklevel=2, 283 ) 284 return self.cipher 285 286 @property 287 def clientcert(self) -> Optional[certs.Cert]: # pragma: no cover 288 """*Deprecated:* An outdated alias for Connection.certificate_list[0].""" 289 warnings.warn( 290 "Client.clientcert is deprecated, use Client.certificate_list instead.", 291 DeprecationWarning, 292 stacklevel=2, 293 ) 294 if self.certificate_list: 295 return self.certificate_list[0] 296 else: 297 return None 298 299 @clientcert.setter 300 def clientcert(self, val): # pragma: no cover 301 warnings.warn( 302 "Client.clientcert is deprecated, use Client.certificate_list instead.", 303 DeprecationWarning, 304 stacklevel=2, 305 ) 306 if val: 307 self.certificate_list = [val] 308 else: 309 self.certificate_list = []
A connection between a client and mitmproxy.
169 def __init__( 170 self, 171 peername: Address, 172 sockname: Address, 173 timestamp_start: float, 174 *, 175 transport_protocol: TransportProtocol = "tcp", 176 proxy_mode: mode_specs.ProxyMode = mode_specs.ProxyMode.parse("regular"), 177 ): 178 self.id = str(uuid.uuid4()) 179 self.peername = peername 180 self.sockname = sockname 181 self.timestamp_start = timestamp_start 182 self.state = ConnectionState.OPEN 183 self.transport_protocol = transport_protocol 184 self.proxy_mode = proxy_mode
The proxy server type this client has been connecting to.
Deprecated: An outdated alias for Connection.certificate_list[0].
Inherited Members
- Connection
- id
- state
- transport_protocol
- error
- tls
- certificate_list
- alpn
- alpn_offers
- cipher
- cipher_list
- tls_version
- sni
- timestamp_end
- timestamp_tls_setup
- connected
- tls_established
- alpn_proto_negotiated
- mitmproxy.coretypes.serializable.Serializable
- copy
312class Server(Connection): 313 """A connection between mitmproxy and an upstream server.""" 314 315 peername: Optional[Address] = None 316 """The server's resolved `(ip, port)` tuple. Will be set during connection establishment.""" 317 sockname: Optional[Address] = None 318 address: Optional[Address] 319 """The server's `(host, port)` address tuple. The host can either be a domain or a plain IP address.""" 320 321 timestamp_start: Optional[float] = None 322 """*Timestamp:* TCP SYN sent.""" 323 timestamp_tcp_setup: Optional[float] = None 324 """*Timestamp:* TCP ACK received.""" 325 326 via: Optional[server_spec.ServerSpec] = None 327 """An optional proxy server specification via which the connection should be established.""" 328 329 def __init__( 330 self, 331 address: Optional[Address], 332 *, 333 transport_protocol: TransportProtocol = "tcp", 334 ): 335 self.id = str(uuid.uuid4()) 336 self.address = address 337 self.state = ConnectionState.CLOSED 338 self.transport_protocol = transport_protocol 339 340 def __str__(self): 341 if self.alpn: 342 tls_state = f", alpn={self.alpn.decode(errors='replace')}" 343 elif self.tls_established: 344 tls_state = ", tls" 345 else: 346 tls_state = "" 347 if self.sockname: 348 local_port = f", src_port={self.sockname[1]}" 349 else: 350 local_port = "" 351 return f"Server({human.format_address(self.address)}, state={self.state.name.lower()}{tls_state}{local_port})" 352 353 def __setattr__(self, name, value): 354 if name in ("address", "via"): 355 connection_open = ( 356 self.__dict__.get("state", ConnectionState.CLOSED) 357 is ConnectionState.OPEN 358 ) 359 # assigning the current value is okay, that may be an artifact of calling .set_state(). 360 attr_changed = self.__dict__.get(name) != value 361 if connection_open and attr_changed: 362 raise RuntimeError(f"Cannot change server.{name} on open connection.") 363 return super().__setattr__(name, value) 364 365 def get_state(self): 366 return { 367 "address": self.address, 368 "alpn": self.alpn, 369 "id": self.id, 370 "ip_address": self.peername, 371 "sni": self.sni, 372 "source_address": self.sockname, 373 "timestamp_end": self.timestamp_end, 374 "timestamp_start": self.timestamp_start, 375 "timestamp_tcp_setup": self.timestamp_tcp_setup, 376 "timestamp_tls_setup": self.timestamp_tls_setup, 377 "tls_established": self.tls_established, 378 "tls_version": self.tls_version, 379 "via": None, 380 # only used in sans-io 381 "state": self.state.value, 382 "error": self.error, 383 "tls": self.tls, 384 "certificate_list": [x.get_state() for x in self.certificate_list], 385 "alpn_offers": self.alpn_offers, 386 "cipher_name": self.cipher, 387 "cipher_list": self.cipher_list, 388 "via2": self.via, 389 } 390 391 @classmethod 392 def from_state(cls, state) -> "Server": 393 server = Server(None) 394 server.set_state(state) 395 return server 396 397 def set_state(self, state): 398 self.address = tuple(state["address"]) if state["address"] else None 399 self.alpn = state["alpn"] 400 self.id = state["id"] 401 self.peername = tuple(state["ip_address"]) if state["ip_address"] else None 402 self.sni = state["sni"] 403 self.sockname = ( 404 tuple(state["source_address"]) if state["source_address"] else None 405 ) 406 self.timestamp_end = state["timestamp_end"] 407 self.timestamp_start = state["timestamp_start"] 408 self.timestamp_tcp_setup = state["timestamp_tcp_setup"] 409 self.timestamp_tls_setup = state["timestamp_tls_setup"] 410 self.tls_version = state["tls_version"] 411 self.state = ConnectionState(state["state"]) 412 self.error = state["error"] 413 self.tls = state["tls"] 414 self.certificate_list = [ 415 certs.Cert.from_state(x) for x in state["certificate_list"] 416 ] 417 self.alpn_offers = state["alpn_offers"] 418 self.cipher = state["cipher_name"] 419 self.cipher_list = state["cipher_list"] 420 self.via = state["via2"] 421 422 @property 423 def ip_address(self) -> Optional[Address]: # pragma: no cover 424 """*Deprecated:* An outdated alias for `Server.peername`.""" 425 warnings.warn( 426 "Server.ip_address is deprecated, use Server.peername instead.", 427 DeprecationWarning, 428 stacklevel=2, 429 ) 430 return self.peername 431 432 @property 433 def cert(self) -> Optional[certs.Cert]: # pragma: no cover 434 """*Deprecated:* An outdated alias for `Connection.certificate_list[0]`.""" 435 warnings.warn( 436 "Server.cert is deprecated, use Server.certificate_list instead.", 437 DeprecationWarning, 438 stacklevel=2, 439 ) 440 if self.certificate_list: 441 return self.certificate_list[0] 442 else: 443 return None 444 445 @cert.setter 446 def cert(self, val): # pragma: no cover 447 warnings.warn( 448 "Server.cert is deprecated, use Server.certificate_list instead.", 449 DeprecationWarning, 450 stacklevel=2, 451 ) 452 if val: 453 self.certificate_list = [val] 454 else: 455 self.certificate_list = []
A connection between mitmproxy and an upstream server.
The server's resolved (ip, port)
tuple. Will be set during connection establishment.
The server's (host, port)
address tuple. The host can either be a domain or a plain IP address.
An optional proxy server specification via which the connection should be established.
Deprecated: An outdated alias for Connection.certificate_list[0]
.
Inherited Members
- Connection
- id
- state
- transport_protocol
- error
- tls
- certificate_list
- alpn
- alpn_offers
- cipher
- cipher_list
- tls_version
- sni
- timestamp_end
- timestamp_tls_setup
- connected
- tls_established
- alpn_proto_negotiated
- mitmproxy.coretypes.serializable.Serializable
- copy
16class ConnectionState(Flag): 17 """The current state of the underlying socket.""" 18 19 CLOSED = 0 20 CAN_READ = 1 21 CAN_WRITE = 2 22 OPEN = CAN_READ | CAN_WRITE
The current state of the underlying socket.
Inherited Members
- enum.Enum
- name
- value