class Async::Container::Supervisor::Connection

def call(timeout: nil, **message)

def call(timeout: nil, **message)
	id = next_id
	calls[id] = ::Thread::Queue.new
	
	write(id: id, **message)
	
	return calls[id].pop(timeout: timeout)
ensure
	calls.delete(id)
end

def call(...)

def call(...)
	Call.call(self, ...)
end

def close

def close
	if @reader
		@reader.stop
		@reader = nil
	end
	
	if stream = @stream
		@stream = nil
		stream.close
	end
	
	if @calls
		@calls.each do |id, call|
			call.close
		end
		
		@calls.clear
	end
end

def each

def each
	while message = self.read
		yield message
	end
end

def initialize(stream, id = 0, **state)

def initialize(stream, id = 0, **state)
	@stream = stream
	@id = id
	@state = state
	
	@reader = nil
	@calls = {}
end

def next_id

def next_id
	@id += 2
end

def read

def read
	if line = @stream&.gets
		JSON.parse(line, symbolize_names: true)
	end
end

def run(target)

def run(target)
	self.each do |message|
		if id = message.delete(:id)
			if call = @calls[id]
				# Response to a call:
				call.push(**message)
			else
				# Incoming call:
				Call.dispatch(self, target, id, message)
			end
		else
			Console.error(self, "Unknown message:", message)
		end
	end
end

def run_in_background(target, parent: Task.current)

def run_in_background(target, parent: Task.current)
	@reader ||= parent.async do
		self.run(target)
	end
end

def write(**message)

def write(**message)
	@stream.write(JSON.dump(message) << "\n")
	@stream.flush
end