This file is indexed.

/usr/lib/python2.7/dist-packages/neo/test/iotest/test_klustakwikio.py is in python-neo 0.3.3-2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
# -*- coding: utf-8 -*-
"""
Tests of neo.io.klustakwikio
"""

# needed for python 3 compatibility
from __future__ import absolute_import

import glob
import os.path
import sys
import tempfile

try:
    import unittest2 as unittest
except ImportError:
    import unittest

import numpy as np
import quantities as pq

import neo
from neo.test.iotest.common_io_test import BaseTestIO
from neo.test.tools import assert_arrays_almost_equal
from neo.io.klustakwikio import KlustaKwikIO, HAVE_MLAB


@unittest.skipUnless(HAVE_MLAB, "requires matplotlib")
@unittest.skipIf(sys.version_info[0] > 2, "not Python 3 compatible")
class testFilenameParser(unittest.TestCase):
    """Tests that filenames can be loaded with or without basename.

    The test directory contains two basenames and some decoy files with
    malformed group numbers."""
    def setUp(self):
        self.dirname = os.path.join(tempfile.gettempdir(),
                                    'files_for_testing_neo',
                                    'klustakwik/test1')
        if not os.path.exists(self.dirname):
            raise unittest.SkipTest('data directory does not exist: ' +
                                    self.dirname)

    def test1(self):
        """Tests that files can be loaded by basename"""
        kio = KlustaKwikIO(filename=os.path.join(self.dirname, 'basename'))
        if not BaseTestIO.use_network:
            raise unittest.SkipTest("Requires download of data from the web")
        fetfiles = kio._fp.read_filenames('fet')

        self.assertEqual(len(fetfiles), 2)
        self.assertEqual(os.path.abspath(fetfiles[0]),
                         os.path.abspath(os.path.join(self.dirname,
                                                      'basename.fet.0')))
        self.assertEqual(os.path.abspath(fetfiles[1]),
                         os.path.abspath(os.path.join(self.dirname,
                                                      'basename.fet.1')))

    def test2(self):
        """Tests that files are loaded even without basename"""
        pass

        # this test is in flux, should probably have it default to
        # basename = os.path.split(dirname)[1] when dirname is a directory
        #~ dirname = os.path.normpath('./files_for_tests/klustakwik/test1')
        #~ kio = KlustaKwikIO(filename=dirname)
        #~ fetfiles = kio._fp.read_filenames('fet')

        #~ # It will just choose one of the two basenames, depending on which
        #~ # is first, so just assert that it did something without error.
        #~ self.assertNotEqual(len(fetfiles), 0)

    def test3(self):
        """Tests that files can be loaded by basename2"""
        kio = KlustaKwikIO(filename=os.path.join(self.dirname, 'basename2'))
        if not BaseTestIO.use_network:
            raise unittest.SkipTest("Requires download of data from the web")
        clufiles = kio._fp.read_filenames('clu')

        self.assertEqual(len(clufiles), 1)
        self.assertEqual(os.path.abspath(clufiles[1]),
                         os.path.abspath(os.path.join(self.dirname,
                                                      'basename2.clu.1')))


@unittest.skipUnless(HAVE_MLAB, "requires matplotlib")
@unittest.skipIf(sys.version_info[0] > 2, "not Python 3 compatible")
class testRead(unittest.TestCase):
    """Tests that data can be read from KlustaKwik files"""
    def setUp(self):
        self.dirname = os.path.join(tempfile.gettempdir(),
                                    'files_for_testing_neo',
                                    'klustakwik/test2')
        if not os.path.exists(self.dirname):
            raise unittest.SkipTest('data directory does not exist: ' +
                                    self.dirname)

    def test1(self):
        """Tests that data and metadata are read correctly"""
        kio = KlustaKwikIO(filename=os.path.join(self.dirname, 'base'),
                           sampling_rate=1000.)
        block = kio.read()[0]
        seg = block.segments[0]
        self.assertEqual(len(seg.spiketrains), 4)

        for st in seg.spiketrains:
            self.assertEqual(st.units, np.array(1.0) * pq.s)
            self.assertEqual(st.t_start, 0.0)

        self.assertEqual(seg.spiketrains[0].name, 'unit 1 from group 0')
        self.assertEqual(seg.spiketrains[0].annotations['cluster'], 1)
        self.assertEqual(seg.spiketrains[0].annotations['group'], 0)
        self.assertTrue(np.all(seg.spiketrains[0].times == np.array([.100,
                                                                     .200])))

        self.assertEqual(seg.spiketrains[1].name, 'unit 2 from group 0')
        self.assertEqual(seg.spiketrains[1].annotations['cluster'], 2)
        self.assertEqual(seg.spiketrains[1].annotations['group'], 0)
        self.assertEqual(seg.spiketrains[1].t_start, 0.0)
        self.assertTrue(np.all(seg.spiketrains[1].times == np.array([.305])))

        self.assertEqual(seg.spiketrains[2].name, 'unit -1 from group 1')
        self.assertEqual(seg.spiketrains[2].annotations['cluster'], -1)
        self.assertEqual(seg.spiketrains[2].annotations['group'], 1)
        self.assertEqual(seg.spiketrains[2].t_start, 0.0)
        self.assertTrue(np.all(seg.spiketrains[2].times == np.array([.253])))

        self.assertEqual(seg.spiketrains[3].name, 'unit 2 from group 1')
        self.assertEqual(seg.spiketrains[3].annotations['cluster'], 2)
        self.assertEqual(seg.spiketrains[3].annotations['group'], 1)
        self.assertEqual(seg.spiketrains[3].t_start, 0.0)
        self.assertTrue(np.all(seg.spiketrains[3].times == np.array([.050,
                                                                     .152])))

    def test2(self):
        """Checks that cluster id autosets to 0 without clu file"""
        kio = KlustaKwikIO(filename=os.path.join(self.dirname, 'base2'),
                           sampling_rate=1000.)
        block = kio.read()[0]
        seg = block.segments[0]
        self.assertEqual(len(seg.spiketrains), 1)
        self.assertEqual(seg.spiketrains[0].name, 'unit 0 from group 5')
        self.assertEqual(seg.spiketrains[0].annotations['cluster'], 0)
        self.assertEqual(seg.spiketrains[0].annotations['group'], 5)
        self.assertEqual(seg.spiketrains[0].t_start, 0.0)
        self.assertTrue(np.all(seg.spiketrains[0].times == np.array([0.026,
                                                                     0.122,
                                                                     0.228])))


