/usr/lib/python2.7/dist-packages/framework/subsystems/ogsmd/modems/ti_calypso/modem.py is in fso-frameworkd 0.10.1-3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 | #!/usr/bin/env python
"""
The Open GSM Daemon - Python Implementation
(C) 2008-2009 Michael 'Mickey' Lauer <mlauer@vanille-media.de>
(C) 2008-2009 Openmoko, Inc.
GPLv2 or later
Package: ogsmd.modems.ti_calypso
Module: modem
"""
__version__ = "0.9.9.10"
MODULE_NAME = "ogsmd.modems.ti_calypso"
DEVICE_CALYPSO_PATH = "/dev/ttySAC0"
SYSFS_CALYPSO_POWER_PATH = "/sys/bus/platform/devices/neo1973-pm-gsm.0/power_on"
SYSFS_CALYPSO_RESET_PATH = "/sys/bus/platform/devices/neo1973-pm-gsm.0/reset"
SYSFS_CALYPSO_FLOW_CONTROL_PATH = "/sys/bus/platform/devices/neo1973-pm-gsm.0/flowcontrolled"
import mediator
from framework.config import config
from framework.patterns.utilities import killall
from ogsmd.modems.abstract.modem import AbstractModem
from .channel import CallChannel, UnsolicitedResponseChannel, MiscChannel
from .unsolicited import UnsolicitedResponseDelegate
from ogsmd.gsm.channel import AtCommandChannel
from ogsmd.helpers import writeToFile
from dbus import Interface
from time import sleep
import serial
import logging
logger = logging.getLogger( MODULE_NAME )
import os
#=========================================================================#
class TiCalypso( AbstractModem ):
#=========================================================================#
def __init__( self, *args, **kwargs ):
# kernel specific paths
global SYSFS_CALYPSO_POWER_PATH
global SYSFS_CALYPSO_RESET_PATH
global SYSFS_CALYPSO_FLOW_CONTROL_PATH
kernel_release = os.uname()[2]
if kernel_release >= "2.6.32":
SYSFS_CALYPSO_POWER_PATH = "/sys/bus/platform/devices/gta02-pm-gsm.0/power_on"
SYSFS_CALYPSO_RESET_PATH = "/sys/bus/platform/devices/gta02-pm-gsm.0/reset"
SYSFS_CALYPSO_FLOW_CONTROL_PATH = "/sys/bus/platform/devices/gta02-pm-gsm.0/flowcontrolled"
logger.info( "Kernel >=2.6.32, gsm sysfs path updated" )
AbstractModem.__init__( self, *args, **kwargs )
self._channelmap = { "ogsmd.call":1, "ogsmd.unsolicited":2, "ogsmd.misc":3, "ogsmd.gprs":4 }
# VC 1
self._channels["CALL"] = CallChannel( self.pathfactory, "ogsmd.call", modem=self )
# VC 2
self._channels["UNSOL"] = UnsolicitedResponseChannel( self.pathfactory, "ogsmd.unsolicited", modem=self )
# VC 3
self._channels["MISC"] = MiscChannel( self.pathfactory, "ogsmd.misc", modem=self )
# VC 4
# FIXME pre-allocate GPRS channel for pppd?
# configure channels
self._channels["UNSOL"].setDelegate( UnsolicitedResponseDelegate( self._object, mediator ) )
# configure behaviour using special commands
self._data["cancel-outgoing-call"] = "%CHLD=I"
# muxer mode
self._muxercommand = config.getValue( "ogsmd", "ti_calypso_muxer", "gsm0710muxd" )
# muxer object
self._muxeriface = None
def _modemOn( self ):
"""
Lowlevel initialize this modem.
"""
logger.debug( "reset-cycling modem" )
writeToFile( SYSFS_CALYPSO_POWER_PATH, "0\n" )
sleep( 1 )
writeToFile( SYSFS_CALYPSO_RESET_PATH, "0\n" )
sleep( 1 )
writeToFile( SYSFS_CALYPSO_POWER_PATH, "1\n" )
sleep( 1 )
writeToFile( SYSFS_CALYPSO_RESET_PATH, "1\n" )
sleep( 1 )
writeToFile( SYSFS_CALYPSO_RESET_PATH, "0\n" )
sleep( 1 )
logger.debug( "reset cycle complete" )
device = serial.Serial()
device.port = DEVICE_CALYPSO_PATH
device.baudrate = 115200
device.rtscts = True
device.xonxoff = False
device.bytesize = serial.EIGHTBITS
device.parity = serial.PARITY_NONE
device.stopbits = serial.STOPBITS_ONE
device.timeout = 1
logger.debug( "opening port now" )
device.open()
device.write( "\0xf9\0xf9" )
device.flush()
sleep( 0.2 )
device.write( "\0x7E\0x03\0xEF\0xC3\0x01\0x70\0x7E" )
device.flush()
sleep( 0.2 )
device.write( "\r\nAT\r\n" )
device.flush()
result = device.read( 64 )
logger.debug( "got %s", repr(result) )
ok = False
for retries in xrange( 5 ):
logger.debug( "port open, sending ATE0" )
device.write( "ATE0\r\n" )
device.flush()
result = device.read( 64 )
logger.debug( "got %s", repr(result) )
if "OK" in result:
ok = True
break
device.close()
return ok
def _modemOff( self ):
device = serial.Serial()
device.port = DEVICE_CALYPSO_PATH
device.baudrate = 115200
device.rtscts = True
device.xonxoff = False
device.bytesize = serial.EIGHTBITS
device.parity = serial.PARITY_NONE
device.stopbits = serial.STOPBITS_ONE
device.timeout = 1
logger.debug( "opening port now" )
device.open()
device.write( "\0xf9\0xf9" )
device.flush()
sleep( 0.2 )
device.write( "\0x7E\0x03\0xEF\0xC3\0x01\0x70\0x7E" )
device.flush()
sleep( 0.2 )
device.write( "\r\nAT@POFF\r\n" )
device.flush()
sleep( 0.2 )
writeToFile( SYSFS_CALYPSO_POWER_PATH, "0\n" )
def close( self ): # SYNC
"""
Close modem.
Overriden for internal purposes.
"""
# call default implementation (closing all channels)
AbstractModem.close( self )
killall( self._muxercommand )
# don't let two processes kill the power
if self._muxercommand != "gsm0710muxd":
self._modemOff()
def channel( self, category ):
"""
Return proper channel.
Overridden for internal purposes.
"""
if category == "CallMediator":
return self._channels["CALL"]
elif category == "UnsolicitedMediator":
return self._channels["UNSOL"]
else:
return self._channels["MISC"]
def pathfactory( self, name ):
"""
Allocate a new channel from the MUXer.
Overridden for internal purposes.
"""
logger.info( "Requesting new channel from '%s'", self._muxercommand )
if self._muxercommand == "gsm0710muxd":
if self._muxeriface is None:
muxer = self._bus.get_object( "org.pyneo.muxer", "/org/pyneo/Muxer" )
self._muxeriface = Interface( muxer, "org.freesmartphone.GSM.MUX" )
return str( self._muxeriface.AllocChannel( name ) )
elif self._muxercommand == "fso-abyss":
if self._muxeriface is None:
muxer = self._bus.get_object( "org.freesmartphone.omuxerd", "/org/freesmartphone/GSM/Muxer" )
self._muxeriface = Interface( muxer, "org.freesmartphone.GSM.MUX" )
# power on modem
if not self._modemOn():
self._muxeriface = None
return "" # FIXME: emit error?
if not self._muxeriface.HasAutoSession():
# abyss needs an open session before we can allocate channels
self._muxeriface.OpenSession( True, 98, "serial", DEVICE_CALYPSO_PATH, 115200 )
pts, vc = self._muxeriface.AllocChannel( name, self._channelmap[name] )
return str(pts)
def dataPort( self ):
return self.pathfactory( "ogsmd.gprs" )
def prepareForSuspend( self, ok_callback, error_callback ):
"""overridden for internal purposes"""
# FIXME still no error handling here
def post_ok( ok_callback=ok_callback ):
writeToFile( SYSFS_CALYPSO_FLOW_CONTROL_PATH, "1" )
ok_callback()
AbstractModem.prepareForSuspend( self, post_ok, error_callback )
def recoverFromSuspend( self, ok_callback, error_callback ):
writeToFile( SYSFS_CALYPSO_FLOW_CONTROL_PATH, "0" )
AbstractModem.recoverFromSuspend( self, ok_callback, error_callback )
|