/usr/lib/python3/dist-packages/partd/utils.py is in python3-partd 0.3.8-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 | from contextlib import contextmanager
import os
import shutil
import tempfile
import struct
def raises(exc, lamda):
try:
lamda()
return False
except exc:
return True
@contextmanager
def tmpfile(extension=''):
extension = '.' + extension.lstrip('.')
handle, filename = tempfile.mkstemp(extension)
os.close(handle)
os.remove(filename)
try:
yield filename
finally:
if os.path.exists(filename):
if os.path.isdir(filename):
shutil.rmtree(filename)
else:
os.remove(filename)
def frame(bytes):
""" Pack the length of the bytes in front of the bytes
TODO: This does a full copy. This should maybe be inlined somehow
whereever this gets used instead. My laptop shows a data bandwidth of
2GB/s
"""
return struct.pack('Q', len(bytes)) + bytes
def framesplit(bytes):
""" Split buffer into frames of concatenated chunks
>>> data = frame(b'Hello') + frame(b'World')
>>> list(framesplit(data)) # doctest: +SKIP
[b'Hello', b'World']
"""
i = 0; n = len(bytes)
chunks = list()
while i < n:
nbytes = struct.unpack('Q', bytes[i:i+8])[0]
i += 8
yield bytes[i: i + nbytes]
i += nbytes
def partition_all(n, bytes):
""" Partition bytes into evenly sized blocks
The final block holds the remainder and so may not be of equal size
>>> list(partition_all(2, b'Hello'))
['He', 'll', 'o']
See Also:
toolz.partition_all
"""
if len(bytes) < n: # zero copy fast common case
yield bytes
else:
for i in range(0, len(bytes), n):
yield bytes[i: i+n]
@contextmanager
def ignoring(*exc):
try:
yield
except exc:
pass
@contextmanager
def do_nothing(*args, **kwargs):
yield
def nested_get(ind, coll, lazy=False):
""" Get nested index from collection
Examples
--------
>>> nested_get(1, 'abc')
'b'
>>> nested_get([1, 0], 'abc')
['b', 'a']
>>> nested_get([[1, 0], [0, 1]], 'abc')
[['b', 'a'], ['a', 'b']]
"""
if isinstance(ind, list):
if lazy:
return (nested_get(i, coll, lazy=lazy) for i in ind)
else:
return [nested_get(i, coll, lazy=lazy) for i in ind]
return seq
else:
return coll[ind]
def flatten(seq):
"""
>>> list(flatten([1]))
[1]
>>> list(flatten([[1, 2], [1, 2]]))
[1, 2, 1, 2]
>>> list(flatten([[[1], [2]], [[1], [2]]]))
[1, 2, 1, 2]
>>> list(flatten(((1, 2), (1, 2)))) # Don't flatten tuples
[(1, 2), (1, 2)]
>>> list(flatten((1, 2, [3, 4]))) # support heterogeneous
[1, 2, 3, 4]
"""
for item in seq:
if isinstance(item, list):
for item2 in flatten(item):
yield item2
else:
yield item
def suffix(key, term):
""" suffix a key with a suffix
Works if they key is a string or a tuple
>>> suffix('x', '.dtype')
'x.dtype'
>>> suffix(('a', 'b', 'c'), '.dtype')
('a', 'b', 'c.dtype')
"""
if isinstance(key, str):
return key + term
elif isinstance(key, tuple):
return key[:-1] + (suffix(key[-1], term),)
else:
return suffix(str(key), term)
def extend(key, term):
""" extend a key with a another element in a tuple
Works if they key is a string or a tuple
>>> extend('x', '.dtype')
('x', '.dtype')
>>> extend(('a', 'b', 'c'), '.dtype')
('a', 'b', 'c', '.dtype')
"""
if isinstance(term, tuple):
pass
elif isinstance(term, str):
term = (term,)
else:
term = (str(term),)
if not isinstance(key, tuple):
key = (key,)
return key + term
|