Package dpkt :: Module pcap
[hide private]
[frames] | no frames]

Source Code for Module dpkt.pcap

  1  # $Id: pcap.py 77 2011-01-06 15:59:38Z dugsong $ 
  2   
  3  """Libpcap file format.""" 
  4   
  5  import sys, time 
  6  import dpkt 
  7   
  8  TCPDUMP_MAGIC = 0xa1b2c3d4L 
  9  PMUDPCT_MAGIC = 0xd4c3b2a1L 
 10   
 11  PCAP_VERSION_MAJOR = 2 
 12  PCAP_VERSION_MINOR = 4 
 13   
 14  DLT_NULL               = 0 
 15  DLT_EN10MB             = 1 
 16  DLT_EN3MB              = 2 
 17  DLT_AX25               = 3 
 18  DLT_PRONET             = 4 
 19  DLT_CHAOS              = 5 
 20  DLT_IEEE802            = 6 
 21  DLT_ARCNET             = 7 
 22  DLT_SLIP               = 8 
 23  DLT_PPP                = 9 
 24  DLT_FDDI               = 10 
 25  DLT_PFSYNC             = 18 
 26  DLT_IEEE802_11         = 105 
 27  DLT_LINUX_SLL          = 113 
 28  DLT_PFLOG              = 117 
 29  DLT_IEEE802_11_RADIO   = 127 
 30   
 31  if sys.platform.find('openbsd') != -1: 
 32      DLT_LOOP           = 12 
 33      DLT_RAW            = 14 
 34  else: 
 35      DLT_LOOP           = 108 
 36      DLT_RAW            = 12 
 37   
 38  dltoff = { DLT_NULL:4, DLT_EN10MB:14, DLT_IEEE802:22, DLT_ARCNET:6, 
 39             DLT_SLIP:16, DLT_PPP:4, DLT_FDDI:21, DLT_PFLOG:48, DLT_PFSYNC:4, 
 40             DLT_LOOP:4, DLT_LINUX_SLL:16 } 
 41   
42 -class PktHdr(dpkt.Packet):
43 """pcap packet header.""" 44 __hdr__ = ( 45 ('tv_sec', 'I', 0), 46 ('tv_usec', 'I', 0), 47 ('caplen', 'I', 0), 48 ('len', 'I', 0), 49 )
50
51 -class LEPktHdr(PktHdr):
52 __byte_order__ = '<'
53
54 -class FileHdr(dpkt.Packet):
55 """pcap file header.""" 56 __hdr__ = ( 57 ('magic', 'I', TCPDUMP_MAGIC), 58 ('v_major', 'H', PCAP_VERSION_MAJOR), 59 ('v_minor', 'H', PCAP_VERSION_MINOR), 60 ('thiszone', 'I', 0), 61 ('sigfigs', 'I', 0), 62 ('snaplen', 'I', 1500), 63 ('linktype', 'I', 1), 64 )
65
66 -class LEFileHdr(FileHdr):
67 __byte_order__ = '<'
68
69 -class Writer(object):
70 """Simple pcap dumpfile writer."""
71 - def __init__(self, fileobj, snaplen=1500, linktype=DLT_EN10MB):
72 self.__f = fileobj 73 if sys.byteorder == 'little': 74 fh = LEFileHdr(snaplen=snaplen, linktype=linktype) 75 else: 76 fh = FileHdr(snaplen=snaplen, linktype=linktype) 77 self.__f.write(str(fh))
78
79 - def writepkt(self, pkt, ts=None):
80 if ts is None: 81 ts = time.time() 82 s = str(pkt) 83 n = len(s) 84 if sys.byteorder == 'little': 85 ph = LEPktHdr(tv_sec=int(ts), 86 tv_usec=int((float(ts) - int(ts)) * 1000000.0), 87 caplen=n, len=n) 88 else: 89 ph = PktHdr(tv_sec=int(ts), 90 tv_usec=int((float(ts) - int(ts)) * 1000000.0), 91 caplen=n, len=n) 92 self.__f.write(str(ph)) 93 self.__f.write(s)
94
95 - def close(self):
96 self.__f.close()
97
98 -class Reader(object):
99 """Simple pypcap-compatible pcap file reader.""" 100
101 - def __init__(self, fileobj):
102 self.name = fileobj.name 103 self.fd = fileobj.fileno() 104 self.__f = fileobj 105 buf = self.__f.read(FileHdr.__hdr_len__) 106 self.__fh = FileHdr(buf) 107 self.__ph = PktHdr 108 if self.__fh.magic == PMUDPCT_MAGIC: 109 self.__fh = LEFileHdr(buf) 110 self.__ph = LEPktHdr 111 elif self.__fh.magic != TCPDUMP_MAGIC: 112 raise ValueError, 'invalid tcpdump header' 113 if self.__fh.linktype in dltoff: 114 self.dloff = dltoff[self.__fh.linktype] 115 else: 116 self.dloff = 0 117 self.snaplen = self.__fh.snaplen 118 self.filter = ''
119
120 - def fileno(self):
121 return self.fd
122 125
126 - def setfilter(self, value, optimize=1):
127 return NotImplementedError
128
129 - def readpkts(self):
130 return list(self)
131
132 - def dispatch(self, cnt, callback, *args):
133 if cnt > 0: 134 for i in range(cnt): 135 ts, pkt = self.next() 136 callback(ts, pkt, *args) 137 else: 138 for ts, pkt in self: 139 callback(ts, pkt, *args)
140
141 - def loop(self, callback, *args):
142 self.dispatch(0, callback, *args)
143
144 - def __iter__(self):
145 self.__f.seek(FileHdr.__hdr_len__) 146 while 1: 147 buf = self.__f.read(PktHdr.__hdr_len__) 148 if not buf: break 149 hdr = self.__ph(buf) 150 buf = self.__f.read(hdr.caplen) 151 yield (hdr.tv_sec + (hdr.tv_usec / 1000000.0), buf)
152 153 if __name__ == '__main__': 154 import unittest 155
156 - class PcapTestCase(unittest.TestCase):
157 - def test_endian(self):
158 be = '\xa1\xb2\xc3\xd4\x00\x02\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x60\x00\x00\x00\x01' 159 le = '\xd4\xc3\xb2\xa1\x02\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x60\x00\x00\x00\x01\x00\x00\x00' 160 befh = FileHdr(be) 161 lefh = LEFileHdr(le) 162 self.failUnless(befh.linktype == lefh.linktype)
163 164 unittest.main() 165