Edit on GitHub

mitmproxy.dns

  1from __future__ import annotations
  2from dataclasses import dataclass
  3import itertools
  4import random
  5import struct
  6from ipaddress import IPv4Address, IPv6Address
  7import time
  8from typing import ClassVar
  9
 10from mitmproxy import flow, stateobject
 11from mitmproxy.net.dns import classes, domain_names, op_codes, response_codes, types
 12
 13# DNS parameters taken from https://www.iana.org/assignments/dns-parameters/dns-parameters.xml
 14
 15
 16@dataclass
 17class Question(stateobject.StateObject):
 18    HEADER: ClassVar[struct.Struct] = struct.Struct("!HH")
 19
 20    name: str
 21    type: int
 22    class_: int
 23
 24    _stateobject_attributes = dict(name=str, type=int, class_=int)
 25
 26    @classmethod
 27    def from_state(cls, state):
 28        return cls(**state)
 29
 30    def __str__(self) -> str:
 31        return self.name
 32
 33    def to_json(self) -> dict:
 34        """
 35        Converts the question into json for mitmweb.
 36        Sync with web/src/flow.ts.
 37        """
 38        return {
 39            "name": self.name,
 40            "type": types.to_str(self.type),
 41            "class": classes.to_str(self.class_),
 42        }
 43
 44
 45@dataclass
 46class ResourceRecord(stateobject.StateObject):
 47    DEFAULT_TTL: ClassVar[int] = 60
 48    HEADER: ClassVar[struct.Struct] = struct.Struct("!HHIH")
 49
 50    name: str
 51    type: int
 52    class_: int
 53    ttl: int
 54    data: bytes
 55
 56    _stateobject_attributes = dict(name=str, type=int, class_=int, ttl=int, data=bytes)
 57
 58    @classmethod
 59    def from_state(cls, state):
 60        return cls(**state)
 61
 62    def __str__(self) -> str:
 63        try:
 64            if self.type == types.A:
 65                return str(self.ipv4_address)
 66            if self.type == types.AAAA:
 67                return str(self.ipv6_address)
 68            if self.type in (types.NS, types.CNAME, types.PTR):
 69                return self.domain_name
 70            if self.type == types.TXT:
 71                return self.text
 72        except:
 73            return f"0x{self.data.hex()} (invalid {types.to_str(self.type)} data)"
 74        return f"0x{self.data.hex()}"
 75
 76    @property
 77    def text(self) -> str:
 78        return self.data.decode("utf-8")
 79
 80    @text.setter
 81    def text(self, value: str) -> None:
 82        self.data = value.encode("utf-8")
 83
 84    @property
 85    def ipv4_address(self) -> IPv4Address:
 86        return IPv4Address(self.data)
 87
 88    @ipv4_address.setter
 89    def ipv4_address(self, ip: IPv4Address) -> None:
 90        self.data = ip.packed
 91
 92    @property
 93    def ipv6_address(self) -> IPv6Address:
 94        return IPv6Address(self.data)
 95
 96    @ipv6_address.setter
 97    def ipv6_address(self, ip: IPv6Address) -> None:
 98        self.data = ip.packed
 99
