class Fluent::Counter::CleanupThread

def initialize(store, mutex_hash, mutex)

def initialize(store, mutex_hash, mutex)
  @store = store
  @mutex_hash = mutex_hash
  @mutex = mutex
  @thread = nil
  @running = false
end

def run_once

def run_once
  @mutex.synchronize do
    last_cleanup_at = (Time.now - CLEANUP_INTERVAL).to_i
    @mutex_hash.each do |(key, mutex)|
      v = @store.get(key, raw: true)
      next unless v
      next if last_cleanup_at < v['last_modified_at'][0] # v['last_modified_at'] = [sec, nsec]
      next unless mutex.try_lock
      @mutex_hash[key] = nil
      mutex.unlock
      # Check that a waiting thread is in a lock queue.
      # Can't get a lock here means this key is used in other places.
      # So restore a mutex value to a corresponding key.
      if mutex.try_lock
        @mutex_hash.delete(key)
        mutex.unlock
      else
        @mutex_hash[key] = mutex
      end
    end
  end
end

def start

def start
  @running = true
  @thread = Thread.new do
    while @running
      sleep CLEANUP_INTERVAL
      run_once
    end
  end
end

def stop

def stop
  return unless @running
  @running = false
  begin
    # Avoid waiting CLEANUP_INTERVAL
    Timeout.timeout(1) do
      @thread.join
    end
  rescue Timeout::Error
    @thread.kill
  end
end