This file is indexed.

/usr/lib/python2.7/dist-packages/spykeutils/tools.py is in python-spykeutils 0.4.3-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
import neo
try:
    import neo.description
    HAS_DESCRIPTION = True
except ImportError:
    HAS_DESCRIPTION = False
import quantities as pq
import scipy as sp
import _scipy_quantities as spq


def apply_to_dict(fn, dictionary, *args):
    """ Applies a function to all spike trains in a dictionary of spike train
    sequences.

    :param function fn: Function to apply. Should take a
        :class:`neo.core.SpikeTrain` as first argument.
    :param dict dictionary: Dictionary of sequences of
        :class:`neo.core.SpikeTrain` objects to apply the function to.
    :param args: Additional arguments which will be passed to ``fn``.
    :returns: A new dictionary with the same keys as ``dictionary``.
    :rtype: dict
    """

    applied = {}
    for k in dictionary:
        applied[k] = [fn(st, *args) for st in dictionary[k]]
    return applied


def bin_spike_trains(trains, sampling_rate, t_start=None, t_stop=None):
    """ Creates binned representations of spike trains.

    :param dict trains: A dictionary of sequences of
        :class:`neo.core.SpikeTrain` objects.
    :param sampling_rate: The sampling rate which will be used to bin
        the spike trains as inverse time scalar.
    :type sampling_rate: Quantity scalar
    :type t_start: The desired time for the start of the first bin as time
        scalar. It will be the minimum start time of all spike trains if
        ``None`` is passed.
    :type t_start: Quantity scalar
    :param t_stop: The desired time for the end of the last bin as time scalar.
        It will be the maximum stop time of all spike trains if ``None`` is
        passed.
    :type t_stop: Quantity scalar
    :returns: A dictionary (with the same indices as ``trains``) of lists
        of spike train counts and the bin borders.
    :rtype: dict, Quantity 1D with time units
    """
    if t_start is None or t_stop is None:
        max_start, max_stop = maximum_spike_train_interval(trains)
        if t_start is None:
            t_start = max_start
        if t_stop is None:
            t_stop = max_stop

    t_start = t_start.rescale(t_stop.units)

    duration = t_stop - t_start
    num_bins = (sampling_rate * duration).simplified
    bins = sp.arange(num_bins + 1) * (duration / num_bins) + t_start
    return apply_to_dict(_bin_single_spike_train, trains, bins), bins


def _bin_single_spike_train(train, bins):
    """ Return a binned representation of SpikeTrain object.

    :param train: A spike train to bin.
    :type train: :class:`neo.core.SpikeTrain`
    :param bins: The bin edges, including the rightmost edge, with time units.
    :type bins: Quantity 1D
    :returns: The binned spike train.
    :rtype: 1-D array
    """
    return sp.histogram(train.rescale(bins.units), bins)[0]


def concatenate_spike_trains(trains):
    """ Concatenates spike trains.

    :param sequence trains: :class:`neo.core.SpikeTrain` objects to
        concatenate.
    :returns: A spike train consisting of the concatenated spike trains. The
        spikes will be in the order of the given spike trains and ``t_start``
        and ``t_stop`` will be set to the minimum and maximum value.
    :rtype: :class:`neo.core.SpikeTrain`
    """

    t_start, t_stop = maximum_spike_train_interval({0: trains})
    return neo.SpikeTrain(
        spq.concatenate([train.view(type=pq.Quantity) for train in trains]),
        t_start=t_start, t_stop=t_stop)


def minimum_spike_train_interval(
        trains, t_start=-sp.inf * pq.s, t_stop=sp.inf * pq.s):
    """ Computes the maximum starting time and minimum end time that all
    given spike trains share. This yields the shortest interval shared by all
    spike trains.

    :param dict trains: A dictionary of sequences of
        :class:`neo.core.SpikeTrain` objects.
    :param t_start: Minimal starting time to return.
    :type t_start: Quantity scalar
    :param t_stop: Maximum end time to return. If ``None``, infinity is used.
    :type t_stop: Quantity scalar
    :returns: Maximum shared t_start time and minimum shared t_stop time as
        time scalars.
    :rtype: Quantity scalar, Quantity scalar
    """
    if t_stop is None:
        t_stop = sp.inf * pq.s

    # Load data and find shortest spike train
    for st in trains.itervalues():
        if len(st) > 0:
            # Minimum length of spike of all spike trains for this unit
            t_start = max(t_start, max((t.t_start for t in st)))
            t_stop = min(t_stop, min((t.t_stop for t in st)))

    if t_stop == sp.inf * pq.s:
        t_stop = t_start

    return t_start, t_stop


def maximum_spike_train_interval(
        trains, t_start=sp.inf * pq.s, t_stop=-sp.inf * pq.s):
    """ Computes the minimum starting time and maximum end time of all
    given spike trains. This yields an interval containing the spikes of
    all spike trains.

    :param dict trains: A dictionary of sequences of
        :class:`neo.core.SpikeTrain` objects.
    :param t_start: Maximum starting time to return.
    :type t_start: Quantity scalar
    :param t_stop: Minimum end time to return. If ``None``, infinity is used.
    :type t_stop: Quantity scalar
    :returns: Minimum t_start time and maximum t_stop time as time scalars.
    :rtype: Quantity scalar, Quantity scalar
    """
    if t_stop is None:
        t_stop = sp.inf * pq.s

    for st in trains.itervalues():
        if len(st) > 0:
            t_start = min(t_start, min((t.t_start for t in st)))
            t_stop = max(t_stop, max((t.t_stop for t in st)))

    return t_start, t_stop


