class Restforce::Concerns::Streaming::ReplayExtension

def incoming(message, callback)

def incoming(message, callback)
  callback.call(message).tap do
    channel = message.fetch('channel')
    replay_id = message.fetch('data', {}).fetch('event', {})['replayId']
    handler = @replay_handlers[channel]
    if !replay_id.nil? && !handler.nil? && handler.respond_to?(:[]=)
      # remember the last replay_id for this channel
      handler[channel] = replay_id
    end
  end
end

def initialize(replay_handlers)

def initialize(replay_handlers)
  @replay_handlers = replay_handlers
end

def outgoing(message, callback)

def outgoing(message, callback)
  # Leave non-subscribe messages alone
  return callback.call(message) unless message['channel'] == '/meta/subscribe'
  channel = message['subscription']
  # Set the replay value for the channel
  message['ext'] ||= {}
  message['ext']['replay'] = {
    channel => replay_id(channel)
  }
  # Carry on and send the message to the server
  callback.call message
end

def replay_id(channel)

def replay_id(channel)
  handler = @replay_handlers[channel]
  if handler.is_a?(Integer)
    handler # treat it as a scalar
  elsif handler.respond_to?(:[])
    # Ask for the latest replayId for this channel
    handler[channel]
  else
    # Just pass it along
    handler
  end
end