class FDB::Future
def self.finalize(ptr)
def self.finalize(ptr) proc do #puts "Destroying future #{ptr}" FDBC.fdb_future_destroy(ptr) end end
def self.wait_for_any(*futures)
def self.wait_for_any(*futures) if futures.empty? raise ArgumentError, "wait_for_any requires at least one future" end mx = Mutex.new cv = ConditionVariable.new ready_idx = -1 futures.each_with_index do |f, i| f.on_ready do |f| mx.synchronize { if ready_idx < 0 ready_idx = i cv.signal end } end end mx.synchronize { if ready_idx < 0 cv.wait mx end } ready_idx end
def block_until_ready
def block_until_ready FDBC.check_error FDBC.fdb_future_block_until_ready(@fpointer) end
def callback_wrapper(f, &block)
def callback_wrapper(f, &block) begin yield f rescue Exception begin $stderr.puts "Discarding uncaught exception from user callback:" $stderr.puts "#{$@.first}: #{$!.message} (#{$!.class})", $@.drop(1).map{|s| "\t#{s}"} rescue Exception end end end
def cancel
def cancel FDBC.fdb_future_cancel(@fpointer) end
def initialize(fpointer)
def initialize(fpointer) @fpointer = fpointer ObjectSpace.define_finalizer(self, FDB::Future.finalize(@fpointer)) end
def on_ready(&block)
def on_ready(&block) def callback_wrapper(f, &block) begin yield f rescue Exception begin $stderr.puts "Discarding uncaught exception from user callback:" $stderr.puts "#{$@.first}: #{$!.message} (#{$!.class})", $@.drop(1).map{|s| "\t#{s}"} rescue Exception end end end entry = CallbackEntry.new FDB.cb_mutex.synchronize { pos = FDB.ffi_callbacks.length entry.index = pos FDB.ffi_callbacks << entry } entry.callback = FFI::Function.new(:void, [:pointer, :pointer]) do |ign1, ign2| FDB.cb_mutex.synchronize { FDB.ffi_callbacks[-1].index = entry.index FDB.ffi_callbacks[entry.index] = FDB.ffi_callbacks[-1] FDB.ffi_callbacks.pop } callback_wrapper(self, &block) end FDBC.fdb_future_set_callback(@fpointer, entry.callback, nil) end
def ready?
def ready? return !FDBC.fdb_future_is_ready(@fpointer).zero? end
def release_memory
def release_memory FDBC.fdb_future_release_memory(@fpointer) end