lib/posthog/send_worker.rb
# frozen_string_literal: true require 'posthog/defaults' require 'posthog/message_batch' require 'posthog/transport' require 'posthog/utils' module PostHog class SendWorker include PostHog::Utils include PostHog::Defaults include PostHog::Logging # public: Creates a new worker # # The worker continuously takes messages off the queue # and makes requests to the posthog.com api # # queue - Queue synchronized between client and worker # api_key - String of the project's API key # options - Hash of worker options # batch_size - Fixnum of how many items to send in a batch # on_error - Proc of what to do on an error # def initialize(queue, api_key, options = {}) symbolize_keys! options @queue = queue @api_key = api_key @on_error = options[:on_error] || proc { |status, error| } batch_size = options[:batch_size] || Defaults::MessageBatch::MAX_SIZE @batch = MessageBatch.new(batch_size) @lock = Mutex.new @transport = Transport.new api_host: options[:host], skip_ssl_verification: options[:skip_ssl_verification] end # public: Continuously runs the loop to check for new events # def run until Thread.current[:should_exit] return if @queue.empty? @lock.synchronize do consume_message_from_queue! until @batch.full? || @queue.empty? end res = @transport.send @api_key, @batch @on_error.call(res.status, res.error) unless res.status == 200 @lock.synchronize { @batch.clear } end ensure @transport.shutdown end # public: Check whether we have outstanding requests. # # TODO: Rename to `requesting?` in future version def is_requesting? # rubocop:disable Naming/PredicateName @lock.synchronize { !@batch.empty? } end private def consume_message_from_queue! @batch << @queue.pop rescue MessageBatch::JSONGenerationError => e @on_error.call(-1, e.to_s) end end end