class WebSocket::Driver::Hybi
def self.generate_accept(key)
def self.generate_accept(key) Base64.encode64(Digest::SHA1.digest(key + GUID)).strip end
def binary(message)
def binary(message) frame(message, :binary) end
def close(reason = nil, code = nil)
def close(reason = nil, code = nil) reason ||= '' code ||= ERRORS[:normal_closure] case @ready_state when 0 then @ready_state = 3 emit(:close, CloseEvent.new(code, reason)) true when 1 then frame(reason, :close, code) @ready_state = 2 true else false end end
def emit_frame
def emit_frame payload = @masked ? Mask.mask(@payload, @mask) : @payload case @opcode when OPCODES[:continuation] then return fail(:protocol_error, 'Received unexpected continuation frame') unless @mode @buffer.concat(payload) if @final message = @buffer message = Driver.encode(message, :utf8) if @mode == :text reset if message emit(:message, MessageEvent.new(message)) else fail(:encoding_error, 'Could not decode a text frame as UTF-8') end end when OPCODES[:text] then if @final message = Driver.encode(payload, :utf8) if message emit(:message, MessageEvent.new(message)) else fail(:encoding_error, 'Could not decode a text frame as UTF-8') end else @mode = :text @buffer.concat(payload) end when OPCODES[:binary] then if @final emit(:message, MessageEvent.new(payload)) else @mode = :binary @buffer.concat(payload) end when OPCODES[:close] then code = (payload.size >= 2) ? 256 * payload[0] + payload[1] : nil unless (payload.size == 0) or (code && code >= MIN_RESERVED_ERROR && code <= MAX_RESERVED_ERROR) or ERROR_CODES.include?(code) code = ERRORS[:protocol_error] end message = Driver.encode(payload[2..-1] || [], :utf8) if payload.size > 125 or message.nil? code = ERRORS[:protocol_error] end reason = (payload.size > 2) ? message : '' shutdown(code, reason || '') when OPCODES[:ping] then frame(payload, :pong) when OPCODES[:pong] then message = Driver.encode(payload, :utf8) callback = @ping_callbacks[message] @ping_callbacks.delete(message) callback.call if callback end end
def fail(type, message)
def fail(type, message) emit(:error, ProtocolError.new(message)) shutdown(ERRORS[type], message) end
def frame(data, type = nil, code = nil)
def frame(data, type = nil, code = nil) return queue([data, type, code]) if @ready_state <= 0 return false unless @ready_state == 1 data = data.to_s unless Array === data data = Driver.encode(data, :utf8) if String === data is_text = (String === data) opcode = OPCODES[type || (is_text ? :text : :binary)] buffer = data.respond_to?(:bytes) ? data.bytes.to_a : data insert = code ? 2 : 0 length = buffer.size + insert header = (length <= 125) ? 2 : (length <= 65535 ? 4 : 10) offset = header + (@masking ? 4 : 0) masked = @masking ? MASK : 0 frame = Array.new(offset) frame[0] = FIN | opcode if length <= 125 frame[1] = masked | length elsif length <= 65535 frame[1] = masked | 126 frame[2] = (length >> 8) & BYTE frame[3] = length & BYTE else frame[1] = masked | 127 frame[2] = (length >> 56) & BYTE frame[3] = (length >> 48) & BYTE frame[4] = (length >> 40) & BYTE frame[5] = (length >> 32) & BYTE frame[6] = (length >> 24) & BYTE frame[7] = (length >> 16) & BYTE frame[8] = (length >> 8) & BYTE frame[9] = length & BYTE end if code buffer = [(code >> 8) & BYTE, code & BYTE] + buffer end if @masking mask = [rand(256), rand(256), rand(256), rand(256)] frame[header...offset] = mask buffer = Mask.mask(buffer, mask) end frame.concat(buffer) @socket.write(Driver.encode(frame, :binary)) true end
def handshake_response
def handshake_response sec_key = @socket.env['HTTP_SEC_WEBSOCKET_KEY'] return '' unless String === sec_key accept = Hybi.generate_accept(sec_key) protos = @socket.env['HTTP_SEC_WEBSOCKET_PROTOCOL'] supported = @protocols proto = nil headers = [ "HTTP/1.1 101 Switching Protocols", "Upgrade: websocket", "Connection: Upgrade", "Sec-WebSocket-Accept: #{accept}" ] if protos protos = protos.split(/\s*,\s*/) if String === protos proto = protos.find { |p| supported.include?(p) } if proto @protocol = proto headers << "Sec-WebSocket-Protocol: #{proto}" end end (headers + [@headers.to_s, '']).join("\r\n") end
def initialize(socket, options = {})
def initialize(socket, options = {}) super reset @reader = StreamReader.new @stage = 0 @masking = options[:masking] @protocols = options[:protocols] || [] @protocols = @protocols.strip.split(/\s*,\s*/) if String === @protocols @max_length = options[:max_length] || MAX_LENGTH @require_masking = options[:require_masking] @ping_callbacks = {} end
def integer(bytes)
def integer(bytes) number = 0 bytes.each_with_index do |data, i| number += data << (8 * (bytes.size - 1 - i)) end number end
def parse(data)
def parse(data) data = data.bytes.to_a if data.respond_to?(:bytes) @reader.put(data) buffer = true while buffer case @stage when 0 then buffer = @reader.read(1) parse_opcode(buffer[0]) if buffer when 1 then buffer = @reader.read(1) parse_length(buffer[0]) if buffer when 2 then buffer = @reader.read(@length_size) parse_extended_length(buffer) if buffer when 3 then buffer = @reader.read(4) if buffer @mask = buffer @stage = 4 end when 4 then buffer = @reader.read(@length) if buffer @payload = buffer emit_frame @stage = 0 end end end end
def parse_extended_length(buffer)
def parse_extended_length(buffer) @length = integer(buffer) unless FRAGMENTED_OPCODES.include?(@opcode) or @length <= 125 return fail(:protocol_error, "Received control frame having too long payload: #{@length}") end if @length > @max_length return fail(:too_large, 'WebSocket frame length too large') end @stage = @masked ? 3 : 4 end
def parse_length(data)
def parse_length(data) @masked = (data & MASK) == MASK if @require_masking and not @masked return fail(:unacceptable, 'Received unmasked frame but masking is required') end @length = (data & LENGTH) if @length <= 125 @stage = @masked ? 3 : 4 else @length_size = (@length == 126) ? 2 : 8 @stage = 2 end end
def parse_opcode(data)
def parse_opcode(data) rsvs = [RSV1, RSV2, RSV3].map { |rsv| (data & rsv) == rsv } if rsvs.any? return fail(:protocol_error, "One or more reserved bits are on: reserved1 = #{rsvs[0] ? 1 : 0}" + ", reserved2 = #{rsvs[1] ? 1 : 0 }" + ", reserved3 = #{rsvs[2] ? 1 : 0 }") end @final = (data & FIN) == FIN @opcode = (data & OPCODE) @mask = [] @payload = [] unless OPCODES.values.include?(@opcode) return fail(:protocol_error, "Unrecognized frame opcode: #{@opcode}") end unless FRAGMENTED_OPCODES.include?(@opcode) or @final return fail(:protocol_error, "Received fragmented control frame: opcode = #{@opcode}") end if @mode and OPENING_OPCODES.include?(@opcode) return fail(:protocol_error, 'Received new data frame but previous continuous frame is unfinished') end @stage = 1 end
def ping(message = '', &callback)
def ping(message = '', &callback) @ping_callbacks[message] = callback if callback frame(message, :ping) end
def reset
def reset @buffer = [] @mode = nil end
def shutdown(code, reason)
def shutdown(code, reason) frame(reason, :close, code) @ready_state = 3 emit(:close, CloseEvent.new(code, reason)) end
def text(message)
def text(message) frame(message, :text) end
def version
def version "hybi-#{@socket.env['HTTP_SEC_WEBSOCKET_VERSION']}" end