begin
require "amqp"
require "amq/protocol"
rescue LoadError
raise "Missing EM-Synchrony dependency: gem install amqp"
end
module EventMachine
module Synchrony
module AMQP
class Error < RuntimeError; end
class << self
def sync &blk
fiber = Fiber.current
blk.call(fiber)
Fiber.yield
end
def sync_cb fiber
lambda do |*args|
if fiber == Fiber.current
return *args
else
fiber.resume(*args)
end
end
end
%w[connect start run].each do |type|
line = __LINE__ + 2
code = <<-EOF
def #{type}(*params)
sync { |f| ::AMQP.#{type}(*params, &sync_cb(f)) }
end
EOF
module_eval(code, __FILE__, line)
end
end
class Channel < ::AMQP::Channel
def initialize(*params, &block)
f = Fiber.current
super(*params, &EM::Synchrony::AMQP.sync_cb(f))
channel, open_ok = Fiber.yield
raise Error.new unless open_ok.is_a?(::AMQ::Protocol::Channel::OpenOk)
channel
end
%w[direct fanout topic headers].each do |type|
line = __LINE__ + 2
code = <<-EOF
alias :a#{type} :#{type}
def #{type}(name = 'amq.#{type}', opts = {})
if exchange = find_exchange(name)
extended_opts = Exchange.add_default_options(:#{type}, name, opts, nil)
validate_parameters_match!(exchange, extended_opts, :exchange)
exchange
else
register_exchange(Exchange.new(self, :#{type}, name, opts))
end
end
EOF
module_eval(code, __FILE__, line)
end
alias :aqueue! :queue!
def queue!(name, opts = {})
queue = Queue.new(self, name, opts)
register_queue(queue)
end
%w[flow recover tx_select tx_commit tx_rollback reset]
.each do |type|
line = __LINE__ + 2
code = <<-EOF
alias :a#{type} :#{type}
def #{type}(*params)
EM::Synchrony::AMQP.sync { |f| self.a#{type}(*params, &EM::Synchrony::AMQP.sync_cb(f)) }
end
EOF
module_eval(code, __FILE__, line)
end
end
class Consumer < ::AMQP::Consumer
alias :aon_delivery :on_delivery
def on_delivery(&block)
Fiber.new do
aon_delivery(&EM::Synchrony::AMQP.sync_cb(Fiber.current))
loop { block.call(Fiber.yield) }
end.resume
self
end
alias :aconsume :consume
def consume(nowait = false)
ret = EM::Synchrony::AMQP.sync { |f| self.aconsume(nowait, &EM::Synchrony::AMQP.sync_cb(f)) }
raise Error.new(ret.to_s) unless ret.is_a?(::AMQ::Protocol::Basic::ConsumeOk)
self
end
alias :aresubscribe :resubscribe
def resubscribe
EM::Synchrony::AMQP.sync { |f| self.aconsume(&EM::Synchrony::AMQP.sync_cb(f)) }
self
end
alias :acancel :cancel
def cancel(nowait = false)
EM::Synchrony::AMQP.sync { |f| self.aconsume(nowait, &EM::Synchrony::AMQP.sync_cb(f)) }
self
end
end
class Exchange < ::AMQP::Exchange
def initialize(channel, type, name, opts = {}, &block)
f = Fiber.current
# AMQP Exchange constructor handles certain special exchanges differently.
# The callback passed in isn't called when the response comes back
# but is called immediately on the original calling fiber. That means that
# when the sync_cb callback yields the fiber when called, it will hang and never
# be resumed. So handle these exchanges without yielding
if name == "amq.#{type}" or name.empty? or opts[:no_declare]
exchange = nil
super(channel, type, name, opts) { |ex| exchange = ex }
else
super(channel, type, name, opts, &EM::Synchrony::AMQP.sync_cb(f))
exchange, declare_ok = Fiber.yield
raise Error.new(declare_ok.to_s) unless declare_ok.is_a?(::AMQ::Protocol::Exchange::DeclareOk)
end
exchange
end
alias :apublish :publish
def publish payload, options = {}
apublish(payload, options)
end
alias :adelete :delete
def delete(opts = {})
EM::Synchrony::AMQP.sync { |f| adelete(opts, &EM::Synchrony::AMQP.sync_cb(f)) }
end
end
class Queue < ::AMQP::Queue
def initialize(*params)
f = Fiber.current
super(*params, &EM::Synchrony::AMQP.sync_cb(f))
queue, declare_ok = Fiber.yield
raise Error.new unless declare_ok.is_a?(::AMQ::Protocol::Queue::DeclareOk)
queue
end
alias :asubscribe :subscribe
def subscribe(opts = {}, &block)
Fiber.new do
asubscribe(opts, &EM::Synchrony::AMQP.sync_cb(Fiber.current))
loop { block.call(Fiber.yield) }
end.resume
end
%w[bind rebind unbind delete purge pop unsubscribe status].each do |type|
line = __LINE__ + 2
code = <<-EOF
alias :a#{type} :#{type}
def #{type}(*params)
EM::Synchrony::AMQP.sync { |f| self.a#{type}(*params, &EM::Synchrony::AMQP.sync_cb(f)) }
end
EOF
module_eval(code, __FILE__, line)
end
end
class Session < ::AMQP::Session
%w[disconnect].each do |type|
line = __LINE__ + 2
code = <<-EOF
alias :a#{type} :#{type}
def #{type}(*params)
EM::Synchrony::AMQP.sync { |f| self.a#{type}(*params, &EM::Synchrony::AMQP.sync_cb(f)) }
end
EOF
module_eval(code, __FILE__, line)
end
end
::AMQP.client = ::EM::Synchrony::AMQP::Session
end
end
end