lib/em-synchrony/amqp.rb



begin
  require "amqp"
  require "amq/protocol"
rescue LoadError
  raise "Missing EM-Synchrony dependency: gem install amqp"
end

module EventMachine
  module Synchrony
    module AMQP
      class Error < RuntimeError; end

      class << self
        def sync &blk
          fiber = Fiber.current
          blk.call(fiber)
          Fiber.yield
        end

        def sync_cb fiber
          lambda do |*args|
            if fiber == Fiber.current
              return *args
            else
              fiber.resume(*args)
            end
          end
        end

        %w[connect start run].each do |type|
          line = __LINE__ + 2
          code = <<-EOF
            def #{type}(*params)
              sync { |f| ::AMQP.#{type}(*params, &sync_cb(f)) }
            end
          EOF
          module_eval(code, __FILE__, line)
        end
      end

      class Channel < ::AMQP::Channel
        def initialize(*params, &block)
          f = Fiber.current
          super(*params, &EM::Synchrony::AMQP.sync_cb(f))
          channel, open_ok = Fiber.yield
          raise Error.new unless open_ok.is_a?(::AMQ::Protocol::Channel::OpenOk)
          channel
        end

        %w[direct fanout topic headers].each do |type|
          line = __LINE__ + 2
          code = <<-EOF
            alias :a#{type} :#{type}
            def #{type}(name = 'amq.#{type}', opts = {})
              if exchange = find_exchange(name)
                extended_opts = Exchange.add_default_options(:#{type}, name, opts, nil)
                validate_parameters_match!(exchange, extended_opts, :exchange)
                exchange
              else
                register_exchange(Exchange.new(self, :#{type}, name, opts))
              end
            end
          EOF
          module_eval(code, __FILE__, line)
        end

        alias :aqueue! :queue!
        def queue!(name, opts = {})
          queue = Queue.new(self, name, opts)
          register_queue(queue)
        end

        %w[flow recover tx_select tx_commit tx_rollback reset]
        .each do |type|
          line = __LINE__ + 2
          code = <<-EOF
            alias :a#{type} :#{type}
            def #{type}(*params)
              EM::Synchrony::AMQP.sync { |f| self.a#{type}(*params, &EM::Synchrony::AMQP.sync_cb(f)) }
            end
          EOF
          module_eval(code, __FILE__, line)
        end
      end

      class Consumer < ::AMQP::Consumer
        alias :aon_delivery :on_delivery
        def on_delivery(&block)
          Fiber.new do
            aon_delivery(&EM::Synchrony::AMQP.sync_cb(Fiber.current))
            loop { block.call(Fiber.yield) }
          end.resume
          self
        end

        alias :aconsume :consume
        def consume(nowait = false)
          ret = EM::Synchrony::AMQP.sync { |f| self.aconsume(nowait, &EM::Synchrony::AMQP.sync_cb(f)) }
          raise Error.new(ret.to_s) unless ret.is_a?(::AMQ::Protocol::Basic::ConsumeOk)
          self
        end

        alias :aresubscribe :resubscribe
        def resubscribe
          EM::Synchrony::AMQP.sync { |f| self.aconsume(&EM::Synchrony::AMQP.sync_cb(f)) }
          self
        end

        alias :acancel :cancel
        def cancel(nowait = false)
          EM::Synchrony::AMQP.sync { |f| self.aconsume(nowait, &EM::Synchrony::AMQP.sync_cb(f)) }
          self
        end
      end

      class Exchange < ::AMQP::Exchange
        def initialize(channel, type, name, opts = {}, &block)
          f = Fiber.current

          # AMQP Exchange constructor handles certain special exchanges differently.
          # The callback passed in isn't called when the response comes back
          # but is called immediately on the original calling fiber. That means that
          # when the sync_cb callback yields the fiber when called, it will hang and never
          # be resumed.  So handle these exchanges without yielding
          if name == "amq.#{type}" or name.empty? or opts[:no_declare]
            exchange = nil
            super(channel, type, name, opts) { |ex| exchange = ex }
          else
            super(channel, type, name, opts, &EM::Synchrony::AMQP.sync_cb(f))
            exchange, declare_ok = Fiber.yield
            raise Error.new(declare_ok.to_s) unless declare_ok.is_a?(::AMQ::Protocol::Exchange::DeclareOk)
          end

          exchange
        end

        alias :apublish :publish
        def publish payload, options = {}
          apublish(payload, options)
        end

        alias :adelete :delete
        def delete(opts = {})
          EM::Synchrony::AMQP.sync { |f| adelete(opts, &EM::Synchrony::AMQP.sync_cb(f)) }
        end
      end

      class Queue < ::AMQP::Queue
        def initialize(*params)
          f = Fiber.current
          super(*params, &EM::Synchrony::AMQP.sync_cb(f))
          queue, declare_ok = Fiber.yield
          raise Error.new unless declare_ok.is_a?(::AMQ::Protocol::Queue::DeclareOk)
          queue
        end

        alias :asubscribe :subscribe
        def subscribe(opts = {}, &block)
          Fiber.new do
            asubscribe(opts, &EM::Synchrony::AMQP.sync_cb(Fiber.current))
            loop { block.call(Fiber.yield) }
          end.resume
        end

        %w[bind rebind unbind delete purge pop unsubscribe status].each do |type|
          line = __LINE__ + 2
          code = <<-EOF
            alias :a#{type} :#{type}
            def #{type}(*params)
              EM::Synchrony::AMQP.sync { |f| self.a#{type}(*params, &EM::Synchrony::AMQP.sync_cb(f)) }
            end
          EOF
          module_eval(code, __FILE__, line)
        end
      end

      class Session < ::AMQP::Session
        %w[disconnect].each do |type|
          line = __LINE__ + 2
          code = <<-EOF
            alias :a#{type} :#{type}
            def #{type}(*params)
              EM::Synchrony::AMQP.sync { |f| self.a#{type}(*params, &EM::Synchrony::AMQP.sync_cb(f)) }
            end
          EOF
          module_eval(code, __FILE__, line)
        end
      end
      ::AMQP.client = ::EM::Synchrony::AMQP::Session
    end
  end
end