Package dpkt :: Module radiotap
[hide private]
[frames] | no frames]

Source Code for Module dpkt.radiotap

  1  '''Radiotap''' 
  2   
  3  import dpkt 
  4  import ieee80211 
  5  import socket 
  6   
  7  # Ref: http://www.radiotap.org 
  8  # Fields Ref: http://www.radiotap.org/defined-fields/all 
  9   
 10  # Present flags 
 11  _TSFT_MASK            = 0x1000000 
 12  _FLAGS_MASK           = 0x2000000 
 13  _RATE_MASK            = 0x4000000 
 14  _CHANNEL_MASK         = 0x8000000 
 15  _FHSS_MASK            = 0x10000000 
 16  _ANT_SIG_MASK         = 0x20000000 
 17  _ANT_NOISE_MASK       = 0x40000000 
 18  _LOCK_QUAL_MASK       = 0x80000000 
 19  _TX_ATTN_MASK         = 0x10000 
 20  _DB_TX_ATTN_MASK      = 0x20000 
 21  _DBM_TX_POWER_MASK    = 0x40000 
 22  _ANTENNA_MASK         = 0x80000 
 23  _DB_ANT_SIG_MASK      = 0x100000 
 24  _DB_ANT_NOISE_MASK    = 0x200000 
 25  _RX_FLAGS_MASK        = 0x400000 
 26  _CHANNELPLUS_MASK     = 0x200 
 27  _EXT_MASK             = 0x1 
 28   
 29  _TSFT_SHIFT           = 24 
 30  _FLAGS_SHIFT          = 25 
 31  _RATE_SHIFT           = 26 
 32  _CHANNEL_SHIFT        = 27 
 33  _FHSS_SHIFT           = 28 
 34  _ANT_SIG_SHIFT        = 29 
 35  _ANT_NOISE_SHIFT      = 30 
 36  _LOCK_QUAL_SHIFT      = 31 
 37  _TX_ATTN_SHIFT        = 16 
 38  _DB_TX_ATTN_SHIFT     = 17 
 39  _DBM_TX_POWER_SHIFT   = 18 
 40  _ANTENNA_SHIFT        = 19 
 41  _DB_ANT_SIG_SHIFT     = 20 
 42  _DB_ANT_NOISE_SHIFT   = 21 
 43  _RX_FLAGS_SHIFT       = 22 
 44  _CHANNELPLUS_SHIFT    = 10 
 45  _EXT_SHIFT            = 0 
 46   
 47  # Flags elements 
 48  _FLAGS_SIZE           = 2 
 49  _CFP_FLAG_SHIFT       = 0 
 50  _PREAMBLE_SHIFT       = 1 
 51  _WEP_SHIFT            = 2 
 52  _FRAG_SHIFT           = 3 
 53  _FCS_SHIFT            = 4 
 54  _DATA_PAD_SHIFT       = 5 
 55  _BAD_FCS_SHIFT        = 6 
 56  _SHORT_GI_SHIFT       = 7 
 57   
 58  # Channel type 
 59  _CHAN_TYPE_SIZE       = 4 
 60  _CHANNEL_TYPE_SHIFT   = 4 
 61  _CCK_SHIFT            = 5 
 62  _OFDM_SHIFT           = 6 
 63  _TWO_GHZ_SHIFT        = 7 
 64  _FIVE_GHZ_SHIFT       = 8 
 65  _PASSIVE_SHIFT        = 9 
 66  _DYN_CCK_OFDM_SHIFT   = 10 
 67  _GFSK_SHIFT           = 11 
 68  _GSM_SHIFT            = 12 
 69  _STATIC_TURBO_SHIFT   = 13 
 70  _HALF_RATE_SHIFT      = 14 
 71  _QUARTER_RATE_SHIFT   = 15 
 72   
 73  # Flags offsets and masks 
 74  _FCS_SHIFT = 4 
 75  _FCS_MASK = 0x10 
 76   
