module Goliath::Rack::BarrierAroundware
def accept_response(handle, resp_succ, resp, req=nil, fiber=nil)
calls self.shortened_url = resp)
* call the setter for that handle if any (on receipt of :shortened_url,
* and file the response in either successes or failures as appropriate
* remove the tracking handle from pending_requests
On receipt of an async result,
def accept_response(handle, resp_succ, resp, req=nil, fiber=nil) raise "received response for a non-pending request!" if not pending_requests.include?(handle) pending_requests.delete(handle) resp_succ ? (successes[handle] = [req, resp]) : (failures[handle] = [req, resp]) self.send("#{handle}=", resp) if self.respond_to?("#{handle}=") check_progress(fiber) resp end
def add_to_pending(handle)
Register a pending request. If you call this from outside #enqueue, you
def add_to_pending(handle) set_deferred_status(nil) # we're not done yet, even if we were @pending_requests << handle end
def check_progress(fiber)
def check_progress(fiber) if finished? succeed # continue processing fiber.resume(self) if fiber && fiber.alive? && fiber != Fiber.current end end
def enqueue(handle, deferred_req)
Add a deferred request to the pending pool, and set a callback to
def enqueue(handle, deferred_req) fiber = Fiber.current add_to_pending(handle) deferred_req.callback{|resp| safely(env){ accept_response(handle, true, resp, deferred_req, fiber) } } deferred_req.errback{|resp| safely(env){ accept_response(handle, false, resp, deferred_req, fiber) } } end
def enqueue_acceptor(handle)
end
end
acc.succeed(resp.first)
db.collection(:users).afind(:username => :bob) do |resp|
enqueue_acceptor(:bob) do |acc|
# a database lookup that takes a block
@example
end
EM.add_timer(1.0){ acc.succeed }
enqueue_acceptor(:sleepy)do |acc|
# sleep for 1.0 seconds and then complete
@example
your desired response.
#succeed (or #fail) on the acceptor from within the block, passing it
gives you a deferrable 'acceptor' and enqueues it -- simply call
Do you have a method that uses a block, not a deferrable? This method
def enqueue_acceptor(handle) acceptor = EM::DefaultDeferrable.new yield(acceptor) enqueue handle, acceptor end
def finished?
def finished? pending_requests.empty? end
def initialize(env)
-
(Goliath::Rack::BarrierAroundware)
-
Parameters:
-
env
(Goliath::Env
) -- The request environment
def initialize(env) @env = env @pending_requests = Set.new @successes = {} @failures = {} end
def perform
pending responses complete. You're free to enqueue responses, call
Perform will yield (allowing other processes to continue) until all
def perform Fiber.yield unless finished? end