class Async::Container::Supervisor::Connection::Call
def self.call(connection, **message, &block)
def self.call(connection, **message, &block) id = connection.next_id call = self.new(connection, id, message) connection.calls[id] = call begin connection.write(id: id, **message) if block_given? call.each(&block) else intermediate = nil while response = call.pop if response.delete(:finished) if intermediate if response.any? intermediate << response end return intermediate else return response end else # Buffer intermediate responses: intermediate ||= [] intermediate << response end end end ensure connection.calls.delete(id) end end