This file is indexed.

/usr/lib/python2.7/dist-packages/framework/subsystems/ogsmd/gsm/parser.py is in fso-frameworkd 0.10.1-3.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
#!/usr/bin/env python
"""
The Open GSM Daemon - Python Implementation

(C) 2008 Michael 'Mickey' Lauer <mlauer@vanille-media.de>
(C) 2008 Openmoko, Inc.
GPLv2 or later

Package: ogsmd.gsm
Module: parser
"""

__version__ = "0.8.5.2"

import os
DEBUG = os.environ.get( "FSO_DEBUG_PARSER", False )

#=========================================================================#
class StateBasedLowlevelAtParser( object ):
#=========================================================================#
    """
    A state machine based lowlevel AT response parser.

    Requirements:
    * Support feeding data from the channel in chunks of arbitrary lengths          [ok]
    * Support solicited and unsolicited responses (on the same channel)             [ok]
    * Support single (e.g. +CGMR) and multi requests (+CGMR;+CGMM;+CGMI;+CGSN)      [ok]
    * Handle one-line and multi-line responses                                      [ok]
    * Handle multiline unsolicited responses (e.g. +CBM)                            [ok, but kind of ugly]
    * Handle multiline requests by supporting continuation '\r\n> '                 [ok, but see NOTE]

    Todo:
    * Detect echo mode and adjust itself (or warn)
    * Handle multiline answers with empty lines (e.g. in SMS)
    * Seamless handover to binary mode parsers / data connections
    """

    def __init__( self, response, unsolicited ):
        self.response = response
        self.unsolicited = unsolicited
        self.state = self.reset()

    def reset( self, yankSolicited=True, keepPrefixes=False ):
        if yankSolicited:
            self.lines = []
        if keepPrefixes:
            self.continuationPrefixes = self.validPrefixes
        else:
            self.continuationPrefixes = set([])
        self.ulines = []
        self.curline = ""
        self.hasPdu = False
        return self.state_start

    def feed( self, bytes, haveCommand, validPrefixes ):
        # NOTE: the continuation query relies on '\r\n> ' not being
        # fragmented... question: is that always correct? If not,
        # we better keep the state. We could also enhance the signature
        # to support handing a haveContinuation parameter over to here.

        self.haveCommand = haveCommand
        self.validPrefixes = self.continuationPrefixes.union(validPrefixes)

        if bytes == "\r\n> ":
            if DEBUG: print "PARSER DEBUG: got continuation character. sending empty response"
            self.response( [] )
            self.state = self.reset( keepPrefixes=True )
            return

        for b in bytes:
            if DEBUG: print "PARSER DEBUG: [%s] feeding %s to %s" % ( ( "solicited" if self.haveCommand else "unsolicited" ), repr(b), self.state )

            nextstate = self.state( b )
            if nextstate is None:
                print "PARSER DEBUG: WARNING: UNDEFINED PARSER STATE! Do not know where to go from %s upon receiving %s" % ( self.state, repr(b) )
                print "previous bytes were:", repr(bytes)
                print "current byte is:", repr(b)
                print "lines:", repr(self.lines)
                print "curline:", repr(self.curline)
                print "solicited:", self.haveCommand
                self.state = self.reset()
                break
            else:
                self.state = nextstate

    def state_start( self, b ):
        if b == '\r':
            return self.state_start_r
        # this is unusal, but we are forgiving
        if b == '\n':
            return self.state_inline
        # this is even more unusual, but we are _really_ forgiving
        return self.state_inline( b )

    def state_start_r( self, b ):
        if b == '\n':
            return self.state_inline

    def state_inline( self, b ):
        # FIXME checking the number of " in self.curline violates
        # the state machine layer and slows down the parser.
        # We better map this to the state machine instead.
        if b not in "\r\n" or self.curline.count('"')%2:
            self.curline += b
            return self.state_inline
        else:
            if b == "\r":
                return self.state_inline_r
            # usually this should not happen, but some SMS are badly formatted
            if b == '\n':
                return self.lineCompleted()

    def state_inline_r( self, b ):
        if b == '\r':
            return self.state_inline_multipleR
        if b == '\n':
            return self.lineCompleted()

    def state_inline_multipleR( self, b ):
        if b == '\r':
            return self.state_inline_multipleR
        if b == '\n':
            return self.lineCompleted( True )

    def lineCompleted( self, multipleR = False ):
        if self.haveCommand:
            return self.solicitedLineCompleted( multipleR )
        else:
            return self.unsolicitedLineCompleted( multipleR )

    def solicitedLineCompleted( self, multipleR = False ):
        if DEBUG: print "PARSER DEBUG: [perhaps solicited] line completed, line=", repr(self.curline), "previous lines=", self.lines

        # skip empty lines [is this always legal?]
        if self.curline == "":
            return self.state_start

        if self.isTerminationLine():
            if DEBUG: print "PARSER DEBUG: [solicited] response completed"
            self.lines.append( self.curline )
            self.response( self.lines )
            return self.reset()

        elif self.hasPdu:
            self.hasPdu = False
            self.lines.append( self.curline )
            self.curline = ""
            return self.state_start

        elif self.isUnsolicitedLine():
            if DEBUG: print "PARSER DEBUG: [unsolicited] response detected within solicited"
            return self.unsolicitedLineCompleted( multipleR )

        else:
            self.hasPdu = self.isSolicitedPduLine()
            self.lines.append( self.curline )
            self.curline = ""
            return self.state_start

    def isUnsolicitedLine( self ):
        """
        Check whether the line starts with a prefix that indicates a valid response to our command.
        """
        if self.validPrefixes == []: # everything allowed
            return False
        for prefix in self.validPrefixes:
            if DEBUG: print "PARSER DEBUG: checking whether %s starts with valid prefix %s" % ( repr(self.curline), repr(prefix) )
            if prefix == "PDU":
                if self.curline[0] in "0123456789ABCDEF":
                    if DEBUG: print "PARSER DEBUG: yes; must be really solicited"
                    return False
            elif self.curline.startswith( prefix ):
                if DEBUG: print "PARSER DEBUG: yes; must be really solicited"
                return False
        if DEBUG: print "PARSER DEBUG: no match; must be unsolicited"
        return True # no prefix did match

    def isTerminationLine( self ):
        if self.curline == "OK" \
        or self.curline == "ERROR" \
        or self.curline.startswith( "+CME ERROR" ) \
        or self.curline.startswith( "+CMS ERROR" ) \
        or self.curline.startswith( "+EXT ERROR" ) \
        or self.curline.startswith( "BUSY" ) \
        or self.curline.startswith( "CONNECT" ) \
        or self.curline.startswith( "NO ANSWER" ) \
        or self.curline.startswith( "NO CARRIER" ) \
        or self.curline.startswith( "NO DIALTONE" ):
            return True
        else:
            return False

    def isUnsolicitedPduLine( self ):
        if self.curline.startswith( "+CBM:" ) \
        or self.curline.startswith( "+CDS:" ) \
        or self.curline.startswith( "+CMT:" ):
            return True
        else:
            return False

    def isSolicitedPduLine( self ):
        if self.curline.startswith( "+CMGL:" ) \
        or self.curline.startswith( "+CMGR:" ):
            return True
        else:
            return False

    def unsolicitedLineCompleted( self, multipleR = False ):
        if DEBUG: print "PARSER DEBUG: [unsolicited] line completed, line=", repr(self.curline)
        self.ulines.append( self.curline )

        if self.hasPdu:
            if DEBUG: print "PARSER DEBUG: [unsolicited] line pdu completed, sending."
            if not self.curline:
                if DEBUG: print "Empty line before PDU, ignoring"
                # We have some cases where there is an empty line before the pdu
                self.ulines.pop()
                return self.state_inline
            self.hasPdu = False
            self.unsolicited( self.ulines )
            return self.reset( False )

        # Now this is slightly suboptimal. I tried hard to prevent even more protocol knowledge
        # infecting this parser, but I can't seem to find another way to detect a multiline
        # unsolicited response. Ideally, GSM would clearly indicate whether a PDU is following
        # or not, but alas, that's not the case.
        if self.curline:
            if self.isUnsolicitedPduLine():
                if DEBUG: print "PARSER DEBUG: message has PDU, waiting for 2nd line."
                self.hasPdu = True
                self.curline = ""
                return self.state_inline
            else:
                self.unsolicited( self.ulines )
                return self.reset( False )
        else:
            if DEBUG: print "PARSER DEBUG: [unsolicited] message with empty line. Ignoring."
            return self.state_inline

