class StatelyDB::CoreClient

client for your schema.
This client shouldn’t be used directly in most cases. Instead, use the generated
CoreClient is a low level client for interacting with the Stately Cloud API.

def self.make_endpoint(endpoint: nil, region: nil)

Returns:
  • (String) - the constructed endpoint

Parameters:
  • region (String) -- the region to connect to
  • endpoint (String) -- the endpoint to connect to
def self.make_endpoint(endpoint: nil, region: nil)
  return endpoint unless endpoint.nil?
  return "https://api.stately.cloud" if region.nil?
  region = region.sub("aws-", "") if region.start_with?("aws-")
  "https://#{region}.aws.api.stately.cloud"
end

def begin_list(prefix,

Returns:
  • (Array, StatelyDB::Token) - the list of Items and the token

Parameters:
  • sort_direction (Symbol) -- the direction to sort by (:ascending or :descending)
  • sort_property (String) -- the property to sort by
  • limit (Integer) -- the maximum number of items to return
  • prefix (String) -- the prefix to list
def begin_list(prefix,
               limit: 100,
               sort_property: nil,
               sort_direction: :ascending)
  sort_direction = sort_direction == :ascending ? 0 : 1
  req = Stately::Db::BeginListRequest.new(
    store_id: @store_id,
    key_path_prefix: String(prefix),
    limit:,
    sort_property:,
    sort_direction:,
    allow_stale: @allow_stale,
    schema_id: @schema::SCHEMA_ID,
    schema_version_id: @schema::SCHEMA_VERSION_ID
  )
  resp = @stub.begin_list(req)
  process_list_response(resp)
end

def begin_scan(limit: 0,

Returns:
  • (Array, StatelyDB::Token) - the list of Items and the token

Parameters:
  • segment_index (Integer) -- the index of the segment to scan.
  • total_segments (Integer) -- the total number of segments to divide the
  • item_types (Array) -- the item types to filter by. The returned
  • limit (Integer) -- the max number of items to retrieve. If set to 0
def begin_scan(limit: 0,
               item_types: [],
               total_segments: nil,
               segment_index: nil)
  if total_segments.nil? != segment_index.nil?
    raise StatelyDB::Error.new("total_segments and segment_index must both be set or both be nil",
                               code: GRPC::Core::StatusCodes::INVALID_ARGUMENT,
                               stately_code: "InvalidArgument")
  end
  req = Stately::Db::BeginScanRequest.new(
    store_id: @store_id,
    limit:,
    filter_condition: item_types.map do |item_type|
      Stately::Db::FilterCondition.new(item_type: item_type.respond_to?(:name) ? item_type.name.split("::").last : item_type)
    end,
    schema_id: @schema::SCHEMA_ID,
    schema_version_id: @schema::SCHEMA_VERSION_ID
  )
  resp = @stub.begin_scan(req)
  process_list_response(resp)
end

def close

Returns:
  • (void) - nil
def close
  @channel&.close
  @token_provider&.close
end

def continue_list(token)

Returns:
  • (Array, StatelyDB::Token) - the list of Items and the token

Parameters:
  • token (StatelyDB::Token) -- the token to continue from
def continue_list(token)
  req = Stately::Db::ContinueListRequest.new(
    token_data: token.token_data,
    schema_id: @schema::SCHEMA_ID,
    schema_version_id: @schema::SCHEMA_VERSION_ID
  )
  resp = @stub.continue_list(req)
  process_list_response(resp)
end

def continue_scan(token)

Returns:
  • (Array, StatelyDB::Token) - the list of Items and the token

Parameters:
  • token (StatelyDB::Token) -- the token to continue from
def continue_scan(token)
  req = Stately::Db::ContinueScanRequest.new(
    token_data: token.token_data,
    schema_id: @schema::SCHEMA_ID,
    schema_version_id: @schema::SCHEMA_VERSION_ID
  )
  resp = @stub.continue_scan(req)
  process_list_response(resp)
end

def delete(*key_paths)

Returns:
  • (void) - nil

Raises:
  • (StatelyDB::Error) - if the item is not found
  • (StatelyDB::Error) - if the parameters are invalid

Parameters:
  • key_paths (String, Array) -- the paths to the items. Max 50 key paths.
def delete(*key_paths)
  key_paths = Array(key_paths).flatten
  req = Stately::Db::DeleteRequest.new(
    store_id: @store_id,
    schema_id: @schema::SCHEMA_ID,
    schema_version_id: @schema::SCHEMA_VERSION_ID,
    deletes: key_paths.map { |key_path| Stately::Db::DeleteItem.new(key_path: String(key_path)) }
  )
  @stub.delete(req)
  nil
end

def get(key_path)

Raises:
  • (StatelyDB::Error) - if the parameters are invalid or if the item is not found

Returns:
  • (StatelyDB::Item, NilClass) - the Item or nil if not found

Parameters:
  • key_path (String) -- the path to the item
def get(key_path)
  resp = get_batch(key_path)
  # Always return a single Item.
  resp.first
end

def get_batch(*key_paths)

Raises:
  • (StatelyDB::Error) - if the parameters are invalid or if the item is not found

Returns:
  • (Array, NilClass) - the items or nil if not found

Parameters:
  • key_paths (String, Array) -- the paths to the items. Max 100 key paths.
def get_batch(*key_paths)
  key_paths = Array(key_paths).flatten
  req = Stately::Db::GetRequest.new(
    store_id: @store_id,
    schema_id: @schema::SCHEMA_ID,
    schema_version_id: @schema::SCHEMA_VERSION_ID,
    gets:
      key_paths.map { |key_path| Stately::Db::GetItem.new(key_path: String(key_path)) },
    allow_stale: @allow_stale
  )
  resp = @stub.get(req)
  resp.items.map do |result|
    @schema.unmarshal_item(stately_item: result)
  end
end

def initialize(store_id:,

Parameters:
  • no_auth (Boolean) -- Indicates that the client should not attempt to get
  • region (String) -- the region to connect to.
  • endpoint (String) -- the endpoint to connect to.
  • token_provider (StatelyDB::Common::Auth::TokenProvider) -- the token provider to use for authentication.
  • schema (Module) -- the generated Schema module to use for mapping StatelyDB Items.
  • store_id (Integer) -- the StatelyDB to use for all operations with this client.
def initialize(store_id:,
               schema:,
               token_provider: nil,
               endpoint: nil,
               region: nil,
               no_auth: false)
  if store_id.nil?
    raise StatelyDB::Error.new("store_id is required",
                               code: GRPC::Core::StatusCodes::INVALID_ARGUMENT,
                               stately_code: "InvalidArgument")
  end
  if schema.nil?
    raise StatelyDB::Error.new("schema is required",
                               code: GRPC::Core::StatusCodes::INVALID_ARGUMENT,
                               stately_code: "InvalidArgument")
  end
  endpoint = self.class.make_endpoint(endpoint:, region:)
  @channel = Common::Net.new_channel(endpoint:)
  # Make sure to use the correct endpoint for the default token provider
  @token_provider = token_provider || Common::Auth::AuthTokenProvider.new(endpoint:)
  interceptors = [Common::ErrorInterceptor.new]
  interceptors << Common::Auth::Interceptor.new(token_provider: @token_provider) unless no_auth
  @stub = Stately::Db::DatabaseService::Stub.new(nil, nil,
                                                 channel_override: @channel, interceptors:)
  @store_id = store_id.to_i
  @schema = schema
  @allow_stale = false
end

def process_list_response(resp)

Other tags:
    Api: - private

Returns:
  • (Array(Array, StatelyDB::Token)) - the list of Items and the token

Parameters:
  • resp (::Stately::Db::ListResponse) -- the response to process
def process_list_response(resp)
  items = []
  token = nil
  safe_yield_stream(resp) do |r|
    case r.response
    when :result
      r.result.items.map do |result|
        items << @schema.unmarshal_item(stately_item: result)
      end
    when :finished
      raw_token = r.finished.token
      token = StatelyDB::Token.new(token_data: raw_token.token_data,
                                   can_continue: raw_token.can_continue,
                                   can_sync: raw_token.can_sync,
                                   schema_version_id: raw_token.schema_version_id)
    end
  end
  [items, token]
end

def process_sync_response(resp)

Other tags:
    Api: - private

Returns:
  • (StatelyDB::SyncResult) - the result of the sync operation

Parameters:
  • resp (::Stately::Db::SyncResponse) -- the response to process
def process_sync_response(resp)
  changed_items = []
  deleted_item_paths = []
  updated_outside_list_window_paths = []
  token = nil
  is_reset = false
  safe_yield_stream(resp) do |r|
    case r.response
    when :result
      r.result.changed_items.each do |item|
        changed_items << @schema.unmarshal_item(stately_item: item)
      end
      r.result.deleted_items.each do |item|
        deleted_item_paths << item.key_path
      end
      r.result.updated_item_keys_outside_list_window.each do |item|
        updated_outside_list_window_paths << item.key_path
      end
    when :reset
      is_reset = true
    when :finished
      raw_token = r.finished.token
      token = StatelyDB::Token.new(token_data: raw_token.token_data,
                                   can_continue: raw_token.can_continue,
                                   can_sync: raw_token.can_sync,
                                   schema_version_id: raw_token.schema_version_id)
    end
  end
  SyncResult.new(changed_items:, deleted_item_paths:, updated_outside_list_window_paths:, is_reset:, token:)
end

def put(item,

Other tags:
    Example: client.data.put(my_item, must_not_exist: true) -
    Example: client.data.put(my_item) -

Returns:
  • (StatelyDB::Item) - the item that was stored

Parameters:
  • overwrite_metadata_timestamps (Boolean) -- If set to true, the server will
  • must_not_exist (Boolean) -- A condition that indicates this item must
  • item (StatelyDB::Item) -- a StatelyDB Item
def put(item,
        must_not_exist: false,
        overwrite_metadata_timestamps: false)
  resp = put_batch({ item:, must_not_exist:, overwrite_metadata_timestamps: })
  # Always return a single Item.
  resp.first
end

def put_batch(*items)

Returns:
  • (Array) - the items that were stored

Parameters:
  • items (StatelyDB::Item, Array) -- the items to store.
def put_batch(*items)
  puts = Array(items).flatten.map do |input|
    if input.is_a?(Hash)
      item = input[:item]
      Stately::Db::PutItem.new(
        item: item.send("marshal_stately"),
        overwrite_metadata_timestamps: input[:overwrite_metadata_timestamps],
        must_not_exist: input[:must_not_exist]
      )
    else
      Stately::Db::PutItem.new(
        item: input.send("marshal_stately")
      )
    end
  end
  req = Stately::Db::PutRequest.new(
    store_id: @store_id,
    schema_id: @schema::SCHEMA_ID,
    schema_version_id: @schema::SCHEMA_VERSION_ID,
    puts:
  )
  resp = @stub.put(req)
  resp.items.map do |result|
    @schema.unmarshal_item(stately_item: result)
  end
end

def sync_list(token)

Returns:
  • (StatelyDB::SyncResult) - the result of the sync operation

Parameters:
  • token (StatelyDB::Token) -- the token to sync from
def sync_list(token)
  req = Stately::Db::SyncListRequest.new(
    token_data: token.token_data,
    schema_id: @schema::SCHEMA_ID,
    schema_version_id: @schema::SCHEMA_VERSION_ID
  )
  resp = @stub.sync_list(req)
  process_sync_response(resp)
end

def transaction

Raises:
  • (Exception) - if any other exception is raised
  • (StatelyDB::Error) - if the item is not found
  • (StatelyDB::Error) - if the parameters are invalid

Returns:
  • (StatelyDB::Transaction::Transaction::Result) - the result of the transaction
def transaction
  txn = StatelyDB::Transaction::Transaction.new(stub: @stub, store_id: @store_id, schema: @schema)
  txn.begin
  yield txn
  txn.commit
rescue StatelyDB::Error
  raise
# Handle any other exceptions and abort the transaction. We're rescuing Exception here
# because we want to catch all exceptions, including those that don't inherit from StandardError.
rescue Exception => e
  txn.abort
  # All gRPC errors inherit from GRPC::BadStatus. We wrap these in a StatelyDB::Error.
  raise StatelyDB::Error.from(e) if e.is_a? GRPC::BadStatus
  # Calling raise with no parameters re-raises the original exception
  raise
end

def with_allow_stale(allow_stale)

Returns:
  • (self) - a new client with the allow_stale flag set

Parameters:
  • allow_stale (Boolean) -- whether to allow stale results
def with_allow_stale(allow_stale)
  new_client = clone
  new_client.instance_variable_set(:@allow_stale, allow_stale)
  new_client
end