module Karafka::Instrumentation::Listener
def on_backends_inline_process(event)
-
event
(Dry::Events::Event
) -- event details including payload
def on_backends_inline_process(event) count = event[:caller].send(:params_batch).to_a.size topic = event[:caller].topic.name time = event[:time] info "Inline processing of topic #{topic} with #{count} messages took #{time} ms" end
def on_connection_client_fetch_loop_error(event)
-
event
(Dry::Events::Event
) -- event details including payload
Other tags:
- Note: - Karafka will attempt to reconnect, so an error not a fatal
def on_connection_client_fetch_loop_error(event) error "Client fetch loop error: #{event[:error]}" end
def on_connection_delegator_call(event)
-
event
(Dry::Events::Event
) -- event details including payload
def on_connection_delegator_call(event) consumer = event[:consumer] topic = consumer.topic.name kafka_messages = event[:kafka_messages] info "#{kafka_messages.count} messages on #{topic} topic delegated to #{consumer.class}" end
def on_connection_listener_fetch_loop_error(event)
- Note: - It's an error as we can recover from it not a fatal
Parameters:
-
event
(Dry::Events::Event
) -- event details including payload
def on_connection_listener_fetch_loop_error(event) error "Listener fetch loop error: #{event[:error]}" end
def on_consumers_responders_respond_with(event)
-
event
(Dry::Events::Event
) -- event details including payload
def on_consumers_responders_respond_with(event) calling = event[:caller].class responder = calling.topic.responder data = event[:data] info "Responded from #{calling} using #{responder} with following data #{data}" end
def on_fetcher_call_error(event)
-
event
(Dry::Events::Event
) -- event details including payload
Other tags:
- Note: - If this happens, Karafka will shutdown as it means a critical error
def on_fetcher_call_error(event) fatal "Fetcher crash due to an error: #{event[:error]}" end
def on_params_params_parse(event)
-
event
(Dry::Events::Event
) -- event details including payload
def on_params_params_parse(event) # Keep in mind, that a caller here is a param object not a controller, # so it returns a topic as a string, not a routing topic debug "Params parsing for #{event[:caller].topic} topic successful in #{event[:time]} ms" end
def on_params_params_parse_error(event)
-
event
(Dry::Events::Event
) -- event details including payload
def on_params_params_parse_error(event) error "Params parsing error for #{event[:caller].topic} topic: #{event[:error]}" end
def on_process_notice_signal(event)
-
event
(Dry::Events::Event
) -- event details including payload
def on_process_notice_signal(event) info "Received #{event[:signal]} system signal" end
def on_server_stop(_event)
-
_event
(Dry::Events::Event
) -- event details including payload
def on_server_stop(_event) # We use a separate thread as logging can't be called from trap context Thread.new { info "Stopping Karafka server #{::Process.pid}" } end
def on_server_stop_error(_event)
-
_event
(Dry::Events::Event
) -- event details including payload
def on_server_stop_error(_event) # We use a separate thread as logging can't be called from trap context Thread.new { error "Forceful Karafka server #{::Process.pid} stop" } end