class WaterDrop::Clients::Buffered
Client used to buffer messages that we send out in specs and other places.
def initialize(*args)
-
args(Object) -- anything accepted by `Clients::Dummy`
def initialize(*args) super @messages = [] @topics = Hash.new { |k, v| k[v] = [] } @transaction_mutex = Mutex.new @transaction_active = false @transaction_messages = [] @transaction_topics = Hash.new { |k, v| k[v] = [] } @transaction_level = 0 end
def messages_for(topic)
-
topic(String) --
def messages_for(topic) @topics[topic] end
def produce(message)
-
message(Hash) -- `WaterDrop::Producer#produce_sync` message hash
def produce(message) if @transaction_active @transaction_topics[message.fetch(:topic)] << message @transaction_messages << message else # We pre-validate the message payload, so topic is ensured to be present @topics[message.fetch(:topic)] << message @messages << message end SyncResponse.new end
def reset
Clears internal buffer
def reset @messages.clear @topics.each_value(&:clear) end
def transaction
Supports our aborting transaction flow
Yields the code pretending it is in a transaction
def transaction @transaction_level += 1 return yield if @transaction_mutex.owned? @transaction_mutex.lock @transaction_active = true result = nil commit = false catch(:abort) do result = yield commit = true end commit || raise(WaterDrop::Errors::AbortTransaction) # Transfer transactional data on success @transaction_topics.each do |topic, messages| @topics[topic] += messages end @messages += @transaction_messages result rescue StandardError => e return if e.is_a?(WaterDrop::Errors::AbortTransaction) raise ensure @transaction_level -= 1 if @transaction_level.zero? && @transaction_mutex.owned? @transaction_topics.clear @transaction_messages.clear @transaction_active = false @transaction_mutex.unlock end end