/usr/share/pyshared/DiskManager/Fstab/DiskInfo.py is in disk-manager 1.1.1-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 | # -*- coding: UTF-8 -*-
#
# DiskInfo.py : Detect and get informations about block devices
# Copyright (C) 2007 Mertens Florent <flomertens@gmail.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
import os
import sys
import glob
import logging
from commands import *
from subprocess import *
from Fstabconfig import *
from FstabError import *
import FstabData
REQUIRED = ["DEV", "DEVICE", "DEV_NUM", "FS_TYPE"]
DEFAULT_BACKEND = "ToolsBackend"
def get_diskinfo_backend(backend = "auto") :
''' get_diskinfo_backend([backend]) -> return the best backend available,
or the backend specify by backend.\n
ex : info = get_diskinfo_backend()() '''
backends = []
path = os.path.dirname(__file__)
if not path :
path = os.getcwd()
sys.path.append(path)
names = [ os.path.basename(path)[:-3] for path in os.listdir(path)
if path.endswith("Backend.py") ]
for name in names:
module = __import__( name )
for attr in dir(module):
obj = getattr(module, attr)
if hasattr(obj, "_backend_score") :
backends.append(obj)
if len(backends) == 0 :
logging.warning("No DiskInfo backend found. Trying the default one...")
module = __import__( DEFAULT_BACKEND )
return getattr(module, DEFAULT_BACKEND)
if not backend == "auto" :
backend = backend[0].upper() + backend[1:] + "Backend"
if not backend in [ k.__name__ for k in backends ] :
logging.warning("%s is not available. Using auto backend." % backend)
elif getattr(module, backend )()._backend_score() :
return getattr(module, backend)
else :
logging.warning("%s is not currently working. Using auto backend." % backend)
scores = [ k()._backend_score() for k in backends ]
return backends[scores.index(max(scores))]
class DiskInfoBase(list) :
''' Base class for all DiskInfo backend. This class implememt all the infrastructure
to manage DiskInfo class, and leave only to backends the works to get sensible
informations about a device. '''
def __init__(self) :
self._loaded = False
self._loading = False
self._driver_db = {}
def _backend_score(self) :
''' Return the reliability score of the backend (0-100). You must override
this method when creating a backend. You might probably make this score
dependant of the versions of the tools you use. Keep also in mind that
the score of the default backend (ToolsBackend) is set to 50 '''
return 0
def _get_devices(self) :
''' This function try to get all block devices from /proc/partitions. You might or not
override this method, dependending on how reliable you think it is. '''
fd = open("/proc/partitions")
devices = fd.readlines()[2:]
fd.close()
return devices
def load_database(self, reload = False) :
''' x.load_database([reload]) -> load the database.\n
At DiskInfo creation, the database is not created. You have to call load_database
to create it. But you should not need to explicitly doing it, since all other methods
that use the database should do it. In fact this method is keeped public only
for it's reload option : when calling load_database with reload = True, the
database is reloaded '''
if (self._loaded or self._loading) and not reload :
return
logging.debug("Starting DiskInfo database creation...")
self._loading = True
del self[:]
devices = self._get_devices()
for i in range(len(devices)) :
self.append({})
self[i] = self._get_device_info(devices[i])
self[i] = self._get_common_device_info(self[i])
logging.debug("-"*10)
self._load_reverse_database()
self._loaded = True
self._loading = False
logging.debug("End DiskInfo database creation.")
def _get_device_info(self, device) :
''' This is the method that all backends must override. It should returns a dict of
information about device. Informations are :
"DEV" : simple device name, like sda1 or dm-0. (required)
"DEVICE" : the device path, like /dev/sda1, or /dev/mapper/*. (required)
"FS_UUID" : the UUID of the device (required)
"FS_LABEL" : the label of the device (required if available)
"FS_TYPE" : the type of the device, like ext3. (required)
"FS_USAGE" : the usage of the fs, like filesystem, or crypto.
Devices with FS_USAGE != filesystem are ignored. (required)
"REMOVABLE" : set to True if the device is from a removable drive.
Removable devices are ignored. (required)
"MINOR" : the minor number of the device (required)
"MAJOR" : the major number of the device (required)
"SYSFS_PATH": the sysfs path (recommanded)
"SIZE" : the size of the device (recommanded)
"FS_VERSION": the version of the type (recommanded)
"MODEL", "SERIAL_SHORT", "BUS", "VENDOR", "SERIAL", "TYPE", "REVISION" :
some informations about the device, and its drive. (optionnal) '''
#Impement your backend here
return device
def _get_common_device_info(self, dev) :
''' This method complete device informations with common attributes that should not
be the job of a backend. Also set here what we ignore or not. '''
# Set FS_DRIVERS
if dev.has_key("FS_TYPE") and dev.has_key("FS_USAGE") and dev["FS_USAGE"] == "filesystem" :
dev["FS_DRIVERS"] = self.get_drivers(dev["FS_TYPE"])
# Set FS_LABEL_SAFE
if dev.has_key("FS_LABEL") and not dev.has_key("FS_LABEL_SAFE") :
label = dev["FS_LABEL"]
label = label.replace("/", "")
for char in FstabData.special_char :
label = label.replace(char, "_")
if label :
logging.debug("-> Set FS_LABEL_SAFE : " + label)
dev["FS_LABEL_SAFE"] = label
# Set DEV_NUM
if dev.has_key("MINOR") and dev.has_key("MAJOR") :
dev["DEV_NUM"] = os.makedev(int(dev["MAJOR"]), int(dev["MINOR"]))
# Ignore every dev if FS_USAGE != filesystem or if REMOVABLE
# also ignore dev that don't have all REQUIRED attributes
dev["IGNORE"] = False
if not dev.has_key("FS_USAGE") :
dev["IGNORE"] = True
elif not dev["FS_USAGE"] == "filesystem" :
dev["IGNORE"] = True
if not min(map(dev.has_key, REQUIRED)) :
dev["IGNORE"] = True
if dev["REMOVABLE"] :
dev["IGNORE"] = True
# Ignore older entry with the same DEVICE
if dev.has_key("DEVICE") :
for device in self.search(dev["DEVICE"], keys = ["DEVICE"]) :
if not self[device]["DEV"] == dev["DEV"] :
logging.debug("W: " + "Ignore duplicate entry : " + self[device]["DEV"] \
+ " -> " + self[device]["DEVICE"])
self[device]["IGNORE"] = True
return dev
def _load_reverse_database(self) :
self._reverse_database = {}
for i in range(len(self)) :
if self[i].has_key("DEV_NUM") :
self._reverse_database["DEV_NUM=%s" % self[i]["DEV_NUM"]] = i
if self[i].has_key("FS_UUID") :
self._reverse_database["UUID=%s" % self[i]["FS_UUID"]] = i
if self[i].has_key("FS_LABEL") :
self._reverse_database["LABEL=%s" % self[i]["FS_LABEL"]] = i
def get_drivers(self, type, reload = False) :
''' x.get_drivers(type, [reload]) -> return a dict of available driver for this type.\n
Return dict is of the type : {primary : [[name1, description1, fsck1], ...], \
secondary : [[name2, description2, fsck2], ...], \
all : {name1 : [name1, description1, fsck1], name2 : [name2, description2, fsck2], ...}}
fsck = 1 if it exists an fsck for this driver, otherwise it is 0.
drivers are checked in /proc/filesystem, modprobe -l, and in /sbin/mount.*.
To avoid having to do the work again and again, result is cached, and so
you ll need to set reload to True to have up to date results. '''
if self._driver_db.has_key(type) and not reload :
return self._driver_db["type"]
self._driver_db["type"] = { "primary" : [], "secondary" : [], "all" : {} }
if type in open("/proc/filesystems").read() or self._check_module(type) :
self._driver_db["type"]["primary"].append([type, "Default driver"])
for special in glob.glob("/sbin/mount.%s*" % type) :
if os.path.isfile(special) :
special = special.split("mount.")[-1]
self._driver_db["type"]["primary"].append([special, \
FstabData.special_driver.get(special, FstabData.special_driver["__unknow__"])])
if FstabData.secondary_driver.has_key("__all__") :
self._driver_db["type"]["secondary"].append([ \
FstabData.secondary_driver["__all__"], "Secondary Driver"])
if FstabData.secondary_driver.has_key(type) :
self._driver_db["type"]["secondary"].append([ \
FstabData.secondary_driver[type], "Secondary Driver"])
for driver in self._driver_db["type"]["primary"] + self._driver_db["type"]["secondary"] :
driver.append(int(os.path.isfile("/sbin/fsck.%s" % driver[0])))
self._driver_db["type"]["all"][driver[0]] = driver
return self._driver_db["type"]
def _check_module(self, module) :
return bool(module in getoutput("%s -l %s" % (MODPROBE, module)))
def __getitem__(self, item) :
self.load_database()
if type(item) == int :
if item < len(self) :
return list.__getitem__(self, item)
else :
raise NotInDatabase, "Index %i out of range" % item
else :
try :
return list.__getitem__(self, self.search(item)[0])
except :
raise NotInDatabase, "Can't find %s in the database" % item
def list(self, col = "DEVICE", ignored = True, keep_index = False) :
''' x.list([col], [ignored], [keep_index]) -> List all values of attribute col.
Default to "DEVICE"\n
If ignored is set to False, don't list device with IGNORE=True. Default to True.
If keep_index is set to True and ignored to False, all ignored device result to
an empty string, to keep the index with the database. Default to False. '''
self.load_database()
result = []
for k in self :
if not ignored and k["IGNORE"] :
if keep_index :
result.append("")
continue
if k.has_key(col) :
result.append(k[col])
else :
result.append("")
return result
def get(self, item, attribute) :
''' x.get(item, attribute) -> return attribute of item '''
self.load_database()
try :
return self[item][attribute]
except :
return "None"
def search_reverse(self, pattern, ignored = True) :
''' x.search_reverse(pattern, [ignored]) -> search for pattern in the reverse database\n
This method is faster than the search method, but not as flexible, since pattern is
searched in fixed keys : "DEV_NUM", "FS_UUID", "FS_LABEL"
Set ignored to False, to not return device with IGNORE=True. Default to True '''
self.load_database()
if self._loaded and self._reverse_database.has_key(pattern) :
result = self._reverse_database[pattern]
if ignored or not self[result]["IGNORE"] :
return result
return None
def search_device(self, entry, ignored = True) :
''' x.search_device(entry, ignored = True) -> search entry in the reverse database\n
search_device is a convenient method that will check if device is of
the type dev_file, "UUID=", "LABEL=" and will search in the reverse database
the appropriate value.
Set ignored to False, to not return device with IGNORE=True. Default to True '''
if os.path.exists(entry) :
entry = "DEV_NUM=%s" % os.stat(os.path.realpath(entry)).st_rdev
return self.search_reverse(entry, ignored)
def search(self, pattern, keys = ["DEV", "DEVICE"], ignored = True) :
''' x.search(pattern, [list], [keys], [ignored]) -> search for pattren in each
keys of each Entry of x\n
Default keys are : ["DEV", "DEVICE"]
Set ignored to False, to not return device with IGNORE=True. Default to True '''
self.load_database()
result = []
for col in keys :
i = 0
for value in self.list(col, ignored, keep_index = True) :
if value == pattern and i not in result :
result.append(i)
if col == "FS_UUID" :
if "UUID=" + value == pattern and i not in result :
result.append(i)
if col == "FS_LABEL" :
if "LABEL=" + value == pattern and i not in result :
result.append(i)
i = i + 1
return result
def export(self, device) :
''' x.export(device) -> query database for device, and return a string of printable
informations about device.\n
If device is set to "all", query database for all devices '''
self.load_database()
result = ""
if device == "all" :
result += "Query database for all devices :\n\n"
for i in range(len(self)) :
result += "Info for " + self[i]["DEV"] + " :\n"
result += "\n".join(["-> %s=%s" % (k, v) for k, v in self[i].items()])
result += "\n\n"
else :
result += "Query database for " + device + " :\n\n"
if self.search(device) :
result += "Info for " + self[device]["DEV"] + " :\n"
result += "\n".join(["-> %s=%s" % (k, v) for k, v in self[device].items()])
result += "\n\n"
else :
result += device + " not in the database"
return result
|