/usr/share/pyshared/txzmq/pubsub.py is in python-txzmq 0.6.2-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 | """
ZeroMQ PUB-SUB wrappers.
"""
from zmq.core import constants
from txzmq.connection import ZmqConnection
class ZmqPubConnection(ZmqConnection):
"""
Publishing in broadcast manner.
"""
socketType = constants.PUB
def publish(self, message, tag=''):
"""
Broadcast L{message} with specified L{tag}.
@param message: message data
@type message: C{str}
@param tag: message tag
@type tag: C{str}
"""
self.send(tag + '\0' + message)
class ZmqSubConnection(ZmqConnection):
"""
Subscribing to messages.
"""
socketType = constants.SUB
def subscribe(self, tag):
"""
Subscribe to messages with specified tag (prefix).
@param tag: message tag
@type tag: C{str}
"""
self.socket_set(constants.SUBSCRIBE, tag)
def unsubscribe(self, tag):
"""
Unsubscribe from messages with specified tag (prefix).
@param tag: message tag
@type tag: C{str}
"""
self.socket_set(constants.UNSUBSCRIBE, tag)
def messageReceived(self, message):
"""
Called on incoming message from ZeroMQ.
@param message: message data
"""
if len(message) == 2:
# compatibility receiving of tag as first part
# of multi-part message
self.gotMessage(message[1], message[0])
else:
self.gotMessage(*reversed(message[0].split('\0', 1)))
def gotMessage(self, message, tag):
"""
Called on incoming message recevied by subscriber
@param message: message data
@param tag: message tag
"""
raise NotImplementedError(self)
|