class FDB::TransactionRead
def self.finalize(ptr)
def self.finalize(ptr) proc do #puts "Destroying transaction #{ptr}" FDBC.fdb_transaction_destroy(ptr) end end
def each
def each bsel = @bsel esel = @esel limit = @limit iteration = 1 # the first read was fired off when the RangeEnum was initialized future = @future done = false while !done if future kvs, count, more = future.wait index = 0 future = nil return if count.zero? end result = kvs[index] index += 1 if index == count if more.zero? || limit == count done = true else iteration += 1 if limit.nonzero? limit -= count end if @reverse.nonzero? esel = KeySelector.first_greater_or_equal(kvs.last.key) else bsel = KeySelector.first_greater_than(kvs.last.key) end future = @get_range.call(bsel, esel, limit, @mode, iteration, @reverse) end end yield result end end
def get(key)
def get(key) key = FDB.key_to_bytes(key) Value.new(FDBC.fdb_transaction_get(@tpointer, key, key.bytesize, @is_snapshot)) end
def get_key(keysel)
def get_key(keysel) key = FDB.key_to_bytes(keysel.key) Key.new(FDBC.fdb_transaction_get_key(@tpointer, key, key.bytesize, keysel.or_equal, keysel.offset, @is_snapshot)) end
def get_range(bkeysel, ekeysel, options={}, &block)
def get_range(bkeysel, ekeysel, options={}, &block) defaults = { :limit => 0, :reverse => false, :streaming_mode => :iterator } options = defaults.merge options bsel = to_selector bkeysel esel = to_selector ekeysel if options[:streaming_mode].kind_of? Symbol streaming_mode = @@StreamingMode[options[:streaming_mode].to_s.upcase] raise ArgumentError, "#{options[:streaming_mode]} is not a valid streaming mode" if !streaming_mode streaming_mode = streaming_mode[0] else streaming_mode = options[:streaming_mode] end r = @@RangeEnum.new(lambda {|bsel, esel, limit, streaming_mode, iteration, reverse| begin_key = FDB.key_to_bytes(bsel.key) end_key = FDB.key_to_bytes(esel.key) FDB::FutureKeyValueArray.new(FDBC.fdb_transaction_get_range(@tpointer, begin_key, begin_key.bytesize, bsel.or_equal, bsel.offset, end_key, end_key.bytesize, esel.or_equal, esel.offset, limit, 0, streaming_mode, iteration, @is_snapshot, reverse)) }, bsel, esel, options[:limit], options[:reverse] ? 1 : 0, streaming_mode) if !block_given? r else r.each &block end end
def get_range_start_with(prefix, options={}, &block)
def get_range_start_with(prefix, options={}, &block) prefix = FDB.key_to_bytes(prefix) prefix = prefix.dup.force_encoding "BINARY" get_range(prefix, FDB.strinc(prefix), options, &block) end
def get_read_version
def get_read_version Version.new(FDBC.fdb_transaction_get_read_version @tpointer) end
def initialize(tpointer, db, is_snapshot)
def initialize(tpointer, db, is_snapshot) @tpointer = tpointer @db = db @is_snapshot = is_snapshot ObjectSpace.define_finalizer(self, self.class.finalize(@tpointer)) end
def initialize(get_range, bsel, esel, limit, reverse, streaming_mode)
def initialize(get_range, bsel, esel, limit, reverse, streaming_mode) @get_range = get_range @bsel = bsel @esel = esel @limit = limit @reverse = reverse @mode = streaming_mode @future = @get_range.call(@bsel, @esel, @limit, @mode, 1, @reverse) end
def to_a
def to_a o = self.dup o.instance_eval do if @mode == @@StreamingMode["ITERATOR"][0] if @limit.zero? @mode = @@StreamingMode["WANT_ALL"][0] else @mode = @@StreamingMode["EXACT"][0] end end end Enumerable.instance_method(:to_a).bind(o).call end
def to_selector(key_or_selector)
def to_selector(key_or_selector) if key_or_selector.kind_of? KeySelector key_or_selector else KeySelector.first_greater_or_equal key_or_selector end end
def transact
def transact yield self end