def _handle_orphans(obj, remove):
    """ Removes half-orphaned Spikes and SpikeTrains that occur when
    removing an object upwards in the hierarchy.
    """
    if isinstance(obj, neo.Segment):
        for s in obj.spikes:
            if s.unit:
                if not remove:
                    s.segment = None
                else:
                    try:
                        s.unit.spikes.remove(s)
                    except ValueError:
                        pass

        for st in obj.spiketrains:
            if st.unit:
                if not remove:
                    st.segment = None
                else:
                    try:
                        st.unit.spiketrains.remove(st)
                    except ValueError:
                        pass
    elif isinstance(obj, neo.Unit):
        for s in obj.spikes:
            if s.segment:
                if not remove:
                    s.unit = None
                else:
                    try:
                        s.segment.spikes.remove(s)
                    except ValueError:
                        pass

        for st in obj.spiketrains:
            if st.segment:
                if not remove:
                    st.unit = None
                else:
                    try:
                        st.segment.spiketrains.remove(st)
                    except ValueError:
                        pass
    elif isinstance(obj, neo.RecordingChannelGroup):
        for u in obj.units:
            _handle_orphans(u, remove)


def remove_from_hierarchy(obj, remove_half_orphans=True):
    """ Removes a Neo object from the hierarchy it is embedded in. Mostly
    downward links are removed (except for possible links in
    :class:`neo.core.Spike` or :class:`neo.core.SpikeTrain` objects).
    For example, when ``obj`` is a :class:`neo.core.Segment`, the link from
    its parent :class:`neo.core.Block` will be severed. Also, all links to
    the segment from its spikes and spike trains will be severed.

    :param obj: The object to be removed.
    :type obj: Neo object
    :param bool remove_half_orphans: When True, :class:`neo.core.Spike`
        and :class:`neo.core.SpikeTrain` belonging to a
        :class:`neo.core.Segment` or :class:`neo.core.Unit` removed by
        this function will be removed from the hierarchy as well, even
        if they are still linked from a :class:`neo.core.Unit` or
        :class:`neo.core.Segment`, respectively. In this case, their
        links to the hierarchy defined by ``obj`` will be kept intact.
    """
    classname = type(obj).__name__

    # Parent for arbitrary object
    if HAS_DESCRIPTION:
        if classname in neo.description.many_to_one_relationship:
            for n in neo.description.many_to_one_relationship[classname]:
                p = getattr(obj, n.lower())
                if p is None:
                    continue
                l = getattr(p, classname.lower() + 's', ())
                try:
                    l.remove(obj)
                except ValueError:
                    pass
    else:
        for n in obj._single_parent_objects:
            p = getattr(obj, n.lower())
            if p is None:
                continue
            l = getattr(p, classname.lower() + 's', ())
            try:
                l.remove(obj)
            except ValueError:
                pass

    # Many-to-many relationships
    if isinstance(obj, neo.RecordingChannel):
        for rcg in obj.recordingchannelgroups:
            try:
                idx = rcg.recordingchannels.index(obj)
                if rcg.channel_indexes.shape[0] == len(rcg.recordingchannels):
                    rcg.channel_indexes = sp.delete(rcg.channel_indexes, idx)
                if rcg.channel_names.shape[0] == len(rcg.recordingchannels):
                    rcg.channel_names = sp.delete(rcg.channel_names, idx)
                rcg.recordingchannels.remove(obj)
            except ValueError:
                pass

    if isinstance(obj, neo.RecordingChannelGroup):
        for rc in obj.recordingchannels:
            try:
                rc.recordingchannelgroups.remove(obj)
            except ValueError:
                pass

    _handle_orphans(obj, remove_half_orphans)


def extract_spikes(train, signals, length, align_time):
    """ Extract spikes with waveforms from analog signals using a spike train. 
    Spikes that are too close to the beginning or end of the shortest signal
    to be fully extracted are ignored.

    :type train: :class:`neo.core.SpikeTrain`
    :param train: The spike times.
    :param sequence signals: A sequence of :class:`neo.core.AnalogSignal`
        objects from which the spikes are extracted. The waveforms of
        the returned spikes are extracted from these signals in the
        same order they are given.
    :type length: Quantity scalar
    :param length: The length of the waveform to extract as time scalar.
    :type align_time: Quantity scalar
    :param align_time: The alignment time of the spike times as time scalar.
        This is the time delta from the start of the extracted waveform
        to the exact time of the spike.
    :returns: A list of :class:`neo.core.Spike` objects, one for each
        time point in ``train``. All returned spikes include their
        ``waveform`` property.
    :rtype: list
    """
    if not signals:
        raise ValueError('No signals to extract spikes from')
    ref = signals[0]
    for s in signals[1:]:
        if ref.sampling_rate != s.sampling_rate:
            raise ValueError(
                'All signals for spike extraction need the same sampling rate')

    wave_unit = signals[0].units
    srate = signals[0].sampling_rate
    end = min(s.shape[0] for s in signals)

    aligned_train = train - align_time
    cut_samples = int((length * srate).simplified)

    st = sp.asarray((aligned_train * srate).simplified)

    # Find extraction epochs
    st_ok = (st >= 0) * (st < end - cut_samples)
    epochs = sp.vstack((st[st_ok], st[st_ok] + cut_samples)).T.astype(sp.int64)

    nspikes = epochs.shape[0]
    if not nspikes:
        return []

    # Create data
    data = sp.vstack([sp.asarray(s.rescale(wave_unit)) for s in signals])
    nc = len(signals)

    spikes = []
    for s in xrange(nspikes):
        waveform = sp.zeros((cut_samples, nc))
        for c in xrange(nc):
            waveform[:, c] = \
                data[c, epochs[s, 0]:epochs[s, 1]]
        spikes.append(neo.Spike(train[st_ok][s], waveform=waveform * wave_unit,
                                sampling_rate=srate))

    return spikes