/usr/share/pyshared/mdp/nodes/convolution_nodes.py is in python-mdp 3.3-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 | __docformat__ = "restructuredtext en"
from mdp import numx, numx_linalg, utils, NodeException
import mdp
import scipy.signal as signal
# TODO automatic selection of convolution
# TODO provide generators for standard filters
# TODO look into Theano, define TheanoConvolutionNode
class Convolution2DNode(mdp.Node):
"""Convolve input data with filter banks.
The ``filters`` argument specifies a set of 2D filters that are
convolved with the input data during execution. Convolution can
be selected to be executed by linear filtering of the data, or
in the frequency domain using a Discrete Fourier Transform.
Input data can be given as 3D data, each row being a 2D array
to be convolved with the filters, or as 2D data, in which case
the ``input_shape`` argument must be specified.
This node depends on ``scipy``.
"""
def __init__(self, filters, input_shape = None,
approach = 'fft',
mode = 'full', boundary = 'fill', fillvalue = 0,
output_2d = True,
input_dim = None, dtype = None):
"""
Input arguments:
input_shape -- Is a tuple (h,w) that corresponds to the height and
width of the input 2D data. If the input data is given
in a flattened format, it is first reshaped before
convolution
approach -- 'approach' is one of ['linear', 'fft']
'linear': convolution is done by linear filtering;
'fft': convoltion is done using the Fourier Transform
If 'approach' is 'fft', the 'boundary' and 'fillvalue' arguments
are ignored, and are assumed to be 'fill' and 0, respectively.
(*Default* = 'fft')
mode -- Convolution mode, as defined in scipy.signal.convolve2d
'mode' is one of ['valid', 'same', 'full']
(*Default* = 'full')
boundary -- Boundary condition, as defined in scipy.signal.convolve2d
'boundary' is one of ['fill', 'wrap', 'symm']
(*Default* = 'fill')
fillvalue -- Value to fill pad input arrays with
(*Default* = 0)
output_2d -- If True, the output array is 2D; the first index
corresponds to data points; every output data point
is the result of flattened convolution results, with
the output of each filter concatenated together.
If False, the output array is 4D; the format is
data[idx,filter_nr,x,y], with
filter_nr: index of convolution filter
idx: data point index
x, y: 2D coordinates
"""
super(Convolution2DNode, self).__init__(input_dim=input_dim,
dtype=dtype)
self.filters = filters
self._input_shape = input_shape
if approach not in ['linear', 'fft']:
raise NodeException("'approach' argument must be one of ['linear', 'fft']")
self._approach = approach
if mode not in ['valid', 'same', 'full']:
raise NodeException("'mode' argument must be one of ['valid', 'same', 'full']")
self._mode = mode
self.boundary = boundary
self.fillvalue = fillvalue
self.output_2d = output_2d
self._output_shape = None
# ------- class properties
def get_filters(self):
return self._filters
def set_filters(self, filters):
if not isinstance(filters, numx.ndarray):
raise NodeException("'filters' argument must be a numpy array")
if filters.ndim != 3:
raise NodeException('Filters must be specified in a 3-dim array, with each '+
'filter on a different row')
self._filters = filters
filters = property(get_filters, set_filters)
def get_boundary(self):
return self._boundary
def set_boundary(self, boundary):
if boundary not in ['fill', 'wrap', 'symm']:
raise NodeException(
"'boundary' argument must be one of ['fill', 'wrap', 'symm']")
self._boundary = boundary
boundary = property(get_boundary, set_boundary)
@property
def input_shape(self):
return self._input_shape
@property
def approach(self):
return self._approach
@property
def mode(self):
return self._mode
@property
def output_shape(self):
return self._output_shape
# ------- /class properties
def is_trainable(self):
return False
def is_invertible(self):
return False
def _get_supported_dtypes(self):
"""Return the list of dtypes supported by this node.
Support floating point types with size smaller or equal than 64 bits.
This is because fftpack does not support floating point types larger
than that.
"""
return [t for t in utils.get_dtypes('Float') if t.itemsize<=8]
def _pre_execution_checks(self, x):
"""This method contains all pre-execution checks.
It can be used when a subclass defines multiple execution methods.
In this case, the output dimension depends on the type of
convolution we use (padding, full, ...). Also, we want to
to be able to accept 3D arrays.
"""
# check input rank
if not x.ndim in [2,3]:
error_str = "x has rank %d, should be 2 or 3" % (x.ndim)
raise NodeException(error_str)
# set 2D shape if necessary
if self._input_shape is None:
if x.ndim == 2:
error_str = "Cannot infer 2D shape from 1D data points. " + \
"Data must have rank 3, or shape argument given."
raise NodeException(error_str)
else:
self._input_shape = x.shape[1:]
# set the input dimension if necessary
if self.input_dim is None:
self.input_dim = numx.prod(self._input_shape)
# set the dtype if necessary
if self.dtype is None:
self.dtype = x.dtype
# check the input dimension
if not numx.prod(x.shape[1:]) == self.input_dim:
error_str = "x has dimension %d, should be %d" % (x.shape[1],
self.input_dim)
raise NodeException(error_str)
# set output_dim if necessary
if self.output_dim is None:
input_shape = self.input_shape
filters_shape = self.filters.shape
if self.mode == 'same':
self._output_shape = input_shape
elif self.mode == 'full':
self._output_shape = (input_shape[0]+filters_shape[1]-1,
input_shape[1]+filters_shape[2]-1)
else: # mode == 'valid'
self._output_shape = (input_shape[0]-filters_shape[1]+1,
input_shape[1]-filters_shape[2]+1)
self.output_dim = self.filters.shape[0]*numx.prod(self._output_shape)
if x.shape[0] == 0:
error_str = "x must have at least one observation (zero given)"
raise NodeException(error_str)
def _execute(self, x):
is_2d = x.ndim==2
output_shape, input_shape = self._output_shape, self._input_shape
filters = self.filters
nfilters = filters.shape[0]
# XXX depends on convolution
y = numx.empty((x.shape[0], nfilters,
output_shape[0], output_shape[1]), dtype=self.dtype)
for n_im, im in enumerate(x):
if is_2d:
im = im.reshape(input_shape)
for n_flt, flt in enumerate(filters):
if self.approach == 'fft':
y[n_im,n_flt,:,:] = signal.fftconvolve(im, flt, mode=self.mode)
elif self.approach == 'linear':
y[n_im,n_flt,:,:] = signal.convolve2d(im, flt,
mode=self.mode,
boundary=self.boundary,
fillvalue=self.fillvalue)
# reshape if necessary
if self.output_2d:
y.resize((y.shape[0], self.output_dim))
return y
|