class WaterDrop::Instrumentation::LoggerListener

as well as we can use it standalone
@note It is a module as we can use it then as a part of the Karafka framework listener
It can be removed/replaced or anything without any harm to the Waterdrop flow
Default listener that hooks up to our instrumentation and uses its events for logging

def debug(event, log_message)

Parameters:
  • log_message (String) -- message we want to publish
  • event (Dry::Events::Event) -- event that happened with the details
def debug(event, log_message)
  @logger.debug("[#{event[:producer_id]}] #{log_message}")
end

def error(event, log_message)

Parameters:
  • log_message (String) -- message we want to publish
  • event (Dry::Events::Event) -- event that happened with the details
def error(event, log_message)
  @logger.error("[#{event[:producer_id]}] #{log_message}")
end

def info(event, log_message)

Parameters:
  • log_message (String) -- message we want to publish
  • event (Dry::Events::Event) -- event that happened with the details
def info(event, log_message)
  @logger.info("[#{event[:producer_id]}] #{log_message} took #{event[:time]} ms")
end

def initialize(logger, log_messages: true)

Parameters:
  • log_messages (Boolean) -- Should we report the messages content (payload and metadata)
  • logger (Object) -- logger we want to use
def initialize(logger, log_messages: true)
  @logger = logger
  @log_messages = log_messages
end

def log_messages?

Returns:
  • (Boolean) - should we report the messages details in the debug mode.
def log_messages?
  @log_messages
end

def on_buffer_flushed_async(event)

Parameters:
  • event (Dry::Events::Event) -- event that happened with the details
def on_buffer_flushed_async(event)
  messages = event[:messages]
  info(event, "Async flushing of #{messages.size} messages from the buffer")
  return unless log_messages?
  debug(event, messages)
end

def on_buffer_flushed_sync(event)

Parameters:
  • event (Dry::Events::Event) -- event that happened with the details
def on_buffer_flushed_sync(event)
  messages = event[:messages]
  info(event, "Sync flushing of #{messages.size} messages from the buffer")
  return unless log_messages?
  debug(event, messages)
end

def on_buffer_purged(event)

Parameters:
  • event (Dry::Events::Event) -- event that happened with the details
def on_buffer_purged(event)
  info(event, 'Successfully purging buffer')
end

def on_error_occurred(event)

Parameters:
  • event (Dry::Events::Event) -- event that happened with the error details
def on_error_occurred(event)
  error = event[:error]
  type = event[:type]
  error(event, "Error occurred: #{error} - #{type}")
end

def on_message_buffered(event)

Parameters:
  • event (Dry::Events::Event) -- event that happened with the details
def on_message_buffered(event)
  message = event[:message]
  info(event, "Buffering of a message to '#{message[:topic]}' topic")
  return unless log_messages?
  debug(event, [message])
end

def on_message_produced_async(event)

Parameters:
  • event (Dry::Events::Event) -- event that happened with the details
def on_message_produced_async(event)
  message = event[:message]
  info(event, "Async producing of a message to '#{message[:topic]}' topic")
  return unless log_messages?
  debug(event, message)
end

def on_message_produced_sync(event)

Parameters:
  • event (Dry::Events::Event) -- event that happened with the details
def on_message_produced_sync(event)
  message = event[:message]
  info(event, "Sync producing of a message to '#{message[:topic]}' topic")
  return unless log_messages?
  debug(event, message)
end

def on_messages_buffered(event)

Parameters:
  • event (Dry::Events::Event) -- event that happened with the details
def on_messages_buffered(event)
  messages = event[:messages]
  info(event, "Buffering of #{messages.size} messages")
  return unless log_messages?
  debug(event, [messages, messages.size])
end

def on_messages_produced_async(event)

Parameters:
  • event (Dry::Events::Event) -- event that happened with the details
def on_messages_produced_async(event)
  messages = event[:messages]
  topics_count = messages.map { |message| "'#{message[:topic]}'" }.uniq.count
  info(event, "Async producing of #{messages.size} messages to #{topics_count} topics")
  return unless log_messages?
  debug(event, messages)
end

def on_messages_produced_sync(event)

Parameters:
  • event (Dry::Events::Event) -- event that happened with the details
def on_messages_produced_sync(event)
  messages = event[:messages]
  topics_count = messages.map { |message| "'#{message[:topic]}'" }.uniq.count
  info(event, "Sync producing of #{messages.size} messages to #{topics_count} topics")
  return unless log_messages?
  debug(event, messages)
end

def on_producer_closed(event)

Parameters:
  • event (Dry::Events::Event) -- event that happened with the details
def on_producer_closed(event)
  info(event, 'Closing producer')
end

def on_transaction_aborted(event)

Parameters:
  • event (Dry::Events::Event) -- event that happened with the details
def on_transaction_aborted(event)
  info(event, 'Aborting transaction')
end

def on_transaction_committed(event)

Parameters:
  • event (Dry::Events::Event) -- event that happened with the details
def on_transaction_committed(event)
  info(event, 'Committing transaction')
end

def on_transaction_finished(event)

Parameters:
  • event (Dry::Events::Event) -- event that happened with the details
def on_transaction_finished(event)
  info(event, 'Processing transaction')
end

def on_transaction_started(event)

Parameters:
  • event (Dry::Events::Event) -- event that happened with the details
def on_transaction_started(event)
  info(event, 'Starting transaction')
end