class Gladys::Container
def cache(key)
query(:warehouse).all
cache(:random_warehouse) do
Example:
repeating for things like preloading parameters.
Allows for an in-memory cache of the result of a block to avoid
def cache(key) @cache[key] = yield if @cache[key].nil? @cache[key] end
def count_rows(table, args = {})
def count_rows(table, args = {}) log_debug_duration("Calling :count_rows with table: #{table} and #{args.size} args") do query(table) { where(args).count } end end
def create_extension(name)
def create_extension(name) log_debug_duration("Calling :create_extension with name: #{name}") do db.run("CREATE EXTENSION IF NOT EXISTS \"#{name}\"") end end
def create_sequence(name, start: 1, increment: 1)
def create_sequence(name, start: 1, increment: 1) log_debug_duration("Calling :create_sequence with name: #{name}") do db.run("CREATE SEQUENCE IF NOT EXISTS \"#{name}\" START WITH #{start} INCREMENT BY #{increment}") end end
def drop_sequence(name)
def drop_sequence(name) log_debug_duration("Calling :drop_sequence with name: #{name}") do db.run("DROP SEQUENCE IF EXISTS \"#{name}\" CASCADE") end end
def drop_table(table)
def drop_table(table) log_debug_duration("Calling :drop_table with table: #{table}") do db.run("DROP TABLE IF EXISTS \"#{table}\" CASCADE") end end
def drop_tables(tables)
def drop_tables(tables) log_debug_duration("Calling :drop_tables with tables: #{tables}") do tables.each do |table| drop_table(table) end end end
def flush_cache
def flush_cache @cache = Concurrent::Hash.new end
def get_input(name)
def get_input(name) if @script.preload_input?(name) @preloaded_inputs[name].sample else instance_eval(&@script.inputs_block(name)) end end
def handle_metrics(message = nil)
def handle_metrics(message = nil) return_value = nil lat = Benchmark.realtime do return_value = yield rescue Sequel::DatabaseConnectionError, Sequel::DatabaseDisconnectError => e Gladys.report_script_error(@script, e) metrics&.increment_reconnects rescue PG::InFailedSqlTransaction => e Gladys.report_script_error(@script, e) # Don't increment the error count here since we already counted it when # the transaction block failed. rescue => e Gladys.report_script_error(@script, e) metrics&.increment_errors raise e end log_debug "#{message}. Took: #{lat.round(4)} seconds" if message metrics&.increment_latency(lat) return_value end
def initialize(script, db, preloaded_inputs: Concurrent::Hash.new, should_stop: -> { false })
def initialize(script, db, preloaded_inputs: Concurrent::Hash.new, should_stop: -> { false }) @script = script @db = db @preloaded_inputs = preloaded_inputs @should_stop = should_stop @locks = Concurrent::Hash.new @cache = Concurrent::Hash.new @metrics = Metrics.new end
def insert(table, *args)
def insert(table, *args) log_debug_duration("Calling :insert with table: #{table} and #{args.size} args") do query(table) { insert(*args) } end end
def insert_many(table, *args)
def insert_many(table, *args) log_debug_duration("Calling :insert_many with table: #{table} and #{args.size} args") do data = args.first return if data.empty? columns = data.first.keys.map(&:to_sym) rows = data.map { |row| columns.map { |col| row[col] }.join(",") }.join("\n") query do copy_into(table, format: :csv, columns: columns, data: rows) end end end
def load_db(table = nil)
def load_db(table = nil) if table.nil? db else db[table.to_sym] end end
def lock(key)
locked?(:single_threaded_operation)
```ruby
You can use the `locked?` method to check if a lock is currently held.
```
end
# Perform operation that should only be executed by a single thread.
lock(:single_threaded_operation) do
```ruby
even if an error is raised.
block is executed. This is useful for ensuring that the lock is always unlocked
Passing a block to the `lock` method will automatically unlock the lock after the
```
unlock(:single_threaded_operation)
Perform operation that should only be executed by a single thread.
lock(:single_threaded_operation)
```ruby
multiple threads executing the same code at the same time.
same time. This is useful for ensuring that the database is not corrupted by
Locks are used to prevent multiple threads from executing the same code at the
#
def lock(key) Gladys.log_debug "Locking #{key}" @locks[key] ||= Concurrent::Semaphore.new(1) @locks[key].acquire Gladys.log_debug "Locked #{key}" return unless block_given? begin yield if block_given? ensure unlock(key) end end
def locked?(key)
def locked?(key) return false unless @locks[key] !@locks[key].try_acquire(1, nil) ensure @locks[key]&.release end
def log(message)
def log(message) Gladys.log(message) end
def log_debug(message)
def log_debug(message) Gladys.log_debug(message) end
def log_debug_duration(message)
def log_debug_duration(message) log_debug "Starting: #{message}" result = nil lat = Benchmark.realtime do result = yield end result ensure if lat log_debug "Finished: #{message}. Took: #{lat.round(4)} seconds" else log_debug "Finished: #{message}. Took: <unknown>" end end
def options
def options @script.defined_options end
def query(table = nil, &block)
def query(table = nil, &block) message = if table.nil? "Calling :query" else "Calling :query with table: #{table}" end # Do not remove the handle_metrics call. It is VERY important. handle_metrics(message) do results = if block.nil? load_db(table) else load_db(table).instance_eval(&block) end metrics&.increment_qps results end end
def stopping?
def stopping? @should_stop&.call == true end
def transaction(&block)
def transaction(&block) log_debug_duration("Calling :transaction") do query { transaction(&block) }.tap do metrics&.increment_tps end end rescue Sequel::Rollback => e log_debug "Rollback: #{e.message}" log_debug "Backtrace: #{e.backtrace.join("\n")}" # Do nothing. Just log the rollback. end
def unlock(key)
def unlock(key) Gladys.log_debug "Unlocking #{key}" @locks[key].release Gladys.log_debug "Unlocked #{key}" end
def upsert(table, params)
def upsert(table, params) log_debug_duration("Calling :upsert with table: #{table}") do query(table) do insert_conflict.insert(params) end end end
def upsert_many(table, *args)
def upsert_many(table, *args) log_debug_duration("Calling :upsert_many with table: #{table} and #{args.size} args") do query(table) do insert_conflict.multi_insert(*args) end end end