/usr/lib/python2.7/dist-packages/txzmq/pubsub.py is in python-txzmq 0.8.0-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 | """
ZeroMQ PUB-SUB wrappers.
"""
from __future__ import unicode_literals
from zmq import constants
from txzmq.connection import ZmqConnection
class ZmqPubConnection(ZmqConnection):
"""
Publishing in broadcast manner.
"""
socketType = constants.PUB
def publish(self, message, tag=b''):
"""
Publish `message` with specified `tag`.
:param message: message data
:type message: str
:param tag: message tag
:type tag: str
"""
self.send(tag + b'\0' + message)
class ZmqSubConnection(ZmqConnection):
"""
Subscribing to messages published by publishers.
Subclass this class and implement :meth:`gotMessage` to handle incoming
messages.
"""
socketType = constants.SUB
def subscribe(self, tag):
"""
Subscribe to messages with specified tag (prefix).
Function may be called several times.
:param tag: message tag
:type tag: str
"""
self.socket.set(constants.SUBSCRIBE, tag)
def unsubscribe(self, tag):
"""
Unsubscribe from messages with specified tag (prefix).
Function may be called several times.
:param tag: message tag
:type tag: str
"""
self.socket.set(constants.UNSUBSCRIBE, tag)
def messageReceived(self, message):
"""
Overridden from :class:`ZmqConnection` to process
and unframe incoming messages.
All parsed messages are passed to :meth:`gotMessage`.
:param message: message data
"""
if len(message) == 2:
# compatibility receiving of tag as first part
# of multi-part message
self.gotMessage(message[1], message[0])
else:
self.gotMessage(*reversed(message[0].split(b'\0', 1)))
def gotMessage(self, message, tag):
"""
Called on incoming message recevied by subscriber.
Should be overridden to handle incoming messages.
:param message: message data
:param tag: message tag
"""
raise NotImplementedError(self)
|