Package dpkt :: Module ip6
[hide private]
[frames] | no frames]

Source Code for Module dpkt.ip6

  1  # $Id: ip6.py 87 2013-03-05 19:41:04Z andrewflnr@gmail.com $ 
  2   
  3  """Internet Protocol, version 6.""" 
  4   
  5  import dpkt 
  6   
7 -class IP6(dpkt.Packet):
8 __hdr__ = ( 9 ('v_fc_flow', 'I', 0x60000000L), 10 ('plen', 'H', 0), # payload length (not including header) 11 ('nxt', 'B', 0), # next header protocol 12 ('hlim', 'B', 0), # hop limit 13 ('src', '16s', ''), 14 ('dst', '16s', '') 15 ) 16 17 # XXX - to be shared with IP. We cannot refer to the ip module 18 # right now because ip.__load_protos() expects the IP6 class to be 19 # defined. 20 _protosw = None 21
22 - def _get_v(self):
23 return self.v_fc_flow >> 28
24 - def _set_v(self, v):
25 self.v_fc_flow = (self.v_fc_flow & ~0xf0000000L) | (v << 28)
26 v = property(_get_v, _set_v) 27
28 - def _get_fc(self):
29 return (self.v_fc_flow >> 20) & 0xff
30 - def _set_fc(self, v):
31 self.v_fc_flow = (self.v_fc_flow & ~0xff00000L) | (v << 20)
32 fc = property(_get_fc, _set_fc) 33
34 - def _get_flow(self):
35 return self.v_fc_flow & 0xfffff
36 - def _set_flow(self, v):
37 self.v_fc_flow = (self.v_fc_flow & ~0xfffff) | (v & 0xfffff)
38 flow = property(_get_flow, _set_flow) 39
40 - def unpack(self, buf):
41 dpkt.Packet.unpack(self, buf) 42 self.extension_hdrs = dict(((i, None) for i in ext_hdrs)) 43 44 if self.plen: 45 buf = self.data[:self.plen] 46 else: # due to jumbo payload or TSO 47 buf = self.data 48 49 next = self.nxt 50 51 while (next in ext_hdrs): 52 ext = ext_hdrs_cls[next](buf) 53 self.extension_hdrs[next] = ext 54 buf = buf[ext.length:] 55 next = ext.nxt 56 57 # set the payload protocol id 58 setattr(self, 'p', next) 59 60 try: 61 self.data = self._protosw[next](buf) 62 setattr(self, self.data.__class__.__name__.lower(), self.data) 63 except (KeyError, dpkt.UnpackError): 64 self.data = buf
65
66 - def headers_str(self):
67 """ 68 Output extension headers in order defined in RFC1883 (except dest opts) 69 """ 70 71 header_str = "" 72 73 for hdr in ext_hdrs: 74 if not self.extension_hdrs[hdr] is None: 75 header_str += str(self.extension_hdrs[hdr]) 76 return header_str
77 78
79 - def __str__(self):
80 if (self.nxt == 6 or self.nxt == 17 or self.nxt == 58) and \ 81 not self.data.sum: 82 # XXX - set TCP, UDP, and ICMPv6 checksums 83 p = str(self.data) 84 s = dpkt.struct.pack('>16s16sxBH', self.src, self.dst, self.nxt, len(p)) 85 s = dpkt.in_cksum_add(0, s) 86 s = dpkt.in_cksum_add(s, p) 87 try: 88 self.data.sum = dpkt.in_cksum_done(s) 89 except AttributeError: 90 pass 91 return self.pack_hdr() + self.headers_str() + str(self.data)
92
93 - def set_proto(cls, p, pktclass):
94 cls._protosw[p] = pktclass
95 set_proto = classmethod(set_proto) 96
97 - def get_proto(cls, p):
98 return cls._protosw[p]
99 get_proto = classmethod(get_proto)
100 101 import ip 102 # We are most likely still in the middle of ip.__load_protos() which 103 # implicitly loads this module through __import__(), so the content of 104 # ip.IP._protosw is still incomplete at the moment. By sharing the 105 # same dictionary by reference as opposed to making a copy, when 106 # ip.__load_protos() finishes, we will also automatically get the most 107 # up-to-date dictionary. 108 IP6._protosw = ip.IP._protosw 109
110 -class IP6ExtensionHeader(dpkt.Packet):
111 """ 112 An extension header is very similar to a 'sub-packet'. 113 We just want to re-use all the hdr unpacking etc. 114 """ 115 pass 116
117 -class IP6OptsHeader(IP6ExtensionHeader):
118 __hdr__ = ( 119 ('nxt', 'B', 0), # next extension header protocol 120 ('len', 'B', 0) # option data length in 8 octect units (ignoring first 8 octets) so, len 0 == 64bit header 121 ) 122
123 - def unpack(self, buf):
124 dpkt.Packet.unpack(self, buf) 125 setattr(self, 'length', (self.len + 1) * 8) 126 options = [] 127 128 index = 0 129 130 while (index < self.length - 2): 131 opt_type = ord(self.data[index]) 132 133 # PAD1 option 134 if opt_type == 0: 135 index += 1 136 continue; 137 138 opt_length = ord(self.data[index + 1]) 139 140 if opt_type == 1: # PADN option 141 # PADN uses opt_length bytes in total 142 index += opt_length + 2 143 continue 144 145 options.append({'type': opt_type, 'opt_length': opt_length, 'data': self.data[index + 2:index + 2 + opt_length]}) 146 147 # add the two chars and the option_length, to move to the next option 148 index += opt_length + 2 149 150 setattr(self, 'options', options)
151
152 -class IP6HopOptsHeader(IP6OptsHeader): pass
153
154 -class IP6DstOptsHeader(IP6OptsHeader): pass
155
156 -class IP6RoutingHeader(IP6ExtensionHeader):
157 __hdr__ = ( 158 ('nxt', 'B', 0), # next extension header protocol 159 ('len', 'B', 0), # extension data length in 8 octect units (ignoring first 8 octets) (<= 46 for type 0) 160 ('type', 'B', 0), # routing type (currently, only 0 is used) 161 ('segs_left', 'B', 0), # remaining segments in route, until destination (<= 23) 162 ('rsvd_sl_bits', 'I', 0), # reserved (1 byte), strict/loose bitmap for addresses 163 ) 164
165 - def _get_sl_bits(self):
166 return self.rsvd_sl_bits & 0xffffff
167 - def _set_sl_bits(self, v):
168 self.rsvd_sl_bits = (self.rsvd_sl_bits & ~0xfffff) | (v & 0xfffff)
169 sl_bits = property(_get_sl_bits, _set_sl_bits) 170
171 - def unpack(self, buf):
172 hdr_size = 8 173 addr_size = 16 174 175 dpkt.Packet.unpack(self, buf) 176 177 addresses = [] 178 num_addresses = self.len / 2 179 buf = buf[hdr_size:hdr_size + num_addresses * addr_size] 180 181 for i in range(num_addresses): 182 addresses.append(buf[i * addr_size: i * addr_size + addr_size]) 183 184 self.data = buf 185 setattr(self, 'addresses', addresses) 186 setattr(self, 'length', self.len * 8 + 8)
187
188 -class IP6FragmentHeader(IP6ExtensionHeader):
189 __hdr__ = ( 190 ('nxt', 'B', 0), # next extension header protocol 191 ('resv', 'B', 0), # reserved, set to 0 192 ('frag_off_resv_m', 'H', 0), # frag offset (13 bits), reserved zero (2 bits), More frags flag 193 ('id', 'I', 0) # fragments id 194 ) 195
196 - def unpack(self, buf):
197 dpkt.Packet.unpack(self, buf) 198 setattr(self, 'length', self.__hdr_len__)
199
200 - def _get_frag_off(self):
201 return self.frag_off_resv_m >> 3
202 - def _set_frag_off(self, v):
203 self.frag_off_resv_m = (self.frag_off_resv_m & ~0xfff8) | (v << 3)
204 frag_off = property(_get_frag_off, _set_frag_off) 205
206 - def _get_m_flag(self):
207 return self.frag_off_resv_m & 1
208 - def _set_m_flag(self, v):
209 self.frag_off_resv_m = (self.frag_off_resv_m & ~0xfffe) | v
210 m_flag = property(_get_m_flag, _set_m_flag)
211
212 -class IP6AHHeader(IP6ExtensionHeader):
213 __hdr__ = ( 214 ('nxt', 'B', 0), # next extension header protocol 215 ('len', 'B', 0), # length of header in 4 octet units (ignoring first 2 units) 216 ('resv', 'H', 0), # reserved, 2 bytes of 0 217 ('spi', 'I', 0), # SPI security parameter index 218 ('seq', 'I', 0) # sequence no. 219 ) 220
221 - def unpack(self, buf):
222 dpkt.Packet.unpack(self, buf) 223 setattr(self, 'length', (self.len + 2) * 4) 224 setattr(self, 'auth_data', self.data[:(self.len - 1) * 4])
225 226
227 -class IP6ESPHeader(IP6ExtensionHeader):
228 - def unpack(self, buf):
229 raise NotImplementedError("ESP extension headers are not supported.")
230 231 232 ext_hdrs = [ip.IP_PROTO_HOPOPTS, ip.IP_PROTO_ROUTING, ip.IP_PROTO_FRAGMENT, ip.IP_PROTO_AH, ip.IP_PROTO_ESP, ip.IP_PROTO_DSTOPTS] 233 ext_hdrs_cls = {ip.IP_PROTO_HOPOPTS: IP6HopOptsHeader, 234 ip.IP_PROTO_ROUTING: IP6RoutingHeader, 235 ip.IP_PROTO_FRAGMENT: IP6FragmentHeader, 236 ip.IP_PROTO_ESP: IP6ESPHeader, 237 ip.IP_PROTO_AH: IP6AHHeader, 238 ip.IP_PROTO_DSTOPTS: IP6DstOptsHeader} 239 240 if __name__ == '__main__': 241 import unittest 242
243 - class IP6TestCase(unittest.TestCase):
244
245 - def test_IP6(self):
246 s = '`\x00\x00\x00\x00(\x06@\xfe\x80\x00\x00\x00\x00\x00\x00\x02\x11$\xff\xfe\x8c\x11\xde\xfe\x80\x00\x00\x00\x00\x00\x00\x02\xb0\xd0\xff\xfe\xe1\x80r\xcd\xca\x00\x16\x04\x84F\xd5\x00\x00\x00\x00\xa0\x02\xff\xff\xf8\t\x00\x00\x02\x04\x05\xa0\x01\x03\x03\x00\x01\x01\x08\n}\x185?\x00\x00\x00\x00' 247 ip = IP6(s) 248 #print `ip` 249 ip.data.sum = 0 250 s2 = str(ip) 251 ip2 = IP6(s) 252 #print `ip2` 253 assert(s == s2)
254
255 - def test_IP6RoutingHeader(self):
256 s = '`\x00\x00\x00\x00<+@ H\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xde\xca G\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xca\xfe\x06\x04\x00\x02\x00\x00\x00\x00 \x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xde\xca "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xde\xca\x00\x14\x00P\x00\x00\x00\x00\x00\x00\x00\x00P\x02 \x00\x91\x7f\x00\x00' 257 ip = IP6(s) 258 s2 = str(ip) 259 # 43 is Routing header id 260 assert(len(ip.extension_hdrs[43].addresses) == 2) 261 assert(ip.tcp) 262 assert(s == s2)
263 264
265 - def test_IP6FragmentHeader(self):
266 s = '\x06\xee\xff\xfb\x00\x00\xff\xff' 267 fh = IP6FragmentHeader(s) 268 s2 = str(fh) 269 assert(fh.nxt == 6) 270 assert(fh.id == 65535) 271 assert(fh.frag_off == 8191) 272 assert(fh.m_flag == 1)
273
274 - def test_IP6OptionsHeader(self):
275 s = ';\x04\x01\x02\x00\x00\xc9\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\xc2\x04\x00\x00\x00\x00\x05\x02\x00\x00\x01\x02\x00\x00' 276 options = IP6OptsHeader(s).options 277 assert(len(options) == 3)
278
279 - def test_IP6AHHeader(self):
280 s = ';\x04\x00\x00\x02\x02\x02\x02\x01\x01\x01\x01\x78\x78\x78\x78\x78\x78\x78\x78' 281 ah = IP6AHHeader(s) 282 assert(ah.length == 24) 283 assert(ah.auth_data == 'xxxxxxxx') 284 assert(ah.spi == 0x2020202) 285 assert(ah.seq == 0x1010101)
286
287 - def test_IP6ExtensionHeaders(self):
288 p = '`\x00\x00\x00\x00<+@ H\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xde\xca G\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xca\xfe\x06\x04\x00\x02\x00\x00\x00\x00 \x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xde\xca "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xde\xca\x00\x14\x00P\x00\x00\x00\x00\x00\x00\x00\x00P\x02 \x00\x91\x7f\x00\x00' 289 ip = IP6(p) 290 291 o = ';\x04\x01\x02\x00\x00\xc9\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\xc2\x04\x00\x00\x00\x00\x05\x02\x00\x00\x01\x02\x00\x00' 292 options = IP6HopOptsHeader(o) 293 294 ip.extension_hdrs[0] = options 295 296 fh = '\x06\xee\xff\xfb\x00\x00\xff\xff' 297 ip.extension_hdrs[44] = IP6FragmentHeader(fh) 298 299 ah = ';\x04\x00\x00\x02\x02\x02\x02\x01\x01\x01\x01\x78\x78\x78\x78\x78\x78\x78\x78' 300 ip.extension_hdrs[51] = IP6AHHeader(ah) 301 302 do = ';\x02\x01\x02\x00\x00\xc9\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' 303 ip.extension_hdrs[60] = IP6DstOptsHeader(do) 304 305 assert(len([k for k in ip.extension_hdrs if (not ip.extension_hdrs[k] is None)]) == 5)
306 307 unittest.main() 308