Package dpkt :: Module dns
[hide private]
[frames] | no frames]

Source Code for Module dpkt.dns

  1  # $Id: dns.py 27 2006-11-21 01:22:52Z dahelder $ 
  2   
  3  """Domain Name System.""" 
  4   
  5  import struct 
  6  import dpkt 
  7   
  8  DNS_Q = 0 
  9  DNS_R = 1 
 10   
 11  # Opcodes 
 12  DNS_QUERY = 0 
 13  DNS_IQUERY = 1 
 14  DNS_STATUS = 2 
 15  DNS_NOTIFY = 4 
 16  DNS_UPDATE = 5 
 17   
 18  # Flags 
 19  DNS_CD = 0x0010 # checking disabled 
 20  DNS_AD = 0x0020 # authenticated data 
 21  DNS_Z =  0x0040 # unused 
 22  DNS_RA = 0x0080 # recursion available 
 23  DNS_RD = 0x0100 # recursion desired 
 24  DNS_TC = 0x0200 # truncated 
 25  DNS_AA = 0x0400 # authoritative answer 
 26  DNS_QR = 0x8000 # response ( query / response ) 
 27   
 28  # Response codes 
 29  DNS_RCODE_NOERR = 0 
 30  DNS_RCODE_FORMERR = 1 
 31  DNS_RCODE_SERVFAIL = 2 
 32  DNS_RCODE_NXDOMAIN = 3 
 33  DNS_RCODE_NOTIMP = 4 
 34  DNS_RCODE_REFUSED = 5 
 35  DNS_RCODE_YXDOMAIN = 6 
 36  DNS_RCODE_YXRRSET = 7 
 37  DNS_RCODE_NXRRSET = 8 
 38  DNS_RCODE_NOTAUTH = 9 
 39  DNS_RCODE_NOTZONE = 10 
 40   
 41  # RR types 
 42  DNS_A = 1 
 43  DNS_NS = 2 
 44  DNS_CNAME = 5 
 45  DNS_SOA = 6 
 46  DNS_PTR = 12 
 47  DNS_HINFO = 13 
 48  DNS_MX = 15 
 49  DNS_TXT = 16 
 50  DNS_AAAA = 28 
 51  DNS_SRV = 33 
 52   
 53  # RR classes 
 54  DNS_IN = 1 
 55  DNS_CHAOS = 3 
 56  DNS_HESIOD = 4 
 57  DNS_ANY = 255 
 58   
