/usr/share/pyshared/neo/io/brainwaref32io.py is in python-neo 0.3.3-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 | # -*- coding: utf-8 -*-
'''
Class for reading from Brainware F32 files
F32 files are simplified binary files for holding spike data. Unlike SRC
files, F32 files carry little metadata. This also means, however, that the
file format does not change, unlike SRC files whose format changes periodically
(although ideally SRC files are backwards-compatible).
Each F32 file only holds a single Block.
The only metadata stored in the file is the length of a single repetition
of the stimulus and the values of the stimulus parameters (but not the names
of the parameters).
Brainware was developed by Dr. Jan Schnupp and is availabe from
Tucker Davis Technologies, Inc.
http://www.tdt.com/downloads.htm
Neither Dr. Jan Schnupp nor Tucker Davis Technologies, Inc. had any part in the
development of this code
The code is implemented with the permission of Dr. Jan Schnupp
Author: Todd Jennings
'''
# needed for python 3 compatibility
from __future__ import absolute_import, division, print_function
# import needed core python modules
from os import path
# numpy and quantities are already required by neo
import numpy as np
import quantities as pq
# needed core neo modules
from neo.core import Block, RecordingChannelGroup, Segment, SpikeTrain, Unit
# need to subclass BaseIO
from neo.io.baseio import BaseIO
# some tools to finalize the hierachy
from neo.io.tools import create_many_to_one_relationship
class BrainwareF32IO(BaseIO):
'''
Class for reading Brainware Spike ReCord files with the extension '.f32'
The read_block method returns the first Block of the file. It will
automatically close the file after reading.
The read method is the same as read_block.
The read_all_blocks method automatically reads all Blocks. It will
automatically close the file after reading.
The read_next_block method will return one Block each time it is called.
It will automatically close the file and reset to the first Block
after reading the last block.
Call the close method to close the file and reset this method
back to the first Block.
The isopen property tells whether the file is currently open and
reading or closed.
Note 1:
There is always only one RecordingChannelGroup. BrainWare stores the
equivalent of RecordingChannelGroups in separate files.
Usage:
>>> from neo.io.brainwaref32io import BrainwareF32IO
>>> f32file = BrainwareF32IO(filename='multi_500ms_mulitrep_ch1.f32')
>>> blk1 = f32file.read()
>>> blk2 = f32file.read_block()
>>> print blk1.segments
>>> print blk1.segments[0].spiketrains
>>> print blk1.units
>>> print blk1.units[0].name
>>> print blk2
>>> print blk2[0].segments
'''
is_readable = True # This class can only read data
is_writable = False # write is not supported
# This class is able to directly or indirectly handle the following objects
# You can notice that this greatly simplifies the full Neo object hierarchy
supported_objects = [Block, RecordingChannelGroup,
Segment, SpikeTrain, Unit]
readable_objects = [Block]
writeable_objects = []
has_header = False
is_streameable = False
# This is for GUI stuff: a definition for parameters when reading.
# This dict should be keyed by object (`Block`). Each entry is a list
# of tuple. The first entry in each tuple is the parameter name. The
# second entry is a dict with keys 'value' (for default value),
# and 'label' (for a descriptive name).
# Note that if the highest-level object requires parameters,
# common_io_test will be skipped.
read_params = {Block: [],
RecordingChannelGroup: [],
Segment: [],
SpikeTrain: [],
Unit: [],
}
# does not support write so no GUI stuff
write_params = None
name = 'Brainware F32 File'
extensions = ['f32']
mode = 'file'
def __init__(self, filename=None):
'''
Arguments:
filename: the filename
'''
BaseIO.__init__(self)
self._path = filename
self._filename = path.basename(filename)
self._fsrc = None
self.__lazy = False
self._blk = None
self.__unit = None
self.__t_stop = None
self.__params = None
self.__seg = None
self.__spiketimes = None
def read(self, lazy=False, cascade=True, **kargs):
'''
Reads simple spike data file "fname" generated with BrainWare
'''
return self.read_block(lazy=lazy, cascade=cascade)
def read_block(self, lazy=False, cascade=True, **kargs):
'''
Reads a block from the simple spike data file "fname" generated
with BrainWare
'''
# there are no keyargs implemented to so far. If someone tries to pass
# them they are expecting them to do something or making a mistake,
# neither of which should pass silently
if kargs:
raise NotImplementedError('This method does not have any '
'argument implemented yet')
self._fsrc = None
self.__lazy = lazy
self._blk = Block(file_origin=self._filename)
block = self._blk
# if we aren't doing cascade, don't load anything
if not cascade:
return block
# create the objects to store other objects
rcg = RecordingChannelGroup(file_origin=self._filename)
self.__unit = Unit(file_origin=self._filename)
# load objects into their containers
block.recordingchannelgroups.append(rcg)
rcg.units.append(self.__unit)
# initialize values
self.__t_stop = None
self.__params = None
self.__seg = None
self.__spiketimes = None
# open the file
with open(self._path, 'rb') as self._fsrc:
res = True
# while the file is not done keep reading segments
while res:
res = self.__read_id()
create_many_to_one_relationship(block)
# cleanup attributes
self._fsrc = None
self.__lazy = False
self._blk = None
self.__t_stop = None
self.__params = None
self.__seg = None
self.__spiketimes = None
return block
# -------------------------------------------------------------------------
# -------------------------------------------------------------------------
# IMPORTANT!!!
# These are private methods implementing the internal reading mechanism.
# Due to the way BrainWare DAM files are structured, they CANNOT be used
# on their own. Calling these manually will almost certainly alter your
# position in the file in an unrecoverable manner, whether they throw
# an exception or not.
# -------------------------------------------------------------------------
# -------------------------------------------------------------------------
def __read_id(self):
'''
Read the next ID number and do the appropriate task with it.
Returns nothing.
'''
try:
# float32 -- ID of the first data sequence
objid = np.fromfile(self._fsrc, dtype=np.float32, count=1)[0]
except IndexError:
# if we have a previous segment, save it
self.__save_segment()
# if there are no more Segments, return
return False
if objid == -2:
self.__read_condition()
elif objid == -1:
self.__read_segment()
else:
self.__spiketimes.append(objid)
return True
def __read_condition(self):
'''
Read the parameter values for a single stimulus condition.
Returns nothing.
'''
# float32 -- SpikeTrain length in ms
self.__t_stop = np.fromfile(self._fsrc, dtype=np.float32, count=1)[0]
# float32 -- number of stimulus parameters
numelements = int(np.fromfile(self._fsrc, dtype=np.float32,
count=1)[0])
# [float32] * numelements -- stimulus parameter values
paramvals = np.fromfile(self._fsrc, dtype=np.float32,
count=numelements).tolist()
# organize the parameers into a dictionary with arbitrary names
paramnames = ['Param%s' % i for i in range(len(paramvals))]
self.__params = dict(zip(paramnames, paramvals))
def __read_segment(self):
'''
Setup the next Segment.
Returns nothing.
'''
# if we have a previous segment, save it
self.__save_segment()
# create the segment
self.__seg = Segment(file_origin=self._filename,
**self.__params)
# create an empy array to save the spike times
# this needs to be converted to a SpikeTrain before it can be used
self.__spiketimes = []
def __save_segment(self):
'''
Write the segment to the Block if it exists
'''
# if this is the beginning of the first condition, then we don't want
# to save, so exit
# but set __seg from None to False so we know next time to create a
# segment even if there are no spike in the condition
if self.__seg is None:
self.__seg = False
return
if not self.__seg:
# create dummy values if there are no SpikeTrains in this condition
self.__seg = Segment(file_origin=self._filename,
**self.__params)
self.__spiketimes = []
if self.__lazy:
train = SpikeTrain(pq.Quantity([], dtype=np.float32,
units=pq.ms),
t_start=0*pq.ms, t_stop=self.__t_stop * pq.ms,
file_origin=self._filename)
train.lazy_shape = len(self.__spiketimes)
else:
times = pq.Quantity(self.__spiketimes, dtype=np.float32,
units=pq.ms)
train = SpikeTrain(times,
t_start=0*pq.ms, t_stop=self.__t_stop * pq.ms,
file_origin=self._filename)
self.__seg.spiketrains = [train]
self.__unit.spiketrains.append(train)
self._blk.segments.append(self.__seg)
# set an empty segment
# from now on, we need to set __seg to False rather than None so
# that if there is a condition with no SpikeTrains we know
# to create an empty Segment
self.__seg = False
|