class Travis::Client::Listener
def default_channels
def default_channels return ['common'] if session.access_token.nil? session.user.channels end
def disconnect
def disconnect socket.disconnect end
def dispatch(type, json)
def dispatch(type, json) payload = JSON.parse(json) entities = session.load format_payload(type, payload) yield Event.new(type, entities['repository'], entities['build'], entities['job'], payload) end
def format_job(payload)
def format_job(payload) build = { "id" => payload["build_id"], "repository_id" => payload["repository_id"] } repo = { "id" => payload["repository_id"], "slug" => payload["repository_slug"] } build["number"] = payload["number"][/^[^\.]+/] if payload["number"] { "job" => payload, "build" => build, "repository" => repo } end
def format_log(payload)
def format_log(payload) job = session.job(payload['id']) { "job" => { "id" => job.id }, "build" => { "id" => job.build.id }, "repository" => { "id" => job.repository.id } } end
def format_payload(type, payload)
def format_payload(type, payload) case type when "job:log" then format_log(payload) when /job:/ then format_job(payload) else payload end end
def initialize(session)
def initialize(session) @session = session @socket = Socket.new(pusher_key, pusher_options) @channels = [] @callbacks = [] end
def listen
def listen @channels = default_channels if @channels.empty? @channels.map! { |c| c.start_with?('private-') ? c : "private-#{c}" } if session.private_channels? @channels.uniq.each { |c| socket.subscribe(c) } @callbacks.each { |e,b| socket.bind(e) { |d| dispatch(e, d, &b) } } socket.connect end
def on(*events, &block)
def on(*events, &block) events = events.flat_map { |e| e.respond_to?(:to_str) ? e.to_str : EVENTS.grep(e) }.uniq events.each { |e| @callbacks << [e, block] } end
def on_connect
def on_connect socket.bind('pusher:connection_established') { yield } end
def pusher_key
def pusher_key session.config.fetch('pusher').fetch('key') rescue IndexError raise Travis::Client::Error, "#{session.api_endpoint} is missing pusher key" end
def pusher_options
def pusher_options pusher_options = session.config['pusher'] || {} encrypted = pusher_options['scheme'] != 'http' options = { :encrypted => encrypted, :session => session } options[:ws_host] = pusher_options['host'] if pusher_options['host'] options[:wss_port] = pusher_options['port'] if encrypted and pusher_options['port'] options[:ws_port] = pusher_options['port'] if !encrypted and pusher_options['port'] options[:ws_path] = pusher_options['path'] if pusher_options['path'] options[:ws_path] = '/' << options[:ws_path] unless options[:ws_path].nil? or options[:ws_path].start_with? '/' options[:ssl_verify] = session.ssl.fetch(:verify, true) options end
def subscribe(*entities)
def subscribe(*entities) entities = entities.map do |entity| entity = entity.pusher_entity while entity.respond_to? :pusher_entity @channels.concat(entity.pusher_channels) entity end yield entities.any? ? EntityListener.new(self, entities) : self if block_given? end