class Travis::Client::Listener
def default_channels
def default_channels return ['common'] if session.access_token.nil? session.user.channels end
def disconnect
def disconnect socket.disconnect end
def dispatch(type, json)
def dispatch(type, json) payload = JSON.parse(json) entities = session.load format_payload(type, payload) yield Event.new(type, entities['repository'], entities['build'], entities['job'], payload) end
def format_job(payload)
def format_job(payload) build = { 'id' => payload['build_id'], 'repository_id' => payload['repository_id'] } repo = { 'id' => payload['repository_id'], 'slug' => payload['repository_slug'] } build['number'] = payload['number'][/^[^.]+/] if payload['number'] { 'job' => payload, 'build' => build, 'repository' => repo } end
def format_log(payload)
def format_log(payload) job = session.job(payload['id']) { 'job' => { 'id' => job.id }, 'build' => { 'id' => job.build.id }, 'repository' => { 'id' => job.repository.id } } end
def format_payload(type, payload)
def format_payload(type, payload) case type when 'job:log' then format_log(payload) when /job:/ then format_job(payload) else payload end end
def initialize(session)
def initialize(session) @session = session @socket = Socket.new(pusher_key, pusher_options) @channels = [] @callbacks = [] end
def listen
def listen @channels = default_channels if @channels.empty? @channels.map! { |c| c.start_with?('private-') ? c : "private-#{c}" } if session.private_channels? @channels.uniq.each { |c| socket.subscribe(c) } @callbacks.each { |e, b| socket.bind(e) { |d| dispatch(e, d, &b) } } socket.connect end
def on(*events, &block)
def on(*events, &block) events = events.flat_map { |e| e.respond_to?(:to_str) ? e.to_str : EVENTS.grep(e) }.uniq events.each { |e| @callbacks << [e, block] } end
def on_connect(&block)
def on_connect(&block) socket.bind('pusher:connection_established', &block) end
def pusher_key
def pusher_key session.config.fetch('pusher').fetch('key') rescue IndexError raise Travis::Client::Error, "#{session.api_endpoint} is missing pusher key" end
def pusher_options
def pusher_options pusher_options = session.config['pusher'] || {} encrypted = pusher_options['scheme'] != 'http' options = { encrypted:, session: } options[:ws_host] = pusher_options['host'] if pusher_options['host'] options[:wss_port] = pusher_options['port'] if encrypted && pusher_options['port'] options[:ws_port] = pusher_options['port'] if !encrypted && pusher_options['port'] options[:ws_path] = pusher_options['path'] if pusher_options['path'] unless options[:ws_path].nil? || options[:ws_path].start_with?('/') options[:ws_path] = '/' << options[:ws_path] end options[:ssl_verify] = session.ssl.fetch(:verify, true) options end
def subscribe(*entities)
def subscribe(*entities) entities = entities.map do |entity| entity = entity.pusher_entity while entity.respond_to? :pusher_entity @channels.concat(entity.pusher_channels) entity end yield entities.any? ? EntityListener.new(self, entities) : self if block_given? end