/usr/lib/python3/dist-packages/partd/pandas.py is in python3-partd 0.3.7-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 | from __future__ import absolute_import
import pandas as pd
from toolz import valmap
from functools import partial
from .compatibility import pickle
from .numpy import Numpy
from .core import Interface
from .utils import extend
dumps = partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
class PandasColumns(Interface):
def __init__(self, partd=None):
self.partd = Numpy(partd)
Interface.__init__(self)
def append(self, data, **kwargs):
for k, df in data.items():
self.iset(extend(k, '.columns'), dumps(list(df.columns)))
self.iset(extend(k, '.index-name'), dumps(df.index.name))
# TODO: don't use values, it does some work. Look at _blocks instead
# pframe/cframe do this well
arrays = dict((extend(k, col), df[col].values)
for k, df in data.items()
for col in df.columns)
arrays.update(dict((extend(k, '.index'), df.index.values)
for k, df in data.items()))
# TODO: handle categoricals
self.partd.append(arrays, **kwargs)
def _get(self, keys, columns=None, **kwargs):
if columns is None:
columns = self.partd.partd.get([extend(k, '.columns') for k in keys],
**kwargs)
columns = list(map(pickle.loads, columns))
else:
columns = [columns] * len(keys)
index_names = self.partd.partd.get([extend(k, '.index-name')
for k in keys], **kwargs)
index_names = map(pickle.loads, index_names)
keys = [[extend(k, '.index'), [extend(k, col) for col in cols]]
for k, cols in zip(keys, columns)]
arrays = self.partd.get(keys, **kwargs)
return [pd.DataFrame(dict(zip(cols, arrs)), columns=cols,
index=pd.Index(index, name=iname))
for iname, (index, arrs), cols in zip(index_names, arrays, columns)]
def __getstate__(self):
return {'partd': self.partd}
def _iset(self, key, value):
return self.partd._iset(key, value)
def drop(self):
return self.partd.drop()
@property
def lock(self):
return self.partd.partd.lock
def __exit__(self, *args):
self.drop()
self.partd.__exit__(self, *args)
def __del__(self):
self.partd.__del__()
import pandas as pd
from pandas.core.internals import create_block_manager_from_blocks, make_block
from pandas.core.index import _ensure_index
def to_blocks(df):
blocks = [block.values for block in df._data.blocks]
index = df.index.values
placement = [ b.mgr_locs.as_array for b in df._data.blocks ]
return blocks, index, df.index.name, list(df.columns), placement
return {'blocks': blocks,
'index': index,
'index_name': df.index.name,
'columns': df.columns,
'placement': [ b.mgr_locs.as_array for b in df._data.blocks ]}
def from_blocks(blocks, index, index_name, columns, placement):
blocks = [ make_block(b, placement=placement[i]) for i, b in enumerate(blocks) ]
axes = [_ensure_index(columns), _ensure_index(index) ]
df = pd.DataFrame(create_block_manager_from_blocks(blocks, axes))
df.index.name = index_name
return df
from . import numpy as pnp
from .utils import framesplit, frame
def serialize(df):
""" Serialize and compress a Pandas DataFrame
Uses Pandas blocks, snappy, and blosc to deconstruct an array into bytes
"""
blocks, index, index_name, columns, placement = to_blocks(df)
categories = [(b.ordered, b.categories)
if isinstance(b, pd.Categorical)
else None
for b in blocks]
blocks = [b.codes if isinstance(b, pd.Categorical) else b
for b in blocks]
b_blocks = [pnp.compress(pnp.serialize(block), block.dtype)
for block in blocks] # this can be slightly faster if we merge both operations
b_index = pnp.compress(pnp.serialize(index), index.dtype)
frames = [dumps(index_name),
dumps(columns),
dumps(placement),
dumps(index.dtype),
b_index,
dumps([block.dtype for block in blocks]),
dumps([block.shape for block in blocks]),
dumps(categories)] + b_blocks
return b''.join(map(frame, frames))
def deserialize(bytes):
""" Deserialize and decompress bytes back to a pandas DataFrame """
frames = list(framesplit(bytes))
index_name = pickle.loads(frames[0])
columns = pickle.loads(frames[1])
placement = pickle.loads(frames[2])
dt = pickle.loads(frames[3])
index = pnp.deserialize(pnp.decompress(frames[4], dt), dt, copy=True)
dtypes = pickle.loads(frames[5])
shapes = pickle.loads(frames[6])
categories = pickle.loads(frames[7])
b_blocks = frames[8:]
blocks = [pnp.deserialize(pnp.decompress(block, dt), dt, copy=True).reshape(shape)
for block, dt, shape in zip(b_blocks, dtypes, shapes)]
blocks = [pd.Categorical.from_codes(b, cat[1], ordered=cat[0])
if cat is not None
else b
for cat, b in zip(categories, blocks)]
return from_blocks(blocks, index, index_name, columns, placement)
from .encode import Encode
def join(dfs):
if not dfs:
return pd.DataFrame()
else:
return pd.concat(dfs)
PandasBlocks = partial(Encode, serialize, deserialize, join)
|