class Async::Container::Supervisor::Connection
def call(timeout: nil, **message)
def call(timeout: nil, **message) id = next_id calls[id] = ::Thread::Queue.new write(id: id, **message) return calls[id].pop(timeout: timeout) ensure calls.delete(id) end
def call(...)
def call(...) Call.call(self, ...) end
def close
def close if @reader @reader.stop @reader = nil end if stream = @stream @stream = nil stream.close end if @calls @calls.each do |id, call| call.close end @calls.clear end end
def each
def each while message = self.read yield message end end
def initialize(stream, id = 0, **state)
def initialize(stream, id = 0, **state) @stream = stream @id = id @state = state @reader = nil @calls = {} end
def next_id
def next_id @id += 2 end
def read
def read if line = @stream&.gets JSON.parse(line, symbolize_names: true) end end
def run(target)
def run(target) self.each do |message| if id = message.delete(:id) if call = @calls[id] # Response to a call: call.push(**message) else # Incoming call: Call.dispatch(self, target, id, message) end else Console.error(self, "Unknown message:", message) end end end
def run_in_background(target, parent: Task.current)
def run_in_background(target, parent: Task.current) @reader ||= parent.async do self.run(target) end end
def write(**message)
def write(**message) @stream.write(JSON.dump(message) << "\n") @stream.flush end