/usr/share/pyshared/pyepl/eeg.py is in python-pyepl 1.1.0+git12-g365f8e3-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 | # PyEPL: eeg.py
#
# Copyright (C) 2003-2005 Michael J. Kahana
# Authors: Ian Schleifer, Per Sederberg, Aaron Geller, Josh Jacobs
# URL: http://memory.psych.upenn.edu/programming/pyepl
#
# Distributed under the terms of the GNU Lesser General Public License
# (LGPL). See the license.txt that came with this file.
"""
This module provides EEG sync pulsing and logging.
"""
from textlog import LogTrack
from hardware import addPollCallback, removePollCallback
import timing
import exceptions
import exputils
# import python modules
import random
import sys
class EEGTrack(LogTrack):
"""
This track recognizes if you are sending sync pulses via USB or
through a parallel port or if you are connected directly to the
data access (Like in the RTLinux scalp EEG setup) and
automatically starts sending pulses and logging the times of the
pulses.
"""
logExtension = ".eeglog"
def __init__(self, basename, archive = None, autoStart = True):
"""
Set up the EEGTrack.
INPUT ARGS:
basename- The name of the log.
archive- Directory to put the log.
autoStart- Whether to automatically start the service
and logging.
"""
self.eeg_files = []
self.record_mode = "N" # N: neither, S: scalp, P: pulse
self.do_log = False
self.dat_file = None
self.EEG_sync_thread = None
self.awCard = None
if not archive:
self.archive = exputils.session
else:
self.archive = archive
LogTrack.__init__(self, basename, archive, autoStart)
def startService(self):
"""
Start the EEG service, testing what type of synching to do and
opening ports as necessary.
"""
# If Darwin, then only check pulse
if sys.platform == 'darwin':
# see if can open Pulse port
try:
from hardware import AWCard
self.awCard = AWCard()
self.record_mode = "P"
except:
# both failed so neither
# report / raise something
self.record_mode = "N"
exceptions.eplWarn("Neither Scalp nor Pulse EEG syncing was initialized.")
else:
# import eeg stuph
from hardware import Parallel, EEGShmAttached, EEGShmAttach, \
EEGShmDetach, EEGRecStart, EEGRecStop, EEGGetOffset, EPLScalpEEGException
# see if can attach to shared memory
try:
EEGShmAttach()
self.record_mode = "S"
except EPLScalpEEGException:
# did not attach
# see if can open Pulse port
try:
self.parallel = Parallel()
self.record_mode = "P"
except:
# both failed so neither
# report / raise something
self.record_mode = "N"
exceptions.eplWarn("Neither Scalp nor Pulse EEG syncing was initialized.")
def stopService(self):
"""
Stop the EEG syncing.
"""
# clean up
if self.record_mode == "S":
EEGRecStop()
EEGShmDetach()
elif self.record_mode == "P":
if self.awCard: # on darwin
del self.awCard
else:
del self.parallel
def startLogging(self):
"""
Start sending synchs and logging.
"""
# start logging
LogTrack.startLogging(self)
# check record state, maybe get new file, start recording
if self.record_mode == "S":
# is scalp, so start recording to new file
self.dat_filename = 'eeg0.dat'
# make sure we have a file name that doesn't already exist...
n = 0
while self.archive.exists(self.dat_filename):
n += 1
self.dat_filename = 'eeg%d.dat' % n
self.dat_file = self.archive.createFile(self.dat_filename)
self.dat_file.close()
EEGRecStart(self.dat_file.name)
# setup scalp callback
self.last_align = timing.now()
self.align_interval = 1000
addPollCallback(self.scalpCallback)
elif self.record_mode == "P":
# is pulse, so setup pulse callback
self.last_align = timing.now()
self.align_interval = 1000
self.pulseTrain(10,"EXPSTART_")
addPollCallback(self.pulseCallback)
def newTarget(self, archive):
"""
Switch to a new archive location for this EEG log.
"""
# first update the archive that will be used for the EEG data file
self.archive = archive
LogTrack.newTarget(self, archive)
def stopLogging(self):
"""
Stop sending syncs and logging them.
"""
# clean up recording
# stop logging/ remove callbacks
removePollCallback(self.scalpCallback)
removePollCallback(self.pulseCallback)
if self.record_mode == "S":
# stop recording
EEGRecStop()
elif self.record_mode == "P":
self.pulseTrain(5,"EXPEND_")
# stop logging
LogTrack.stopLogging(self)
def scalpCallback(self):
"""
Callback to make logs using the real-time scalp interface.
"""
# is it time to do another alignment?
if timing.now() >= self.last_align + self.align_interval:
# query for offset
(timeInterval,offset) = timing.timedCall(None,
EEGGetOffset)
# get info for log message
self.logMessage("%s\t%s" % (self.dat_filename,offset),timeInterval)
# update last_align
self.last_align = timeInterval[0]
def timedPulse(self, pulseTime, pulsePrefix='', signal='', clk=None):
"""
Send a pulse and log it.
INPUT ARGS:
pulseTime- Duration of pulse.
pulsePrefix- Name to log the pulse.
"""
# see if using clock
usingClock = True
if clk is None:
# no clock, so use time
clk = timing.now()
usingClock = False
if self.awCard:
if len(signal)>0:
(timeInterval,returnValue)=timing.timedCall(clk, self.awCard.write, signal)
else:
(timeInterval,returnValue)=timing.timedCall(clk, self.awCard.allOn)
else:
if len(signal)>0:
(timeInterval,returnValue)=timing.timedCall(clk, self.parallel.setSignal, True, signal)
else:
(timeInterval,returnValue)=timing.timedCall(clk, self.parallel.setState, True)
self.logMessage("%s" % pulsePrefix+"UP",timeInterval)
# wait for the pulse time
if usingClock:
clk.delay(pulseTime)
else:
clk = clk + pulseTime
if self.awCard:
(timeInterval,returnValue)=timing.timedCall(clk, self.awCard.allOff)
else:
(timeInterval,returnValue)=timing.timedCall(clk, self.parallel.setState, False)
self.logMessage("%s" % pulsePrefix+"DN",timeInterval)
# I'm not sure when if you want this to be the start or end of the pulse
return timeInterval
def pulseTrain(self, numPulses,trainName=""):
"""
Send a train of pulses.
INPUT ARGS:
numPulses- Number of pulses in train.
trainName- Name to use in the log.
"""
#trainLen denotes the number of pulses in this train
pulseLen=10 #in milliseconds
interPulseInterval=5
self.logMessage(trainName+"TRAIN")
# set the desired pulsetime
pulseTime = timing.now()
for i in range(numPulses):
# send a pulse
(timeInterval,returnValue)=timing.timedCall(pulseTime,
self.timedPulse,
pulseLen,'TRAIN_')
# pause for the next pulse
pulseTime += interPulseInterval
def pulseCallback(self):
"""
Callback to manage sending pulses.
"""
minInterPulsetime=750
maxInterPulseTime=1250
pulseLen=10 #in milliseconds
if timing.now() >= self.last_align + self.align_interval:
timeInterval = self.timedPulse(pulseLen)
# randomize the alignment interval
self.align_interval = random.uniform(minInterPulsetime, maxInterPulseTime)
# update last_align
self.last_align = timeInterval[0]
def calcOffset(eventTime):
"""
Return a two tuple of the offset in to the file and a maximum latency value.
The offset into the file is a string representation of "FILENAME\tOffset".
The maximum latency is simply a number in ms.
"""
# keep the last
startTime = None
endTime = None
for (timeStamp,withinTick,logMessage) in self:
if timeStamp[0] >= eventTime[0]:
# we have passed the time
endTime = timeStamp
endLog = logMessage
break
else:
# save as last time
startTime = timeStamp
startLog = logMessage
# set defaults
filename = ""
offset = ""
ml = 0
# see if found
if startTime is not None and endTime is not None:
# get the filenames and offsets
[startFile,startOffset] = startLog.split('\t')
[endFile,endOffset] = startLog.split('\t')
if startFile == endFile:
filename = startFile
# calc the slope
slope = float(endOffset - startOffset)/float(endTime[0] - startTime[0])
# calc the offset
offset = (slope * (eventTime[0] - startTime[0]) + startOffset)
# eventually, we'll add in a maximum latency in the offset
# using the ml values from the start and end times. Thus,
# we'll calculate four lines, return the min value as the
# offset and the max-min as the latency in samples
# return what we gots
return ("%s\t%s" % (filename,offset),ml)
|