/usr/lib/python2.7/dist-packages/framework/patterns/loophole.py is in fso-frameworkd 0.10.1-2ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 | #!/usr/bin/env python
"""
freesmartphone.org Framework Daemon
(C) 2008-2009 Michael 'Mickey' Lauer <mlauer@vanille-media.de>
GPLv2 or later
Package: framework.patterns
Module: loophole
"""
__version__ = "0.2.0"
import SocketServer
import thread
import code
import sys
import gobject
gobject.threads_init()
if __debug__:
class logger():
@staticmethod
def debug( message ):
print message
else:
import logging
logger = logging.getLogger( "mppl.loophole" )
#============================================================================#
class Writer( object ):
#============================================================================#
def __init__( self, delegate ):
self.delegate = delegate
def write( self, data ):
self.delegate( data )
#============================================================================#
class NetworkInterpreterConsole( code.InteractiveConsole ):
#============================================================================#
def __init__( self, request, locals_, *args, **kwargs ):
self.request = request
self.exitflag = False
code.InteractiveConsole.__init__( self, locals_, *args, **kwargs )
self.oldstdout = sys.stdout
sys.stdout = Writer( self.write )
def raw_input( self, prompt ):
self.request.send( prompt )
print >>self.oldstdout, "waiting for data..."
data = self.request.recv( 1024 )
print >>self.oldstdout, "got data '%s'" % repr(data)
if data == "\x04":
raise EOFError
# omit trailing line terminators
if data.endswith( "\r\n" ):
command = data[:-2]
elif data.endswith( "\n" ):
command = data[:-1]
else:
command = data
return command
def close( self ):
self.exitflag = True
sys.stdout = self.oldstdout
del self.oldstdout
def write( self, data ):
if not self.exitflag:
self.request.send( data )
#============================================================================#
class InterpreterRequestHandler( SocketServer.BaseRequestHandler ):
#============================================================================#
"""
Request Handler
"""
interpreters = {}
def setup( self ):
try:
self.myinterpreter = self.interpreters[self.client_address]
except KeyError:
self.myinterpreter = self.interpreters[self.client_address] = NetworkInterpreterConsole( self.request, self.locals_ )
def handle( self ):
self.myinterpreter.interact()
def finish( self ):
self.myinterpreter.close()
self.request.send( "Bye %s\n" % str( self.client_address ) )
self.request.close()
#============================================================================#
class LoopHole( object ):
#============================================================================#
def __init__( self, locals_ = {} ):
InterpreterRequestHandler.locals_ = locals_
self.server = SocketServer.ThreadingTCPServer( ( "", 8822 ), InterpreterRequestHandler )
thread.start_new_thread( self.run, () )
def run( self, *args, **kwargs ):
self.server.serve_forever()
#============================================================================#
if __name__ == "__main__":
#============================================================================#
import time
l = LoopHole()
try:
while True:
time.sleep( 10 )
except KeyboardInterrupt:
pass
sys.exit( 0 )
|