/usr/lib/python2.7/dist-packages/framework/subsystems/ogsmd/gsm/parser.py is in fso-frameworkd 0.10.1-3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 | #!/usr/bin/env python
"""
The Open GSM Daemon - Python Implementation
(C) 2008 Michael 'Mickey' Lauer <mlauer@vanille-media.de>
(C) 2008 Openmoko, Inc.
GPLv2 or later
Package: ogsmd.gsm
Module: parser
"""
__version__ = "0.8.5.2"
import os
DEBUG = os.environ.get( "FSO_DEBUG_PARSER", False )
#=========================================================================#
class StateBasedLowlevelAtParser( object ):
#=========================================================================#
"""
A state machine based lowlevel AT response parser.
Requirements:
* Support feeding data from the channel in chunks of arbitrary lengths [ok]
* Support solicited and unsolicited responses (on the same channel) [ok]
* Support single (e.g. +CGMR) and multi requests (+CGMR;+CGMM;+CGMI;+CGSN) [ok]
* Handle one-line and multi-line responses [ok]
* Handle multiline unsolicited responses (e.g. +CBM) [ok, but kind of ugly]
* Handle multiline requests by supporting continuation '\r\n> ' [ok, but see NOTE]
Todo:
* Detect echo mode and adjust itself (or warn)
* Handle multiline answers with empty lines (e.g. in SMS)
* Seamless handover to binary mode parsers / data connections
"""
def __init__( self, response, unsolicited ):
self.response = response
self.unsolicited = unsolicited
self.state = self.reset()
def reset( self, yankSolicited=True, keepPrefixes=False ):
if yankSolicited:
self.lines = []
if keepPrefixes:
self.continuationPrefixes = self.validPrefixes
else:
self.continuationPrefixes = set([])
self.ulines = []
self.curline = ""
self.hasPdu = False
return self.state_start
def feed( self, bytes, haveCommand, validPrefixes ):
# NOTE: the continuation query relies on '\r\n> ' not being
# fragmented... question: is that always correct? If not,
# we better keep the state. We could also enhance the signature
# to support handing a haveContinuation parameter over to here.
self.haveCommand = haveCommand
self.validPrefixes = self.continuationPrefixes.union(validPrefixes)
if bytes == "\r\n> ":
if DEBUG: print "PARSER DEBUG: got continuation character. sending empty response"
self.response( [] )
self.state = self.reset( keepPrefixes=True )
return
for b in bytes:
if DEBUG: print "PARSER DEBUG: [%s] feeding %s to %s" % ( ( "solicited" if self.haveCommand else "unsolicited" ), repr(b), self.state )
nextstate = self.state( b )
if nextstate is None:
print "PARSER DEBUG: WARNING: UNDEFINED PARSER STATE! Do not know where to go from %s upon receiving %s" % ( self.state, repr(b) )
print "previous bytes were:", repr(bytes)
print "current byte is:", repr(b)
print "lines:", repr(self.lines)
print "curline:", repr(self.curline)
print "solicited:", self.haveCommand
self.state = self.reset()
break
else:
self.state = nextstate
def state_start( self, b ):
if b == '\r':
return self.state_start_r
# this is unusal, but we are forgiving
if b == '\n':
return self.state_inline
# this is even more unusual, but we are _really_ forgiving
return self.state_inline( b )
def state_start_r( self, b ):
if b == '\n':
return self.state_inline
def state_inline( self, b ):
# FIXME checking the number of " in self.curline violates
# the state machine layer and slows down the parser.
# We better map this to the state machine instead.
if b not in "\r\n" or self.curline.count('"')%2:
self.curline += b
return self.state_inline
else:
if b == "\r":
return self.state_inline_r
# usually this should not happen, but some SMS are badly formatted
if b == '\n':
return self.lineCompleted()
def state_inline_r( self, b ):
if b == '\r':
return self.state_inline_multipleR
if b == '\n':
return self.lineCompleted()
def state_inline_multipleR( self, b ):
if b == '\r':
return self.state_inline_multipleR
if b == '\n':
return self.lineCompleted( True )
def lineCompleted( self, multipleR = False ):
if self.haveCommand:
return self.solicitedLineCompleted( multipleR )
else:
return self.unsolicitedLineCompleted( multipleR )
def solicitedLineCompleted( self, multipleR = False ):
if DEBUG: print "PARSER DEBUG: [perhaps solicited] line completed, line=", repr(self.curline), "previous lines=", self.lines
# skip empty lines [is this always legal?]
if self.curline == "":
return self.state_start
if self.isTerminationLine():
if DEBUG: print "PARSER DEBUG: [solicited] response completed"
self.lines.append( self.curline )
self.response( self.lines )
return self.reset()
elif self.hasPdu:
self.hasPdu = False
self.lines.append( self.curline )
self.curline = ""
return self.state_start
elif self.isUnsolicitedLine():
if DEBUG: print "PARSER DEBUG: [unsolicited] response detected within solicited"
return self.unsolicitedLineCompleted( multipleR )
else:
self.hasPdu = self.isSolicitedPduLine()
self.lines.append( self.curline )
self.curline = ""
return self.state_start
def isUnsolicitedLine( self ):
"""
Check whether the line starts with a prefix that indicates a valid response to our command.
"""
if self.validPrefixes == []: # everything allowed
return False
for prefix in self.validPrefixes:
if DEBUG: print "PARSER DEBUG: checking whether %s starts with valid prefix %s" % ( repr(self.curline), repr(prefix) )
if prefix == "PDU":
if self.curline[0] in "0123456789ABCDEF":
if DEBUG: print "PARSER DEBUG: yes; must be really solicited"
return False
elif self.curline.startswith( prefix ):
if DEBUG: print "PARSER DEBUG: yes; must be really solicited"
return False
if DEBUG: print "PARSER DEBUG: no match; must be unsolicited"
return True # no prefix did match
def isTerminationLine( self ):
if self.curline == "OK" \
or self.curline == "ERROR" \
or self.curline.startswith( "+CME ERROR" ) \
or self.curline.startswith( "+CMS ERROR" ) \
or self.curline.startswith( "+EXT ERROR" ) \
or self.curline.startswith( "BUSY" ) \
or self.curline.startswith( "CONNECT" ) \
or self.curline.startswith( "NO ANSWER" ) \
or self.curline.startswith( "NO CARRIER" ) \
or self.curline.startswith( "NO DIALTONE" ):
return True
else:
return False
def isUnsolicitedPduLine( self ):
if self.curline.startswith( "+CBM:" ) \
or self.curline.startswith( "+CDS:" ) \
or self.curline.startswith( "+CMT:" ):
return True
else:
return False
def isSolicitedPduLine( self ):
if self.curline.startswith( "+CMGL:" ) \
or self.curline.startswith( "+CMGR:" ):
return True
else:
return False
def unsolicitedLineCompleted( self, multipleR = False ):
if DEBUG: print "PARSER DEBUG: [unsolicited] line completed, line=", repr(self.curline)
self.ulines.append( self.curline )
if self.hasPdu:
if DEBUG: print "PARSER DEBUG: [unsolicited] line pdu completed, sending."
if not self.curline:
if DEBUG: print "Empty line before PDU, ignoring"
# We have some cases where there is an empty line before the pdu
self.ulines.pop()
return self.state_inline
self.hasPdu = False
self.unsolicited( self.ulines )
return self.reset( False )
# Now this is slightly suboptimal. I tried hard to prevent even more protocol knowledge
# infecting this parser, but I can't seem to find another way to detect a multiline
# unsolicited response. Ideally, GSM would clearly indicate whether a PDU is following
# or not, but alas, that's not the case.
if self.curline:
if self.isUnsolicitedPduLine():
if DEBUG: print "PARSER DEBUG: message has PDU, waiting for 2nd line."
self.hasPdu = True
self.curline = ""
return self.state_inline
else:
self.unsolicited( self.ulines )
return self.reset( False )
else:
if DEBUG: print "PARSER DEBUG: [unsolicited] message with empty line. Ignoring."
return self.state_inline
#=========================================================================#
LowlevelAtParser = StateBasedLowlevelAtParser
#=========================================================================#
#=========================================================================#
class QualcommGsmViolationParser( StateBasedLowlevelAtParser ):
#=========================================================================#
"""
The HTC modems violate v250.ter AT format specification. Although v1 is
set, they omit the '\n' as error termination. We need to check after
every \r whether it's a termination, since the error may not come.
"""
def state_inline( self, b ):
# FIXME checking the number of " in self.curline violates
# the state machine layer and slows down the parser.
# We better map this to the state machine instead.
if b not in "\r\n" or self.curline.count('"')%2:
self.curline += b
return self.state_inline
else:
if b == "\r":
if self.curline.startswith( "+CME ERROR" ) \
or self.curline.startswith( "+CMS ERROR" ) \
or self.curline.startswith( "+EXT ERROR" ):
return self.lineCompleted()
else:
return self.state_inline_r
# usually this should not happen, but some SMS are badly formatted
if b == '\n':
return self.lineCompleted()
#=========================================================================#
if __name__ == "__main__":
#=========================================================================#
import sys, random, time
responses = []
unsoliciteds = []
def response( chunk ):
print "response =", repr(chunk)
responses.append( chunk )
def unsolicited( chunk ):
print "unsolicited =", repr(chunk)
unsoliciteds.append( chunk )
p = LowlevelAtParser( response, unsolicited )
random.seed( time.time() )
# todo use input to read command lines
while True:
read = sys.stdin.read( random.randint( 5, 20 ) )
if read == "":
break
else:
p.feed( read, True )
time.sleep( 0.01 )
print repr(p.lines)
print repr(responses)
print repr(unsolicited)
|