/usr/lib/ruby/1.8/qrack/subscription.rb is in libbunny-ruby1.8 0.6.2-3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 | module Qrack
# Subscription ancestor class
class Subscription
attr_accessor :consumer_tag, :delivery_tag, :message_max, :timeout, :ack, :exclusive
attr_reader :client, :queue, :message_count
def initialize(client, queue, opts = {})
@client = client
@queue = queue
# Get timeout value
@timeout = opts[:timeout] || nil
# Get maximum amount of messages to process
@message_max = opts[:message_max] || nil
# If a consumer tag is not passed in the server will generate one
@consumer_tag = opts[:consumer_tag] || nil
# Ignore the :nowait option if passed, otherwise program will hang waiting for a
# response from the server causing an error.
opts.delete(:nowait)
# Do we want to have to provide an acknowledgement?
@ack = opts[:ack] || nil
# Does this consumer want exclusive use of the queue?
@exclusive = opts[:exclusive] || false
# Initialize message counter
@message_count = 0
# Give queue reference to this subscription
@queue.subscription = self
# Store options
@opts = opts
end
def start(&blk)
# Do not process any messages if zero message_max
if message_max == 0
return
end
# Notify server about new consumer
setup_consumer
# Start subscription loop
loop do
begin
method = client.next_method(:timeout => timeout)
rescue Qrack::ClientTimeout
queue.unsubscribe()
break
end
# Increment message counter
@message_count += 1
# get delivery tag to use for acknowledge
queue.delivery_tag = method.delivery_tag if @ack
header = client.next_payload
# If maximum frame size is smaller than message payload body then message
# will have a message header and several message bodies
msg = ''
while msg.length < header.size
msg += client.next_payload
end
# If block present, pass the message info to the block for processing
blk.call({:header => header, :payload => msg, :delivery_details => method.arguments}) if !blk.nil?
# Exit loop if message_max condition met
if (!message_max.nil? and message_count == message_max)
# Stop consuming messages
queue.unsubscribe()
# Acknowledge receipt of the final message
queue.ack() if @ack
# Quit the loop
break
end
# Have to do the ack here because the ack triggers the release of messages from the server
# if you are using Client#qos prefetch and you will get extra messages sent through before
# the unsubscribe takes effect to stop messages being sent to this consumer unless the ack is
# deferred.
queue.ack() if @ack
end
end
end
end
|