@unittest.skipUnless(HAVE_MLAB, "requires matplotlib")
@unittest.skipIf(sys.version_info[0] > 2, "not Python 3 compatible")
class testWrite(unittest.TestCase):
    def setUp(self):
        self.dirname = os.path.join(tempfile.gettempdir(),
                                    'files_for_testing_neo',
                                    'klustakwik/test3')
        if not os.path.exists(self.dirname):
            raise unittest.SkipTest('data directory does not exist: ' +
                                    self.dirname)

    def test1(self):
        """Create clu and fet files based on spiketrains in a block.

        Checks that
            Files are created
            Converted to samples correctly
            Missing sampling rate are taken from IO reader default
            Spiketrains without cluster info are assigned to cluster 0
            Spiketrains across segments are concatenated
        """
        block = neo.Block()
        segment = neo.Segment()
        segment2 = neo.Segment()
        block.segments.append(segment)
        block.segments.append(segment2)

        # Fake spiketrain 1, will be sorted
        st1 = neo.SpikeTrain(times=[.002, .004, .006], units='s', t_stop=1.)
        st1.annotations['cluster'] = 0
        st1.annotations['group'] = 0
        segment.spiketrains.append(st1)

        # Fake spiketrain 1B, on another segment. No group specified,
        # default is 0.
        st1B = neo.SpikeTrain(times=[.106], units='s', t_stop=1.)
        st1B.annotations['cluster'] = 0
        segment2.spiketrains.append(st1B)

        # Fake spiketrain 2 on same group, no sampling rate specified
        st2 = neo.SpikeTrain(times=[.001, .003, .011], units='s', t_stop=1.)
        st2.annotations['cluster'] = 1
        st2.annotations['group'] = 0
        segment.spiketrains.append(st2)

        # Fake spiketrain 3 on new group, with different sampling rate
        st3 = neo.SpikeTrain(times=[.05, .09, .10], units='s', t_stop=1.)
        st3.annotations['cluster'] = -1
        st3.annotations['group'] = 1
        segment.spiketrains.append(st3)

        # Fake spiketrain 4 on new group, without cluster info
        st4 = neo.SpikeTrain(times=[.005, .009], units='s', t_stop=1.)
        st4.annotations['group'] = 2
        segment.spiketrains.append(st4)

        # Create empty directory for writing
        delete_test_session()

        # Create writer with default sampling rate
        kio = KlustaKwikIO(filename=os.path.join(self.dirname, 'base1'),
                           sampling_rate=1000.)
        kio.write_block(block)

        # Check files were created
        for fn in ['.fet.0', '.fet.1', '.clu.0', '.clu.1']:
            self.assertTrue(os.path.exists(os.path.join(self.dirname,
                                                        'base1' + fn)))

        # Check files contain correct content
        # Spike times on group 0
        data = file(os.path.join(self.dirname, 'base1.fet.0')).readlines()
        data = [int(d) for d in data]
        self.assertEqual(data, [0, 2, 4, 6, 1, 3, 11, 106])

        # Clusters on group 0
        data = file(os.path.join(self.dirname, 'base1.clu.0')).readlines()
        data = [int(d) for d in data]
        self.assertEqual(data, [2, 0, 0, 0, 1, 1, 1, 0])

        # Spike times on group 1
        data = file(os.path.join(self.dirname, 'base1.fet.1')).readlines()
        data = [int(d) for d in data]
        self.assertEqual(data, [0, 50, 90, 100])

        # Clusters on group 1
        data = file(os.path.join(self.dirname, 'base1.clu.1')).readlines()
        data = [int(d) for d in data]
        self.assertEqual(data, [1, -1, -1, -1])

        # Spike times on group 2
        data = file(os.path.join(self.dirname, 'base1.fet.2')).readlines()
        data = [int(d) for d in data]
        self.assertEqual(data, [0, 5, 9])

        # Clusters on group 2
        data = file(os.path.join(self.dirname, 'base1.clu.2')).readlines()
        data = [int(d) for d in data]
        self.assertEqual(data, [1, 0, 0])

        # Empty out test session again
        delete_test_session()


