/usr/lib/python2.7/dist-packages/Halberd/clues/file.py is in python-halberd 0.2.4-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 | # -*- coding: iso-8859-1 -*-
"""Utilities for clue storage.
Provides functionality needed to store clues on disk.
"""
# Copyright (C) 2004, 2005, 2006, 2010 Juan M. Bello Rivas <jmbr@superadditive.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
import os
import csv
import types
import shutil
import Halberd.util
from Halberd.clues.Clue import Clue
class InvalidFile(Exception):
"""The loaded file is not a valid clue file.
"""
def __init__(self, msg):
self.msg = msg
def __str__(self):
return self.msg
def save(filename, clues):
"""Save a clues to a file.
@param filename: Name of the file where the clues will be written to.
@type filename: C{str}
@param clues: Sequence of clues to write.
@type clues: C{list}
"""
# Create or truncate the destination file.
cluefp = open(filename, 'w+')
writer = csv.writer(cluefp)
for clue in clues:
# Store the most relevant clue information.
writer.writerow((clue.getCount(), clue._local, clue.headers))
cluefp.close()
def load(filename):
"""Load clues from file.
@param filename: Name of the files where the clues are stored.
@type filename: C{str}
@return: Clues extracted from the file.
@rtype: C{list}
@raise InvalidFile: In case there's a problem while reinterpreting the
clues.
"""
cluefp = open(filename, 'r')
reader = csv.reader(cluefp)
clues = []
for tup in reader:
try:
count, localtime, headers = tup
except ValueError:
raise InvalidFile, 'Cannot unpack fields'
# Recreate the current clue.
clue = Clue()
try:
clue._count = int(count)
clue._local = float(localtime)
except ValueError:
raise InvalidFile, 'Could not convert fields'
# This may be risky from a security standpoint.
clue.headers = eval(headers, {}, {})
if not (isinstance(clue.headers, types.ListType) or
isinstance(clue.headers, types.TupleType)):
raise InvalidFile, 'Wrong clue header field'
clue.parse(clue.headers)
clues.append(clue)
cluefp.close()
return clues
class ClueDir:
"""Stores clues hierarchically using the underlying filesystem.
ClueDir tries to be as portable as possible but requires the host operating
system to be able to create long filenames (and directories, of course).
This is an example layout::
http___www_microsoft_com/
http___www_microsoft_com/207_46_134_221.clu
http___www_microsoft_com/207_46_156_220.clu
http___www_microsoft_com/207_46_156_252.clu
.
.
.
"""
def __init__(self, root=None):
"""Initializes ClueDir object.
@param root: Root folder where to start creating sub-folders.
@type root: C{str}
"""
self.ext = 'clu'
if not root:
self.root = os.getcwd()
else:
self.root = root
self._mkdir(self.root)
def _sanitize(self, url):
"""Filter out potentially dangerous chars.
"""
return url.translate(Halberd.util.table)
def _mkdir(self, dest):
"""Creates a directory to store clues.
If the directory already exists it won't complain about that.
"""
try:
st = os.stat(dest)
except OSError:
os.mkdir(dest)
else:
if not shutil.stat.S_ISDIR(st.st_mode):
raise InvalidFile, \
'%s already exist and is not a directory' % dest
return dest
def save(self, url, addr, clues):
"""Hierarchically write clues.
@param url: URL scanned (will be used as a directory name).
@type url: C{url}
@param addr: Address of the target.
@type addr: C{str}
@param clues: Clues to be stored.
@type clues: C{list}
@raise OSError: If the directories can't be created.
@raise IOError: If the file can't be stored successfully.
"""
assert url and addr
urldir = self._mkdir(os.path.join(self.root, self._sanitize(url)))
filename = self._sanitize(addr) + os.extsep + self.ext
cluefile = os.path.join(urldir, filename)
Halberd.clues.file.save(cluefile, clues)
# vim: ts=4 sw=4 et
|