#=========================================================================#
LowlevelAtParser = StateBasedLowlevelAtParser
#=========================================================================#

#=========================================================================#
class QualcommGsmViolationParser( StateBasedLowlevelAtParser ):
#=========================================================================#
    """
    The HTC modems violate v250.ter AT format specification. Although v1 is
    set, they omit the '\n' as error termination. We need to check after
    every \r whether it's a termination, since the error may not come.
    """
    def state_inline( self, b ):
        # FIXME checking the number of " in self.curline violates
        # the state machine layer and slows down the parser.
        # We better map this to the state machine instead.
        if b not in "\r\n" or self.curline.count('"')%2:
            self.curline += b
            return self.state_inline
        else:
            if b == "\r":
                if self.curline.startswith( "+CME ERROR" ) \
                or self.curline.startswith( "+CMS ERROR" ) \
                or self.curline.startswith( "+EXT ERROR" ):
                    return self.lineCompleted()
                else:
                    return self.state_inline_r
            # usually this should not happen, but some SMS are badly formatted
            if b == '\n':
                return self.lineCompleted()

#=========================================================================#
if __name__ == "__main__":
#=========================================================================#
    import sys, random, time

    responses = []
    unsoliciteds = []

    def response( chunk ):
        print "response =", repr(chunk)
        responses.append( chunk )

    def unsolicited( chunk ):
        print "unsolicited =", repr(chunk)
        unsoliciteds.append( chunk )

    p = LowlevelAtParser( response, unsolicited )

    random.seed( time.time() )

    # todo use input to read command lines
    while True:
        read = sys.stdin.read( random.randint( 5, 20 ) )
        if read == "":
            break
        else:
            p.feed( read, True )
            time.sleep( 0.01 )

    print repr(p.lines)
    print repr(responses)
    print repr(unsolicited)