class Fluent::Plugin::ForwardOutput::Node::SocketCache

def clear

def clear
  @mutex.synchronize do
    @inactive_socks.values.each do |s|
      s.sock.close rescue nil
    end
    @inactive_socks.clear
    @active_socks.values.each do |s|
      s.sock.close rescue nil
    end
    @active_socks.clear
  end
end

def dec_ref(key = Thread.current.object_id)

def dec_ref(key = Thread.current.object_id)
  @mutex.synchronize do
    if @active_socks[key]
      @active_socks[key].ref -= 1
    elsif @inactive_socks[key]
      @inactive_socks[key].ref -= 1
    else
      @log.warn("Not found key for dec_ref: #{key}")
    end
  end
end

def dec_ref_by_value(val)

This method is expected to be called in class which doesn't call #inc_ref
def dec_ref_by_value(val)
  @mutex.synchronize do
    sock = @active_socks.detect { |_, v| v.sock == val }
    if sock
      key = sock.first
      @active_socks[key].ref -= 1
      return
    end
    sock = @inactive_socks.detect { |_, v| v.sock == val }
    if sock
      key = sock.first
      @inactive_socks[key].ref -= 1
      return
    else
      @log.warn("Not found key for dec_ref_by_value: #{key}")
    end
  end
end

def expired?(key = Thread.current.object_id)

This method is thread unsafe
def expired?(key = Thread.current.object_id)
   @active_socks[key].timeout ? @active_socks[key].timeout < Time.now : false
end

def fetch_or(key = Thread.current.object_id)

We expect that `yield` returns a unique object in this class
def fetch_or(key = Thread.current.object_id)
  @mutex.synchronize do
    unless @active_socks[key]
      @active_socks[key] = TimedSocket.new(timeout, yield, 1)
      @log.debug("connect new socket #{@active_socks[key]}")
      return @active_socks[key].sock
    end
    if expired?(key)
      # Do not close this socket here in case of it will be used by other place (e.g. wait for receiving ack)
      @inactive_socks[key] = @active_socks.delete(key)
      @log.debug("connection #{@inactive_socks[key]} is expired. reconnecting...")
      @active_socks[key] = TimedSocket.new(timeout, yield, 0)
    end
    @active_socks[key].ref += 1
    @active_socks[key].sock
  end
end

def initialize(timeout, log)

def initialize(timeout, log)
  @log = log
  @timeout = timeout
  @active_socks = {}
  @inactive_socks = {}
  @mutex = Mutex.new
end

def purge_obsolete_socks

def purge_obsolete_socks
  @mutex.synchronize do
    @inactive_socks.keys.each do |k|
      # 0 means sockets stored in this class received all acks
      if @inactive_socks[k].ref <= 0
        s = @inactive_socks.delete(k)
        s.sock.close  rescue nil
        @log.debug("purged obsolete socket #{s.sock}")
      end
    end
    @active_socks.keys.each do |k|
      if expired?(k) && @active_socks[k].ref <= 0
        @inactive_socks[k] = @active_socks.delete(k)
      end
    end
  end
end

def revoke(key = Thread.current.object_id)

def revoke(key = Thread.current.object_id)
  @mutex.synchronize do
    if @active_socks[key]
      @inactive_socks[key] = @active_socks.delete(key)
      @inactive_socks[key].ref = 0
    end
  end
end

def revoke_by_value(val)

This method is expected to be called in class which doesn't call #fetch_or
def revoke_by_value(val)
  @mutex.synchronize do
    sock = @active_socks.detect { |_, v| v.sock == val }
    if sock
      key = sock.first
      @inactive_socks[key] = @active_socks.delete(key)
      @inactive_socks[key].ref = 0
    else
      @log.debug("Not found for revoke_by_value :#{val}")
    end
  end
end

def timeout

def timeout
  @timeout && Time.now + @timeout
end