class ActiveRecord::FutureResult
:nodoc:
def self.wrap(result)
def self.wrap(result) case result when self, Complete result else Complete.new(result) end end
def cancel
def cancel @pending = false @error = Canceled self end
def canceled?
def canceled? @session && !@session.active? end
def exec_query(connection, *args, **kwargs)
def exec_query(connection, *args, **kwargs) connection.raw_exec_query(*args, **kwargs) end
def execute!(connection)
def execute!(connection) execute_query(connection) end
def execute_or_skip
def execute_or_skip return unless pending? @session.synchronize do return unless pending? @pool.with_connection do |connection| return unless @mutex.try_lock begin if pending? @event_buffer = EventBuffer.new(self, @instrumenter) ActiveSupport::IsolatedExecutionState[:active_record_instrumenter] = @event_buffer execute_query(connection, async: true) end ensure @mutex.unlock end end end end
def execute_or_wait
def execute_or_wait if pending? start = Process.clock_gettime(Process::CLOCK_MONOTONIC, :float_millisecond) @mutex.synchronize do if pending? @pool.with_connection do |connection| execute_query(connection) end else @lock_wait = (Process.clock_gettime(Process::CLOCK_MONOTONIC, :float_millisecond) - start) end end else @lock_wait = 0.0 end end
def execute_query(connection, async: false)
def execute_query(connection, async: false) @result = exec_query(connection, *@args, **@kwargs, async: async) rescue => error @error = error ensure @pending = false end
def initialize(pool, *args, **kwargs)
def initialize(pool, *args, **kwargs) @mutex = Mutex.new @session = nil @pool = pool @args = args @kwargs = kwargs @pending = true @error = nil @result = nil @instrumenter = ActiveSupport::Notifications.instrumenter @event_buffer = nil end
def pending?
def pending? @pending && (!@session || @session.active?) end
def result
def result execute_or_wait @event_buffer&.flush if canceled? raise Canceled elsif @error raise @error else @result end end
def schedule!(session)
def schedule!(session) @session = session @pool.schedule_query(self) end
def then(&block)
def then(&block) Promise.new(self, block) end