class Slack::RealTime::Concurrency::Async::Socket
def build_endpoint
def build_endpoint endpoint = ::Async::IO::Endpoint.tcp(addr, port) endpoint = ::Async::IO::SSLEndpoint.new(endpoint, ssl_context: build_ssl_context) if secure? endpoint end
def build_ssl_context
def build_ssl_context OpenSSL::SSL::SSLContext.new(:TLSv1_2_client).tap do |ctx| ctx.set_params(verify_mode: OpenSSL::SSL::VERIFY_PEER) end end
def close
def close super ensure if @socket @socket.close @socket = nil end end
def connect
def connect @socket = connect_socket @driver = Client.new(@socket, url) end
def connect!
def connect! super run_loop end
def connect_socket
def connect_socket build_endpoint.connect end
def current_time
def current_time ::Async::Clock.now end
def disconnect!
def disconnect! super ensure if (restart = @restart) @restart = nil restart.signal end end
def restart_async(_client, new_url)
def restart_async(_client, new_url) @url = new_url @last_message_at = current_time @restart&.signal end
def run_async(&block)
def run_async(&block) ::Async.run(&block) end
def run_loop
def run_loop while @driver&.next_event # $stderr.puts event.inspect end end
def start_async(client)
def start_async(client) Thread.new do start_reactor(client) end end
def start_reactor(client)
def start_reactor(client) Async do |task| @restart = ::Async::Notification.new if client.run_ping? @ping_task = task.async do |subtask| subtask.annotate "#{client} keep-alive" # The timer task will naturally exit after the driver is set to nil. while @restart subtask.sleep client.websocket_ping_timer client.run_ping! if @restart end end end while @restart @client_task&.stop @client_task = task.async do |subtask| subtask.annotate "#{client} run-loop" client.run_loop rescue ::Async::Wrapper::Cancelled => e # Will get restarted by ping worker. rescue StandardError => e client.logger.error(subtask.to_s) { e.message } end @restart.wait end @ping_task&.stop end end
def start_sync(client)
def start_sync(client) start_reactor(client).wait end