class Restforce::Concerns::Streaming::ReplayExtension
def incoming(message, callback)
def incoming(message, callback) callback.call(message).tap do channel = message.fetch('channel') replay_id = message.fetch('data', {}).fetch('event', {})['replayId'] handler = @replay_handlers[channel] if !replay_id.nil? && !handler.nil? && handler.respond_to?(:[]=) # remember the last replay_id for this channel handler[channel] = replay_id end end end
def initialize(replay_handlers)
def initialize(replay_handlers) @replay_handlers = replay_handlers end
def outgoing(message, callback)
def outgoing(message, callback) # Leave non-subscribe messages alone return callback.call(message) unless message['channel'] == '/meta/subscribe' channel = message['subscription'] # Set the replay value for the channel message['ext'] ||= {} message['ext']['replay'] = { channel => replay_id(channel) } # Carry on and send the message to the server callback.call message end
def replay_id(channel)
def replay_id(channel) handler = @replay_handlers[channel] if handler.is_a?(Integer) handler # treat it as a scalar elsif handler.respond_to?(:[]) # Ask for the latest replayId for this channel handler[channel] else # Just pass it along handler end end