@unittest.skipUnless(HAVE_MLAB, "requires matplotlib")
@unittest.skipIf(sys.version_info[0] > 2, "not Python 3 compatible")
class testWriteWithFeatures(unittest.TestCase):
    def setUp(self):
        self.dirname = os.path.join(tempfile.gettempdir(),
                                    'files_for_testing_neo',
                                    'klustakwik/test4')
        if not os.path.exists(self.dirname):
            raise unittest.SkipTest('data directory does not exist: ' +
                                    self.dirname)

    def test1(self):
        """Create clu and fet files based on spiketrains in a block.

        Checks that
            Files are created
            Converted to samples correctly
            Missing sampling rate are taken from IO reader default
            Spiketrains without cluster info are assigned to cluster 0
            Spiketrains across segments are concatenated
        """
        block = neo.Block()
        segment = neo.Segment()
        segment2 = neo.Segment()
        block.segments.append(segment)
        block.segments.append(segment2)

        # Fake spiketrain 1
        st1 = neo.SpikeTrain(times=[.002, .004, .006], units='s', t_stop=1.)
        st1.annotations['cluster'] = 0
        st1.annotations['group'] = 0
        wff = np.array([
            [11.3, 0.2],
            [-0.3, 12.3],
            [3.0, -2.5]])
        st1.annotations['waveform_features'] = wff
        segment.spiketrains.append(st1)

        # Create empty directory for writing
        if not os.path.exists(self.dirname):
            os.mkdir(self.dirname)
        delete_test_session(self.dirname)

        # Create writer
        kio = KlustaKwikIO(filename=os.path.join(self.dirname, 'base2'),
                           sampling_rate=1000.)
        kio.write_block(block)

        # Check files were created
        for fn in ['.fet.0', '.clu.0']:
            self.assertTrue(os.path.exists(os.path.join(self.dirname,
                                                        'base2' + fn)))

        # Check files contain correct content
        fi = file(os.path.join(self.dirname, 'base2.fet.0'))

        # first line is nbFeatures
        self.assertEqual(fi.readline(), '2\n')

        # Now check waveforms and times are same
        data = fi.readlines()
        new_wff = []
        new_times = []
        for line in data:
            line_split = line.split()
            new_wff.append([float(val) for val in line_split[:-1]])
            new_times.append(int(line_split[-1]))
        self.assertEqual(new_times, [2, 4, 6])
        assert_arrays_almost_equal(wff, np.array(new_wff), .00001)

        # Clusters on group 0
        data = file(os.path.join(self.dirname, 'base2.clu.0')).readlines()
        data = [int(d) for d in data]
        self.assertEqual(data, [1, 0, 0, 0])

        # Now read the features and test same
        block = kio.read_block()
        train = block.segments[0].spiketrains[0]
        assert_arrays_almost_equal(wff, train.annotations['waveform_features'],
                                   .00001)

        # Empty out test session again
        delete_test_session(self.dirname)


@unittest.skipUnless(HAVE_MLAB, "requires matplotlib")
@unittest.skipIf(sys.version_info[0] > 2, "not Python 3 compatible")
class CommonTests(BaseTestIO, unittest.TestCase):
    ioclass = KlustaKwikIO

    # These are the files it tries to read and test for compliance
    files_to_test = [
        'test2/base',
        'test2/base2',
        ]

    # Will fetch from g-node if they don't already exist locally
    # How does it know to do this before any of the other tests?
    files_to_download = [
        'test1/basename.clu.0',
        'test1/basename.fet.-1',
        'test1/basename.fet.0',
        'test1/basename.fet.1',
        'test1/basename.fet.1a',
        'test1/basename.fet.a1',
        'test1/basename2.clu.1',
        'test1/basename2.fet.1',
        'test1/basename2.fet.1a',
        'test2/base2.fet.5',
        'test2/base.clu.0',
        'test2/base.clu.1',
        'test2/base.fet.0',
        'test2/base.fet.1',
        'test3/base1.clu.0',
        'test3/base1.clu.1',
        'test3/base1.clu.2',
        'test3/base1.fet.0',
        'test3/base1.fet.1',
        'test3/base1.fet.2'
        ]


def delete_test_session(dirname=None):
    """Removes all file in directory so we can test writing to it"""
    if dirname is None:
        dirname = os.path.join(os.path.dirname(__file__),
                               'files_for_tests/klustakwik/test3')
    for fi in glob.glob(os.path.join(dirname, '*')):
        os.remove(fi)


if __name__ == '__main__':
    unittest.main()