class ElasticAPM::Transport::Connection
rubocop:disable Metrics/ClassLength
@api private
def append(str)
def append(str) bytes = if @config.http_compression @bytes_sent = @wr.tell else @bytes_sent += str.bytesize end debug 'Bytes sent during this request: %d', bytes @wr.puts(str) end
def connect_unless_connected
def connect_unless_connected @mutex.synchronize do return true if @connected debug 'Opening new request' reset! @rd, @wr = ModdedIO.pipe enable_compression! if @config.http_compression? perform_request_in_thread wait_for_connection schedule_closing if @config.api_request_time append(@metadata) true end end
def connected?
def connected? @mutex.synchronize { @connected } end
def enable_compression!
def enable_compression! @wr.binmode @wr = Zlib::GzipWriter.new(@wr) end
def flush
def flush @mutex.synchronize do return unless @connected debug 'Closing request' @wr.close @conn_thread.join 5 if @conn_thread end end
def initialize(config, metadata)
def initialize(config, metadata) @config = config @metadata = metadata.to_json @url = config.server_url + '/intake/v2/events' headers = (@config.http_compression? ? GZIP_HEADERS : HEADERS).dup if (token = config.secret_token) headers['Authorization'] = "Bearer #{token}" end if config.use_ssl? && config.server_ca_cert @ssl_context = OpenSSL::SSL::SSLContext.new @ssl_context.ca_file = config.server_ca_cert end @client = HTTP.headers(headers).persistent(@url) @mutex = Mutex.new end
def perform_request_in_thread
def perform_request_in_thread @conn_thread = Thread.new do begin @connected = true resp = @client.post( @url, body: @rd, ssl_context: @ssl_context ).flush rescue Exception => e @connection_error = e ensure @connected = false end if resp&.status == 202 debug 'APM Server responded with status 202' elsif resp error "APM Server responded with an error:\n%p", resp.body.to_s end resp end end
def reset!
def reset! @bytes_sent = 0 @connected = false @connection_error = nil @close_task = nil end
def schedule_closing
def schedule_closing @close_task = Concurrent::ScheduledTask.execute(@config.api_request_time) do flush end end
def wait_for_connection
def wait_for_connection until @connected if (exception = @connection_error) @wr&.close raise FailedToConnectError, exception end sleep 0.01 end end
def write(str)
def write(str) return if @config.disable_send connect_unless_connected @mutex.synchronize { append(str) } return unless @bytes_sent >= @config.api_request_size flush rescue FailedToConnectError => e error "Couldn't establish connection to APM Server:\n%p", e flush nil end