lib/travis/client/listener.rb



require 'travis/client'
require 'forwardable'
require 'json'

if require 'pusher-client'
  # it's us that has been loading pusher-client
  # so let's assume we can mess with it - yay for global state
  PusherClient.logger.level = 5
end

module Travis
  module Client
    class Listener
      class Socket < PusherClient::Socket
        attr_accessor :session, :signatures
        def initialize(application_key, options = {})
          @session    = options.fetch(:session)
          @signatures = {}
          super

          bind('pusher:error') do |data|
            handle_error(data)
          end
        end

        def subscribe_all
          # bulk auth on connect
          fetch_auth(*channels.channels.keys)
          super
        end

        def fetch_auth(*channels)
          channels.select! { |c| signatures[c].nil? if c.start_with? 'private-' }
          signatures.merge! session.post_raw('/pusher/auth', :channels => channels, :socket_id => socket_id)['channels'] if channels.any?
        end

        def get_private_auth(channel)
          fetch_auth(channel.name)
          signatures[channel.name]
        end

        def handle_error(data)
          code, message = data["code"], data["message"] if data.is_a? Hash
          message ||= data.inspect

          case code
          when 4100             then reconnect(1)
          when 4200, 4201, 4202 then reconnect
          else raise Travis::Client::Error, "Pusher error: %s (code: %p)" % [message, code]
          end
        end

        def reconnect(delay = nil)
          disconnect if connected
          sleep delay if delay and delay > 0
          connect
        end
      end

      EVENTS = %w[
        build:created build:started build:finished
        job:created job:started job:log job:finished
      ]

      Event = Struct.new(:type, :repository, :build, :job, :payload)

      class EntityListener
        attr_reader :listener, :entities

        extend Forwardable
        def_delegators :listener, :disconnect, :on_connect, :subscribe

        def initialize(listener, entities)
          @listener, @entities = listener, Array(entities)
        end

        def on(*events)
          listener.on(*events) { |e| yield(e) if dispatch?(e) }
        end

        private

          def dispatch?(event)
            entities.include? event.repository or
            entities.include? event.build      or
            entities.include? event.job
          end
      end

      attr_reader :session, :socket

      def initialize(session)
        @session   = session
        @socket    = Socket.new(pusher_key, pusher_options)
        @channels  = []
        @callbacks = []
      end

      def subscribe(*entities)
        entities = entities.map do |entity|
          entity = entity.pusher_entity while entity.respond_to? :pusher_entity
          @channels.concat(entity.pusher_channels)
          entity
        end

        yield entities.any? ? EntityListener.new(self, entities) : self if block_given?
      end

      def on(*events, &block)
        events = events.flat_map { |e| e.respond_to?(:to_str) ? e.to_str : EVENTS.grep(e) }.uniq
        events.each { |e| @callbacks << [e, block] }
      end

      def on_connect
        socket.bind('pusher:connection_established') { yield }
      end

      def listen
        @channels = default_channels if @channels.empty?
        @channels.map! { |c| c.start_with?('private-') ? c : "private-#{c}" } if session.private_channels?
        @channels.uniq.each { |c| socket.subscribe(c) }
        @callbacks.each { |e,b| socket.bind(e) { |d| dispatch(e, d, &b) } }
        socket.connect
      end

      def disconnect
        socket.disconnect
      end

      private

        def dispatch(type, json)
          payload  = JSON.parse(json)
          entities = session.load format_payload(type, payload)
          yield Event.new(type, entities['repository'], entities['build'], entities['job'], payload)
        end

        def format_payload(type, payload)
          case type
          when "job:log" then format_log(payload)
          when /job:/    then format_job(payload)
          else payload
          end
        end

        def format_job(payload)
          build           = { "id" => payload["build_id"],      "repository_id" => payload["repository_id"]   }
          repo            = { "id" => payload["repository_id"], "slug"          => payload["repository_slug"] }
          build["number"] = payload["number"][/^[^\.]+/] if payload["number"]
          { "job" => payload, "build" => build, "repository" => repo }
        end

        def format_log(payload)
          job = session.job(payload['id'])
          { "job" => { "id" => job.id }, "build" => { "id" => job.build.id }, "repository" => { "id" => job.repository.id } }
        end

        def default_channels
          return ['common'] if session.access_token.nil?
          session.user.channels
        end

        def pusher_options
          pusher_options       = session.config['pusher'] || {}
          encrypted            = pusher_options['scheme'] != 'http'
          options              = { :encrypted => encrypted, :session => session }
          options[:ws_host]    = pusher_options['host'] if pusher_options['host']
          options[:wss_port]   = pusher_options['port'] if encrypted  and pusher_options['port']
          options[:ws_port]    = pusher_options['port'] if !encrypted and pusher_options['port']
          options[:ws_path]    = pusher_options['path'] if pusher_options['path']
          options[:ws_path]    = '/' << options[:ws_path] unless options[:ws_path].nil? or options[:ws_path].start_with? '/'
          options[:ssl_verify] = session.ssl.fetch(:verify, true)
          options
        end

        def pusher_key
          session.config.fetch('pusher').fetch('key')
        rescue IndexError
          raise Travis::Client::Error, "#{session.api_endpoint} is missing pusher key"
        end
    end
  end
end