/usr/share/pyshared/stompy/simple.py is in python-stompy 0.2.9-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 | from stompy.stomp import Stomp
from Queue import Empty
from uuid import uuid4
class TransactionError(Exception):
"""Transaction related error."""
class Client(object):
"""Simple STOMP client.
:keyword host: Hostname of the server to connect to (default:
``localhost``)
:keyword port: Port of the server to connect to (default: ``61613``)
Example
>>> from stompy.simple import Client
>>> stomp = Client()
>>> stomp.connect()
>>> stomp.put("The quick brown fox...", destination="/queue/test")
>>> stomp.subscribe("/queue/test")
>>> message = stomp.get_nowait()
>>> message.body
'The quick brown fox...'
>>> stomp.ack(message)
>>> stomp.unsubscribe("/queue/test")
>>> stomp.disconnect()
"""
Empty = Empty
def __init__(self, host="localhost", port=61613):
self.stomp = Stomp(host, port)
self._current_transaction = None
def get(self, block=True, callback=None):
"""Get message.
:keyword block: Block if necessary until an item is available.
If this is ``False``, return an item if one is immediately
available, else raise the :exc:`Empty` exception.
:keyword callback: Optional function to execute when message recieved.
:raises Empty: If ``block`` is off and no message was receied.
"""
frame = self.stomp.receive_frame(nonblocking=not block, callback=callback)
if frame is None and not block:
raise self.Empty()
return frame
def get_nowait(self):
"""Remove and return an item from the queue without blocking.
Only get an item if one is immediately available. Otherwise
raise the :exc:`Empty` exception.
See :meth:`get`.
"""
return self.get(block=False)
def put(self, item, destination, persistent=True, conf=None):
"""Put an item into the queue.
:param item: Body of the message.
:param destination: Destination queue.
:keyword persistent: Is message persistent? (store on disk).
:keyword conf: Extra headers to send to the broker.
:returns: The resulting :class:`stompy.frame.Frame` instance.
"""
persistent = "true" if persistent else "false"
conf = self._make_conf(conf, body=item, destination=destination,
persistent=persistent)
return self.stomp.send(conf)
def connect(self, username=None, password=None, clientid=None):
"""Connect to the broker.
:keyword username: Username for connection
:keyword password: Password for connection
:keyword clientid: Client identification for persistent connections
:raises :exc:`stompy.stomp.ConnectionError`:
if the connection was unsuccessful.
:raises :exc:`stompy.stomp.ConnectionTimeoutError`:
if the connection timed out.
"""
self.stomp.connect(username=username, password=password, clientid=clientid)
def disconnect(self):
"""Disconnect from the broker."""
self.stomp.disconnect()
def subscribe(self, destination, ack="auto", conf=None):
"""Subscribe to topic/queue.
:param destination: The destination queue/topic to subscribe to.
:keyword ack: How to handle acknowledgment, either
``auto`` - ack is handled by the server automatically, or
``client`` - ack is handled by the client itself by calling
:meth:`ack`.
:keyword conf: Additional headers to send with the subscribe request.
"""
conf = self._make_conf(conf, destination=destination, ack=ack)
return self.stomp.subscribe(conf)
def unsubscribe(self, destination, conf=None):
"""Unsubscribe from topic/queue previously subscribed to.
:param destination: The destination queue/topic to unsubscribe from.
:keyword conf: Additional headers to send with the unsubscribe
request.
"""
conf = self._make_conf(conf, destination=destination)
return self.stomp.unsubscribe(conf)
def begin(self, transaction):
"""Begin transaction.
Every :meth:`ack` and :meth:`send` will be affected by this
transaction and won't be real until a :meth:`commit` is issued.
To roll-back any changes since the transaction started use
:meth:`abort`.
"""
if self._current_transaction:
raise TransactionError(
"Already in transaction. Please commit or abort first!")
self._current_transaction = str(uuid4())
return self.stomp.begin({"transaction": self._current_transaction})
def commit(self, transaction):
"""Commit current transaction."""
if not self._current_transaction:
raise TransactionError("Not in transaction")
self.stomp.commit({"transaction": self._current_transaction})
self._current_transaction = None
def abort(self):
"""Roll-back current transaction."""
if not self._current_transaction:
raise TransactionError("Not in transaction")
self.stomp.abort({"transaction": self._current_transaction})
self._current_transaction = None
def ack(self, frame):
"""Acknowledge message.
:param frame: The message to acknowledge.
"""
return self.stomp.ack(frame)
def _make_conf(self, conf, **kwargs):
kwargs.update(dict(conf or {}))
if self._current_transaction:
conf["transaction"] = self._current_transaction
return kwargs
|