lib/restforce/concerns/streaming.rb
# frozen_string_literal: true module Restforce module Concerns module Streaming # Public: Subscribe to a PushTopic # # topics - The name of the PushTopic channel(s) to subscribe to. # block - A block to run when a new message is received. # # Returns a Faye::Subscription def legacy_subscribe(topics, options = {}, &block) topics = Array(topics).map { |channel| "/topic/#{channel}" } subscription(topics, options, &block) end alias subscribe legacy_subscribe # Public: Subscribe to one or more Streaming API channels # # channels - The name of the Streaming API (cometD) channel(s) to subscribe to. # block - A block to run when a new message is received. # # Returns a Faye::Subscription def subscription(channels, options = {}, &block) one_or_more_channels = Array(channels) one_or_more_channels.each do |channel| replay_handlers[channel] = options[:replay] end faye.subscribe(one_or_more_channels, &block) end # Public: Faye client to use for subscribing to PushTopics def faye unless options[:instance_url] raise 'Instance URL missing. Call .authenticate! first.' end url = "#{options[:instance_url]}/cometd/#{options[:api_version]}" @faye ||= Faye::Client.new(url).tap do |client| client.set_header 'Authorization', "OAuth #{options[:oauth_token]}" client.bind 'transport:down' do Restforce.log "[COMETD DOWN]" client.set_header 'Authorization', "OAuth #{authenticate!.access_token}" end client.bind 'transport:up' do Restforce.log "[COMETD UP]" end client.add_extension ReplayExtension.new(replay_handlers) end end def replay_handlers @_replay_handlers ||= {} end class ReplayExtension def initialize(replay_handlers) @replay_handlers = replay_handlers end def incoming(message, callback) callback.call(message).tap do channel = message.fetch('channel') replay_id = message.fetch('data', {}).fetch('event', {})['replayId'] handler = @replay_handlers[channel] if !replay_id.nil? && !handler.nil? && handler.respond_to?(:[]=) # remember the last replay_id for this channel handler[channel] = replay_id end end end def outgoing(message, callback) # Leave non-subscribe messages alone return callback.call(message) unless message['channel'] == '/meta/subscribe' channel = message['subscription'] # Set the replay value for the channel message['ext'] ||= {} message['ext']['replay'] = { channel => replay_id(channel) } # Carry on and send the message to the server callback.call message end private def replay_id(channel) handler = @replay_handlers[channel] if handler.respond_to?(:[]) && !handler.is_a?(Integer) # Ask for the latest replayId for this channel handler[channel] else # Just pass it along handler end end end end end end