Package dpkt :: Module diameter
[hide private]
[frames] | no frames]

Source Code for Module dpkt.diameter

  1  # $Id: diameter.py 23 2006-11-08 15:45:33Z dugsong $ 
  2   
  3  """Diameter.""" 
  4   
  5  import struct 
  6  import dpkt 
  7   
  8  # Diameter Base Protocol - RFC 3588 
  9  # http://tools.ietf.org/html/rfc3588 
 10   
 11  # Request/Answer Command Codes 
 12  ABORT_SESSION           = 274 
 13  ACCOUTING               = 271 
 14  CAPABILITIES_EXCHANGE   = 257 
 15  DEVICE_WATCHDOG         = 280 
 16  DISCONNECT_PEER         = 282 
 17  RE_AUTH                 = 258 
 18  SESSION_TERMINATION     = 275 
 19   
20 -class Diameter(dpkt.Packet):
21 __hdr__ = ( 22 ('v', 'B', 1), 23 ('len', '3s', 0), 24 ('flags', 'B', 0), 25 ('cmd', '3s', 0), 26 ('app_id', 'I', 0), 27 ('hop_id', 'I', 0), 28 ('end_id', 'I', 0) 29 ) 30
31 - def _get_r(self):
32 return (self.flags >> 7) & 0x1
33 - def _set_r(self, r):
34 self.flags = (self.flags & ~0x80) | ((r & 0x1) << 7)
35 request_flag = property(_get_r, _set_r) 36
37 - def _get_p(self):
38 return (self.flags >> 6) & 0x1
39 - def _set_p(self, p):
40 self.flags = (self.flags & ~0x40) | ((p & 0x1) << 6)
41 proxiable_flag = property(_get_p, _set_p) 42
43 - def _get_e(self):
44 return (self.flags >> 5) & 0x1
45 - def _set_e(self, e):
46 self.flags = (self.flags & ~0x20) | ((e & 0x1) << 5)
47 error_flag = property(_get_e, _set_e) 48
49 - def _get_t(self):
50 return (self.flags >> 4) & 0x1
51 - def _set_t(self, t):
52 self.flags = (self.flags & ~0x10) | ((t & 0x1) << 4)
53 retransmit_flag = property(_get_t, _set_t) 54
55 - def unpack(self, buf):
56 dpkt.Packet.unpack(self, buf) 57 self.cmd = (ord(self.cmd[0]) << 16) | \ 58 (ord(self.cmd[1]) << 8) | \ 59 ord(self.cmd[2]) 60 self.len = (ord(self.len[0]) << 16) | \ 61 (ord(self.len[1]) << 8) | \ 62 ord(self.len[2]) 63 self.data = self.data[:self.len - self.__hdr_len__] 64 65 l = [] 66 while self.data: 67 avp = AVP(self.data) 68 l.append(avp) 69 self.data = self.data[len(avp):] 70 self.data = self.avps = l
71
72 - def pack_hdr(self):
73 self.len = chr((self.len >> 16) & 0xff) + \ 74 chr((self.len >> 8) & 0xff) + \ 75 chr(self.len & 0xff) 76 self.cmd = chr((self.cmd >> 16) & 0xff) + \ 77 chr((self.cmd >> 8) & 0xff) + \ 78 chr(self.cmd & 0xff) 79 return dpkt.Packet.pack_hdr(self)
80
81 - def __len__(self):
82 return self.__hdr_len__ + \ 83 sum(map(len, self.data))
84
85 - def __str__(self):
86 return self.pack_hdr() + \ 87 ''.join(map(str, self.data))
88
89 -class AVP(dpkt.Packet):
90 __hdr__ = ( 91 ('code', 'I', 0), 92 ('flags', 'B', 0), 93 ('len', '3s', 0), 94 ) 95
96 - def _get_v(self):
97 return (self.flags >> 7) & 0x1
98 - def _set_v(self, v):
99 self.flags = (self.flags & ~0x80) | ((v & 0x1) << 7)
100 vendor_flag = property(_get_v, _set_v) 101
102 - def _get_m(self):
103 return (self.flags >> 6) & 0x1
104 - def _set_m(self, m):
105 self.flags = (self.flags & ~0x40) | ((m & 0x1) << 6)
106 mandatory_flag = property(_get_m, _set_m) 107
108 - def _get_p(self):
109 return (self.flags >> 5) & 0x1
110 - def _set_p(self, p):
111 self.flags = (self.flags & ~0x20) | ((p & 0x1) << 5)
112 protected_flag = property(_get_p, _set_p) 113
114 - def unpack(self, buf):
115 dpkt.Packet.unpack(self, buf) 116 self.len = (ord(self.len[0]) << 16) | \ 117 (ord(self.len[1]) << 8) | \ 118 ord(self.len[2]) 119 120 if self.vendor_flag: 121 self.vendor = struct.unpack('>I', self.data[:4])[0] 122 self.data = self.data[4:self.len - self.__hdr_len__] 123 else: 124 self.data = self.data[:self.len - self.__hdr_len__]
125
126 - def pack_hdr(self):
127 self.len = chr((self.len >> 16) & 0xff) + \ 128 chr((self.len >> 8) & 0xff) + \ 129 chr(self.len & 0xff) 130 data = dpkt.Packet.pack_hdr(self) 131 if self.vendor_flag: 132 data += struct.pack('>I', self.vendor) 133 return data
134
135 - def __len__(self):
136 length = self.__hdr_len__ + \ 137 sum(map(len, self.data)) 138 if self.vendor_flag: 139 length += 4 140 return length
141 142 143 if __name__ == '__main__': 144 import unittest 145
146 - class DiameterTestCase(unittest.TestCase):
147 - def testPack(self):
148 d = Diameter(self.s) 149 self.failUnless(self.s == str(d)) 150 d = Diameter(self.t) 151 self.failUnless(self.t == str(d))
152
153 - def testUnpack(self):
154 d = Diameter(self.s) 155 self.failUnless(d.len == 40) 156 #self.failUnless(d.cmd == DEVICE_WATCHDOG_REQUEST) 157 self.failUnless(d.request_flag == 1) 158 self.failUnless(d.error_flag == 0) 159 self.failUnless(len(d.avps) == 2) 160 161 avp = d.avps[0] 162 #self.failUnless(avp.code == ORIGIN_HOST) 163 self.failUnless(avp.mandatory_flag == 1) 164 self.failUnless(avp.vendor_flag == 0) 165 self.failUnless(avp.len == 12) 166 self.failUnless(len(avp) == 12) 167 self.failUnless(avp.data == '\x68\x30\x30\x32') 168 169 # also test the optional vendor id support 170 d = Diameter(self.t) 171 self.failUnless(d.len == 44) 172 avp = d.avps[0] 173 self.failUnless(avp.vendor_flag == 1) 174 self.failUnless(avp.len == 16) 175 self.failUnless(len(avp) == 16) 176 self.failUnless(avp.vendor == 3735928559) 177 self.failUnless(avp.data == '\x68\x30\x30\x32')
178 179 s = '\x01\x00\x00\x28\x80\x00\x01\x18\x00\x00\x00\x00\x00\x00\x41\xc8\x00\x00\x00\x0c\x00\x00\x01\x08\x40\x00\x00\x0c\x68\x30\x30\x32\x00\x00\x01\x28\x40\x00\x00\x08' 180 t = '\x01\x00\x00\x2c\x80\x00\x01\x18\x00\x00\x00\x00\x00\x00\x41\xc8\x00\x00\x00\x0c\x00\x00\x01\x08\xc0\x00\x00\x10\xde\xad\xbe\xef\x68\x30\x30\x32\x00\x00\x01\x28\x40\x00\x00\x08'
181 unittest.main() 182