class WaterDrop::Clients::Buffered

Client used to buffer messages that we send out in specs and other places.

def initialize(*args)

Parameters:
  • args (Object) -- anything accepted by `Clients::Dummy`
def initialize(*args)
  super
  @messages = []
  @topics = Hash.new { |k, v| k[v] = [] }
  @transaction_mutex = Mutex.new
  @transaction_active = false
  @transaction_messages = []
  @transaction_topics = Hash.new { |k, v| k[v] = [] }
  @transaction_level = 0
end

def messages_for(topic)

Parameters:
  • topic (String) --
def messages_for(topic)
  @topics[topic]
end

def produce(message)

Parameters:
  • message (Hash) -- `WaterDrop::Producer#produce_sync` message hash
def produce(message)
  if @transaction_active
    @transaction_topics[message.fetch(:topic)] << message
    @transaction_messages << message
  else
    # We pre-validate the message payload, so topic is ensured to be present
    @topics[message.fetch(:topic)] << message
    @messages << message
  end
  SyncResponse.new
end

def reset

Used in between specs so messages do not leak out
Clears internal buffer
def reset
  @messages.clear
  @topics.each_value(&:clear)
end

def transaction

Moves messages the appropriate buffers only if transaction is successful
Supports our aborting transaction flow
Yields the code pretending it is in a transaction
def transaction
  @transaction_level += 1
  return yield if @transaction_mutex.owned?
  @transaction_mutex.lock
  @transaction_active = true
  result = nil
  commit = false
  catch(:abort) do
    result = yield
    commit = true
  end
  commit || raise(WaterDrop::Errors::AbortTransaction)
  # Transfer transactional data on success
  @transaction_topics.each do |topic, messages|
    @topics[topic] += messages
  end
  @messages += @transaction_messages
  result
rescue StandardError => e
  return if e.is_a?(WaterDrop::Errors::AbortTransaction)
  raise
ensure
  @transaction_level -= 1
  if @transaction_level.zero? && @transaction_mutex.owned?
    @transaction_topics.clear
    @transaction_messages.clear
    @transaction_active = false
    @transaction_mutex.unlock
  end
end