/usr/lib/ruby/vendor_ruby/em-synchrony/amqp.rb is in ruby-em-synchrony 1.0.5-2ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 | begin
require "amqp"
require "amq/protocol"
rescue LoadError
raise "Missing EM-Synchrony dependency: gem install amqp"
end
module EventMachine
module Synchrony
module AMQP
class Error < RuntimeError; end
class << self
def sync &blk
fiber = Fiber.current
blk.call(fiber)
Fiber.yield
end
def sync_cb fiber
lambda do |*args|
if fiber == Fiber.current
return *args
else
fiber.resume(*args)
end
end
end
%w[connect start run].each do |type|
line = __LINE__ + 2
code = <<-EOF
def #{type}(*params)
sync { |f| ::AMQP.#{type}(*params, &sync_cb(f)) }
end
EOF
module_eval(code, __FILE__, line)
end
end
class Channel < ::AMQP::Channel
def initialize(*params, &block)
f = Fiber.current
super(*params, &EM::Synchrony::AMQP.sync_cb(f))
channel, open_ok = Fiber.yield
raise Error.new unless open_ok.is_a?(::AMQ::Protocol::Channel::OpenOk)
channel
end
%w[direct fanout topic headers].each do |type|
line = __LINE__ + 2
code = <<-EOF
alias :a#{type} :#{type}
def #{type}(name = 'amq.#{type}', opts = {})
if exchange = find_exchange(name)
extended_opts = Exchange.add_default_options(:#{type}, name, opts, nil)
validate_parameters_match!(exchange, extended_opts, :exchange)
exchange
else
register_exchange(Exchange.new(self, :#{type}, name, opts))
end
end
EOF
module_eval(code, __FILE__, line)
end
alias :aqueue! :queue!
def queue!(name, opts = {})
queue = Queue.new(self, name, opts)
register_queue(queue)
end
%w[flow recover tx_select tx_commit tx_rollback reset]
.each do |type|
line = __LINE__ + 2
code = <<-EOF
alias :a#{type} :#{type}
def #{type}(*params)
EM::Synchrony::AMQP.sync { |f| self.a#{type}(*params, &EM::Synchrony::AMQP.sync_cb(f)) }
end
EOF
module_eval(code, __FILE__, line)
end
end
class Consumer < ::AMQP::Consumer
alias :aon_delivery :on_delivery
def on_delivery(&block)
Fiber.new do
aon_delivery(&EM::Synchrony::AMQP.sync_cb(Fiber.current))
loop { block.call(Fiber.yield) }
end.resume
self
end
alias :aconsume :consume
def consume(nowait = false)
ret = EM::Synchrony::AMQP.sync { |f| self.aconsume(nowait, &EM::Synchrony::AMQP.sync_cb(f)) }
raise Error.new(ret.to_s) unless ret.is_a?(::AMQ::Protocol::Basic::ConsumeOk)
self
end
alias :aresubscribe :resubscribe
def resubscribe
EM::Synchrony::AMQP.sync { |f| self.aconsume(&EM::Synchrony::AMQP.sync_cb(f)) }
self
end
alias :acancel :cancel
def cancel(nowait = false)
EM::Synchrony::AMQP.sync { |f| self.aconsume(nowait, &EM::Synchrony::AMQP.sync_cb(f)) }
self
end
end
class Exchange < ::AMQP::Exchange
def initialize(channel, type, name, opts = {}, &block)
f = Fiber.current
# AMQP Exchange constructor handles certain special exchanges differently.
# The callback passed in isn't called when the response comes back
# but is called immediately on the original calling fiber. That means that
# when the sync_cb callback yields the fiber when called, it will hang and never
# be resumed. So handle these exchanges without yielding
if name == "amq.#{type}" or name.empty? or opts[:no_declare]
exchange = nil
super(channel, type, name, opts) { |ex| exchange = ex }
else
super(channel, type, name, opts, &EM::Synchrony::AMQP.sync_cb(f))
exchange, declare_ok = Fiber.yield
raise Error.new(declare_ok.to_s) unless declare_ok.is_a?(::AMQ::Protocol::Exchange::DeclareOk)
end
exchange
end
alias :apublish :publish
def publish payload, options = {}
apublish(payload, options)
end
alias :adelete :delete
def delete(opts = {})
EM::Synchrony::AMQP.sync { |f| adelete(opts, &EM::Synchrony::AMQP.sync_cb(f)) }
end
end
class Queue < ::AMQP::Queue
def initialize(*params)
f = Fiber.current
super(*params, &EM::Synchrony::AMQP.sync_cb(f))
queue, declare_ok = Fiber.yield
raise Error.new unless declare_ok.is_a?(::AMQ::Protocol::Queue::DeclareOk)
queue
end
alias :asubscribe :subscribe
def subscribe(opts = {}, &block)
Fiber.new do
asubscribe(opts, &EM::Synchrony::AMQP.sync_cb(Fiber.current))
loop { block.call(Fiber.yield) }
end.resume
end
%w[bind rebind unbind delete purge pop unsubscribe status].each do |type|
line = __LINE__ + 2
code = <<-EOF
alias :a#{type} :#{type}
def #{type}(*params)
EM::Synchrony::AMQP.sync { |f| self.a#{type}(*params, &EM::Synchrony::AMQP.sync_cb(f)) }
end
EOF
module_eval(code, __FILE__, line)
end
end
class Session < ::AMQP::Session
%w[disconnect].each do |type|
line = __LINE__ + 2
code = <<-EOF
alias :a#{type} :#{type}
def #{type}(*params)
EM::Synchrony::AMQP.sync { |f| self.a#{type}(*params, &EM::Synchrony::AMQP.sync_cb(f)) }
end
EOF
module_eval(code, __FILE__, line)
end
end
end
end
end
|