100    @property
101    def domain_name(self) -> str:
102        return domain_names.unpack(self.data)
103
104    @domain_name.setter
105    def domain_name(self, name: str) -> None:
106        self.data = domain_names.pack(name)
107
108    def to_json(self) -> dict:
109        """
110        Converts the resource record into json for mitmweb.
111        Sync with web/src/flow.ts.
112        """
113        return {
114            "name": self.name,
115            "type": types.to_str(self.type),
116            "class": classes.to_str(self.class_),
117            "ttl": self.ttl,
118            "data": str(self),
119        }
120
121    @classmethod
122    def A(cls, name: str, ip: IPv4Address, *, ttl: int = DEFAULT_TTL) -> ResourceRecord:
123        """Create an IPv4 resource record."""
124        return cls(name, types.A, classes.IN, ttl, ip.packed)
125
126    @classmethod
127    def AAAA(
128        cls, name: str, ip: IPv6Address, *, ttl: int = DEFAULT_TTL
129    ) -> ResourceRecord:
130        """Create an IPv6 resource record."""
131        return cls(name, types.AAAA, classes.IN, ttl, ip.packed)
132
133    @classmethod
134    def CNAME(
135        cls, alias: str, canonical: str, *, ttl: int = DEFAULT_TTL
136    ) -> ResourceRecord:
137        """Create a canonical internet name resource record."""
138        return cls(alias, types.CNAME, classes.IN, ttl, domain_names.pack(canonical))
139
140    @classmethod
141    def PTR(cls, inaddr: str, ptr: str, *, ttl: int = DEFAULT_TTL) -> ResourceRecord:
142        """Create a canonical internet name resource record."""
143        return cls(inaddr, types.PTR, classes.IN, ttl, domain_names.pack(ptr))
144
145    @classmethod
146    def TXT(cls, name: str, text: str, *, ttl: int = DEFAULT_TTL) -> ResourceRecord:
147        """Create a textual resource record."""
148        return cls(name, types.TXT, classes.IN, ttl, text.encode("utf-8"))
149
150
151# comments are taken from rfc1035
152@dataclass
153class Message(stateobject.StateObject):
154    HEADER: ClassVar[struct.Struct] = struct.Struct("!HHHHHH")
155
156    timestamp: float
157    """The time at which the message was sent or received."""
158    id: int
159    """An identifier assigned by the program that generates any kind of query."""
160    query: bool
161    """A field that specifies whether this message is a query."""
162    op_code: int
163    """
164    A field that specifies kind of query in this message.
165    This value is set by the originator of a request and copied into the response.
166    """
167    authoritative_answer: bool
168    """
169    This field is valid in responses, and specifies that the responding name server
170    is an authority for the domain name in question section.
171    """
172    truncation: bool
173    """Specifies that this message was truncated due to length greater than that permitted on the transmission channel."""
174    recursion_desired: bool
175    """
176    This field may be set in a query and is copied into the response.
177    If set, it directs the name server to pursue the query recursively.
178    """
179    recursion_available: bool
180    """This field is set or cleared in a response, and denotes whether recursive query support is available in the name server."""
181    reserved: int
182    """Reserved for future use.  Must be zero in all queries and responses."""
183    response_code: int
184    """This field is set as part of responses."""
185    questions: list[Question]
186    """
187    The question section is used to carry the "question" in most queries, i.e.
188    the parameters that define what is being asked.
189    """
190    answers: list[ResourceRecord]
191    """First resource record section."""
192    authorities: list[ResourceRecord]
193    """Second resource record section."""
194    additionals: list[ResourceRecord]
195    """Third resource record section."""
196
197    _stateobject_attributes = dict(
198        timestamp=float,
199        id=int,
200        query=bool,
201        op_code=int,
202        authoritative_answer=bool,
203        truncation=bool,
204        recursion_desired=bool,
205        recursion_available=bool,
206        reserved=int,
207        response_code=int,
208        questions=list[Question],
209        answers=list[ResourceRecord],
210        authorities=list[ResourceRecord],
211        additionals=list[ResourceRecord],
212    )
213
214    @classmethod
215    def from_state(cls, state):
216        obj = cls.__new__(cls)  # `cls(**state)` won't work recursively
217        obj.set_state(state)
218        return obj
219
220    def __str__(self) -> str:
221        return "\r\n".join(
222            map(
223                str,
224                itertools.chain(
225                    self.questions, self.answers, self.authorities, self.additionals
226                ),
227            )
228        )
229
230    @property
231    def content(self) -> bytes:
232        """Returns the user-friendly content of all parts as encoded bytes."""
233        return str(self).encode()
234
235    @property
236    def size(self) -> int:
237        """Returns the cumulative data size of all resource record sections."""
238        return sum(
239            len(x.data)
240            for x in itertools.chain.from_iterable(
241                [self.answers, self.authorities, self.additionals]
242            )
243        )
244
245    def fail(self, response_code: int) -> Message:
246        if response_code == response_codes.NOERROR:
247            raise ValueError("response_code must be an error code.")
248        return Message(
249            timestamp=time.time(),
250            id=self.id,
251            query=False,
252            op_code=self.op_code,
253            authoritative_answer=False,
254            truncation=False,
255            recursion_desired=self.recursion_desired,
256            recursion_available=False,
257            reserved=0,
258            response_code=response_code,
259            questions=self.questions,
260            answers=[],
261            authorities=[],
262            additionals=[],
263        )
264
265    def succeed(self, answers: list[ResourceRecord]) -> Message:
266        return Message(
267            timestamp=time.time(),
268            id=self.id,
269            query=False,
270            op_code=self.op_code,
271            authoritative_answer=False,
272            truncation=False,
273            recursion_desired=self.recursion_desired,
274            recursion_available=True,
275            reserved=0,
276            response_code=response_codes.NOERROR,
277            questions=self.questions,
278            answers=answers,
279            authorities=[],
280            additionals=[],
281        )
282
283    @classmethod
284    def unpack(cls, buffer: bytes) -> Message:
285        """Converts the entire given buffer into a DNS message."""
286        length, msg = cls.unpack_from(buffer, 0)
287        if length != len(buffer):
288            raise struct.error(f"unpack requires a buffer of {length} bytes")
289        return msg
290
291    @classmethod
292    def unpack_from(cls, buffer: bytes | bytearray, offset: int) -> tuple[int, Message]:
293        """Converts the buffer from a given offset into a DNS message and also returns its length."""
294        (
295            id,
296            flags,
297            len_questions,
298            len_answers,
299            len_authorities,
300            len_additionals,
301        ) = Message.HEADER.unpack_from(buffer, offset)
302        msg = Message(
303            timestamp=time.time(),
304            id=id,
305            query=(flags & (1 << 15)) == 0,
306            op_code=(flags >> 11) & 0b1111,
307            authoritative_answer=(flags & (1 << 10)) != 0,
308            truncation=(flags & (1 << 9)) != 0,
309            recursion_desired=(flags & (1 << 8)) != 0,
310            recursion_available=(flags & (1 << 7)) != 0,
311            reserved=(flags >> 4) & 0b111,
312            response_code=flags & 0b1111,
313            questions=[],
314            answers=[],
315            authorities=[],
316            additionals=[],
317        )
318        offset += Message.HEADER.size
319        cached_names = domain_names.cache()
320
321        def unpack_domain_name() -> str:
322            nonlocal buffer, offset, cached_names
323            name, length = domain_names.unpack_from_with_compression(
324                buffer, offset, cached_names
325            )
326            offset += length
327            return name
328
329        for i in range(0, len_questions):
330            try:
331                name = unpack_domain_name()
332                type, class_ = Question.HEADER.unpack_from(buffer, offset)
333                offset += Question.HEADER.size
334                msg.questions.append(Question(name=name, type=type, class_=class_))
335            except struct.error as e:
336                raise struct.error(f"question #{i}: {str(e)}")
337
338        def unpack_rrs(
339            section: list[ResourceRecord], section_name: str, count: int
340        ) -> None:
341            nonlocal buffer, offset
342            for i in range(0, count):
343                try:
344                    name = unpack_domain_name()
345                    type, class_, ttl, len_data = ResourceRecord.HEADER.unpack_from(
346                        buffer, offset
347                    )
348                    offset += ResourceRecord.HEADER.size
349                    end_data = offset + len_data
350                    if len(buffer) < end_data:
351                        raise struct.error(
352                            f"unpack requires a data buffer of {len_data} bytes"
353                        )
354                    data = buffer[offset:end_data]
355                    if 0b11000000 in data:
356                        # the resource record might contains a compressed domain name, if so, uncompressed in advance
357                        try:
358                            (
359                                rr_name,
360                                rr_name_len,
361                            ) = domain_names.unpack_from_with_compression(
362                                buffer, offset, cached_names
363                            )
364                            if rr_name_len == len_data:
365                                data = domain_names.pack(rr_name)
366                        except struct.error:
367                            pass
368                    section.append(ResourceRecord(name, type, class_, ttl, data))
369                    offset += len_data
370                except struct.error as e:
371                    raise struct.error(f"{section_name} #{i}: {str(e)}")
372
373        unpack_rrs(msg.answers, "answer", len_answers)
374        unpack_rrs(msg.authorities, "authority", len_authorities)
375        unpack_rrs(msg.additionals, "additional", len_additionals)
376        return (offset, msg)
377
378    @property
379    def packed(self) -> bytes:
380        """Converts the message into network bytes."""
381        if self.id < 0 or self.id > 65535:
382            raise ValueError(f"DNS message's id {self.id} is out of bounds.")
383        flags = 0
384        if not self.query:
385            flags |= 1 << 15
386        if self.op_code < 0 or self.op_code > 0b1111:
387            raise ValueError(f"DNS message's op_code {self.op_code} is out of bounds.")
388        flags |= self.op_code << 11
389        if self.authoritative_answer:
390            flags |= 1 << 10
391        if self.truncation:
392            flags |= 1 << 9
393        if self.recursion_desired:
394            flags |= 1 << 8
395        if self.recursion_available:
396            flags |= 1 << 7
397        if self.reserved < 0 or self.reserved > 0b111:
398            raise ValueError(
399                f"DNS message's reserved value of {self.reserved} is out of bounds."
400            )
401        flags |= self.reserved << 4
402        if self.response_code < 0 or self.response_code > 0b1111:
403            raise ValueError(
404                f"DNS message's response_code {self.response_code} is out of bounds."
405            )
406        flags |= self.response_code
407        data = bytearray()
408        data.extend(
409            Message.HEADER.pack(
410                self.id,
411                flags,
412                len(self.questions),
413                len(self.answers),
414                len(self.authorities),
415                len(self.additionals),
416            )
417        )
418        # TODO implement compression
419        for question in self.questions:
420            data.extend(domain_names.pack(question.name))
421            data.extend(Question.HEADER.pack(question.type, question.class_))
422        for rr in (*self.answers, *self.authorities, *self.additionals):
423            data.extend(domain_names.pack(rr.name))
424            data.extend(
425                ResourceRecord.HEADER.pack(rr.type, rr.class_, rr.ttl, len(rr.data))
426            )
427            data.extend(rr.data)
428        return bytes(data)
429
430    def to_json(self) -> dict:
431        """
432        Converts the message into json for mitmweb.
433        Sync with web/src/flow.ts.
434        """
435        return {
436            "id": self.id,
437            "query": self.query,
438            "op_code": op_codes.to_str(self.op_code),
439            "authoritative_answer": self.authoritative_answer,
440            "truncation": self.truncation,
441            "recursion_desired": self.recursion_desired,
442            "recursion_available": self.recursion_available,
443            "response_code": response_codes.to_str(self.response_code),
444            "status_code": response_codes.http_equiv_status_code(self.response_code),
445            "questions": [question.to_json() for question in self.questions],
446            "answers": [rr.to_json() for rr in self.answers],
447            "authorities": [rr.to_json() for rr in self.authorities],
448            "additionals": [rr.to_json() for rr in self.additionals],
449            "size": self.size,
450            "timestamp": self.timestamp,
451        }
452
453    def copy(self) -> Message:
454        # we keep the copy semantics but change the ID generation
455        state = self.get_state()
456        state["id"] = random.randint(0, 65535)
457        return Message.from_state(state)
458
459
460class DNSFlow(flow.Flow):
461    """A DNSFlow is a collection of DNS messages representing a single DNS query."""
462
463    request: Message
464    """The DNS request."""
465    response: Message | None = None
466    """The DNS response."""
467
468    _stateobject_attributes = flow.Flow._stateobject_attributes.copy()
469    _stateobject_attributes["request"] = Message
470    _stateobject_attributes["response"] = Message
471
472    def __repr__(self) -> str:
473        return f"<DNSFlow\r\n  request={repr(self.request)}\r\n  response={repr(self.response)}\r\n>"
class DNSFlow(mitmproxy.flow.Flow):
461class DNSFlow(flow.Flow):
462    """A DNSFlow is a collection of DNS messages representing a single DNS query."""
463
464    request: Message
465    """The DNS request."""
466    response: Message | None = None
467    """The DNS response."""
468
469    _stateobject_attributes = flow.Flow._stateobject_attributes.copy()
470    _stateobject_attributes["request"] = Message
471    _stateobject_attributes["response"] = Message
472
473    def __repr__(self) -> str:
474        return f"<DNSFlow\r\n  request={repr(self.request)}\r\n  response={repr(self.response)}\r\n>"

A DNSFlow is a collection of DNS messages representing a single DNS query.

request: mitmproxy.dns.Message

The DNS request.

response: mitmproxy.dns.Message | None = None

The DNS response.

type: ClassVar[str] = 'dns'

The flow type, for example http, tcp, or dns.