/usr/lib/ruby/1.8/bunny/exchange09.rb is in libbunny-ruby1.8 0.6.2-3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 | module Bunny
=begin rdoc
=== DESCRIPTION:
*Exchanges* are the routing and distribution hub of AMQP. All messages that Bunny sends
to an AMQP broker/server _have_ to pass through an exchange in order to be routed to a
destination queue. The AMQP specification defines the types of exchange that you can create.
At the time of writing there are four (4) types of exchange defined -
* <tt>:direct</tt>
* <tt>:fanout</tt>
* <tt>:topic</tt>
* <tt>:headers</tt>
AMQP-compliant brokers/servers are required to provide default exchanges for the _direct_ and
_fanout_ exchange types. All default exchanges are prefixed with <tt>'amq.'</tt>, for example -
* <tt>amq.direct</tt>
* <tt>amq.fanout</tt>
* <tt>amq.topic</tt>
* <tt>amq.match</tt> or <tt>amq.headers</tt>
If you want more information about exchanges, please consult the documentation for your
target broker/server or visit the {AMQP website}[http://www.amqp.org] to find the version of the
specification that applies to your target broker/server.
=end
class Exchange09
attr_reader :client, :type, :name, :opts, :key
def initialize(client, name, opts = {})
# check connection to server
raise Bunny::ConnectionError, 'Not connected to server' if client.status == :not_connected
@client, @name, @opts = client, name, opts
# set up the exchange type catering for default names
if name.match(/^amq\./)
new_type = name.sub(/amq\./, '')
# handle 'amq.match' default
new_type = 'headers' if new_type == 'match'
@type = new_type.to_sym
else
@type = opts[:type] || :direct
end
@key = opts[:key]
@client.exchanges[@name] ||= self
# ignore the :nowait option if passed, otherwise program will hang waiting for a
# response that will not be sent by the server
opts.delete(:nowait)
unless name == "amq.#{type}" or name == ''
client.send_frame(
Qrack::Protocol09::Exchange::Declare.new(
{ :exchange => name, :type => type, :nowait => false,
:deprecated_ticket => 0, :deprecated_auto_delete => false, :deprecated_internal => false }.merge(opts)
)
)
method = client.next_method
client.check_response(method, Qrack::Protocol09::Exchange::DeclareOk,
"Error declaring exchange #{name}: type = #{type}")
end
end
=begin rdoc
=== DESCRIPTION:
Requests that an exchange is deleted from broker/server. Removes reference from exchanges
if successful. If an error occurs raises _Bunny_::_ProtocolError_.
==== Options:
* <tt>:if_unused => true or false (_default_)</tt> - If set to _true_, the server will only
delete the exchange if it has no queue bindings. If the exchange has queue bindings the
server does not delete it but raises a channel exception instead.
* <tt>:nowait => true or false (_default_)</tt> - Ignored by Bunny, always _false_.
==== Returns:
<tt>:delete_ok</tt> if successful
=end
def delete(opts = {})
# ignore the :nowait option if passed, otherwise program will hang waiting for a
# response that will not be sent by the server
opts.delete(:nowait)
client.send_frame(
Qrack::Protocol09::Exchange::Delete.new({ :exchange => name, :nowait => false, :deprecated_ticket => 0 }.merge(opts))
)
method = client.next_method
client.check_response(method, Qrack::Protocol09::Exchange::DeleteOk,
"Error deleting exchange #{name}")
client.exchanges.delete(name)
# return confirmation
:delete_ok
end
=begin rdoc
=== DESCRIPTION:
Publishes a message to a specific exchange. The message will be routed to queues as defined
by the exchange configuration and distributed to any active consumers when the transaction,
if any, is committed.
==== OPTIONS:
* <tt>:key => 'routing_key'</tt> - Specifies the routing key for the message. The routing key is
used for routing messages depending on the exchange configuration.
* <tt>:mandatory => true or false (_default_)</tt> - Tells the server how to react if the message
cannot be routed to a queue. If set to _true_, the server will return an unroutable message
with a Return method. If this flag is zero, the server silently drops the message.
* <tt>:immediate => true or false (_default_)</tt> - Tells the server how to react if the message
cannot be routed to a queue consumer immediately. If set to _true_, the server will return an
undeliverable message with a Return method. If set to _false_, the server will queue the message,
but with no guarantee that it will ever be consumed.
* <tt>:persistent => true or false (_default_)</tt> - Tells the server whether to persist the message
If set to _true_, the message will be persisted to disk and not lost if the server restarts.
If set to _false_, the message will not be persisted across server restart. Setting to _true_
incurs a performance penalty as there is an extra cost associated with disk access.
==== RETURNS:
nil
=end
def publish(data, opts = {})
opts = opts.dup
out = []
# Set up options
routing_key = opts.delete(:key) || key
mandatory = opts.delete(:mandatory)
immediate = opts.delete(:immediate)
delivery_mode = opts.delete(:persistent) ? 2 : 1
out << Qrack::Protocol09::Basic::Publish.new(
{ :exchange => name,
:routing_key => routing_key,
:mandatory => mandatory,
:immediate => immediate,
:deprecated_ticket => 0 }
)
data = data.to_s
out << Qrack::Protocol09::Header.new(
Qrack::Protocol09::Basic,
data.length, {
:content_type => 'application/octet-stream',
:delivery_mode => delivery_mode,
:priority => 0
}.merge(opts)
)
out << Qrack::Transport09::Body.new(data)
client.send_frame(*out)
end
end
end
|