1
2
3 """Internet Protocol, version 6."""
4
5 import dpkt
6
7 -class IP6(dpkt.Packet):
8 __hdr__ = (
9 ('v_fc_flow', 'I', 0x60000000L),
10 ('plen', 'H', 0),
11 ('nxt', 'B', 0),
12 ('hlim', 'B', 0),
13 ('src', '16s', ''),
14 ('dst', '16s', '')
15 )
16
17
18
19
20 _protosw = None
21
26 v = property(_get_v, _set_v)
27
32 fc = property(_get_fc, _set_fc)
33
38 flow = property(_get_flow, _set_flow)
39
65
67 """
68 Output extension headers in order defined in RFC1883 (except dest opts)
69 """
70
71 header_str = ""
72
73 for hdr in ext_hdrs:
74 if not self.extension_hdrs[hdr] is None:
75 header_str += str(self.extension_hdrs[hdr])
76 return header_str
77
78
92
95 set_proto = classmethod(set_proto)
96
99 get_proto = classmethod(get_proto)
100
101 import ip
102
103
104
105
106
107
108 IP6._protosw = ip.IP._protosw
109
111 """
112 An extension header is very similar to a 'sub-packet'.
113 We just want to re-use all the hdr unpacking etc.
114 """
115 pass
116
118 __hdr__ = (
119 ('nxt', 'B', 0),
120 ('len', 'B', 0)
121 )
122
124 dpkt.Packet.unpack(self, buf)
125 setattr(self, 'length', (self.len + 1) * 8)
126 options = []
127
128 index = 0
129
130 while (index < self.length - 2):
131 opt_type = ord(self.data[index])
132
133
134 if opt_type == 0:
135 index += 1
136 continue;
137
138 opt_length = ord(self.data[index + 1])
139
140 if opt_type == 1:
141
142 index += opt_length + 2
143 continue
144
145 options.append({'type': opt_type, 'opt_length': opt_length, 'data': self.data[index + 2:index + 2 + opt_length]})
146
147
148 index += opt_length + 2
149
150 setattr(self, 'options', options)
151
153
155
157 __hdr__ = (
158 ('nxt', 'B', 0),
159 ('len', 'B', 0),
160 ('type', 'B', 0),
161 ('segs_left', 'B', 0),
162 ('rsvd_sl_bits', 'I', 0),
163 )
164
169 sl_bits = property(_get_sl_bits, _set_sl_bits)
170
172 hdr_size = 8
173 addr_size = 16
174
175 dpkt.Packet.unpack(self, buf)
176
177 addresses = []
178 num_addresses = self.len / 2
179 buf = buf[hdr_size:hdr_size + num_addresses * addr_size]
180
181 for i in range(num_addresses):
182 addresses.append(buf[i * addr_size: i * addr_size + addr_size])
183
184 self.data = buf
185 setattr(self, 'addresses', addresses)
186 setattr(self, 'length', self.len * 8 + 8)
187
189 __hdr__ = (
190 ('nxt', 'B', 0),
191 ('resv', 'B', 0),
192 ('frag_off_resv_m', 'H', 0),
193 ('id', 'I', 0)
194 )
195
199
204 frag_off = property(_get_frag_off, _set_frag_off)
205
210 m_flag = property(_get_m_flag, _set_m_flag)
211
213 __hdr__ = (
214 ('nxt', 'B', 0),
215 ('len', 'B', 0),
216 ('resv', 'H', 0),
217 ('spi', 'I', 0),
218 ('seq', 'I', 0)
219 )
220
222 dpkt.Packet.unpack(self, buf)
223 setattr(self, 'length', (self.len + 2) * 4)
224 setattr(self, 'auth_data', self.data[:(self.len - 1) * 4])
225
226
229 raise NotImplementedError("ESP extension headers are not supported.")
230
231
232 ext_hdrs = [ip.IP_PROTO_HOPOPTS, ip.IP_PROTO_ROUTING, ip.IP_PROTO_FRAGMENT, ip.IP_PROTO_AH, ip.IP_PROTO_ESP, ip.IP_PROTO_DSTOPTS]
233 ext_hdrs_cls = {ip.IP_PROTO_HOPOPTS: IP6HopOptsHeader,
234 ip.IP_PROTO_ROUTING: IP6RoutingHeader,
235 ip.IP_PROTO_FRAGMENT: IP6FragmentHeader,
236 ip.IP_PROTO_ESP: IP6ESPHeader,
237 ip.IP_PROTO_AH: IP6AHHeader,
238 ip.IP_PROTO_DSTOPTS: IP6DstOptsHeader}
239
240 if __name__ == '__main__':
241 import unittest
242
244
246 s = '`\x00\x00\x00\x00(\x06@\xfe\x80\x00\x00\x00\x00\x00\x00\x02\x11$\xff\xfe\x8c\x11\xde\xfe\x80\x00\x00\x00\x00\x00\x00\x02\xb0\xd0\xff\xfe\xe1\x80r\xcd\xca\x00\x16\x04\x84F\xd5\x00\x00\x00\x00\xa0\x02\xff\xff\xf8\t\x00\x00\x02\x04\x05\xa0\x01\x03\x03\x00\x01\x01\x08\n}\x185?\x00\x00\x00\x00'
247 ip = IP6(s)
248
249 ip.data.sum = 0
250 s2 = str(ip)
251 ip2 = IP6(s)
252
253 assert(s == s2)
254
256 s = '`\x00\x00\x00\x00<+@ H\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xde\xca G\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xca\xfe\x06\x04\x00\x02\x00\x00\x00\x00 \x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xde\xca "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xde\xca\x00\x14\x00P\x00\x00\x00\x00\x00\x00\x00\x00P\x02 \x00\x91\x7f\x00\x00'
257 ip = IP6(s)
258 s2 = str(ip)
259
260 assert(len(ip.extension_hdrs[43].addresses) == 2)
261 assert(ip.tcp)
262 assert(s == s2)
263
264
266 s = '\x06\xee\xff\xfb\x00\x00\xff\xff'
267 fh = IP6FragmentHeader(s)
268 s2 = str(fh)
269 assert(fh.nxt == 6)
270 assert(fh.id == 65535)
271 assert(fh.frag_off == 8191)
272 assert(fh.m_flag == 1)
273
275 s = ';\x04\x01\x02\x00\x00\xc9\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\xc2\x04\x00\x00\x00\x00\x05\x02\x00\x00\x01\x02\x00\x00'
276 options = IP6OptsHeader(s).options
277 assert(len(options) == 3)
278
280 s = ';\x04\x00\x00\x02\x02\x02\x02\x01\x01\x01\x01\x78\x78\x78\x78\x78\x78\x78\x78'
281 ah = IP6AHHeader(s)
282 assert(ah.length == 24)
283 assert(ah.auth_data == 'xxxxxxxx')
284 assert(ah.spi == 0x2020202)
285 assert(ah.seq == 0x1010101)
286
288 p = '`\x00\x00\x00\x00<+@ H\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xde\xca G\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xca\xfe\x06\x04\x00\x02\x00\x00\x00\x00 \x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xde\xca "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xde\xca\x00\x14\x00P\x00\x00\x00\x00\x00\x00\x00\x00P\x02 \x00\x91\x7f\x00\x00'
289 ip = IP6(p)
290
291 o = ';\x04\x01\x02\x00\x00\xc9\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\xc2\x04\x00\x00\x00\x00\x05\x02\x00\x00\x01\x02\x00\x00'
292 options = IP6HopOptsHeader(o)
293
294 ip.extension_hdrs[0] = options
295
296 fh = '\x06\xee\xff\xfb\x00\x00\xff\xff'
297 ip.extension_hdrs[44] = IP6FragmentHeader(fh)
298
299 ah = ';\x04\x00\x00\x02\x02\x02\x02\x01\x01\x01\x01\x78\x78\x78\x78\x78\x78\x78\x78'
300 ip.extension_hdrs[51] = IP6AHHeader(ah)
301
302 do = ';\x02\x01\x02\x00\x00\xc9\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
303 ip.extension_hdrs[60] = IP6DstOptsHeader(do)
304
305 assert(len([k for k in ip.extension_hdrs if (not ip.extension_hdrs[k] is None)]) == 5)
306
307 unittest.main()
308