This file is indexed.

/usr/lib/ruby/vendor_ruby/em-synchrony/amqp.rb is in ruby-em-synchrony 1.0.5-2ubuntu1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
begin
  require "amqp"
  require "amq/protocol"
rescue LoadError
  raise "Missing EM-Synchrony dependency: gem install amqp"
end

module EventMachine
  module Synchrony
    module AMQP
      class Error < RuntimeError; end

      class << self
        def sync &blk
          fiber = Fiber.current
          blk.call(fiber)
          Fiber.yield
        end

        def sync_cb fiber
          lambda do |*args|
            if fiber == Fiber.current
              return *args
            else
              fiber.resume(*args)
            end
          end
        end

        %w[connect start run].each do |type|
          line = __LINE__ + 2
          code = <<-EOF
            def #{type}(*params)
              sync { |f| ::AMQP.#{type}(*params, &sync_cb(f)) }
            end
          EOF
          module_eval(code, __FILE__, line)
        end
      end

      class Channel < ::AMQP::Channel
        def initialize(*params, &block)
          f = Fiber.current
          super(*params, &EM::Synchrony::AMQP.sync_cb(f))
          channel, open_ok = Fiber.yield
          raise Error.new unless open_ok.is_a?(::AMQ::Protocol::Channel::OpenOk)
          channel
        end

        %w[direct fanout topic headers].each do |type|
          line = __LINE__ + 2
          code = <<-EOF
            alias :a#{type} :#{type}
            def #{type}(name = 'amq.#{type}', opts = {})
              if exchange = find_exchange(name)
                extended_opts = Exchange.add_default_options(:#{type}, name, opts, nil)
                validate_parameters_match!(exchange, extended_opts, :exchange)
                exchange
              else
                register_exchange(Exchange.new(self, :#{type}, name, opts))
              end
            end
          EOF
          module_eval(code, __FILE__, line)
        end

        alias :aqueue! :queue!
        def queue!(name, opts = {})
          queue = Queue.new(self, name, opts)
          register_queue(queue)
        end

        %w[flow recover tx_select tx_commit tx_rollback reset]
        .each do |type|
          line = __LINE__ + 2
          code = <<-EOF
            alias :a#{type} :#{type}
            def #{type}(*params)
              EM::Synchrony::AMQP.sync { |f| self.a#{type}(*params, &EM::Synchrony::AMQP.sync_cb(f)) }
            end
          EOF
          module_eval(code, __FILE__, line)
        end
      end

      class Consumer < ::AMQP::Consumer
        alias :aon_delivery :on_delivery
        def on_delivery(&block)
          Fiber.new do
            aon_delivery(&EM::Synchrony::AMQP.sync_cb(Fiber.current))
            loop { block.call(Fiber.yield) }
          end.resume
          self
        end

        alias :aconsume :consume
        def consume(nowait = false)
          ret = EM::Synchrony::AMQP.sync { |f| self.aconsume(nowait, &EM::Synchrony::AMQP.sync_cb(f)) }
          raise Error.new(ret.to_s) unless ret.is_a?(::AMQ::Protocol::Basic::ConsumeOk)
          self
        end

        alias :aresubscribe :resubscribe
        def resubscribe
          EM::Synchrony::AMQP.sync { |f| self.aconsume(&EM::Synchrony::AMQP.sync_cb(f)) }
          self
        end

        alias :acancel :cancel
        def cancel(nowait = false)
          EM::Synchrony::AMQP.sync { |f| self.aconsume(nowait, &EM::Synchrony::AMQP.sync_cb(f)) }
          self
        end
      end

      class Exchange < ::AMQP::Exchange
        def initialize(channel, type, name, opts = {}, &block)
          f = Fiber.current

          # AMQP Exchange constructor handles certain special exchanges differently.
          # The callback passed in isn't called when the response comes back
          # but is called immediately on the original calling fiber. That means that
          # when the sync_cb callback yields the fiber when called, it will hang and never
          # be resumed.  So handle these exchanges without yielding
          if name == "amq.#{type}" or name.empty? or opts[:no_declare]
            exchange = nil
            super(channel, type, name, opts) { |ex| exchange = ex }
          else
            super(channel, type, name, opts, &EM::Synchrony::AMQP.sync_cb(f))
            exchange, declare_ok = Fiber.yield
            raise Error.new(declare_ok.to_s) unless declare_ok.is_a?(::AMQ::Protocol::Exchange::DeclareOk)
          end

          exchange
        end

        alias :apublish :publish
        def publish payload, options = {}
          apublish(payload, options)
        end

        alias :adelete :delete
        def delete(opts = {})
          EM::Synchrony::AMQP.sync { |f| adelete(opts, &EM::Synchrony::AMQP.sync_cb(f)) }
        end
      end

      class Queue < ::AMQP::Queue
        def initialize(*params)
          f = Fiber.current
          super(*params, &EM::Synchrony::AMQP.sync_cb(f))
          queue, declare_ok = Fiber.yield
          raise Error.new unless declare_ok.is_a?(::AMQ::Protocol::Queue::DeclareOk)
          queue
        end

        alias :asubscribe :subscribe
        def subscribe(opts = {}, &block)
          Fiber.new do
            asubscribe(opts, &EM::Synchrony::AMQP.sync_cb(Fiber.current))
            loop { block.call(Fiber.yield) }
          end.resume
        end

        %w[bind rebind unbind delete purge pop unsubscribe status].each do |type|
          line = __LINE__ + 2
          code = <<-EOF
            alias :a#{type} :#{type}
            def #{type}(*params)
              EM::Synchrony::AMQP.sync { |f| self.a#{type}(*params, &EM::Synchrony::AMQP.sync_cb(f)) }
            end
          EOF
          module_eval(code, __FILE__, line)
        end
      end

      class Session < ::AMQP::Session
        %w[disconnect].each do |type|
          line = __LINE__ + 2
          code = <<-EOF
            alias :a#{type} :#{type}
            def #{type}(*params)
              EM::Synchrony::AMQP.sync { |f| self.a#{type}(*params, &EM::Synchrony::AMQP.sync_cb(f)) }
            end
          EOF
          module_eval(code, __FILE__, line)
        end
      end
    end
  end
end