class Async::Container::Supervisor::Connection::Call

def self.call(connection, **message, &block)

def self.call(connection, **message, &block)
	id = connection.next_id
	call = self.new(connection, id, message)
	
	connection.calls[id] = call
	begin
		connection.write(id: id, **message)
		
		if block_given?
			call.each(&block)
		else
			intermediate = nil
			
			while response = call.pop
				if response.delete(:finished)
					if intermediate
						if response.any?
							intermediate << response
						end
						
						return intermediate
					else
						return response
					end
				else
					# Buffer intermediate responses:
					intermediate ||= []
					intermediate << response
				end
			end
		end
	ensure
		connection.calls.delete(id)
	end
end

def self.dispatch(connection, target, id, message)

def self.dispatch(connection, target, id, message)
	Async do
		call = self.new(connection, id, message)
		connection.calls[id] = call
		
		target.dispatch(call)
		
		while response = call.pop
			connection.write(id: id, **response)
		end
	ensure
		# If the queue is closed, we don't need to send a finished message.
		unless call.closed?
			connection.write(id: id, finished: true)
		end
		
		connection.calls.delete(id)
	end
end

def [] key

def [] key
	@message[key]
end

def as_json(...)

def as_json(...)
	@message
end

def close

The call was never completed and the connection itself was closed.
def close
	@queue.close
end

def closed?

def closed?
	@queue.closed?
end

def each(&block)

def each(&block)
	while response = self.pop
		yield response
	end
end

def fail(**response)

def fail(**response)
	self.finish(failed: true, **response)
end

def finish(**response)

def finish(**response)
	# If the remote end has already closed the connection, we don't need to send a finished message:
	unless @queue.closed?
		self.push(id: @id, finished: true, **response)
		@queue.close
	end
end

def initialize(connection, id, message)

def initialize(connection, id, message)
	@connection = connection
	@id = id
	@message = message
	
	@queue = ::Thread::Queue.new
end

def pop(...)

def pop(...)
	@queue.pop(...)
end

def push(**response)

def push(**response)
	@queue.push(response)
end

def to_json(...)

def to_json(...)
	as_json.to_json(...)
end