mitmproxy.certs
1import contextlib 2import datetime 3import ipaddress 4import os 5import re 6import sys 7from dataclasses import dataclass 8from pathlib import Path 9from typing import NewType, Optional, Union 10 11from cryptography import x509 12from cryptography.hazmat.primitives import hashes, serialization 13from cryptography.hazmat.primitives.asymmetric import rsa, dsa, ec 14from cryptography.hazmat.primitives.serialization import pkcs12 15from cryptography.x509 import NameOID, ExtendedKeyUsageOID 16 17import OpenSSL 18from mitmproxy.coretypes import serializable 19 20# Default expiry must not be too long: https://github.com/mitmproxy/mitmproxy/issues/815 21CA_EXPIRY = datetime.timedelta(days=10 * 365) 22CERT_EXPIRY = datetime.timedelta(days=365) 23 24# Generated with "openssl dhparam". It's too slow to generate this on startup. 25DEFAULT_DHPARAM = b""" 26-----BEGIN DH PARAMETERS----- 27MIICCAKCAgEAyT6LzpwVFS3gryIo29J5icvgxCnCebcdSe/NHMkD8dKJf8suFCg3 28O2+dguLakSVif/t6dhImxInJk230HmfC8q93hdcg/j8rLGJYDKu3ik6H//BAHKIv 29j5O9yjU3rXCfmVJQic2Nne39sg3CreAepEts2TvYHhVv3TEAzEqCtOuTjgDv0ntJ 30Gwpj+BJBRQGG9NvprX1YGJ7WOFBP/hWU7d6tgvE6Xa7T/u9QIKpYHMIkcN/l3ZFB 31chZEqVlyrcngtSXCROTPcDOQ6Q8QzhaBJS+Z6rcsd7X+haiQqvoFcmaJ08Ks6LQC 32ZIL2EtYJw8V8z7C0igVEBIADZBI6OTbuuhDwRw//zU1uq52Oc48CIZlGxTYG/Evq 33o9EWAXUYVzWkDSTeBH1r4z/qLPE2cnhtMxbFxuvK53jGB0emy2y1Ei6IhKshJ5qX 34IB/aE7SSHyQ3MDHHkCmQJCsOd4Mo26YX61NZ+n501XjqpCBQ2+DfZCBh8Va2wDyv 35A2Ryg9SUz8j0AXViRNMJgJrr446yro/FuJZwnQcO3WQnXeqSBnURqKjmqkeFP+d8 366mk2tqJaY507lRNqtGlLnj7f5RNoBFJDCLBNurVgfvq9TCVWKDIFD4vZRjCrnl6I 37rD693XKIHUCWOjMh1if6omGXKHH40QuME2gNa50+YPn1iYDl88uDbbMCAQI= 38-----END DH PARAMETERS----- 39""" 40 41 42class Cert(serializable.Serializable): 43 """Representation of a (TLS) certificate.""" 44 45 _cert: x509.Certificate 46 47 def __init__(self, cert: x509.Certificate): 48 assert isinstance(cert, x509.Certificate) 49 self._cert = cert 50 51 def __eq__(self, other): 52 return self.fingerprint() == other.fingerprint() 53 54 def __repr__(self): 55 return f"<Cert(cn={self.cn!r}, altnames={self.altnames!r})>" 56 57 def __hash__(self): 58 return self._cert.__hash__() 59 60 @classmethod 61 def from_state(cls, state): 62 return cls.from_pem(state) 63 64 def get_state(self): 65 return self.to_pem() 66 67 def set_state(self, state): 68 self._cert = x509.load_pem_x509_certificate(state) 69 70 @classmethod 71 def from_pem(cls, data: bytes) -> "Cert": 72 cert = x509.load_pem_x509_certificate(data) # type: ignore 73 return cls(cert) 74 75 def to_pem(self) -> bytes: 76 return self._cert.public_bytes(serialization.Encoding.PEM) 77 78 @classmethod 79 def from_pyopenssl(self, x509: OpenSSL.crypto.X509) -> "Cert": 80 return Cert(x509.to_cryptography()) 81 82 def to_pyopenssl(self) -> OpenSSL.crypto.X509: 83 return OpenSSL.crypto.X509.from_cryptography(self._cert) 84 85 def fingerprint(self) -> bytes: 86 return self._cert.fingerprint(hashes.SHA256()) 87 88 @property 89 def issuer(self) -> list[tuple[str, str]]: 90 return _name_to_keyval(self._cert.issuer) 91 92 @property 93 def notbefore(self) -> datetime.datetime: 94 # x509.Certificate.not_valid_before is a naive datetime in UTC 95 return self._cert.not_valid_before.replace(tzinfo=datetime.timezone.utc) 96 97 @property 98 def notafter(self) -> datetime.datetime: 99 # x509.Certificate.not_valid_after is a naive datetime in UTC 100 return self._cert.not_valid_after.replace(tzinfo=datetime.timezone.utc) 101 102 def has_expired(self) -> bool: 103 return datetime.datetime.utcnow() > self._cert.not_valid_after 104 105 @property 106 def subject(self) -> list[tuple[str, str]]: 107 return _name_to_keyval(self._cert.subject) 108 109 @property 110 def serial(self) -> int: 111 return self._cert.serial_number 112 113 @property 114 def keyinfo(self) -> tuple[str, int]: 115 public_key = self._cert.public_key() 116 if isinstance(public_key, rsa.RSAPublicKey): 117 return "RSA", public_key.key_size 118 if isinstance(public_key, dsa.DSAPublicKey): 119 return "DSA", public_key.key_size 120 if isinstance(public_key, ec.EllipticCurvePublicKey): 121 return f"EC ({public_key.curve.name})", public_key.key_size 122 return ( 123 public_key.__class__.__name__.replace("PublicKey", "").replace("_", ""), 124 getattr(public_key, "key_size", -1), 125 ) # pragma: no cover 126 127 @property 128 def cn(self) -> Optional[str]: 129 attrs = self._cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME) 130 if attrs: 131 return attrs[0].value 132 return None 133 134 @property 135 def organization(self) -> Optional[str]: 136 attrs = self._cert.subject.get_attributes_for_oid( 137 x509.NameOID.ORGANIZATION_NAME 138 ) 139 if attrs: 140 return attrs[0].value 141 return None 142 143 @property 144 def altnames(self) -> list[str]: 145 """ 146 Get all SubjectAlternativeName DNS altnames. 147 """ 148 try: 149 ext = self._cert.extensions.get_extension_for_class( 150 x509.SubjectAlternativeName 151 ).value 152 except x509.ExtensionNotFound: 153 return [] 154 else: 155 return ext.get_values_for_type(x509.DNSName) + [ 156 str(x) for x in ext.get_values_for_type(x509.IPAddress) 157 ] 158 159 160def _name_to_keyval(name: x509.Name) -> list[tuple[str, str]]: 161 parts = [] 162 for attr in name: 163 # pyca cryptography <35.0.0 backwards compatiblity 164 if hasattr(name, "rfc4514_attribute_name"): # pragma: no cover 165 k = attr.rfc4514_attribute_name # type: ignore 166 else: # pragma: no cover 167 k = attr.rfc4514_string().partition("=")[0] 168 v = attr.value 169 parts.append((k, v)) 170 return parts 171 172 173def create_ca( 174 organization: str, 175 cn: str, 176 key_size: int, 177) -> tuple[rsa.RSAPrivateKeyWithSerialization, x509.Certificate]: 178 now = datetime.datetime.now() 179 180 private_key = rsa.generate_private_key( 181 public_exponent=65537, 182 key_size=key_size, 183 ) # type: ignore 184 name = x509.Name( 185 [ 186 x509.NameAttribute(NameOID.COMMON_NAME, cn), 187 x509.NameAttribute(NameOID.ORGANIZATION_NAME, organization), 188 ] 189 ) 190 builder = x509.CertificateBuilder() 191 builder = builder.serial_number(x509.random_serial_number()) 192 builder = builder.subject_name(name) 193 builder = builder.not_valid_before(now - datetime.timedelta(days=2)) 194 builder = builder.not_valid_after(now + CA_EXPIRY) 195 builder = builder.issuer_name(name) 196 builder = builder.public_key(private_key.public_key()) 197 builder = builder.add_extension( 198 x509.BasicConstraints(ca=True, path_length=None), critical=True 199 ) 200 builder = builder.add_extension( 201 x509.ExtendedKeyUsage([ExtendedKeyUsageOID.SERVER_AUTH]), critical=False 202 ) 203 builder = builder.add_extension( 204 x509.KeyUsage( 205 digital_signature=False, 206 content_commitment=False, 207 key_encipherment=False, 208 data_encipherment=False, 209 key_agreement=False, 210 key_cert_sign=True, 211 crl_sign=True, 212 encipher_only=False, 213 decipher_only=False, 214 ), 215 critical=True, 216 ) 217 builder = builder.add_extension( 218 x509.SubjectKeyIdentifier.from_public_key(private_key.public_key()), 219 critical=False, 220 ) 221 cert = builder.sign(private_key=private_key, algorithm=hashes.SHA256()) # type: ignore 222 return private_key, cert 223 224 225def dummy_cert( 226 privkey: rsa.RSAPrivateKey, 227 cacert: x509.Certificate, 228 commonname: Optional[str], 229 sans: list[str], 230 organization: Optional[str] = None, 231) -> Cert: 232 """ 233 Generates a dummy certificate. 234 235 privkey: CA private key 236 cacert: CA certificate 237 commonname: Common name for the generated certificate. 238 sans: A list of Subject Alternate Names. 239 organization: Organization name for the generated certificate. 240 241 Returns cert if operation succeeded, None if not. 242 """ 243 builder = x509.CertificateBuilder() 244 builder = builder.issuer_name(cacert.subject) 245 builder = builder.add_extension( 246 x509.ExtendedKeyUsage([ExtendedKeyUsageOID.SERVER_AUTH]), critical=False 247 ) 248 builder = builder.public_key(cacert.public_key()) 249 250 now = datetime.datetime.now() 251 builder = builder.not_valid_before(now - datetime.timedelta(days=2)) 252 builder = builder.not_valid_after(now + CERT_EXPIRY) 253 254 subject = [] 255 is_valid_commonname = commonname is not None and len(commonname) < 64 256 if is_valid_commonname: 257 assert commonname is not None 258 subject.append(x509.NameAttribute(NameOID.COMMON_NAME, commonname)) 259 if organization is not None: 260 assert organization is not None 261 subject.append(x509.NameAttribute(NameOID.ORGANIZATION_NAME, organization)) 262 builder = builder.subject_name(x509.Name(subject)) 263 builder = builder.serial_number(x509.random_serial_number()) 264 265 ss: list[x509.GeneralName] = [] 266 for x in sans: 267 try: 268 ip = ipaddress.ip_address(x) 269 except ValueError: 270 ss.append(x509.DNSName(x)) 271 else: 272 ss.append(x509.IPAddress(ip)) 273 # RFC 5280 §4.2.1.6: subjectAltName is critical if subject is empty. 274 builder = builder.add_extension( 275 x509.SubjectAlternativeName(ss), critical=not is_valid_commonname 276 ) 277 cert = builder.sign(private_key=privkey, algorithm=hashes.SHA256()) # type: ignore 278 return Cert(cert) 279 280 281@dataclass(frozen=True) 282class CertStoreEntry: 283 cert: Cert 284 privatekey: rsa.RSAPrivateKey 285 chain_file: Optional[Path] 286 287 288TCustomCertId = str # manually provided certs (e.g. mitmproxy's --certs) 289TGeneratedCertId = tuple[Optional[str], tuple[str, ...]] # (common_name, sans) 290TCertId = Union[TCustomCertId, TGeneratedCertId] 291 292DHParams = NewType("DHParams", bytes) 293 294 295class CertStore: 296 """ 297 Implements an in-memory certificate store. 298 """ 299 300 STORE_CAP = 100 301 certs: dict[TCertId, CertStoreEntry] 302 expire_queue: list[CertStoreEntry] 303 304 def __init__( 305 self, 306 default_privatekey: rsa.RSAPrivateKey, 307 default_ca: Cert, 308 default_chain_file: Optional[Path], 309 dhparams: DHParams, 310 ): 311 self.default_privatekey = default_privatekey 312 self.default_ca = default_ca 313 self.default_chain_file = default_chain_file 314 self.dhparams = dhparams 315 self.certs = {} 316 self.expire_queue = [] 317 318 def expire(self, entry: CertStoreEntry) -> None: 319 self.expire_queue.append(entry) 320 if len(self.expire_queue) > self.STORE_CAP: 321 d = self.expire_queue.pop(0) 322 self.certs = {k: v for k, v in self.certs.items() if v != d} 323 324 @staticmethod 325 def load_dhparam(path: Path) -> DHParams: 326 # mitmproxy<=0.10 doesn't generate a dhparam file. 327 # Create it now if necessary. 328 if not path.exists(): 329 path.write_bytes(DEFAULT_DHPARAM) 330 331 # we could use cryptography for this, but it's unclear how to convert cryptography's object to pyOpenSSL's 332 # expected format. 333 bio = OpenSSL.SSL._lib.BIO_new_file(str(path).encode(sys.getfilesystemencoding()), b"r") # type: ignore 334 if bio != OpenSSL.SSL._ffi.NULL: # type: ignore 335 bio = OpenSSL.SSL._ffi.gc(bio, OpenSSL.SSL._lib.BIO_free) # type: ignore 336 dh = OpenSSL.SSL._lib.PEM_read_bio_DHparams( # type: ignore 337 bio, 338 OpenSSL.SSL._ffi.NULL, # type: ignore 339 OpenSSL.SSL._ffi.NULL, # type: ignore 340 OpenSSL.SSL._ffi.NULL, # type: ignore 341 ) 342 dh = OpenSSL.SSL._ffi.gc(dh, OpenSSL.SSL._lib.DH_free) # type: ignore 343 return dh 344 raise RuntimeError("Error loading DH Params.") # pragma: no cover 345 346 @classmethod 347 def from_store( 348 cls, 349 path: Union[Path, str], 350 basename: str, 351 key_size: int, 352 passphrase: Optional[bytes] = None, 353 ) -> "CertStore": 354 path = Path(path) 355 ca_file = path / f"{basename}-ca.pem" 356 dhparam_file = path / f"{basename}-dhparam.pem" 357 if not ca_file.exists(): 358 cls.create_store(path, basename, key_size) 359 return cls.from_files(ca_file, dhparam_file, passphrase) 360 361 @classmethod 362 def from_files( 363 cls, ca_file: Path, dhparam_file: Path, passphrase: Optional[bytes] = None 364 ) -> "CertStore": 365 raw = ca_file.read_bytes() 366 key = load_pem_private_key(raw, passphrase) 367 dh = cls.load_dhparam(dhparam_file) 368 certs = re.split(rb"(?=-----BEGIN CERTIFICATE-----)", raw) 369 ca = Cert.from_pem(certs[1]) 370 if len(certs) > 2: 371 chain_file: Optional[Path] = ca_file 372 else: 373 chain_file = None 374 return cls(key, ca, chain_file, dh) 375 376 @staticmethod 377 @contextlib.contextmanager 378 def umask_secret(): 379 """ 380 Context to temporarily set umask to its original value bitor 0o77. 381 Useful when writing private keys to disk so that only the owner 382 will be able to read them. 383 """ 384 original_umask = os.umask(0) 385 os.umask(original_umask | 0o77) 386 try: 387 yield 388 finally: 389 os.umask(original_umask) 390 391 @staticmethod 392 def create_store( 393 path: Path, basename: str, key_size: int, organization=None, cn=None 394 ) -> None: 395 path.mkdir(parents=True, exist_ok=True) 396 397 organization = organization or basename 398 cn = cn or basename 399 400 key: rsa.RSAPrivateKeyWithSerialization 401 ca: x509.Certificate 402 key, ca = create_ca(organization=organization, cn=cn, key_size=key_size) 403 404 # Dump the CA plus private key. 405 with CertStore.umask_secret(): 406 # PEM format 407 (path / f"{basename}-ca.pem").write_bytes( 408 key.private_bytes( 409 encoding=serialization.Encoding.PEM, 410 format=serialization.PrivateFormat.TraditionalOpenSSL, 411 encryption_algorithm=serialization.NoEncryption(), 412 ) 413 + ca.public_bytes(serialization.Encoding.PEM) 414 ) 415 416 # PKCS12 format for Windows devices 417 (path / f"{basename}-ca.p12").write_bytes( 418 pkcs12.serialize_key_and_certificates( # type: ignore 419 name=basename.encode(), 420 key=key, 421 cert=ca, 422 cas=None, 423 encryption_algorithm=serialization.NoEncryption(), 424 ) 425 ) 426 427 # Dump the certificate in PEM format 428 pem_cert = ca.public_bytes(serialization.Encoding.PEM) 429 (path / f"{basename}-ca-cert.pem").write_bytes(pem_cert) 430 # Create a .cer file with the same contents for Android 431 (path / f"{basename}-ca-cert.cer").write_bytes(pem_cert) 432 433 # Dump the certificate in PKCS12 format for Windows devices 434 (path / f"{basename}-ca-cert.p12").write_bytes( 435 pkcs12.serialize_key_and_certificates( 436 name=basename.encode(), 437 key=None, # type: ignore 438 cert=ca, 439 cas=None, 440 encryption_algorithm=serialization.NoEncryption(), 441 ) 442 ) 443 444 (path / f"{basename}-dhparam.pem").write_bytes(DEFAULT_DHPARAM) 445 446 def add_cert_file( 447 self, spec: str, path: Path, passphrase: Optional[bytes] = None 448 ) -> None: 449 raw = path.read_bytes() 450 cert = Cert.from_pem(raw) 451 try: 452 key = load_pem_private_key(raw, password=passphrase) 453 except ValueError: 454 key = self.default_privatekey 455 456 self.add_cert(CertStoreEntry(cert, key, path), spec) 457 458 def add_cert(self, entry: CertStoreEntry, *names: str) -> None: 459 """ 460 Adds a cert to the certstore. We register the CN in the cert plus 461 any SANs, and also the list of names provided as an argument. 462 """ 463 if entry.cert.cn: 464 self.certs[entry.cert.cn] = entry 465 for i in entry.cert.altnames: 466 self.certs[i] = entry 467 for i in names: 468 self.certs[i] = entry 469 470 @staticmethod 471 def asterisk_forms(dn: str) -> list[str]: 472 """ 473 Return all asterisk forms for a domain. For example, for www.example.com this will return 474 [b"www.example.com", b"*.example.com", b"*.com"]. The single wildcard "*" is omitted. 475 """ 476 parts = dn.split(".") 477 ret = [dn] 478 for i in range(1, len(parts)): 479 ret.append("*." + ".".join(parts[i:])) 480 return ret 481 482 def get_cert( 483 self, 484 commonname: Optional[str], 485 sans: list[str], 486 organization: Optional[str] = None, 487 ) -> CertStoreEntry: 488 """ 489 commonname: Common name for the generated certificate. Must be a 490 valid, plain-ASCII, IDNA-encoded domain name. 491 492 sans: A list of Subject Alternate Names. 493 494 organization: Organization name for the generated certificate. 495 """ 496 497 potential_keys: list[TCertId] = [] 498 if commonname: 499 potential_keys.extend(self.asterisk_forms(commonname)) 500 for s in sans: 501 potential_keys.extend(self.asterisk_forms(s)) 502 potential_keys.append("*") 503 potential_keys.append((commonname, tuple(sans))) 504 505 name = next(filter(lambda key: key in self.certs, potential_keys), None) 506 if name: 507 entry = self.certs[name] 508 else: 509 entry = CertStoreEntry( 510 cert=dummy_cert( 511 self.default_privatekey, 512 self.default_ca._cert, 513 commonname, 514 sans, 515 organization, 516 ), 517 privatekey=self.default_privatekey, 518 chain_file=self.default_chain_file, 519 ) 520 self.certs[(commonname, tuple(sans))] = entry 521 self.expire(entry) 522 523 return entry 524 525 526def load_pem_private_key(data: bytes, password: Optional[bytes]) -> rsa.RSAPrivateKey: 527 """ 528 like cryptography's load_pem_private_key, but silently falls back to not using a password 529 if the private key is unencrypted. 530 """ 531 try: 532 return serialization.load_pem_private_key(data, password) # type: ignore 533 except TypeError: 534 if password is not None: 535 return load_pem_private_key(data, None) 536 raise
class
Cert(mitmproxy.coretypes.serializable.Serializable):
43class Cert(serializable.Serializable): 44 """Representation of a (TLS) certificate.""" 45 46 _cert: x509.Certificate 47 48 def __init__(self, cert: x509.Certificate): 49 assert isinstance(cert, x509.Certificate) 50 self._cert = cert 51 52 def __eq__(self, other): 53 return self.fingerprint() == other.fingerprint() 54 55 def __repr__(self): 56 return f"<Cert(cn={self.cn!r}, altnames={self.altnames!r})>" 57 58 def __hash__(self): 59 return self._cert.__hash__() 60 61 @classmethod 62 def from_state(cls, state): 63 return cls.from_pem(state) 64 65 def get_state(self): 66 return self.to_pem() 67 68 def set_state(self, state): 69 self._cert = x509.load_pem_x509_certificate(state) 70 71 @classmethod 72 def from_pem(cls, data: bytes) -> "Cert": 73 cert = x509.load_pem_x509_certificate(data) # type: ignore 74 return cls(cert) 75 76 def to_pem(self) -> bytes: 77 return self._cert.public_bytes(serialization.Encoding.PEM) 78 79 @classmethod 80 def from_pyopenssl(self, x509: OpenSSL.crypto.X509) -> "Cert": 81 return Cert(x509.to_cryptography()) 82 83 def to_pyopenssl(self) -> OpenSSL.crypto.X509: 84 return OpenSSL.crypto.X509.from_cryptography(self._cert) 85 86 def fingerprint(self) -> bytes: 87 return self._cert.fingerprint(hashes.SHA256()) 88 89 @property 90 def issuer(self) -> list[tuple[str, str]]: 91 return _name_to_keyval(self._cert.issuer) 92 93 @property 94 def notbefore(self) -> datetime.datetime: 95 # x509.Certificate.not_valid_before is a naive datetime in UTC 96 return self._cert.not_valid_before.replace(tzinfo=datetime.timezone.utc) 97 98 @property 99 def notafter(self) -> datetime.datetime: 100 # x509.Certificate.not_valid_after is a naive datetime in UTC 101 return self._cert.not_valid_after.replace(tzinfo=datetime.timezone.utc) 102 103 def has_expired(self) -> bool: 104 return datetime.datetime.utcnow() > self._cert.not_valid_after 105 106 @property 107 def subject(self) -> list[tuple[str, str]]: 108 return _name_to_keyval(self._cert.subject) 109 110 @property 111 def serial(self) -> int: 112 return self._cert.serial_number 113 114 @property 115 def keyinfo(self) -> tuple[str, int]: 116 public_key = self._cert.public_key() 117 if isinstance(public_key, rsa.RSAPublicKey): 118 return "RSA", public_key.key_size 119 if isinstance(public_key, dsa.DSAPublicKey): 120 return "DSA", public_key.key_size 121 if isinstance(public_key, ec.EllipticCurvePublicKey): 122 return f"EC ({public_key.curve.name})", public_key.key_size 123 return ( 124 public_key.__class__.__name__.replace("PublicKey", "").replace("_", ""), 125 getattr(public_key, "key_size", -1), 126 ) # pragma: no cover 127 128 @property 129 def cn(self) -> Optional[str]: 130 attrs = self._cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME) 131 if attrs: 132 return attrs[0].value 133 return None 134 135 @property 136 def organization(self) -> Optional[str]: 137 attrs = self._cert.subject.get_attributes_for_oid( 138 x509.NameOID.ORGANIZATION_NAME 139 ) 140 if attrs: 141 return attrs[0].value 142 return None 143 144 @property 145 def altnames(self) -> list[str]: 146 """ 147 Get all SubjectAlternativeName DNS altnames. 148 """ 149 try: 150 ext = self._cert.extensions.get_extension_for_class( 151 x509.SubjectAlternativeName 152 ).value 153 except x509.ExtensionNotFound: 154 return [] 155 else: 156 return ext.get_values_for_type(x509.DNSName) + [ 157 str(x) for x in ext.get_values_for_type(x509.IPAddress) 158 ]
Representation of a (TLS) certificate.
Inherited Members
- mitmproxy.coretypes.serializable.Serializable
- copy