/usr/share/phatch/phatch/lib/events.py is in phatch-cli 0.2.7.1-3.1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 | # Copyright (C) 2007-2008 www.stani.be
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see http://www.gnu.org/licenses/
# Follows PEP8
"""The aim of this library is to abstract pubsub."""
#check this for the console version (wx should dissappear)
try:
from other.pubsub import ALL_TOPICS, Publisher
except ImportError:
from wx.lib.pubsub import ALL_TOPICS, Publisher
#---Send
class SendListener:
def __init__(self, topic=ALL_TOPICS):
self.topic = topic
def __call__(self, *args, **keyw):
data = (args, keyw) # pack (see ReceiveListener.__call__)
return Publisher().sendMessage(self.topic, data)
class Sender:
def __getattr__(self, topic):
return SendListener(topic)
send = Sender()
#---Receive
def subscribe(method, obj):
Publisher().subscribe(method, getattr(obj, method))
class ReceiveListener:
def __init__(self, obj, method):
self.method = getattr(obj, method)
def __call__(self, message):
args, keyw = message.data # unpack (see SendListener.__call__)
return self.method(*args, **keyw)
class Receiver:
def __init__(self, name):
self._pubsub_name = name
self._listeners = []
def subscribe(self, method):
"""Subscribe with some class magic.
Example: self.subscribe('error') -> subscribe('frame.error')
Afterwars you can call it with send.frame_error()"""
listener = ReceiveListener(self, method)
self._listeners.append(listener)
Publisher().subscribe(listener, '%s_%s' % (self._pubsub_name, method))
def unsubscribe(self, method):
"""Subscribe with some class magic.
Example: self.subscribe('error') -> subscribe('frame.error')"""
listener = ReceiveListener(self, method)
self._listeners.remove(listener)
Publisher().unsubscribe(listener,
'%s_%s' % (self._pubsub_name, method))
def unsubscribe_all(self):
for listener in self._listeners:
Publisher().unsubscribe(listener)
self._listeners = []
def example():
import sys
class Test(Receiver):
def __init__(self):
#register an instance
Receiver.__init__(self, 'test')
#register the method send.test_write -> self.write
self.subscribe('write')
self.phrase = 'planet'
def write(self, phrase, error):
sys.stdout.write(phrase + '\n')
sys.stderr.write(error)
self.phrase = phrase
demo = Test()
phrase = 'hello world'
send.test_write(phrase, error='(No error.)')
assert demo.phrase == phrase
if __name__ == '__main__':
example()
|