/usr/lib/python3/dist-packages/partd/file.py is in python3-partd 0.3.8-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 | from __future__ import absolute_import
import atexit
import os
import shutil
import string
import tempfile
from .core import Interface
import locket
from .utils import ignoring
class File(Interface):
def __init__(self, path=None, dir=None):
if not path:
path = tempfile.mkdtemp(suffix='.partd', dir=dir)
cleanup_files.append(path)
self._explicitly_given_path = False
else:
self._explicitly_given_path = True
self.path = path
if not os.path.exists(path):
with ignoring(OSError):
os.makedirs(path)
self.lock = locket.lock_file(self.filename('.lock'))
Interface.__init__(self)
def __getstate__(self):
return {'path': self.path}
def __setstate__(self, state):
Interface.__setstate__(self, state)
File.__init__(self, state['path'])
def append(self, data, lock=True, fsync=False, **kwargs):
if lock: self.lock.acquire()
try:
for k, v in data.items():
fn = self.filename(k)
if not os.path.exists(os.path.dirname(fn)):
os.makedirs(os.path.dirname(fn))
with open(fn, 'ab') as f:
f.write(v)
if fsync:
os.fsync(f)
finally:
if lock: self.lock.release()
def _get(self, keys, lock=True, **kwargs):
assert isinstance(keys, (list, tuple, set))
if lock:
self.lock.acquire()
try:
result = []
for key in keys:
try:
with open(self.filename(key), 'rb') as f:
result.append(f.read())
except IOError:
result.append(b'')
finally:
if lock:
self.lock.release()
return result
def _iset(self, key, value, lock=True):
""" Idempotent set """
fn = self.filename(key)
if not os.path.exists(os.path.dirname(fn)):
os.makedirs(os.path.dirname(fn))
if lock:
self.lock.acquire()
try:
with open(self.filename(key), 'wb') as f:
f.write(value)
finally:
if lock:
self.lock.release()
def _delete(self, keys, lock=True):
if lock:
self.lock.acquire()
try:
for key in keys:
path = filename(self.path, key)
if os.path.exists(path):
os.remove(path)
finally:
if lock:
self.lock.release()
def drop(self):
if os.path.exists(self.path):
shutil.rmtree(self.path)
self._iset_seen.clear()
os.mkdir(self.path)
def filename(self, key):
return filename(self.path, key)
def __exit__(self, *args):
self.drop()
os.rmdir(self.path)
def __del__(self):
if not self._explicitly_given_path:
self.drop()
os.rmdir(self.path)
def filename(path, key):
return os.path.join(path, escape_filename(token(key)))
# http://stackoverflow.com/questions/295135/turn-a-string-into-a-valid-filename-in-python
valid_chars = "-_.() " + string.ascii_letters + string.digits + os.path.sep
def escape_filename(fn):
""" Escape text so that it is a valid filename
>>> escape_filename('Foo!bar?')
'Foobar'
"""
return ''.join(filter(valid_chars.__contains__, fn))
def token(key):
"""
>>> token('hello')
'hello'
>>> token(('hello', 'world')) # doctest: +SKIP
'hello/world'
"""
if isinstance(key, str):
return key
elif isinstance(key, tuple):
return os.path.join(*map(token, key))
else:
return str(key)
cleanup_files = list()
@atexit.register
def cleanup():
for fn in cleanup_files:
if os.path.exists(fn):
shutil.rmtree(fn)
|