class ElasticAPM::Transport::Connection

rubocop:disable Metrics/ClassLength
@api private

def append(str)

def append(str)
  bytes =
    if @config.http_compression
      @bytes_sent = @wr.tell
    else
      @bytes_sent += str.bytesize
    end
  debug 'Bytes sent during this request: %d', bytes
  @wr.puts(str)
end

def connect_unless_connected

rubocop:disable Metrics/MethodLength
def connect_unless_connected
  @mutex.synchronize do
    return true if @connected
    debug 'Opening new request'
    reset!
    @rd, @wr = ModdedIO.pipe
    enable_compression! if @config.http_compression?
    perform_request_in_thread
    wait_for_connection
    schedule_closing if @config.api_request_time
    append(@metadata)
    true
  end
end

def connected?

def connected?
  @mutex.synchronize { @connected }
end

def enable_compression!

def enable_compression!
  @wr.binmode
  @wr = Zlib::GzipWriter.new(@wr)
end

def flush

def flush
  @mutex.synchronize do
    return unless @connected
    debug 'Closing request'
    @wr.close
    @conn_thread.join 5 if @conn_thread
  end
end

def initialize(config)

def initialize(config)
  @config = config
  @url = config.server_url + '/intake/v2/events'
  headers =
    (@config.http_compression? ? GZIP_HEADERS : HEADERS).dup
  if (token = config.secret_token)
    headers['Authorization'] = "Bearer #{token}"
  end
  @client = HTTP.headers(headers).persistent(@url)
  @metadata = Metadata.build(config)
  @mutex = Mutex.new
end

def perform_request_in_thread

rubocop:disable Metrics/MethodLength
def perform_request_in_thread
  @conn_thread = Thread.new do
    begin
      @connected = true
      resp = @client.post(@url, body: @rd).flush
    rescue Exception => e
      @connection_error = e
    ensure
      @connected = false
    end
    if resp&.status == 202
      debug 'APM Server responded with status 202'
    elsif resp
      error "APM Server reponded with an error:\n%p", resp.body.to_s
    end
    resp
  end
end

def reset!

def reset!
  @bytes_sent = 0
  @connected = false
  @connection_error = nil
  @close_task = nil
end

def schedule_closing

def schedule_closing
  @close_task =
    Concurrent::ScheduledTask.execute(@config.api_request_time) do
      flush
    end
end

def wait_for_connection

def wait_for_connection
  until @connected
    if (exception = @connection_error)
      @wr&.close
      raise FailedToConnectError, exception
    end
    sleep 0.01
  end
end

def write(str)

def write(str)
  connect_unless_connected
  @mutex.synchronize { append(str) }
  return unless @bytes_sent >= @config.api_request_size
  flush
rescue FailedToConnectError => e
  error "Couldn't establish connection to APM Server:\n%p", e
  flush
  nil
end