class Gladys::Container

def cache(key)

end.sample
query(:warehouse).all
cache(:random_warehouse) do

Example:

repeating for things like preloading parameters.
Allows for an in-memory cache of the result of a block to avoid
def cache(key)
  @cache[key] = yield if @cache[key].nil?
  @cache[key]
end

def count_rows(table, args = {})

def count_rows(table, args = {})
  log_debug_duration("Calling :count_rows with table: #{table} and #{args.size} args") do
    query(table) { where(args).count }
  end
end

def create_extension(name)

def create_extension(name)
  log_debug_duration("Calling :create_extension with name: #{name}") do
    db.run("CREATE EXTENSION IF NOT EXISTS \"#{name}\"")
  end
end

def create_sequence(name, start: 1, increment: 1)

def create_sequence(name, start: 1, increment: 1)
  log_debug_duration("Calling :create_sequence with name: #{name}") do
    db.run("CREATE SEQUENCE IF NOT EXISTS \"#{name}\" START WITH #{start} INCREMENT BY #{increment}")
  end
end

def drop_sequence(name)

def drop_sequence(name)
  log_debug_duration("Calling :drop_sequence with name: #{name}") do
    db.run("DROP SEQUENCE IF EXISTS \"#{name}\" CASCADE")
  end
end

def drop_table(table)

def drop_table(table)
  log_debug_duration("Calling :drop_table with table: #{table}") do
    db.run("DROP TABLE IF EXISTS \"#{table}\" CASCADE")
  end
end

def drop_tables(tables)

def drop_tables(tables)
  log_debug_duration("Calling :drop_tables with tables: #{tables}") do
    tables.each do |table|
      drop_table(table)
    end
  end
end

def flush_cache

def flush_cache
  @cache = Concurrent::Hash.new
end

def get_input(name)

def get_input(name)
  if @script.preload_input?(name)
    @preloaded_inputs[name].sample
  else
    instance_eval(&@script.inputs_block(name))
  end
end

def handle_metrics(message = nil)

def handle_metrics(message = nil)
  return_value = nil
  lat = Benchmark.realtime do
    return_value = yield
  rescue Sequel::DatabaseConnectionError, Sequel::DatabaseDisconnectError => e
    Gladys.report_script_error(@script, e)
    metrics&.increment_reconnects
  rescue PG::InFailedSqlTransaction => e
    Gladys.report_script_error(@script, e)
    # Don't increment the error count here since we already counted it when
    # the transaction block failed.
  rescue => e
    Gladys.report_script_error(@script, e)
    metrics&.increment_errors
    raise e
  end
  log_debug "#{message}. Took: #{lat.round(4)} seconds" if message
  metrics&.increment_latency(lat)
  return_value
end

def initialize(script, db, preloaded_inputs: Concurrent::Hash.new, should_stop: -> { false })

def initialize(script, db, preloaded_inputs: Concurrent::Hash.new, should_stop: -> { false })
  @script = script
  @db = db
  @preloaded_inputs = preloaded_inputs
  @should_stop = should_stop
  @locks = Concurrent::Hash.new
  @cache = Concurrent::Hash.new
  @metrics = Metrics.new
end

def insert(table, *args)

def insert(table, *args)
  log_debug_duration("Calling :insert with table: #{table} and #{args.size} args") do
    query(table) { insert(*args) }
  end
end

def insert_many(table, *args)

Use copy_into for large datasets (better performance)
def insert_many(table, *args)
  log_debug_duration("Calling :insert_many with table: #{table} and #{args.size} args") do
    data = args.first
    return if data.empty?
    columns = data.first.keys.map(&:to_sym)
    rows = data.map { |row| columns.map { |col| row[col] }.join(",") }.join("\n")
    query do
      copy_into(table, format: :csv, columns: columns, data: rows)
    end
  end
end

def load_db(table = nil)

def load_db(table = nil)
  if table.nil?
    db
  else
    db[table.to_sym]
  end
end

def lock(key)

```
locked?(:single_threaded_operation)
```ruby

You can use the `locked?` method to check if a lock is currently held.

```
end
# Perform operation that should only be executed by a single thread.
lock(:single_threaded_operation) do
```ruby

even if an error is raised.
block is executed. This is useful for ensuring that the lock is always unlocked
Passing a block to the `lock` method will automatically unlock the lock after the

```
unlock(:single_threaded_operation)
Perform operation that should only be executed by a single thread.
lock(:single_threaded_operation)
```ruby

multiple threads executing the same code at the same time.
same time. This is useful for ensuring that the database is not corrupted by
Locks are used to prevent multiple threads from executing the same code at the
#
def lock(key)
  Gladys.log_debug "Locking #{key}"
  @locks[key] ||= Concurrent::Semaphore.new(1)
  @locks[key].acquire
  Gladys.log_debug "Locked #{key}"
  return unless block_given?
  begin
    yield if block_given?
  ensure
    unlock(key)
  end
end

def locked?(key)

def locked?(key)
  return false unless @locks[key]
  !@locks[key].try_acquire(1, nil)
ensure
  @locks[key]&.release
end

def log(message)

def log(message)
  Gladys.log(message)
end

def log_debug(message)

def log_debug(message)
  Gladys.log_debug(message)
end

def log_debug_duration(message)

def log_debug_duration(message)
  log_debug "Starting: #{message}"
  result = nil
  lat = Benchmark.realtime do
    result = yield
  end
  result
ensure
  if lat
    log_debug "Finished: #{message}. Took: #{lat.round(4)} seconds"
  else
    log_debug "Finished: #{message}. Took: <unknown>"
  end
end

def options

def options
  @script.defined_options
end

def query(table = nil, &block)

def query(table = nil, &block)
  message = if table.nil?
              "Calling :query"
            else
              "Calling :query with table: #{table}"
            end
  # Do not remove the handle_metrics call. It is VERY important.
  handle_metrics(message) do
    results = if block.nil?
                load_db(table)
              else
                load_db(table).instance_eval(&block)
              end
    metrics&.increment_qps
    results
  end
end

def stopping?

def stopping?
  @should_stop&.call == true
end

def transaction(&block)

def transaction(&block)
  log_debug_duration("Calling :transaction") do
    query { transaction(&block) }.tap do
      metrics&.increment_tps
    end
  end
rescue Sequel::Rollback => e
  log_debug "Rollback: #{e.message}"
  log_debug "Backtrace: #{e.backtrace.join("\n")}"
  # Do nothing. Just log the rollback.
end

def unlock(key)

def unlock(key)
  Gladys.log_debug "Unlocking #{key}"
  @locks[key].release
  Gladys.log_debug "Unlocked #{key}"
end

def upsert(table, params)

def upsert(table, params)
  log_debug_duration("Calling :upsert with table: #{table}") do
    query(table) do
      insert_conflict.insert(params)
    end
  end
end

def upsert_many(table, *args)

def upsert_many(table, *args)
  log_debug_duration("Calling :upsert_many with table: #{table} and #{args.size} args") do
    query(table) do
      insert_conflict.multi_insert(*args)
    end
  end
end