59 -def pack_name(name, off, label_ptrs):
60 if name: 61 labels = name.split('.') 62 else: 63 labels = [] 64 labels.append('') 65 buf = '' 66 for i, label in enumerate(labels): 67 key = '.'.join(labels[i:]).upper() 68 ptr = label_ptrs.get(key) 69 if not ptr: 70 if len(key) > 1: 71 ptr = off + len(buf) 72 if ptr < 0xc000: 73 label_ptrs[key] = ptr 74 i = len(label) 75 buf += chr(i) + label 76 else: 77 buf += struct.pack('>H', (0xc000 | ptr)) 78 break 79 return buf
80
81 -def unpack_name(buf, off):
82 name = '' 83 saved_off = 0 84 for i in range(100): # XXX 85 n = ord(buf[off]) 86 if n == 0: 87 off += 1 88 break 89 elif (n & 0xc0) == 0xc0: 90 ptr = struct.unpack('>H', buf[off:off+2])[0] & 0x3fff 91 off += 2 92 if not saved_off: 93 saved_off = off 94 # XXX - don't use recursion!@#$ 95 name = name + unpack_name(buf, ptr)[0] + '.' 96 break 97 else: 98 off += 1 99 name = name + buf[off:off+n] + '.' 100 if len(name) > 255: 101 raise dpkt.UnpackError('name longer than 255 bytes') 102 off += n 103 return name.strip('.'), off
104
105 -class DNS(dpkt.Packet):
106 __hdr__ = ( 107 ('id', 'H', 0), 108 ('op', 'H', DNS_RD), # recursive query 109 # XXX - lists of query, RR objects 110 ('qd', 'H', []), 111 ('an', 'H', []), 112 ('ns', 'H', []), 113 ('ar', 'H', []) 114 )
115 - def get_qr(self):
116 return int((self.op & DNS_QR) == DNS_QR)
117 - def set_qr(self, v):
118 if v: self.op |= DNS_QR 119 else: self.op &= ~DNS_QR
120 qr = property(get_qr, set_qr) 121
122 - def get_opcode(self):
123 return (self.op >> 11) & 0xf
124 - def set_opcode(self, v):
125 self.op = (self.op & ~0x7800) | ((v & 0xf) << 11)
126 opcode = property(get_opcode, set_opcode) 127
128 - def get_aa(self):
129 return int((self.op & DNS_AA) == DNS_AA)
130 - def set_aa(self, v):
131 if v: self.op |= DNS_AA 132 else: self.op &= ~DNS_AA
133 aa = property(get_aa, set_aa) 134
135 - def get_rd(self):
136 return int((self.op & DNS_RD) == DNS_RD)
137 - def set_rd(self,v):
138 if v: self.op |= DNS_RD 139 else: self.op &= ~DNS_RD
140 rd = property(get_rd, set_rd) 141
142 - def get_ra(self):
143 return int((self.op & DNS_RA) == DNS_RA)
144 - def set_ra(self,v):
145 if v: self.op |= DNS_RA 146 else: self.op &= ~DNS_RA
147 ra = property(get_ra, set_ra) 148
149 - def get_zero(self):
150 return int((self.op & DNS_Z) == DNS_Z)
151 - def set_zero(self, v):
152 if v: self.op |= DNS_Z 153 else: self.op &= ~DNS_Z
154 zero = property(get_zero, set_zero) 155
156 - def get_rcode(self):
157 return self.op & 0xf
158 - def set_rcode(self, v):
159 self.op = (self.op & ~0xf) | (v & 0xf)
160 rcode = property(get_rcode, set_rcode) 161
162 - class Q(dpkt.Packet):
163 """DNS question.""" 164 __hdr__ = ( 165 ('name', '1025s', ''), 166 ('type', 'H', DNS_A), 167 ('cls', 'H', DNS_IN) 168 ) 169 # XXX - suk
170 - def __len__(self):
171 raise NotImplementedError
172 __str__ = __len__
173 - def unpack(self, buf):
174 raise NotImplementedError
175
176 - class RR(Q):
177 """DNS resource record.""" 178 __hdr__ = ( 179 ('name', '1025s', ''), 180 ('type', 'H', DNS_A), 181 ('cls', 'H', DNS_IN), 182 ('ttl', 'I', 0), 183 ('rlen', 'H', 4), 184 ('rdata', 's', '') 185 )
186 - def pack_rdata(self, off, label_ptrs):
187 # XXX - yeah, this sux 188 if self.rdata: 189 return self.rdata 190 if self.type == DNS_A: 191 return self.ip 192 elif self.type == DNS_NS: 193 return pack_name(self.nsname, off, label_ptrs) 194 elif self.type == DNS_CNAME: 195 return pack_name(self.cname, off, label_ptrs) 196 elif self.type == DNS_PTR: 197 return pack_name(self.ptrname, off, label_ptrs) 198 elif self.type == DNS_SOA: 199 l = [] 200 l.append(pack_name(self.mname, off, label_ptrs)) 201 l.append(pack_name(self.rname, off + len(l[0]), label_ptrs)) 202 l.append(struct.pack('>IIIII', self.serial, self.refresh, 203 self.retry, self.expire, self.minimum)) 204 return ''.join(l) 205 elif self.type == DNS_MX: 206 return struct.pack('>H', self.preference) + \ 207 pack_name(self.mxname, off + 2, label_ptrs) 208 elif self.type == DNS_TXT or self.type == DNS_HINFO: 209 return ''.join([ '%s%s' % (chr(len(x)), x) 210 for x in self.text ]) 211 elif self.type == DNS_AAAA: 212 return self.ip6 213 elif self.type == DNS_SRV: 214 return struct.pack('>HHH', self.priority, self.weight, self.port) + \ 215 pack_name(self.srvname, off + 6, label_ptrs)
216
217 - def unpack_rdata(self, buf, off):
218 if self.type == DNS_A: 219 self.ip = self.rdata 220 elif self.type == DNS_NS: 221 self.nsname, off = unpack_name(buf, off) 222 elif self.type == DNS_CNAME: 223 self.cname, off = unpack_name(buf, off) 224 elif self.type == DNS_PTR: 225 self.ptrname, off = unpack_name(buf, off) 226 elif self.type == DNS_SOA: 227 self.mname, off = unpack_name(buf, off) 228 self.rname, off = unpack_name(buf, off) 229 self.serial, self.refresh, self.retry, self.expire, \ 230 self.minimum = struct.unpack('>IIIII', buf[off:off+20]) 231 elif self.type == DNS_MX: 232 self.preference = struct.unpack('>H', self.rdata[:2]) 233 self.mxname, off = unpack_name(buf, off+2) 234 elif self.type == DNS_TXT or self.type == DNS_HINFO: 235 self.text = [] 236 buf = self.rdata 237 while buf: 238 n = ord(buf[0]) 239 self.text.append(buf[1:1+n]) 240 buf = buf[1+n:] 241 elif self.type == DNS_AAAA: 242 self.ip6 = self.rdata 243 elif self.type == DNS_SRV: 244 self.priority, self.weight, self.port = \ 245 struct.unpack('>HHH', self.rdata[:6]) 246 self.srvname, off = unpack_name(buf, off+6)
247
248 - def pack_q(self, buf, q):
249 """Append packed DNS question and return buf.""" 250 return buf + pack_name(q.name, len(buf), self.label_ptrs) + \ 251 struct.pack('>HH', q.type, q.cls)
252
253 - def unpack_q(self, buf, off):
254 """Return DNS question and new offset.""" 255 q = self.Q() 256 q.name, off = unpack_name(buf, off) 257 q.type, q.cls = struct.unpack('>HH', buf[off:off+4]) 258 off += 4 259 return q, off
260
261 - def pack_rr(self, buf, rr):
262 """Append packed DNS RR and return buf.""" 263 name = pack_name(rr.name, len(buf), self.label_ptrs) 264 rdata = rr.pack_rdata(len(buf) + len(name) + 10, self.label_ptrs) 265 return buf + name + struct.pack('>HHIH', rr.type, rr.cls, rr.ttl, 266 len(rdata)) + rdata
267
268 - def unpack_rr(self, buf, off):
269 """Return DNS RR and new offset.""" 270 rr = self.RR() 271 rr.name, off = unpack_name(buf, off) 272 rr.type, rr.cls, rr.ttl, rdlen = struct.unpack('>HHIH', buf[off:off+10]) 273 off += 10 274 rr.rdata = buf[off:off+rdlen] 275 rr.unpack_rdata(buf, off) 276 off += rdlen 277 return rr, off
278
279 - def unpack(self, buf):
280 dpkt.Packet.unpack(self, buf) 281 off = self.__hdr_len__ 282 cnt = self.qd 283 self.qd = [] 284 for i in range(cnt): 285 q, off = self.unpack_q(buf, off) 286 self.qd.append(q) 287 for x in ('an', 'ns', 'ar'): 288 cnt = getattr(self, x, 0) 289 setattr(self, x, []) 290 for i in range(cnt): 291 rr, off = self.unpack_rr(buf, off) 292 getattr(self, x).append(rr) 293 self.data = ''
294
295 - def __len__(self):
296 # XXX - cop out 297 return len(str(self))
298
299 - def __str__(self):
300 # XXX - compress names on the fly 301 self.label_ptrs = {} 302 buf = struct.pack(self.__hdr_fmt__, self.id, self.op, len(self.qd), 303 len(self.an), len(self.ns), len(self.ar)) 304 for q in self.qd: 305 buf = self.pack_q(buf, q) 306 for x in ('an', 'ns', 'ar'): 307 for rr in getattr(self, x): 308 buf = self.pack_rr(buf, rr) 309 del self.label_ptrs 310 return buf
311 312 if __name__ == '__main__': 313 import unittest 314 from ip import IP 315
316 - class DNSTestCase(unittest.TestCase):
317 - def test_basic(self):
318 s = 'E\x00\x02\x08\xc15\x00\x00\x80\x11\x92aBk0\x01Bk0w\x005\xc07\x01\xf4\xda\xc2d\xd2\x81\x80\x00\x01\x00\x03\x00\x0b\x00\x0b\x03www\x06google\x03com\x00\x00\x01\x00\x01\xc0\x0c\x00\x05\x00\x01\x00\x00\x03V\x00\x17\x03www\x06google\x06akadns\x03net\x00\xc0,\x00\x01\x00\x01\x00\x00\x01\xa3\x00\x04@\xe9\xabh\xc0,\x00\x01\x00\x01\x00\x00\x01\xa3\x00\x04@\xe9\xabc\xc07\x00\x02\x00\x01\x00\x00KG\x00\x0c\x04usw5\x04akam\xc0>\xc07\x00\x02\x00\x01\x00\x00KG\x00\x07\x04usw6\xc0t\xc07\x00\x02\x00\x01\x00\x00KG\x00\x07\x04usw7\xc0t\xc07\x00\x02\x00\x01\x00\x00KG\x00\x08\x05asia3\xc0t\xc07\x00\x02\x00\x01\x00\x00KG\x00\x05\x02za\xc07\xc07\x00\x02\x00\x01\x00\x00KG\x00\x0f\x02zc\x06akadns\x03org\x00\xc07\x00\x02\x00\x01\x00\x00KG\x00\x05\x02zf\xc07\xc07\x00\x02\x00\x01\x00\x00KG\x00\x05\x02zh\xc0\xd5\xc07\x00\x02\x00\x01\x00\x00KG\x00\x07\x04eur3\xc0t\xc07\x00\x02\x00\x01\x00\x00KG\x00\x07\x04use2\xc0t\xc07\x00\x02\x00\x01\x00\x00KG\x00\x07\x04use4\xc0t\xc0\xc1\x00\x01\x00\x01\x00\x00\xfb4\x00\x04\xd0\xb9\x84\xb0\xc0\xd2\x00\x01\x00\x01\x00\x001\x0c\x00\x04?\xf1\xc76\xc0\xed\x00\x01\x00\x01\x00\x00\xfb4\x00\x04?\xd7\xc6S\xc0\xfe\x00\x01\x00\x01\x00\x001\x0c\x00\x04?\xd00.\xc1\x0f\x00\x01\x00\x01\x00\x00\n\xdf\x00\x04\xc1-\x01g\xc1"\x00\x01\x00\x01\x00\x00\x101\x00\x04?\xd1\xaa\x88\xc15\x00\x01\x00\x01\x00\x00\r\x1a\x00\x04PCC\xb6\xc0o\x00\x01\x00\x01\x00\x00\x10\x7f\x00\x04?\xf1I\xd6\xc0\x87\x00\x01\x00\x01\x00\x00\n\xdf\x00\x04\xce\x84dl\xc0\x9a\x00\x01\x00\x01\x00\x00\n\xdf\x00\x04A\xcb\xea\x1b\xc0\xad\x00\x01\x00\x01\x00\x00\x0b)\x00\x04\xc1l\x9a\t' 319 ip = IP(s) 320 dns = DNS(ip.udp.data) 321 self.failUnless(dns.qd[0].name == 'www.google.com' and 322 dns.an[1].name == 'www.google.akadns.net') 323 s = '\x05\xf5\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x03www\x03cnn\x03com\x00\x00\x01\x00\x01' 324 dns = DNS(s) 325 self.failUnless(s == str(dns))
326
327 - def test_PTR(self):
328 s = 'g\x02\x81\x80\x00\x01\x00\x01\x00\x03\x00\x00\x011\x011\x03211\x03141\x07in-addr\x04arpa\x00\x00\x0c\x00\x01\xc0\x0c\x00\x0c\x00\x01\x00\x00\r6\x00$\x07default\nv-umce-ifs\x05umnet\x05umich\x03edu\x00\xc0\x0e\x00\x02\x00\x01\x00\x00\r6\x00\r\x06shabby\x03ifs\xc0O\xc0\x0e\x00\x02\x00\x01\x00\x00\r6\x00\x0f\x0cfish-license\xc0m\xc0\x0e\x00\x02\x00\x01\x00\x00\r6\x00\x0b\x04dns2\x03itd\xc0O' 329 dns = DNS(s) 330 self.failUnless(dns.qd[0].name == '1.1.211.141.in-addr.arpa' and 331 dns.an[0].ptrname == 'default.v-umce-ifs.umnet.umich.edu' and 332 dns.ns[0].nsname == 'shabby.ifs.umich.edu' and 333 dns.ns[1].ttl == 3382L and 334 dns.ns[2].nsname == 'dns2.itd.umich.edu') 335 self.failUnless(s == str(dns))
336
337 - def test_pack_name(self):
338 # Empty name is \0 339 x = pack_name('', 0, {}) 340 self.assertEqual(x, '\0')
341 342 unittest.main() 343