lib/travis/client/listener.rb



require 'travis/client'
require 'forwardable'
require 'json'

if require 'pusher-client'
  # it's us that has been loading pusher-client
  # so let's assume we can mess with it - yay for global state
  PusherClient.logger.level = 2
end

module Travis
  module Client
    class Listener
      class Socket < PusherClient::Socket
        attr_accessor :session, :signatures
        def initialize(application_key, options = {})
          @session    = options.fetch(:session)
          @signatures = {}
          super
        end

        def subscribe_all
          # bulk auth on connect
          fetch_auth(*channels.channels.keys)
          super
        end

        def fetch_auth(*channels)
          channels.select! { |c| signatures[c].nil? if c.start_with? 'private-' }
          signatures.merge! session.post_raw('/pusher/auth', :channels => channels, :socket_id => socket_id)['channels'] if channels.any?
        end

        def get_private_auth(channel)
          fetch_auth(channel.name)
          signatures[channel.name]
        end
      end

      EVENTS = %w[
        build:created build:started build:finished
        job:created job:started job:log job:finished
      ]

      Event = Struct.new(:type, :repository, :build, :job, :payload)

      class EntityListener
        attr_reader :listener, :entities

        extend Forwardable
        def_delegators :listener, :disconnect, :on_connect, :subscribe

        def initialize(listener, entities)
          @listener, @entities = listener, Array(entities)
        end

        def on(*events)
          listener.on(*events) { |e| yield(e) if dispatch?(e) }
        end

        private

          def dispatch?(event)
            entities.include? event.repository or
            entities.include? event.build      or
            entities.include? event.job
          end
      end

      attr_reader :session, :socket

      def initialize(session)
        @session   = session
        @socket    = Socket.new(pusher_key, :encrypted => true, :session => session)
        @channels  = []
        @callbacks = []
      end

      def subscribe(*entities)
        entities = entities.map do |entity|
          entity = entity.pusher_entity while entity.respond_to? :pusher_entity
          @channels.concat(entity.pusher_channels)
          entity
        end

        yield entities.any? ? EntityListener.new(self, entities) : self if block_given?
      end

      def on(*events, &block)
        events = events.flat_map { |e| e.respond_to?(:to_str) ? e.to_str : EVENTS.grep(e) }.uniq
        events.each { |e| @callbacks << [e, block] }
      end

      def on_connect
        socket.bind('pusher:connection_established') { yield }
      end

      def listen
        @channels = default_channels if @channels.empty?
        @channels.map! { |c| c.start_with?('private-') ? c : "private-#{c}" } if session.private_channels?
        @channels.uniq.each { |c| socket.subscribe(c) }
        @callbacks.each { |e,b| socket.bind(e) { |d| dispatch(e, d, &b) } }
        socket.connect
      end

      def disconnect
        socket.disconnect
      end

      private

        def dispatch(type, json)
          payload  = JSON.parse(json)
          entities = session.load format_payload(type, payload)
          yield Event.new(type, entities['repository'], entities['build'], entities['job'], payload)
        end

        def format_payload(type, payload)
          case type
          when "job:log" then format_log(payload)
          when /job:/    then format_job(payload)
          else payload
          end
        end

        def format_job(payload)
          build           = { "id" => payload["build_id"],      "repository_id" => payload["repository_id"]   }
          repo            = { "id" => payload["repository_id"], "slug"          => payload["repository_slug"] }
          build["number"] = payload["number"][/^[^\.]+/] if payload["number"]
          { "job" => payload, "build" => build, "repository" => repo }
        end

        def format_log(payload)
          job = session.job(payload['id'])
          { "job" => { "id" => job.id }, "build" => { "id" => job.build.id }, "repository" => { "id" => job.repository.id } }
        end

        def default_channels
          return ['common'] if session.access_token.nil?
          session.user.channels
        end

        def pusher_key
          session.config.fetch('pusher').fetch('key')
        rescue IndexError
          raise Travis::Client::Error, "#{session.api_endpoint} is missing pusher key"
        end
    end
  end
end