77 -class Radiotap(dpkt.Packet):
78 __hdr__ = ( 79 ('version', 'B', 0), 80 ('pad', 'B', 0), 81 ('length', 'H', 0), 82 ('present_flags', 'I', 0) 83 ) 84
85 - def _get_tsft_present(self): return (self.present_flags & _TSFT_MASK) >> _TSFT_SHIFT
86 - def _set_tsft_present(self, val): self.present_flags = self.present_flags | (val << _TSFT_SHIFT)
87 - def _get_flags_present(self): return (self.present_flags & _FLAGS_MASK) >> _FLAGS_SHIFT
88 - def _set_flags_present(self, val): self.present_flags = self.present_flags | (val << _FLAGS_SHIFT)
89 - def _get_rate_present(self): return (self.present_flags & _RATE_MASK) >> _RATE_SHIFT
90 - def _set_rate_present(self, val): self.present_flags = self.present_flags | (val << _RATE_SHIFT)
92 - def _set_channel_present(self, val): self.present_flags = self.present_flags | (val << _CHANNEL_SHIFT)
93 - def _get_fhss_present(self): return (self.present_flags & _FHSS_MASK) >> _FHSS_SHIFT
94 - def _set_fhss_present(self, val): self.present_flags = self.present_flags | (val << _FHSS_SHIFT)
96 - def _set_ant_sig_present(self, val): self.present_flags = self.present_flags | (val << _ANT_SIG_SHIFT)
102 - def _set_tx_attn_present(self, val): self.present_flags = self.present_flags | (val << _TX_ATTN_SHIFT)
107 - def _get_ant_present(self): return (self.present_flags & _ANTENNA_MASK) >> _ANTENNA_SHIFT
108 - def _set_ant_present(self, val): self.present_flags = self.present_flags | (val << _ANTENNA_SHIFT)
114 - def _set_rx_flags_present(self, val): self.present_flags = self.present_flags | (val << _RX_FLAGS_SHIFT)
117 - def _get_ext_present(self): return (self.present_flags & _EXT_MASK) >> _EXT_SHIFT
118 - def _set_ext_present(self, val): self.present_flags = self.present_flags | (val << _EXT_SHIFT)
119 120 tsft_present = property(_get_tsft_present, _set_tsft_present) 121 flags_present = property(_get_flags_present, _set_flags_present) 122 rate_present = property(_get_rate_present, _set_rate_present) 123 channel_present = property(_get_channel_present, _set_channel_present) 124 fhss_present = property(_get_fhss_present, _set_fhss_present) 125 ant_sig_present = property(_get_ant_sig_present, _set_ant_sig_present) 126 ant_noise_present = property(_get_ant_noise_present, _set_ant_noise_present) 127 lock_qual_present = property(_get_lock_qual_present, _set_lock_qual_present) 128 tx_attn_present = property(_get_tx_attn_present, _set_tx_attn_present) 129 db_tx_attn_present = property(_get_db_tx_attn_present, _set_db_tx_attn_present) 130 dbm_tx_power_present = property(_get_dbm_power_present, _set_dbm_power_present) 131 ant_present = property(_get_ant_present, _set_ant_present) 132 db_ant_sig_present = property(_get_db_ant_sig_present, _set_db_ant_sig_present) 133 db_ant_noise_present = property(_get_db_ant_noise_present, _set_db_ant_noise_present) 134 rx_flags_present = property(_get_rx_flags_present, _set_rx_flags_present) 135 chanplus_present = property(_get_chanplus_present, _set_chanplus_present) 136 ext_present = property(_get_ext_present, _set_ext_present) 137
138 - def unpack(self, buf):
139 dpkt.Packet.unpack(self, buf) 140 self.data = buf[socket.ntohs(self.length):] 141 142 self.fields = [] 143 buf = buf[self.__hdr_len__:] 144 145 # decode each field into self.<name> (eg. self.tsft) as well as append it self.fields list 146 field_decoder = [ 147 ('tsft', self.tsft_present, self.TSFT), 148 ('flags', self.flags_present, self.Flags), 149 ('rate', self.rate_present, self.Rate), 150 ('channel', self.channel_present, self.Channel), 151 ('fhss', self.fhss_present, self.FHSS), 152 ('ant_sig', self.ant_sig_present, self.AntennaSignal), 153 ('ant_noise', self.ant_noise_present, self.AntennaNoise), 154 ('lock_qual', self.lock_qual_present, self.LockQuality), 155 ('tx_attn', self.tx_attn_present, self.TxAttenuation), 156 ('db_tx_attn', self.db_tx_attn_present, self.DbTxAttenuation), 157 ('dbm_tx_power', self.dbm_tx_power_present, self.DbmTxPower), 158 ('ant', self.ant_present, self.Antenna), 159 ('db_ant_sig', self.db_ant_sig_present, self.DbAntennaSignal), 160 ('db_ant_noise', self.db_ant_noise_present, self.DbAntennaNoise), 161 ('rx_flags', self.rx_flags_present, self.RxFlags) 162 ] 163 for name, present_bit, parser in field_decoder: 164 if present_bit: 165 field = parser(buf) 166 field.data = '' 167 setattr(self, name, field) 168 self.fields.append(field) 169 buf = buf[len(field):] 170 171 if len(self.data) > 0: 172 if self.flags_present and self.flags.fcs: 173 self.data = ieee80211.IEEE80211(self.data, fcs = self.flags.fcs) 174 else: 175 self.data = ieee80211.IEEE80211(self.data)
176
177 - class Antenna(dpkt.Packet):
178 __hdr__ = ( 179 ('index', 'B', 0), 180 )
181
182 - class AntennaNoise(dpkt.Packet):
183 __hdr__ = ( 184 ('db', 'B', 0), 185 )
186
187 - class AntennaSignal(dpkt.Packet):
188 __hdr__ = ( 189 ('db', 'B', 0), 190 )
191
192 - class Channel(dpkt.Packet):
193 __hdr__ = ( 194 ('freq', 'H', 0), 195 ('flags', 'H', 0), 196 )
197
198 - class FHSS(dpkt.Packet):
199 __hdr__ = ( 200 ('set', 'B', 0), 201 ('pattern', 'B', 0), 202 )
203
204 - class Flags(dpkt.Packet):
205 __hdr__ = ( 206 ('val', 'B', 0), 207 ) 208
209 - def _get_fcs_present(self): return (self.val & _FCS_MASK) >> _FCS_SHIFT
210
211 - def _set_fcs_present(self, v): (v << _FCS_SHIFT) | (self.val & ~_FCS_MASK)
212 fcs = property(_get_fcs_present, _set_fcs_present)
213
214 - class LockQuality(dpkt.Packet):
215 __hdr__ = ( 216 ('val', 'H', 0), 217 )
218
219 - class RxFlags(dpkt.Packet):
220 __hdr__ = ( 221 ('val', 'H', 0), 222 )
223
224 - class Rate(dpkt.Packet):
225 __hdr__ = ( 226 ('val', 'B', 0), 227 )
228
229 - class TSFT(dpkt.Packet):
230 __hdr__ = ( 231 ('usecs', 'Q', 0), 232 )
233
234 - class TxAttenuation(dpkt.Packet):
235 __hdr__ = ( 236 ('val', 'H', 0), 237 )
238
239 - class DbTxAttenuation(dpkt.Packet):
240 __hdr__ = ( 241 ('db', 'H', 0), 242 )
243
244 - class DbAntennaNoise(dpkt.Packet):
245 __hdr__ = ( 246 ('db', 'B', 0), 247 )
248
249 - class DbAntennaSignal(dpkt.Packet):
250 __hdr__ = ( 251 ('db', 'B', 0), 252 )
253
254 - class DbmTxPower(dpkt.Packet):
255 __hdr__ = ( 256 ('dbm', 'B', 0), 257 )
258 259 if __name__ == '__main__': 260 import unittest 261
262 - class RadiotapTestCase(unittest.TestCase):
263 - def test_Radiotap(self):
264 s = '\x00\x00\x00\x18\x6e\x48\x00\x00\x00\x02\x6c\x09\xa0\x00\xa8\x81\x02\x00\x00\x00\x00\x00\x00\x00' 265 rad = Radiotap(s) 266 self.failUnless(rad.version == 0) 267 self.failUnless(rad.present_flags == 0x6e480000) 268 self.failUnless(rad.tsft_present == 0) 269 self.failUnless(rad.flags_present == 1) 270 self.failUnless(rad.rate_present == 1) 271 self.failUnless(rad.channel_present == 1) 272 self.failUnless(rad.fhss_present == 0) 273 self.failUnless(rad.ant_sig_present == 1) 274 self.failUnless(rad.ant_noise_present == 1) 275 self.failUnless(rad.lock_qual_present == 0) 276 self.failUnless(rad.db_tx_attn_present == 0) 277 self.failUnless(rad.dbm_tx_power_present == 0) 278 self.failUnless(rad.ant_present == 1) 279 self.failUnless(rad.db_ant_sig_present == 0) 280 self.failUnless(rad.db_ant_noise_present == 0) 281 self.failUnless(rad.rx_flags_present == 1) 282 self.failUnless(rad.channel.freq == 0x6c09) 283 self.failUnless(rad.channel.flags == 0xa000) 284 self.failUnless(len(rad.fields) == 7)
285
286 - def test_fcs(self):
287 s = '\x00\x00\x1a\x00\x2f\x48\x00\x00\x34\x8f\x71\x09\x00\x00\x00\x00\x10\x0c\x85\x09\xc0\x00\xcc\x01\x00\x00' 288 rt = Radiotap(s) 289 self.failUnless(rt.flags_present == 1) 290 self.failUnless(rt.flags.fcs == 1)
291 292 unittest.main() 293