/usr/lib/python2.7/dist-packages/framework/subsystems/oeventsd/action.py is in fso-frameworkd 0.10.1-3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 | # -*- coding: UTF-8 -*-
"""
The freesmartphone Events Module - Python Implementation
(C) 2008 Jan 'Shoragan' Lübbe <jluebbe@lasnet.de>
(C) 2008 Guillaume 'Charlie' Chereau
(C) 2008 Openmoko, Inc.
GPLv2 or later
Package: oeventsd
Module: action
"""
__version__ = "0.2.1"
MODULE_NAME = "oeventsd.action"
from parser import AutoFunction
from framework.patterns import decorator, dbuscache
import Queue
import logging
logger = logging.getLogger( MODULE_NAME )
#============================================================================#
class Action(AutoFunction):
#============================================================================#
"""
Base class for Action objects
An action has a `trigger` and `untrigger` methods.
By default the untrigger method will do nothing, but if it make sense, the action
should define it.
"""
#
# public
#
# Fixme: the outside to inside evaluation unnecessarily
# calls theese, consuming only time
def __init__( self ):
AutoFunction.__init__( self )
logger.debug("UnamedAction : init")
def trigger( self, **kwargs ):
logger.debug("UnamedAction : trigger")
pass
def untrigger( self, **kwargs ):
logger.debug("UnamedAction : untrigger")
pass
def __repr__( self ):
return "unamed action"
#============================================================================#
class ListAction(list, Action):
#============================================================================#
"""
An action that will trigger a sequence of actions
This is basically a script.
"""
def __init__( self, actions ):
list.__init__( self, actions )
Action.__init__( self )
def trigger( self, **kwargs ):
for action in self:
action.trigger( **kwargs )
def untrigger( self, **kwargs ):
for action in self:
action.untrigger( **kwargs )
#============================================================================#
class DebugAction(Action):
#============================================================================#
"""
A special action for debugging purposes
"""
function_name = 'Debug'
def __init__(self, msg):
Action.__init__( self )
self.msg = msg
def trigger(self, **kargs):
logger.info("DebugAction : %s", self.msg)
def __repr__(self):
return "Debug(\"%s\")" % self.msg
#============================================================================#
class DBusAction(Action):
#============================================================================#
"""
A special action that will call a DBus method.
"""
def __init__(self, bus, service, obj, interface, method, *args):
Action.__init__( self )
self.bus = bus
# some arguments checking
assert isinstance(service, str)
assert isinstance(obj, str)
assert isinstance(interface, str)
assert isinstance(method, str)
self.bus = bus
self.service = service
self.obj = obj
self.interface = interface
self.method = method
self.args = args
def trigger(self, **kargs):
iface = dbuscache.dbusInterfaceForObjectWithInterface( self.service, self.obj, self.interface )
logger.info("call dbus method %s %s(%s)", self.obj, self.method, self.args)
# Get the method
method = getattr(iface, self.method)
# We make the call asynchronous, cause we don't want to block the main loop
method( *self.args, **dict( reply_handler=self.on_reply, error_handler=self.on_error ) )
logger.debug( "method called..." )
def on_reply(self, *args):
# We don't pass the reply to anything
logger.info("method %s responded: %s", self.method, args)
def on_error(self, error):
logger.error("method %s emited error: %s", self.method, error)
def __repr__(self):
return "%s(%s)" % (self.method, self.args)
#=========================================================================#
class PeekholeQueue( Queue.Queue ):
#=========================================================================#
"""
This class extends the Queue with a method to peek at the
first element without having to remove this from the queue.
"""
def peek( self ):
if self.empty():
return None
else:
return self.queue[0]
#============================================================================#
class QueuedDBusAction( DBusAction ):
#============================================================================#
q = PeekholeQueue()
def enqueue( self, method, args, kargs ):
logger.debug( "enqueing dbus call %s.%s", method, args )
relaunch = ( self.q.qsize() == 0 )
self.q.put( ( method, args, kargs ) )
if relaunch:
self.workDaQueue()
def workDaQueue( self ):
logger.debug( "working on queue: %s", self.q )
if self.q.qsize():
method, args, kargs = self.q.peek()
# async dbus call now
method( *args, **kargs )
def trigger( self, **kargs ):
iface = dbuscache.dbusInterfaceForObjectWithInterface( self.service, self.obj, self.interface )
logger.info("queued call dbus method %s %s(%s)", self.obj, self.method, self.args)
method = getattr(iface, self.method)
self.enqueue( method, self.args, dict( reply_handler=self.on_reply, error_handler=self.on_error ) )
logger.debug( "method enqueued..." )
def on_reply(self, *args):
# We don't pass the reply to anything
logger.info("signal %s responded : %s", self.method, args)
self.q.get()
self.workDaQueue()
def on_error(self, error):
logger.error("signal %s emited an error %s", self.method, error)
self.q.get()
self.workDaQueue()
|