class Gladys::Context
def add_preloaded_input(name, value)
def add_preloaded_input(name, value) @preloaded_inputs[name] ||= Concurrent::Array.new @preloaded_inputs[name] << value end
def initialize(script, db)
def initialize(script, db) @db = db @script = script @preloaded_inputs = Concurrent::Hash.new end
def options
def options @script.defined_options end
def preload_inputs
def preload_inputs return unless @script.preload_inputs? Gladys.log("Preloading inputs. This may take a while but will not affect the benchmark...") container = Container.new(@script, @db) container.instance_eval(&@script.helpers_block) promises = Concurrent::Array.new realtime = Benchmark.realtime do @script.defined_inputs.each do |input| next unless @script.preload_input?(input) promises << Concurrent::Promise.execute do @script.defined_options.preload_scale.times do |_i| preloaded = container.instance_eval(&@script.inputs_block(input)) add_preloaded_input(input, preloaded) end end end zipped = Concurrent::Promise.zip(*promises) zipped.wait # Preloading inputs failed. # We can't continue with the benchmark. raise zipped.reason if zipped.rejected? end Gladys.log("Done preloading inputs in #{realtime.round(2)} seconds.") end
def run_action(action, threads:, should_stop: -> { true })
Only runs once by default.
Runs the action until the should_stop block returns true.
def run_action(action, threads:, should_stop: -> { true }) raise Errors::DatabaseSizeRequired, "database_size is required" if options.database_size.nil? threads ||= 1 # Ensure we have at least one thread. Gladys.log "Running #{threads} threads" promises = Concurrent::Array.new container = Container.new(@script, @db, preloaded_inputs: @preloaded_inputs, should_stop: should_stop) container.instance_eval(&@script.helpers_block) container.instance_eval(&@script.before_block(action)) threads.times do promises << Concurrent::Promise.execute do Gladys.log "Running thread #{Thread.current.object_id}" loop do begin container.instance_eval(&@script.action_block(action)) rescue => e Gladys.log "Error in thread #{Thread.current.object_id} on action #{action}: #{e.message}" Gladys.report_error(e) end if container.stopping? Gladys.log "Thread #{Thread.current.object_id} finished" break end end end end zipped = Concurrent::Promise.zip(*promises) zipped.wait raise zipped.reason if zipped.rejected? zipped.then do |_| container.instance_eval(&@script.after_block(action)) end container.metrics end