Package dpkt :: Module ssl
[hide private]
[frames] | no frames]

Source Code for Module dpkt.ssl

  1  # $Id: ssl.py 90 2014-04-02 22:06:23Z andrewflnr@gmail.com $ 
  2  # Portion Copyright 2012 Google Inc. All rights reserved. 
  3   
  4  """Secure Sockets Layer / Transport Layer Security.""" 
  5   
  6  import dpkt 
  7  import ssl_ciphersuites 
  8  import struct 
  9  import binascii 
 10  import traceback 
 11  import datetime 
12 13 # 14 # Note from April 2011: cde...@gmail.com added code that parses SSL3/TLS messages more in depth. 15 # 16 # Jul 2012: afleenor@google.com modified and extended SSL support further. 17 # 18 19 20 -class SSL2(dpkt.Packet):
21 __hdr__ = ( 22 ('len', 'H', 0), 23 ('msg', 's', ''), 24 ('pad', 's', ''), 25 )
26 - def unpack(self, buf):
27 dpkt.Packet.unpack(self, buf) 28 if self.len & 0x8000: 29 n = self.len = self.len & 0x7FFF 30 self.msg, self.data = self.data[:n], self.data[n:] 31 else: 32 n = self.len = self.len & 0x3FFF 33 padlen = ord(self.data[0]) 34 self.msg = self.data[1:1+n] 35 self.pad = self.data[1+n:1+n+padlen] 36 self.data = self.data[1+n+padlen:]
37 38 39 # SSLv3/TLS versions 40 SSL3_V = 0x0300 41 TLS1_V = 0x0301 42 TLS11_V = 0x0302 43 TLS12_V = 0x0303 44 45 ssl3_versions_str = { 46 SSL3_V: 'SSL3', 47 TLS1_V: 'TLS 1.0', 48 TLS11_V: 'TLS 1.1', 49 TLS12_V: 'TLS 1.2' 50 } 51 52 SSL3_VERSION_BYTES = set(('\x03\x00', '\x03\x01', '\x03\x02', '\x03\x03')) 53 54 55 # Alert levels 56 SSL3_AD_WARNING = 1 57 SSL3_AD_FATAL = 2 58 alert_level_str = { 59 SSL3_AD_WARNING: 'SSL3_AD_WARNING', 60 SSL3_AD_FATAL: 'SSL3_AD_FATAL' 61 } 62 63 # SSL3 alert descriptions 64 SSL3_AD_CLOSE_NOTIFY = 0 65 SSL3_AD_UNEXPECTED_MESSAGE = 10 # fatal 66 SSL3_AD_BAD_RECORD_MAC = 20 # fatal 67 SSL3_AD_DECOMPRESSION_FAILURE = 30 # fatal 68 SSL3_AD_HANDSHAKE_FAILURE = 40 # fatal 69 SSL3_AD_NO_CERTIFICATE = 41 70 SSL3_AD_BAD_CERTIFICATE = 42 71 SSL3_AD_UNSUPPORTED_CERTIFICATE = 43 72 SSL3_AD_CERTIFICATE_REVOKED = 44 73 SSL3_AD_CERTIFICATE_EXPIRED = 45 74 SSL3_AD_CERTIFICATE_UNKNOWN = 46 75 SSL3_AD_ILLEGAL_PARAMETER = 47 # fatal 76 77 # TLS1 alert descriptions 78 TLS1_AD_DECRYPTION_FAILED = 21 79 TLS1_AD_RECORD_OVERFLOW = 22 80 TLS1_AD_UNKNOWN_CA = 48 # fatal 81 TLS1_AD_ACCESS_DENIED = 49 # fatal 82 TLS1_AD_DECODE_ERROR = 50 # fatal 83 TLS1_AD_DECRYPT_ERROR = 51 84 TLS1_AD_EXPORT_RESTRICTION = 60 # fatal 85 TLS1_AD_PROTOCOL_VERSION = 70 # fatal 86 TLS1_AD_INSUFFICIENT_SECURITY = 71 # fatal 87 TLS1_AD_INTERNAL_ERROR = 80 # fatal 88 TLS1_AD_USER_CANCELLED = 90 89 TLS1_AD_NO_RENEGOTIATION = 100 90 #/* codes 110-114 are from RFC3546 */ 91 TLS1_AD_UNSUPPORTED_EXTENSION = 110 92 TLS1_AD_CERTIFICATE_UNOBTAINABLE = 111 93 TLS1_AD_UNRECOGNIZED_NAME = 112 94 TLS1_AD_BAD_CERTIFICATE_STATUS_RESPONSE = 113 95 TLS1_AD_BAD_CERTIFICATE_HASH_VALUE = 114 96 TLS1_AD_UNKNOWN_PSK_IDENTITY = 115 # fatal 97 98 99 # Mapping alert types to strings 100 alert_description_str = { 101 SSL3_AD_CLOSE_NOTIFY: 'SSL3_AD_CLOSE_NOTIFY', 102 SSL3_AD_UNEXPECTED_MESSAGE: 'SSL3_AD_UNEXPECTED_MESSAGE', 103 SSL3_AD_BAD_RECORD_MAC: 'SSL3_AD_BAD_RECORD_MAC', 104 SSL3_AD_DECOMPRESSION_FAILURE: 'SSL3_AD_DECOMPRESSION_FAILURE', 105 SSL3_AD_HANDSHAKE_FAILURE: 'SSL3_AD_HANDSHAKE_FAILURE', 106 SSL3_AD_NO_CERTIFICATE: 'SSL3_AD_NO_CERTIFICATE', 107 SSL3_AD_BAD_CERTIFICATE: 'SSL3_AD_BAD_CERTIFICATE', 108 SSL3_AD_UNSUPPORTED_CERTIFICATE: 'SSL3_AD_UNSUPPORTED_CERTIFICATE', 109 SSL3_AD_CERTIFICATE_REVOKED: 'SSL3_AD_CERTIFICATE_REVOKED', 110 SSL3_AD_CERTIFICATE_EXPIRED: 'SSL3_AD_CERTIFICATE_EXPIRED', 111 SSL3_AD_CERTIFICATE_UNKNOWN: 'SSL3_AD_CERTIFICATE_UNKNOWN', 112 SSL3_AD_ILLEGAL_PARAMETER: 'SSL3_AD_ILLEGAL_PARAMETER', 113 TLS1_AD_DECRYPTION_FAILED: 'TLS1_AD_DECRYPTION_FAILED', 114 TLS1_AD_RECORD_OVERFLOW: 'TLS1_AD_RECORD_OVERFLOW', 115 TLS1_AD_UNKNOWN_CA: 'TLS1_AD_UNKNOWN_CA', 116 TLS1_AD_ACCESS_DENIED: 'TLS1_AD_ACCESS_DENIED', 117 TLS1_AD_DECODE_ERROR: 'TLS1_AD_DECODE_ERROR', 118 TLS1_AD_DECRYPT_ERROR: 'TLS1_AD_DECRYPT_ERROR', 119 TLS1_AD_EXPORT_RESTRICTION: 'TLS1_AD_EXPORT_RESTRICTION', 120 TLS1_AD_PROTOCOL_VERSION: 'TLS1_AD_PROTOCOL_VERSION', 121 TLS1_AD_INSUFFICIENT_SECURITY: 'TLS1_AD_INSUFFICIENT_SECURITY', 122 TLS1_AD_INTERNAL_ERROR: 'TLS1_AD_INTERNAL_ERROR', 123 TLS1_AD_USER_CANCELLED: 'TLS1_AD_USER_CANCELLED', 124 TLS1_AD_NO_RENEGOTIATION: 'TLS1_AD_NO_RENEGOTIATION', 125 TLS1_AD_UNSUPPORTED_EXTENSION: 'TLS1_AD_UNSUPPORTED_EXTENSION', 126 TLS1_AD_CERTIFICATE_UNOBTAINABLE: 'TLS1_AD_CERTIFICATE_UNOBTAINABLE', 127 TLS1_AD_UNRECOGNIZED_NAME: 'TLS1_AD_UNRECOGNIZED_NAME', 128 TLS1_AD_BAD_CERTIFICATE_STATUS_RESPONSE: 'TLS1_AD_BAD_CERTIFICATE_STATUS_RESPONSE', 129 TLS1_AD_BAD_CERTIFICATE_HASH_VALUE: 'TLS1_AD_BAD_CERTIFICATE_HASH_VALUE', 130 TLS1_AD_UNKNOWN_PSK_IDENTITY: 'TLS1_AD_UNKNOWN_PSK_IDENTITY' 131 } 132 133 134 # struct format strings for parsing buffer lengths 135 # don't forget, you have to pad a 3-byte value with \x00 136 _SIZE_FORMATS = ['!B', '!H', '!I', '!I']
137 138 -def parse_variable_array(buf, lenbytes):
139 """ 140 Parse an array described using the 'Type name<x..y>' syntax from the spec 141 142 Read a length at the start of buf, and returns that many bytes 143 after, in a tuple with the TOTAL bytes consumed (including the size). This 144 does not check that the array is the right length for any given datatype. 145 """ 146 # first have to figure out how to parse length 147 assert lenbytes <= 4 # pretty sure 4 is impossible, too 148 size_format = _SIZE_FORMATS[lenbytes - 1] 149 padding = '\x00' if lenbytes == 3 else '' 150 # read off the length 151 size = struct.unpack(size_format, padding + buf[:lenbytes])[0] 152 # read the actual data 153 data = buf[lenbytes:lenbytes + size] 154 # if len(data) != size: insufficient data 155 return data, size + lenbytes
156
157 158 -class SSL3Exception(Exception):
159 pass
160
161 162 -class TLSRecord(dpkt.Packet):
163 """ 164 SSLv3 or TLSv1+ packet. 165 166 In addition to the fields specified in the header, there are 167 compressed and decrypted fields, indicating whether, in the language 168 of the spec, this is a TLSPlaintext, TLSCompressed, or 169 TLSCiphertext. The application will have to figure out when it's 170 appropriate to change these values. 171 """ 172 173 __hdr__ = ( 174 ('type', 'B', 0), 175 ('version', 'H', 0), 176 ('length', 'H', 0), 177 ) 178
179 - def __init__(self, *args, **kwargs):
180 # assume plaintext unless specified otherwise in arguments 181 self.compressed = kwargs.pop('compressed', False) 182 self.encrypted = kwargs.pop('encrypted', False) 183 # parent constructor 184 dpkt.Packet.__init__(self, *args, **kwargs) 185 # make sure length and data are consistent 186 self.length = len(self.data)
187
188 - def unpack(self, buf):
189 dpkt.Packet.unpack(self, buf) 190 header_length = self.__hdr_len__ 191 self.data = buf[header_length:header_length+self.length] 192 # make sure buffer was long enough 193 if len(self.data) != self.length: 194 raise dpkt.NeedData('TLSRecord data was too short.') 195 # assume compressed and encrypted when it's been parsed from 196 # raw data 197 self.compressed = True 198 self.encrypted = True
199
200 201 -class TLSChangeCipherSpec(dpkt.Packet):
202 """ 203 ChangeCipherSpec message is just a single byte with value 1 204 """ 205 __hdr__ = (('type', 'B', 1),)
206
207 208 -class TLSAppData(str):
209 """ 210 As far as TLSRecord is concerned, AppData is just an opaque blob. 211 """ 212 pass
213
214 215 -class TLSAlert(dpkt.Packet):
216 217 __hdr__ = ( 218 ('level', 'B', 1), 219 ('description', 'B', 0), 220 )
221
222 223 -class TLSHelloRequest(dpkt.Packet):
224 __hdr__ = tuple()
225
226 227 -class TLSClientHello(dpkt.Packet):
228 __hdr__ = ( 229 ('version', 'H', 0x0301), 230 ('random', '32s', '\x00'*32), 231 ) # the rest is variable-length and has to be done manually 232
233 - def unpack(self, buf):
234 dpkt.Packet.unpack(self, buf) 235 # now session, cipher suites, extensions are in self.data 236 self.session_id, pointer = parse_variable_array(self.data, 1) 237 # print 'pointer',pointer 238 # handle ciphersuites 239 ciphersuites, parsed = parse_variable_array(self.data[pointer:], 2) 240 pointer += parsed 241 self.num_ciphersuites = len(ciphersuites) / 2 242 # check len(ciphersuites) % 2 == 0 ? 243 # compression methods 244 compression_methods, parsed = parse_variable_array( 245 self.data[pointer:], 1) 246 pointer += parsed 247 self.num_compression_methods = parsed - 1 248 self.compression_methods = map(ord, compression_methods)
249 # extensions
250 251 252 -class TLSServerHello(dpkt.Packet):
253 __hdr__ = ( 254 ('version', 'H', '0x0301'), 255 ('random', '32s', '\x00'*32), 256 ) # session is variable, forcing rest to be manual 257
258 - def unpack(self, buf):
259 try: 260 dpkt.Packet.unpack(self, buf) 261 self.session_id, pointer = parse_variable_array(self.data, 1) 262 # single cipher suite 263 self.cipher_suite = struct.unpack('!H', self.data[pointer:pointer+2])[0] 264 pointer += 2 265 # single compression method 266 self.compression = struct.unpack('!B', self.data[pointer:pointer+1])[0] 267 pointer += 1 268 # ignore extensions for now 269 except struct.error: 270 # probably data too short 271 raise dpkt.NeedData
272
273 274 -class TLSUnknownHandshake(dpkt.Packet):
275 __hdr__ = tuple()
276 277 TLSCertificate = TLSUnknownHandshake 278 TLSServerKeyExchange = TLSUnknownHandshake 279 TLSCertificateRequest = TLSUnknownHandshake 280 TLSServerHelloDone = TLSUnknownHandshake 281 TLSCertificateVerify = TLSUnknownHandshake 282 TLSClientKeyExchange = TLSUnknownHandshake 283 TLSFinished = TLSUnknownHandshake 284 285 286 # mapping of handshake type ids to their names 287 # and the classes that implement them 288 HANDSHAKE_TYPES = { 289 0: ('HelloRequest', TLSHelloRequest), 290 1: ('ClientHello', TLSClientHello), 291 2: ('ServerHello', TLSServerHello), 292 11: ('Certificate', TLSCertificate), 293 12: ('ServerKeyExchange', TLSServerKeyExchange), 294 13: ('CertificateRequest', TLSCertificateRequest), 295 14: ('ServerHelloDone', TLSServerHelloDone), 296 15: ('CertificateVerify', TLSCertificateVerify), 297 16: ('ClientKeyExchange', TLSClientKeyExchange), 298 20: ('Finished', TLSFinished), 299 }
300 301 302 -class TLSHandshake(dpkt.Packet):
303 ''' 304 A TLS Handshake message 305 306 This goes for all messages encapsulated in the Record layer, but especially 307 important for handshakes and app data: A message may be spread across a 308 number of TLSRecords, in addition to the possibility of there being more 309 than one in a given Record. You have to put together the contents of 310 TLSRecord's yourself. 311 ''' 312 313 # struct.unpack can't handle the 3-byte int, so we parse it as bytes 314 # (and store it as bytes so dpkt doesn't get confused), and turn it into 315 # an int in a user-facing property 316 __hdr__ = ( 317 ('type', 'B', 0), 318 ('length_bytes', '3s', 0), 319 ) 320
321 - def unpack(self, buf):
322 dpkt.Packet.unpack(self, buf) 323 # Wait, might there be more than one message of self.type? 324 embedded_type = HANDSHAKE_TYPES.get(self.type, None) 325 if embedded_type is None: 326 raise SSL3Exception('Unknown or invalid handshake type %d' % 327 self.type) 328 # only take the right number of bytes 329 self.data = self.data[:self.length] 330 if len(self.data) != self.length: 331 raise dpkt.NeedData 332 # get class out of embedded_type tuple 333 self.data = embedded_type[1](self.data)
334 335 @property
336 - def length(self):
337 return struct.unpack('!I', '\x00' + self.length_bytes)[0]
338 339 340 RECORD_TYPES = { 341 20: TLSChangeCipherSpec, 342 21: TLSAlert, 343 22: TLSHandshake, 344 23: TLSAppData, 345 }
346 347 348 -class SSLFactory(object):
349 - def __new__(cls, buf):
350 v = buf[1:3] 351 if v in [ '\x03\x00', '\x03\x01', '\x03\x02' ]: 352 return SSL3(buf) 353 # SSL2 has no characteristic header or magic bytes, so we just assume 354 # that the msg is an SSL2 msg if it is not detected as SSL3+ 355 return SSL2(buf)
356
357 358 -def TLSMultiFactory(buf):
359 ''' 360 Attempt to parse one or more TLSRecord's out of buf 361 362 Args: 363 buf: string containing SSL/TLS messages. May have an incomplete record 364 on the end 365 366 Returns: 367 [TLSRecord] 368 int, total bytes consumed, != len(buf) if an incomplete record was left at 369 the end. 370 371 Raises SSL3Exception. 372 ''' 373 i, n = 0, len(buf) 374 msgs = [] 375 while i < n: 376 v = buf[i+1:i+3] 377 if v in SSL3_VERSION_BYTES: 378 try: 379 msg = TLSRecord(buf[i:]) 380 msgs.append(msg) 381 except dpkt.NeedData: 382 break 383 else: 384 raise SSL3Exception('Bad TLS version in buf: %r' % buf[i:i+5]) 385 i += len(msg) 386 return msgs, i
387 388 389 import unittest 390 391 392 _hexdecode = binascii.a2b_hex
393 394 395 -class TLSRecordTest(unittest.TestCase):
396 """ 397 Test basic TLSRecord functionality 398 399 For this test, the contents of the record doesn't matter, since we're not 400 parsing the next layer. 401 """
402 - def setUp(self):
403 # add some extra data, to make sure length is parsed correctly 404 self.p = TLSRecord('\x17\x03\x01\x00\x08abcdefghzzzzzzzzzzz')
405 - def testContentType(self):
406 self.assertEqual(self.p.type, 23)
407 - def testVersion(self):
408 self.assertEqual(self.p.version, 0x0301)
409 - def testLength(self):
410 self.assertEqual(self.p.length, 8)
411 - def testData(self):
412 self.assertEqual(self.p.data, 'abcdefgh')
413 - def testInitialFlags(self):
414 self.assertTrue(self.p.compressed) 415 self.assertTrue(self.p.encrypted)
416 - def testRepack(self):
417 p2 = TLSRecord(type=23, version=0x0301, data='abcdefgh') 418 self.assertEqual(p2.type, 23) 419 self.assertEqual(p2.version, 0x0301) 420 self.assertEqual(p2.length, 8) 421 self.assertEqual(p2.data, 'abcdefgh') 422 self.assertEqual(p2.pack(), self.p.pack())
423 - def testTotalLength(self):
424 # that len(p) includes header 425 self.assertEqual(len(self.p), 13)
427 self.assertRaises( 428 dpkt.NeedData, 429 TLSRecord, 430 '\x16\x03\x01\x00\x10abc')
431
432 433 -class TLSChangeCipherSpecTest(unittest.TestCase):
434 "It's just a byte. This will be quick, I promise"
435 - def setUp(self):
436 self.p = TLSChangeCipherSpec('\x01')
437 - def testParses(self):
438 self.assertEqual(self.p.type, 1)
439 - def testTotalLength(self):
440 self.assertEqual(len(self.p), 1)
441
442 443 -class TLSAppDataTest(unittest.TestCase):
444 "AppData is basically just a string"
445 - def testValue(self):
446 d = TLSAppData('abcdefgh') 447 self.assertEqual(d, 'abcdefgh')
448
449 450 -class TLSHandshakeTest(unittest.TestCase):
451 - def setUp(self):
452 self.h = TLSHandshake('\x00\x00\x00\x01\xff')
453 - def testCreatedInsideMessage(self):
454 self.assertTrue(isinstance(self.h.data, TLSHelloRequest))
455 - def testLength(self):
456 self.assertEqual(self.h.length, 0x01)
457 - def testRaisesNeedData(self):
458 self.assertRaises(dpkt.NeedData, TLSHandshake, '\x00\x00\x01\x01')
459
460 461 -class ClientHelloTest(unittest.TestCase):
462 'This data is extracted from and verified by Wireshark' 463
464 - def setUp(self):
465 self.data = _hexdecode( 466 "01000199" # handshake header 467 "0301" # version 468 "5008220ce5e0e78b6891afe204498c9363feffbe03235a2d9e05b7d990eb708d" # rand 469 "2009bc0192e008e6fa8fe47998fca91311ba30ddde14a9587dc674b11c3d3e5ed1" # session id 470 # cipher suites 471 "005400ffc00ac0140088008700390038c00fc00500840035c007c009c011c0130045004400330032c00cc00ec002c0040096004100050004002fc008c01200160013c00dc003feff000ac006c010c00bc00100020001" 472 "0100" # compresssion methods 473 # extensions 474 "00fc0000000e000c0000096c6f63616c686f7374000a00080006001700180019000b00020100002300d0a50b2e9f618a9ea9bf493ef49b421835cd2f6b05bbe1179d8edf70d58c33d656e8696d36d7e7e0b9d3ecc0e4de339552fa06c64c0fcb550a334bc43944e2739ca342d15a9ebbe981ac87a0d38160507d47af09bdc16c5f0ee4cdceea551539382333226048a026d3a90a0535f4a64236467db8fee22b041af986ad0f253bc369137cd8d8cd061925461d7f4d7895ca9a4181ab554dad50360ac31860e971483877c9335ac1300c5e78f3e56f3b8e0fc16358fcaceefd5c8d8aaae7b35be116f8832856ca61144fcdd95e071b94d0cf7233740000" 475 "FFFFFFFFFFFFFFFF") # random garbage 476 self.p = TLSHandshake(self.data)
477
479 'Make sure the correct class was constructed' 480 #print self.p 481 self.assertTrue(isinstance(self.p.data, TLSClientHello))
482 483 # def testClientDateCorrect(self): 484 # self.assertEqual(self.p.random_unixtime, 1342710284) 485
486 - def testClientRandomCorrect(self):
487 self.assertEqual(self.p.data.random, 488 _hexdecode('5008220ce5e0e78b6891afe204498c9363feffbe03235a2d9e05b7d990eb708d'))
489
490 - def testCipherSuiteLength(self):
491 # we won't bother testing the identity of each cipher suite in the list. 492 self.assertEqual(self.p.data.num_ciphersuites, 42)
493 #self.assertEqual(len(self.p.ciphersuites), 42) 494
495 - def testSessionId(self):
496 self.assertEqual(self.p.data.session_id, 497 _hexdecode('09bc0192e008e6fa8fe47998fca91311ba30ddde14a9587dc674b11c3d3e5ed1'))
498
499 - def testCompressionMethods(self):
500 self.assertEqual(self.p.data.num_compression_methods, 1)
501
502 - def testTotalLength(self):
503 self.assertEqual(len(self.p), 413)
504
505 506 -class ServerHelloTest(unittest.TestCase):
507 'Again, from Wireshark' 508
509 - def setUp(self):
510 self.data = _hexdecode('0200004d03015008220c8ec43c5462315a7c99f5d5b6bff009ad285b51dc18485f352e9fdecd2009bc0192e008e6fa8fe47998fca91311ba30ddde14a9587dc674b11c3d3e5ed10002000005ff01000100') 511 self.p = TLSHandshake(self.data)
512
513 - def testConstructed(self):
514 self.assertTrue(isinstance(self.p.data, TLSServerHello))
515 516 # def testDateCorrect(self): 517 # self.assertEqual(self.p.random_unixtime, 1342710284) 518
519 - def testRandomCorrect(self):
520 self.assertEqual(self.p.data.random, 521 _hexdecode('5008220c8ec43c5462315a7c99f5d5b6bff009ad285b51dc18485f352e9fdecd'))
522
523 - def testCipherSuite(self):
524 self.assertEqual( 525 ssl_ciphersuites.BY_CODE[self.p.data.cipher_suite].name, 526 'TLS_RSA_WITH_NULL_SHA')
527
528 - def testTotalLength(self):
529 self.assertEqual(len(self.p), 81)
530
531 532 -class TLSMultiFactoryTest(unittest.TestCase):
533 "Made up test data" 534
535 - def setUp(self):
536 self.data = _hexdecode('1703010010' # header 1 537 'AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA' # data 1 538 '1703010010' # header 2 539 'BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB' # data 2 540 '1703010010' # header 3 541 'CCCCCCCC') # data 3 (incomplete) 542 self.msgs, self.bytes_parsed = TLSMultiFactory(self.data)
543
544 - def testNumMessages(self):
545 # only complete messages should be parsed, incomplete ones left 546 # in buffer 547 self.assertEqual(len(self.msgs), 2)
548
549 - def testBytesParsed(self):
550 self.assertEqual(self.bytes_parsed, (5 + 16) * 2)
551
552 - def testFirstMsgData(self):
553 self.assertEqual(self.msgs[0].data, _hexdecode('AA' * 16))
554
555 - def testSecondMsgData(self):
556 self.assertEqual(self.msgs[1].data, _hexdecode('BB' * 16))
557 558 559 if __name__ == '__main__': 560 unittest.main() 561