/usr/lib/python3/dist-packages/partd/pandas.py is in python3-partd 0.3.8-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 | from __future__ import absolute_import
from functools import partial
import numpy as np
import pandas as pd
from pandas.core.internals import create_block_manager_from_blocks, make_block
from . import numpy as pnp
from .core import Interface
from .compatibility import pickle
from .encode import Encode
from .utils import extend, framesplit, frame
dumps = partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
class PandasColumns(Interface):
def __init__(self, partd=None):
self.partd = pnp.Numpy(partd)
Interface.__init__(self)
def append(self, data, **kwargs):
for k, df in data.items():
self.iset(extend(k, '.columns'), dumps(list(df.columns)))
self.iset(extend(k, '.index-name'), dumps(df.index.name))
# TODO: don't use values, it does some work. Look at _blocks instead
# pframe/cframe do this well
arrays = dict((extend(k, col), df[col].values)
for k, df in data.items()
for col in df.columns)
arrays.update(dict((extend(k, '.index'), df.index.values)
for k, df in data.items()))
# TODO: handle categoricals
self.partd.append(arrays, **kwargs)
def _get(self, keys, columns=None, **kwargs):
if columns is None:
columns = self.partd.partd.get([extend(k, '.columns') for k in keys],
**kwargs)
columns = list(map(pickle.loads, columns))
else:
columns = [columns] * len(keys)
index_names = self.partd.partd.get([extend(k, '.index-name')
for k in keys], **kwargs)
index_names = map(pickle.loads, index_names)
keys = [[extend(k, '.index'), [extend(k, col) for col in cols]]
for k, cols in zip(keys, columns)]
arrays = self.partd.get(keys, **kwargs)
return [pd.DataFrame(dict(zip(cols, arrs)), columns=cols,
index=pd.Index(index, name=iname))
for iname, (index, arrs), cols in zip(index_names, arrays, columns)]
def __getstate__(self):
return {'partd': self.partd}
def _iset(self, key, value):
return self.partd._iset(key, value)
def drop(self):
return self.partd.drop()
@property
def lock(self):
return self.partd.partd.lock
def __exit__(self, *args):
self.drop()
self.partd.__exit__(self, *args)
def __del__(self):
self.partd.__del__()
def index_to_header_bytes(ind):
# These have special `__reduce__` methods, just use pickle
if isinstance(ind, (pd.DatetimeIndex,
pd.MultiIndex,
pd.RangeIndex)):
return None, dumps(ind)
if isinstance(ind, pd.CategoricalIndex):
cat = (ind.ordered, ind.categories)
values = ind.codes
else:
cat = None
values = ind.values
header = (type(ind), ind._get_attributes_dict(), values.dtype, cat)
bytes = pnp.compress(pnp.serialize(values), values.dtype)
return header, bytes
def index_from_header_bytes(header, bytes):
if header is None:
return pickle.loads(bytes)
typ, attr, dtype, cat = header
data = pnp.deserialize(pnp.decompress(bytes, dtype), dtype, copy=True)
if cat:
data = pd.Categorical.from_codes(data, cat[1], ordered=cat[0])
return typ.__new__(typ, data=data, **attr)
def block_to_header_bytes(block):
values = block.values
try:
# pandas >= 0.19
from pandas.api.types import is_datetime64tz_dtype
except ImportError:
from pandas.core.common import is_datetime64tz_dtype
if isinstance(values, pd.Categorical):
extension = ('categorical_type', (values.ordered, values.categories))
values = values.codes
elif is_datetime64tz_dtype(block):
# TODO: compat with older pandas?
extension = ('datetime64_tz_type', (block.values.tzinfo,))
values = np.asarray(values)
else:
extension = ('numpy_type', ())
header = (block.mgr_locs.as_array, values.dtype, values.shape, extension)
bytes = pnp.compress(pnp.serialize(values), values.dtype)
return header, bytes
def block_from_header_bytes(header, bytes):
placement, dtype, shape, (extension_type, extension_values) = header
values = pnp.deserialize(pnp.decompress(bytes, dtype), dtype,
copy=True).reshape(shape)
if extension_type == 'categorical_type':
values = pd.Categorical.from_codes(values,
extension_values[1],
ordered=extension_values[0])
elif extension_type == 'datetime64_tz_type':
tz_info = extension_values[0]
values = pd.DatetimeIndex(values).tz_localize('utc').tz_convert(
tz_info)
return make_block(values, placement=placement)
def serialize(df):
""" Serialize and compress a Pandas DataFrame
Uses Pandas blocks, snappy, and blosc to deconstruct an array into bytes
"""
col_header, col_bytes = index_to_header_bytes(df.columns)
ind_header, ind_bytes = index_to_header_bytes(df.index)
headers = [col_header, ind_header]
bytes = [col_bytes, ind_bytes]
for block in df._data.blocks:
h, b = block_to_header_bytes(block)
headers.append(h)
bytes.append(b)
frames = [dumps(headers)] + bytes
return b''.join(map(frame, frames))
def deserialize(bytes):
""" Deserialize and decompress bytes back to a pandas DataFrame """
frames = list(framesplit(bytes))
headers = pickle.loads(frames[0])
bytes = frames[1:]
axes = [index_from_header_bytes(headers[0], bytes[0]),
index_from_header_bytes(headers[1], bytes[1])]
blocks = [block_from_header_bytes(h, b)
for (h, b) in zip(headers[2:], bytes[2:])]
return pd.DataFrame(create_block_manager_from_blocks(blocks, axes))
def join(dfs):
if not dfs:
return pd.DataFrame()
else:
return pd.concat(dfs)
PandasBlocks = partial(Encode, serialize, deserialize, join)
|