class Slack::RealTime::Concurrency::Celluloid::Socket
def build_driver
def build_driver ::WebSocket::Driver.client(self) end
def build_socket
def build_socket socket = ::Celluloid::IO::TCPSocket.new(addr, port) socket = ::Celluloid::IO::SSLSocket.new(socket, build_ssl_context) if secure? socket end
def build_ssl_context
def build_ssl_context OpenSSL::SSL::SSLContext.new(:TLSv1_2_client) end
def close
def close @closing = true super end
def connect
def connect @driver = build_driver end
def connect!
def connect! super run_loop end
def connected?
def connected? !@connected.nil? && !@driver.nil? end
def disconnect!
def disconnect! super @ping_timer&.cancel end
def handle_read(buffer)
def handle_read(buffer) logger.debug("#{self.class}##{__method__}") { buffer } driver&.parse buffer end
def initialize(*args)
def initialize(*args) super end
def read
def read buffer = socket.readpartial(BLOCK_SIZE) raise EOFError unless buffer && !buffer.empty? async.handle_read(buffer) end
def restart_async(client, new_url)
def restart_async(client, new_url) @last_message_at = current_time @url = new_url @client = client Actor.new(future.run_client_loop) end
def run_client_loop
def run_client_loop @client.run_loop rescue StandardError => e logger.debug("#{self.class}##{__method__}") { e } raise e end
def run_loop
def run_loop @closing = false @socket = build_socket @connected = @socket.connect driver.start loop { read } if socket rescue EOFError, Errno::ECONNRESET, Errno::EPIPE => e logger.debug("#{self.class}##{__method__}") { e } unless @closing driver.emit( :close, WebSocket::Driver::CloseEvent.new(1001, 'server closed connection') ) end end
def run_ping_loop
def run_ping_loop return unless @client.run_ping? @ping_timer = every @client.websocket_ping_timer do @client.run_ping! end end
def start_async(client)
def start_async(client) @client = client Actor.new(future.run_ping_loop) Actor.new(future.run_client_loop) end
def write(data)
def write(data) logger.debug("#{self.class}##{__method__}") { data } socket.write(data) end