Package dpkt :: Module dpkt
[hide private]
[frames] | no frames]

Source Code for Module dpkt.dpkt

  1   
  2  # $Id: dpkt.py 43 2007-08-02 22:42:59Z jon.oberheide $ 
  3   
  4  """Simple packet creation and parsing.""" 
  5   
  6  import copy, itertools, socket, struct 
  7   
8 -class Error(Exception): pass
9 -class UnpackError(Error): pass
10 -class NeedData(UnpackError): pass
11 -class PackError(Error): pass
12
13 -class _MetaPacket(type):
14 - def __new__(cls, clsname, clsbases, clsdict):
15 t = type.__new__(cls, clsname, clsbases, clsdict) 16 st = getattr(t, '__hdr__', None) 17 if st is not None: 18 # XXX - __slots__ only created in __new__() 19 clsdict['__slots__'] = [ x[0] for x in st ] + [ 'data' ] 20 t = type.__new__(cls, clsname, clsbases, clsdict) 21 t.__hdr_fields__ = [ x[0] for x in st ] 22 t.__hdr_fmt__ = getattr(t, '__byte_order__', '>') + \ 23 ''.join([ x[1] for x in st ]) 24 t.__hdr_len__ = struct.calcsize(t.__hdr_fmt__) 25 t.__hdr_defaults__ = dict(zip( 26 t.__hdr_fields__, [ x[2] for x in st ])) 27 return t
28
29 -class Packet(object):
30 """Base packet class, with metaclass magic to generate members from 31 self.__hdr__. 32 33 __hdr__ should be defined as a list of (name, structfmt, default) tuples 34 __byte_order__ can be set to override the default ('>') 35 36 Example:: 37 38 >>> class Foo(Packet): 39 ... __hdr__ = (('foo', 'I', 1), ('bar', 'H', 2), ('baz', '4s', 'quux')) 40 ... 41 >>> foo = Foo(bar=3) 42 >>> foo 43 Foo(bar=3) 44 >>> str(foo) 45 '\x00\x00\x00\x01\x00\x03quux' 46 >>> foo.bar 47 3 48 >>> foo.baz 49 'quux' 50 >>> foo.foo = 7 51 >>> foo.baz = 'whee' 52 >>> foo 53 Foo(baz='whee', foo=7, bar=3) 54 >>> Foo('hello, world!') 55 Foo(baz=' wor', foo=1751477356L, bar=28460, data='ld!') 56 """ 57 __metaclass__ = _MetaPacket 58
59 - def __init__(self, *args, **kwargs):
60 """Packet constructor with ([buf], [field=val,...]) prototype. 61 62 Arguments: 63 64 buf -- optional packet buffer to unpack 65 66 Optional keyword arguments correspond to members to set 67 (matching fields in self.__hdr__, or 'data'). 68 """ 69 self.data = '' 70 if args: 71 try: 72 self.unpack(args[0]) 73 except struct.error: 74 if len(args[0]) < self.__hdr_len__: 75 raise NeedData 76 raise UnpackError('invalid %s: %r' % 77 (self.__class__.__name__, args[0])) 78 else: 79 for k in self.__hdr_fields__: 80 setattr(self, k, copy.copy(self.__hdr_defaults__[k])) 81 for k, v in kwargs.iteritems(): 82 setattr(self, k, v)
83
84 - def __len__(self):
85 return self.__hdr_len__ + len(self.data)
86
87 - def __getitem__(self, k):
88 try: return getattr(self, k) 89 except AttributeError: raise KeyError
90
91 - def __repr__(self):
92 l = [ '%s=%r' % (k, getattr(self, k)) 93 for k in self.__hdr_defaults__ 94 if getattr(self, k) != self.__hdr_defaults__[k] ] 95 if self.data: 96 l.append('data=%r' % self.data) 97 return '%s(%s)' % (self.__class__.__name__, ', '.join(l))
98
99 - def __str__(self):
100 return self.pack_hdr() + str(self.data)
101
102 - def pack_hdr(self):
103 """Return packed header string.""" 104 try: 105 return struct.pack(self.__hdr_fmt__, 106 *[ getattr(self, k) for k in self.__hdr_fields__ ]) 107 except struct.error: 108 vals = [] 109 for k in self.__hdr_fields__: 110 v = getattr(self, k) 111 if isinstance(v, tuple): 112 vals.extend(v) 113 else: 114 vals.append(v) 115 try: 116 return struct.pack(self.__hdr_fmt__, *vals) 117 except struct.error, e: 118 raise PackError(str(e))
119
120 - def pack(self):
121 """Return packed header + self.data string.""" 122 return str(self)
123
124 - def unpack(self, buf):
125 """Unpack packet header fields from buf, and set self.data.""" 126 for k, v in itertools.izip(self.__hdr_fields__, 127 struct.unpack(self.__hdr_fmt__, buf[:self.__hdr_len__])): 128 setattr(self, k, v) 129 self.data = buf[self.__hdr_len__:]
130 131 # XXX - ''.join([(len(`chr(x)`)==3) and chr(x) or '.' for x in range(256)]) 132 __vis_filter = """................................ !"#$%&\'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[.]^_`abcdefghijklmnopqrstuvwxyz{|}~.................................................................................................................................""" 133
134 -def hexdump(buf, length=16):
135 """Return a hexdump output string of the given buffer.""" 136 n = 0 137 res = [] 138 while buf: 139 line, buf = buf[:length], buf[length:] 140 hexa = ' '.join(['%02x' % ord(x) for x in line]) 141 line = line.translate(__vis_filter) 142 res.append(' %04d: %-*s %s' % (n, length * 3, hexa, line)) 143 n += length 144 return '\n'.join(res)
145 146 try: 147 import dnet2
148 - def in_cksum_add(s, buf):
149 return dnet.ip_cksum_add(buf, s)
150 - def in_cksum_done(s):
151 return socket.ntohs(dnet.ip_cksum_carry(s))
152 except ImportError: 153 import array
154 - def in_cksum_add(s, buf):
155 n = len(buf) 156 cnt = (n / 2) * 2 157 a = array.array('H', buf[:cnt]) 158 if cnt != n: 159 a.append(struct.unpack('H', buf[-1] + '\x00')[0]) 160 return s + sum(a)
161 - def in_cksum_done(s):
162 s = (s >> 16) + (s & 0xffff) 163 s += (s >> 16) 164 return socket.ntohs(~s & 0xffff)
165
166 -def in_cksum(buf):
167 """Return computed Internet checksum.""" 168 return in_cksum_done(in_cksum_add(0, buf))
169