require "forwardable"
require "net/http"
require "honeybadger/logging"
module Honeybadger
# A concurrent queue to notify the backend.
# @api private
class Worker
extend Forwardable
include Honeybadger::Logging::Helper
# Sub-class thread so we have a named thread (useful for debugging in Thread.list).
class Thread < ::Thread; end
# Used to signal the worker to shutdown.
SHUTDOWN = :__hb_worker_shutdown!
# The base number for the exponential backoff formula when calculating the
# throttle interval. `1.05 ** throttle` will reach an interval of 2 minutes
# after around 100 429 responses from the server.
BASE_THROTTLE = 1.05
def initialize(config)
@config = config
@throttle = 0
@throttle_interval = 0
@mutex = Mutex.new
@marker = ConditionVariable.new
@queue = Queue.new
@shutdown = false
@start_at = nil
@pid = Process.pid
end
def push(msg)
return false unless start
if queue.size >= config.max_queue_size
warn { sprintf("Unable to report error; reached max queue size of %s. id=%s", queue.size, msg.id) }
return false
end
queue.push(msg)
end
def send_now(msg)
handle_response(msg, notify_backend(msg))
end
def shutdown(force = false)
d { "shutting down worker" }
mutex.synchronize do
@shutdown = true
end
return true if force
return true unless thread&.alive?
if throttled?
warn { sprintf("Unable to report %s error(s) to Honeybadger (currently throttled)", queue.size) } unless queue.empty?
return true
end
info { sprintf("Waiting to report %s error(s) to Honeybadger", queue.size) } unless queue.empty?
queue.push(SHUTDOWN)
!!thread.join
ensure
queue.clear
kill!
end
# Blocks until queue is processed up to this point in time.
def flush
mutex.synchronize do
if thread&.alive?
queue.push(marker)
marker.wait(mutex)
end
end
end
def start
return false unless can_start?
mutex.synchronize do
@shutdown = false
@start_at = nil
return true if thread&.alive?
@pid = Process.pid
@thread = Thread.new { run }
end
true
end
private
attr_reader :config, :queue, :pid, :mutex, :marker, :thread, :throttle,
:throttle_interval, :start_at
def_delegator :config, :backend
def shutdown?
mutex.synchronize { @shutdown }
end
def suspended?
mutex.synchronize { start_at && Time.now.to_i < start_at }
end
def can_start?
return false if shutdown?
return false if suspended?
true
end
def throttled?
mutex.synchronize { throttle > 0 }
end
def kill!
d { "killing worker thread" }
if thread
Thread.kill(thread)
thread.join # Allow ensure blocks to execute.
end
true
end
def suspend(interval)
mutex.synchronize do
@start_at = Time.now.to_i + interval
queue.clear
end
# Must be performed last since this may kill the current thread.
kill!
end
def run
begin
d { "worker started" }
loop do
case msg = queue.pop
when SHUTDOWN then break
when ConditionVariable then signal_marker(msg)
else work(msg)
end
end
ensure
d { "stopping worker" }
end
rescue => e
error {
msg = "Error in worker thread (shutting down) class=%s message=%s\n\t%s"
sprintf(msg, e.class, e.message.dump, Array(e.backtrace).join("\n\t"))
}
ensure
release_marker
end
def work(msg)
send_now(msg)
if shutdown? && throttled?
warn { sprintf("Unable to report %s error(s) to Honeybadger (currently throttled)", queue.size) } if queue.size > 1
kill!
return
end
sleep(throttle_interval)
rescue => e
error {
msg = "Error in worker thread class=%s message=%s\n\t%s"
sprintf(msg, e.class, e.message.dump, Array(e.backtrace).join("\n\t"))
}
end
def notify_backend(payload)
d { sprintf("worker notifying backend id=%s", payload.id) }
backend.notify(:notices, payload)
end
def calc_throttle_interval
((BASE_THROTTLE**throttle) - 1).round(3)
end
def inc_throttle
mutex.synchronize do
@throttle += 1
@throttle_interval = calc_throttle_interval
throttle
end
end
def dec_throttle
mutex.synchronize do
return nil if throttle == 0
@throttle -= 1
@throttle_interval = calc_throttle_interval
throttle
end
end
def handle_response(msg, response)
d { sprintf("worker response id=%s code=%s message=%s", msg.id, response.code, response.message.to_s.dump) }
case response.code
when 429, 503
throttle = inc_throttle
warn { sprintf("Error report failed: project is sending too many errors. id=%s code=%s throttle=%s interval=%s", msg.id, response.code, throttle, throttle_interval) }
when 402
warn { sprintf("Error report failed: payment is required. id=%s code=%s", msg.id, response.code) }
suspend(3600)
when 403
warn { sprintf("Error report failed: API key is invalid. id=%s code=%s", msg.id, response.code) }
suspend(3600)
when 413
warn { sprintf("Error report failed: Payload is too large. id=%s code=%s", msg.id, response.code) }
when 201
host = config.get(:"connection.ui_host")
if (throttle = dec_throttle)
info { sprintf("Success ⚡ https://%s/notice/%s id=%s code=%s throttle=%s interval=%s", host, msg.id, msg.id, response.code, throttle, throttle_interval) }
else
info { sprintf("Success ⚡ https://%s/notice/%s id=%s code=%s", host, msg.id, msg.id, response.code) }
end
when :stubbed
info { sprintf("Success ⚡ Development mode is enabled; this error will be reported if it occurs after you deploy your app. id=%s", msg.id) }
when :error
warn { sprintf("Error report failed: an unknown error occurred. code=%s error=%s", response.code, response.message.to_s.dump) }
else
warn { sprintf("Error report failed: unknown response from server. code=%s", response.code) }
end
end
# Release the marker. Important to perform during cleanup when shutting
# down, otherwise it could end up waiting indefinitely.
def release_marker
signal_marker(marker)
end
def signal_marker(marker)
if mutex.owned?
marker.signal
else
mutex.synchronize { marker.signal }
end
end
end
end