lib/faye/eventsource.rb



require File.expand_path('../websocket', __FILE__) unless defined?(Faye::WebSocket)

module Faye
  class EventSource
    DEFAULT_RETRY = 5
    
    include WebSocket::API
    attr_reader :env, :url, :ready_state
    
    def self.eventsource?(env)
      accept = (env['HTTP_ACCEPT'] || '').split(/\s*,\s*/)
      accept.include?('text/event-stream')
    end
    
    def self.determine_url(env)
      secure = if env.has_key?('HTTP_X_FORWARDED_PROTO')
                 env['HTTP_X_FORWARDED_PROTO'] == 'https'
               else
                 env['HTTP_ORIGIN'] =~ /^https:/i
               end
      
      scheme = secure ? 'https:' : 'http:'
      "#{ scheme }//#{ env['HTTP_HOST'] }#{ env['REQUEST_URI'] }"
    end
    
    def initialize(env, options = {})
      @env    = env
      @ping   = options[:ping]
      @retry  = (options[:retry] || DEFAULT_RETRY).to_f
      @url    = EventSource.determine_url(env)
      @stream = Stream.new(self)
      
      @ready_state = CONNECTING
      @send_buffer = []
      EventMachine.next_tick { open }
      
      callback = @env['async.callback']
      callback.call([101, {}, @stream])
      
      @stream.write("HTTP/1.1 200 OK\r\n" +
                    "Content-Type: text/event-stream\r\n" +
                    "Cache-Control: no-cache, no-store\r\n" +
                    "\r\n\r\n" +
                    "retry: #{ (@retry * 1000).floor }\r\n\r\n")
      
      @ready_state = OPEN
      
      if @ping
        @ping_timer = EventMachine.add_periodic_timer(@ping) { ping }
      end
    end
    
    def last_event_id
      @env['HTTP_LAST_EVENT_ID'] || ''
    end
    
    def rack_response
      [ -1, {}, [] ]
    end
    
    def send(message, options = {})
      return false unless @ready_state == OPEN
      
      message = WebSocket.encode(message.to_s).
                gsub(/(\r\n|\r|\n)/, '\1data: ')
      
      frame  = ""
      frame << "event: #{options[:event]}\r\n" if options[:event]
      frame << "id: #{options[:id]}\r\n" if options[:id]
      frame << "data: #{message}\r\n\r\n"
      
      @stream.write(frame)
      true
    end
    
    def ping(message = nil)
      @stream.write(":\r\n\r\n")
      true
    end
    
    def close
      return if [CLOSING, CLOSED].include?(@ready_state)
      @ready_state = CLOSED
      EventMachine.cancel_timer(@ping_timer)
      @stream.close_connection_after_writing
      event = WebSocket::API::Event.new('close')
      event.init_event('close', false, false)
      dispatch_event(event)
    end
  end
  
  class EventSource::Stream
    include EventMachine::Deferrable
    
    extend Forwardable
    def_delegators :@connection, :close_connection, :close_connection_after_writing
    
    def initialize(event_source)
      @event_source = event_source
      @connection   = event_source.env['em.connection']
      @stream_send  = event_source.env['stream.send']
      
      @connection.socket_stream = self if @connection.respond_to?(:socket_stream)
    end
    
    def each(&callback)
      @stream_send ||= callback
    end
    
    def fail
      @event_source.close
    end
    
    def receive(data)
    end
    
    def write(data)
      return unless @stream_send
      @stream_send.call(data) rescue nil
    end
  end
end