/usr/lib/python2.7/dist-packages/dpkt/snoop.py is in python-dpkt 1.8.r98-0.1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 | # $Id$
"""Snoop file format."""
import sys, time
import dpkt
# RFC 1761
SNOOP_MAGIC = 0x736E6F6F70000000L
SNOOP_VERSION = 2
SDL_8023 = 0
SDL_8024 = 1
SDL_8025 = 2
SDL_8026 = 3
SDL_ETHER = 4
SDL_HDLC = 5
SDL_CHSYNC = 6
SDL_IBMCC = 7
SDL_FDDI = 8
SDL_OTHER = 9
dltoff = { SDL_ETHER:14 }
class PktHdr(dpkt.Packet):
"""snoop packet header."""
__byte_order__ = '!'
__hdr__ = (
('orig_len', 'I', 0),
('incl_len', 'I', 0),
('rec_len', 'I', 0),
('cum_drops', 'I', 0),
('ts_sec', 'I', 0),
('ts_usec', 'I', 0),
)
class FileHdr(dpkt.Packet):
"""snoop file header."""
__byte_order__ = '!'
__hdr__ = (
('magic', 'Q', SNOOP_MAGIC),
('v', 'I', SNOOP_VERSION),
('linktype', 'I', SDL_ETHER),
)
class Writer(object):
"""Simple snoop dumpfile writer."""
def __init__(self, fileobj, linktype=SDL_ETHER):
self.__f = fileobj
fh = FileHdr(linktype=linktype)
self.__f.write(str(fh))
def writepkt(self, pkt, ts=None):
if ts is None:
ts = time.time()
s = str(pkt)
n = len(s)
pad_len = 4 - n % 4 if n % 4 else 0
ph = PktHdr(orig_len=n,incl_len=n,
rec_len=PktHdr.__hdr_len__+n+pad_len,
ts_sec=int(ts),
ts_usec=int((int(ts) - float(ts)) * 1000000.0))
self.__f.write(str(ph))
self.__f.write(s + '\0' * pad_len)
def close(self):
self.__f.close()
class Reader(object):
"""Simple pypcap-compatible snoop file reader."""
def __init__(self, fileobj):
self.name = fileobj.name
self.fd = fileobj.fileno()
self.__f = fileobj
buf = self.__f.read(FileHdr.__hdr_len__)
self.__fh = FileHdr(buf)
self.__ph = PktHdr
if self.__fh.magic != SNOOP_MAGIC:
raise ValueError, 'invalid snoop header'
self.dloff = dltoff[self.__fh.linktype]
self.filter = ''
def fileno(self):
return self.fd
def datalink(self):
return self.__fh.linktype
def setfilter(self, value, optimize=1):
return NotImplementedError
def readpkts(self):
return list(self)
def dispatch(self, cnt, callback, *args):
if cnt > 0:
for i in range(cnt):
ts, pkt = self.next()
callback(ts, pkt, *args)
else:
for ts, pkt in self:
callback(ts, pkt, *args)
def loop(self, callback, *args):
self.dispatch(0, callback, *args)
def __iter__(self):
self.__f.seek(FileHdr.__hdr_len__)
while 1:
buf = self.__f.read(PktHdr.__hdr_len__)
if not buf: break
hdr = self.__ph(buf)
buf = self.__f.read(hdr.rec_len - PktHdr.__hdr_len__)
yield (hdr.ts_sec + (hdr.ts_usec / 1000000.0), buf[:hdr.incl_len])
|