class Async::Container::Supervisor::Connection::Call
def self.call(connection, **message, &block)
def self.call(connection, **message, &block) id = connection.next_id call = self.new(connection, id, message) connection.calls[id] = call begin connection.write(id: id, **message) if block_given? call.each(&block) else intermediate = nil while response = call.pop if response.delete(:finished) if intermediate if response.any? intermediate << response end return intermediate else return response end else # Buffer intermediate responses: intermediate ||= [] intermediate << response end end end ensure connection.calls.delete(id) end end
def self.dispatch(connection, target, id, message)
def self.dispatch(connection, target, id, message) Async do call = self.new(connection, id, message) connection.calls[id] = call target.dispatch(call) while response = call.pop connection.write(id: id, **response) end ensure # If the queue is closed, we don't need to send a finished message. unless call.closed? connection.write(id: id, finished: true) end connection.calls.delete(id) end end
def [] key
def [] key @message[key] end
def as_json(...)
def as_json(...) @message end
def close
def close @queue.close end
def closed?
def closed? @queue.closed? end
def each(&block)
def each(&block) while response = self.pop yield response end end
def fail(**response)
def fail(**response) self.finish(failed: true, **response) end
def finish(**response)
def finish(**response) # If the remote end has already closed the connection, we don't need to send a finished message: unless @queue.closed? self.push(id: @id, finished: true, **response) @queue.close end end
def initialize(connection, id, message)
def initialize(connection, id, message) @connection = connection @id = id @message = message @queue = ::Thread::Queue.new end
def pop(...)
def pop(...) @queue.pop(...) end
def push(**response)
def push(**response) @queue.push(response) end
def to_json(...)
def to_json(...) as_json.to_json(...) end