mitmproxy.dns
1from __future__ import annotations 2from dataclasses import dataclass 3import itertools 4import random 5import struct 6from ipaddress import IPv4Address, IPv6Address 7import time 8from typing import ClassVar 9 10from mitmproxy import flow, stateobject 11from mitmproxy.net.dns import classes, domain_names, op_codes, response_codes, types 12 13# DNS parameters taken from https://www.iana.org/assignments/dns-parameters/dns-parameters.xml 14 15 16@dataclass 17class Question(stateobject.StateObject): 18 HEADER: ClassVar[struct.Struct] = struct.Struct("!HH") 19 20 name: str 21 type: int 22 class_: int 23 24 _stateobject_attributes = dict(name=str, type=int, class_=int) 25 26 @classmethod 27 def from_state(cls, state): 28 return cls(**state) 29 30 def __str__(self) -> str: 31 return self.name 32 33 def to_json(self) -> dict: 34 """ 35 Converts the question into json for mitmweb. 36 Sync with web/src/flow.ts. 37 """ 38 return { 39 "name": self.name, 40 "type": types.to_str(self.type), 41 "class": classes.to_str(self.class_), 42 } 43 44 45@dataclass 46class ResourceRecord(stateobject.StateObject): 47 DEFAULT_TTL: ClassVar[int] = 60 48 HEADER: ClassVar[struct.Struct] = struct.Struct("!HHIH") 49 50 name: str 51 type: int 52 class_: int 53 ttl: int 54 data: bytes 55 56 _stateobject_attributes = dict(name=str, type=int, class_=int, ttl=int, data=bytes) 57 58 @classmethod 59 def from_state(cls, state): 60 return cls(**state) 61 62 def __str__(self) -> str: 63 try: 64 if self.type == types.A: 65 return str(self.ipv4_address) 66 if self.type == types.AAAA: 67 return str(self.ipv6_address) 68 if self.type in (types.NS, types.CNAME, types.PTR): 69 return self.domain_name 70 if self.type == types.TXT: 71 return self.text 72 except: 73 return f"0x{self.data.hex()} (invalid {types.to_str(self.type)} data)" 74 return f"0x{self.data.hex()}" 75 76 @property 77 def text(self) -> str: 78 return self.data.decode("utf-8") 79 80 @text.setter 81 def text(self, value: str) -> None: 82 self.data = value.encode("utf-8") 83 84 @property 85 def ipv4_address(self) -> IPv4Address: 86 return IPv4Address(self.data) 87 88 @ipv4_address.setter 89 def ipv4_address(self, ip: IPv4Address) -> None: 90 self.data = ip.packed 91 92 @property 93 def ipv6_address(self) -> IPv6Address: 94 return IPv6Address(self.data) 95 96 @ipv6_address.setter 97 def ipv6_address(self, ip: IPv6Address) -> None: 98 self.data = ip.packed 99 100 @property 101 def domain_name(self) -> str: 102 return domain_names.unpack(self.data) 103 104 @domain_name.setter 105 def domain_name(self, name: str) -> None: 106 self.data = domain_names.pack(name) 107 108 def to_json(self) -> dict: 109 """ 110 Converts the resource record into json for mitmweb. 111 Sync with web/src/flow.ts. 112 """ 113 return { 114 "name": self.name, 115 "type": types.to_str(self.type), 116 "class": classes.to_str(self.class_), 117 "ttl": self.ttl, 118 "data": str(self), 119 } 120 121 @classmethod 122 def A(cls, name: str, ip: IPv4Address, *, ttl: int = DEFAULT_TTL) -> ResourceRecord: 123 """Create an IPv4 resource record.""" 124 return cls(name, types.A, classes.IN, ttl, ip.packed) 125 126 @classmethod 127 def AAAA( 128 cls, name: str, ip: IPv6Address, *, ttl: int = DEFAULT_TTL 129 ) -> ResourceRecord: 130 """Create an IPv6 resource record.""" 131 return cls(name, types.AAAA, classes.IN, ttl, ip.packed) 132 133 @classmethod 134 def CNAME( 135 cls, alias: str, canonical: str, *, ttl: int = DEFAULT_TTL 136 ) -> ResourceRecord: 137 """Create a canonical internet name resource record.""" 138 return cls(alias, types.CNAME, classes.IN, ttl, domain_names.pack(canonical)) 139 140 @classmethod 141 def PTR(cls, inaddr: str, ptr: str, *, ttl: int = DEFAULT_TTL) -> ResourceRecord: 142 """Create a canonical internet name resource record.""" 143 return cls(inaddr, types.PTR, classes.IN, ttl, domain_names.pack(ptr)) 144 145 @classmethod 146 def TXT(cls, name: str, text: str, *, ttl: int = DEFAULT_TTL) -> ResourceRecord: 147 """Create a textual resource record.""" 148 return cls(name, types.TXT, classes.IN, ttl, text.encode("utf-8")) 149 150 151# comments are taken from rfc1035 152@dataclass 153class Message(stateobject.StateObject): 154 HEADER: ClassVar[struct.Struct] = struct.Struct("!HHHHHH") 155 156 timestamp: float 157 """The time at which the message was sent or received.""" 158 id: int 159 """An identifier assigned by the program that generates any kind of query.""" 160 query: bool 161 """A field that specifies whether this message is a query.""" 162 op_code: int 163 """ 164 A field that specifies kind of query in this message. 165 This value is set by the originator of a request and copied into the response. 166 """ 167 authoritative_answer: bool 168 """ 169 This field is valid in responses, and specifies that the responding name server 170 is an authority for the domain name in question section. 171 """ 172 truncation: bool 173 """Specifies that this message was truncated due to length greater than that permitted on the transmission channel.""" 174 recursion_desired: bool 175 """ 176 This field may be set in a query and is copied into the response. 177 If set, it directs the name server to pursue the query recursively. 178 """ 179 recursion_available: bool 180 """This field is set or cleared in a response, and denotes whether recursive query support is available in the name server.""" 181 reserved: int 182 """Reserved for future use. Must be zero in all queries and responses.""" 183 response_code: int 184 """This field is set as part of responses.""" 185 questions: list[Question] 186 """ 187 The question section is used to carry the "question" in most queries, i.e. 188 the parameters that define what is being asked. 189 """ 190 answers: list[ResourceRecord] 191 """First resource record section.""" 192 authorities: list[ResourceRecord] 193 """Second resource record section.""" 194 additionals: list[ResourceRecord] 195 """Third resource record section.""" 196 197 _stateobject_attributes = dict( 198 timestamp=float, 199 id=int, 200 query=bool, 201 op_code=int, 202 authoritative_answer=bool, 203 truncation=bool, 204 recursion_desired=bool, 205 recursion_available=bool, 206 reserved=int, 207 response_code=int, 208 questions=list[Question], 209 answers=list[ResourceRecord], 210 authorities=list[ResourceRecord], 211 additionals=list[ResourceRecord], 212 ) 213 214 @classmethod 215 def from_state(cls, state): 216 obj = cls.__new__(cls) # `cls(**state)` won't work recursively 217 obj.set_state(state) 218 return obj 219 220 def __str__(self) -> str: 221 return "\r\n".join( 222 map( 223 str, 224 itertools.chain( 225 self.questions, self.answers, self.authorities, self.additionals 226 ), 227 ) 228 ) 229 230 @property 231 def content(self) -> bytes: 232 """Returns the user-friendly content of all parts as encoded bytes.""" 233 return str(self).encode() 234 235 @property 236 def size(self) -> int: 237 """Returns the cumulative data size of all resource record sections.""" 238 return sum( 239 len(x.data) 240 for x in itertools.chain.from_iterable( 241 [self.answers, self.authorities, self.additionals] 242 ) 243 ) 244 245 def fail(self, response_code: int) -> Message: 246 if response_code == response_codes.NOERROR: 247 raise ValueError("response_code must be an error code.") 248 return Message( 249 timestamp=time.time(), 250 id=self.id, 251 query=False, 252 op_code=self.op_code, 253 authoritative_answer=False, 254 truncation=False, 255 recursion_desired=self.recursion_desired, 256 recursion_available=False, 257 reserved=0, 258 response_code=response_code, 259 questions=self.questions, 260 answers=[], 261 authorities=[], 262 additionals=[], 263 ) 264 265 def succeed(self, answers: list[ResourceRecord]) -> Message: 266 return Message( 267 timestamp=time.time(), 268 id=self.id, 269 query=False, 270 op_code=self.op_code, 271 authoritative_answer=False, 272 truncation=False, 273 recursion_desired=self.recursion_desired, 274 recursion_available=True, 275 reserved=0, 276 response_code=response_codes.NOERROR, 277 questions=self.questions, 278 answers=answers, 279 authorities=[], 280 additionals=[], 281 ) 282 283 @classmethod 284 def unpack(cls, buffer: bytes) -> Message: 285 """Converts the entire given buffer into a DNS message.""" 286 length, msg = cls.unpack_from(buffer, 0) 287 if length != len(buffer): 288 raise struct.error(f"unpack requires a buffer of {length} bytes") 289 return msg 290 291 @classmethod 292 def unpack_from(cls, buffer: bytes | bytearray, offset: int) -> tuple[int, Message]: 293 """Converts the buffer from a given offset into a DNS message and also returns its length.""" 294 ( 295 id, 296 flags, 297 len_questions, 298 len_answers, 299 len_authorities, 300 len_additionals, 301 ) = Message.HEADER.unpack_from(buffer, offset) 302 msg = Message( 303 timestamp=time.time(), 304 id=id, 305 query=(flags & (1 << 15)) == 0, 306 op_code=(flags >> 11) & 0b1111, 307 authoritative_answer=(flags & (1 << 10)) != 0, 308 truncation=(flags & (1 << 9)) != 0, 309 recursion_desired=(flags & (1 << 8)) != 0, 310 recursion_available=(flags & (1 << 7)) != 0, 311 reserved=(flags >> 4) & 0b111, 312 response_code=flags & 0b1111, 313 questions=[], 314 answers=[], 315 authorities=[], 316 additionals=[], 317 ) 318 offset += Message.HEADER.size 319 cached_names = domain_names.cache() 320 321 def unpack_domain_name() -> str: 322 nonlocal buffer, offset, cached_names 323 name, length = domain_names.unpack_from_with_compression( 324 buffer, offset, cached_names 325 ) 326 offset += length 327 return name 328 329 for i in range(0, len_questions): 330 try: 331 name = unpack_domain_name() 332 type, class_ = Question.HEADER.unpack_from(buffer, offset) 333 offset += Question.HEADER.size 334 msg.questions.append(Question(name=name, type=type, class_=class_)) 335 except struct.error as e: 336 raise struct.error(f"question #{i}: {str(e)}") 337 338 def unpack_rrs( 339 section: list[ResourceRecord], section_name: str, count: int 340 ) -> None: 341 nonlocal buffer, offset 342 for i in range(0, count): 343 try: 344 name = unpack_domain_name() 345 type, class_, ttl, len_data = ResourceRecord.HEADER.unpack_from( 346 buffer, offset 347 ) 348 offset += ResourceRecord.HEADER.size 349 end_data = offset + len_data 350 if len(buffer) < end_data: 351 raise struct.error( 352 f"unpack requires a data buffer of {len_data} bytes" 353 ) 354 data = buffer[offset:end_data] 355 if 0b11000000 in data: 356 # the resource record might contains a compressed domain name, if so, uncompressed in advance 357 try: 358 ( 359 rr_name, 360 rr_name_len, 361 ) = domain_names.unpack_from_with_compression( 362 buffer, offset, cached_names 363 ) 364 if rr_name_len == len_data: 365 data = domain_names.pack(rr_name) 366 except struct.error: 367 pass 368 section.append(ResourceRecord(name, type, class_, ttl, data)) 369 offset += len_data 370 except struct.error as e: 371 raise struct.error(f"{section_name} #{i}: {str(e)}") 372 373 unpack_rrs(msg.answers, "answer", len_answers) 374 unpack_rrs(msg.authorities, "authority", len_authorities) 375 unpack_rrs(msg.additionals, "additional", len_additionals) 376 return (offset, msg) 377 378 @property 379 def packed(self) -> bytes: 380 """Converts the message into network bytes.""" 381 if self.id < 0 or self.id > 65535: 382 raise ValueError(f"DNS message's id {self.id} is out of bounds.") 383 flags = 0 384 if not self.query: 385 flags |= 1 << 15 386 if self.op_code < 0 or self.op_code > 0b1111: 387 raise ValueError(f"DNS message's op_code {self.op_code} is out of bounds.") 388 flags |= self.op_code << 11 389 if self.authoritative_answer: 390 flags |= 1 << 10 391 if self.truncation: 392 flags |= 1 << 9 393 if self.recursion_desired: 394 flags |= 1 << 8 395 if self.recursion_available: 396 flags |= 1 << 7 397 if self.reserved < 0 or self.reserved > 0b111: 398 raise ValueError( 399 f"DNS message's reserved value of {self.reserved} is out of bounds." 400 ) 401 flags |= self.reserved << 4 402 if self.response_code < 0 or self.response_code > 0b1111: 403 raise ValueError( 404 f"DNS message's response_code {self.response_code} is out of bounds." 405 ) 406 flags |= self.response_code 407 data = bytearray() 408 data.extend( 409 Message.HEADER.pack( 410 self.id, 411 flags, 412 len(self.questions), 413 len(self.answers), 414 len(self.authorities), 415 len(self.additionals), 416 ) 417 ) 418 # TODO implement compression 419 for question in self.questions: 420 data.extend(domain_names.pack(question.name)) 421 data.extend(Question.HEADER.pack(question.type, question.class_)) 422 for rr in (*self.answers, *self.authorities, *self.additionals): 423 data.extend(domain_names.pack(rr.name)) 424 data.extend( 425 ResourceRecord.HEADER.pack(rr.type, rr.class_, rr.ttl, len(rr.data)) 426 ) 427 data.extend(rr.data) 428 return bytes(data) 429 430 def to_json(self) -> dict: 431 """ 432 Converts the message into json for mitmweb. 433 Sync with web/src/flow.ts. 434 """ 435 return { 436 "id": self.id, 437 "query": self.query, 438 "op_code": op_codes.to_str(self.op_code), 439 "authoritative_answer": self.authoritative_answer, 440 "truncation": self.truncation, 441 "recursion_desired": self.recursion_desired, 442 "recursion_available": self.recursion_available, 443 "response_code": response_codes.to_str(self.response_code), 444 "status_code": response_codes.http_equiv_status_code(self.response_code), 445 "questions": [question.to_json() for question in self.questions], 446 "answers": [rr.to_json() for rr in self.answers], 447 "authorities": [rr.to_json() for rr in self.authorities], 448 "additionals": [rr.to_json() for rr in self.additionals], 449 "size": self.size, 450 "timestamp": self.timestamp, 451 } 452 453 def copy(self) -> Message: 454 # we keep the copy semantics but change the ID generation 455 state = self.get_state() 456 state["id"] = random.randint(0, 65535) 457 return Message.from_state(state) 458 459 460class DNSFlow(flow.Flow): 461 """A DNSFlow is a collection of DNS messages representing a single DNS query.""" 462 463 request: Message 464 """The DNS request.""" 465 response: Message | None = None 466 """The DNS response.""" 467 468 _stateobject_attributes = flow.Flow._stateobject_attributes.copy() 469 _stateobject_attributes["request"] = Message 470 _stateobject_attributes["response"] = Message 471 472 def __repr__(self) -> str: 473 return f"<DNSFlow\r\n request={repr(self.request)}\r\n response={repr(self.response)}\r\n>"
461class DNSFlow(flow.Flow): 462 """A DNSFlow is a collection of DNS messages representing a single DNS query.""" 463 464 request: Message 465 """The DNS request.""" 466 response: Message | None = None 467 """The DNS response.""" 468 469 _stateobject_attributes = flow.Flow._stateobject_attributes.copy() 470 _stateobject_attributes["request"] = Message 471 _stateobject_attributes["response"] = Message 472 473 def __repr__(self) -> str: 474 return f"<DNSFlow\r\n request={repr(self.request)}\r\n response={repr(self.response)}\r\n>"
A DNSFlow is a collection of DNS messages representing a single DNS query.