# frozen_string_literal: true
require "api/db/service_services_pb"
require "common/auth/auth_token_provider"
require "common/auth/interceptor"
require "common/net/conn"
require "common/error_interceptor"
require "grpc"
require "json"
require "net/http"
require "transaction/transaction"
require "transaction/queue"
require "error"
require "key_path"
require "token"
require "uuid"
module StatelyDB
# CoreClient is a low level client for interacting with the Stately Cloud API.
# This client shouldn't be used directly in most cases. Instead, use the generated
# client for your schema.
class CoreClient
# Initialize a new StatelyDB CoreClient
#
# @param store_id [Integer] the StatelyDB to use for all operations with this client.
# @param schema [Module] the generated Schema module to use for mapping StatelyDB Items.
# @param token_provider [StatelyDB::Common::Auth::TokenProvider] the token provider to use for authentication.
# @param endpoint [String] the endpoint to connect to.
# @param region [String] the region to connect to.
# @param no_auth [Boolean] Indicates that the client should not attempt to get
# an auth token. This is used when talking to the Stately BYOC Data Plane on localhost.
def initialize(store_id:,
schema:,
token_provider: nil,
endpoint: nil,
region: nil,
no_auth: false)
if store_id.nil?
raise StatelyDB::Error.new("store_id is required",
code: GRPC::Core::StatusCodes::INVALID_ARGUMENT,
stately_code: "InvalidArgument")
end
if schema.nil?
raise StatelyDB::Error.new("schema is required",
code: GRPC::Core::StatusCodes::INVALID_ARGUMENT,
stately_code: "InvalidArgument")
end
endpoint = self.class.make_endpoint(endpoint:, region:)
@channel = Common::Net.new_channel(endpoint:)
# Make sure to use the correct endpoint for the default token provider
@token_provider = token_provider || Common::Auth::AuthTokenProvider.new(endpoint:)
interceptors = [Common::ErrorInterceptor.new]
interceptors << Common::Auth::Interceptor.new(token_provider: @token_provider) unless no_auth
@stub = Stately::Db::DatabaseService::Stub.new(nil, nil,
channel_override: @channel, interceptors:)
@store_id = store_id.to_i
@schema = schema
@allow_stale = false
end
# @return [void] nil
def close
@channel&.close
@token_provider&.close
end
# Set whether to allow stale results for all operations with this client. This produces a new client
# with the allow_stale flag set.
# @param allow_stale [Boolean] whether to allow stale results
# @return [self] a new client with the allow_stale flag set
# @example
# client.with_allow_stale(true).get("/ItemType-identifier")
def with_allow_stale(allow_stale)
new_client = clone
new_client.instance_variable_set(:@allow_stale, allow_stale)
new_client
end
# Fetch a single Item from a StatelyDB Store at the given key_path.
#
# @param key_path [String] the path to the item
# @return [StatelyDB::Item, NilClass] the Item or nil if not found
# @raise [StatelyDB::Error] if the parameters are invalid or if the item is not found
#
# @example
# client.get("/ItemType-identifier")
def get(key_path)
resp = get_batch(key_path)
# Always return a single Item.
resp.first
end
# Fetch a batch of up to 100 Items from a StatelyDB Store at the given key_paths.
#
# @param key_paths [String, Array<String>] the paths to the items. Max 100 key paths.
# @return [Array<StatelyDB::Item>, NilClass] the items or nil if not found
# @raise [StatelyDB::Error] if the parameters are invalid or if the item is not found
#
# @example
# client.data.get_batch("/ItemType-identifier", "/ItemType-identifier2")
def get_batch(*key_paths)
key_paths = Array(key_paths).flatten
req = Stately::Db::GetRequest.new(
store_id: @store_id,
schema_id: @schema::SCHEMA_ID,
schema_version_id: @schema::SCHEMA_VERSION_ID,
gets:
key_paths.map { |key_path| Stately::Db::GetItem.new(key_path: String(key_path)) },
allow_stale: @allow_stale
)
resp = @stub.get(req)
resp.items.map do |result|
@schema.unmarshal_item(stately_item: result)
end
end
# Begin listing Items from a StatelyDB Store at the given prefix.
#
# @param prefix [String] the prefix to list
# @param limit [Integer] the maximum number of items to return
# @param sort_property [String] the property to sort by
# @param sort_direction [Symbol] the direction to sort by (:ascending or :descending)
# @return [Array<StatelyDB::Item>, StatelyDB::Token] the list of Items and the token
#
# @example
# client.data.begin_list("/ItemType-identifier", limit: 10, sort_direction: :ascending)
def begin_list(prefix,
limit: 100,
sort_property: nil,
sort_direction: :ascending)
sort_direction = sort_direction == :ascending ? 0 : 1
req = Stately::Db::BeginListRequest.new(
store_id: @store_id,
key_path_prefix: String(prefix),
limit:,
sort_property:,
sort_direction:,
allow_stale: @allow_stale,
schema_id: @schema::SCHEMA_ID,
schema_version_id: @schema::SCHEMA_VERSION_ID
)
resp = @stub.begin_list(req)
process_list_response(resp)
end
# Continue listing Items from a StatelyDB Store using a token.
#
# @param token [StatelyDB::Token] the token to continue from
# @return [Array<StatelyDB::Item>, StatelyDB::Token] the list of Items and the token
#
# @example
# (items, token) = client.data.begin_list("/ItemType-identifier")
# client.data.continue_list(token)
def continue_list(token)
req = Stately::Db::ContinueListRequest.new(
token_data: token.token_data,
schema_id: @schema::SCHEMA_ID,
schema_version_id: @schema::SCHEMA_VERSION_ID
)
resp = @stub.continue_list(req)
process_list_response(resp)
end
# Initiates a scan request which will scan over the entire store and apply
# the provided filters. This API returns a token that you can pass to
# continue_scan to paginate through the result set. This can fail if the
# caller does not have permission to read Items.
#
# WARNING: THIS API CAN BE EXTREMELY EXPENSIVE FOR STORES WITH A LARGE NUMBER
# OF ITEMS.
#
# @param limit [Integer] the max number of items to retrieve. If set to 0
# then the first page of results will be returned which may empty because it
# does not contain items of your selected item types. Be sure to check
# token.can_continue to see if there are more results to fetch.
# @param item_types [Array<Class, String>] the item types to filter by. The returned
# items will be instances of one of these types.
# @param total_segments [Integer] the total number of segments to divide the
# scan into. Use this when you want to parallelize your operation.
# @param segment_index [Integer] the index of the segment to scan.
# Use this when you want to parallelize your operation.
# @return [Array<StatelyDB::Item>, StatelyDB::Token] the list of Items and the token
#
# @example
# client.data.begin_scan(limit: 10, item_types: [MyItem])
def begin_scan(limit: 0,
item_types: [],
total_segments: nil,
segment_index: nil)
if total_segments.nil? != segment_index.nil?
raise StatelyDB::Error.new("total_segments and segment_index must both be set or both be nil",
code: GRPC::Core::StatusCodes::INVALID_ARGUMENT,
stately_code: "InvalidArgument")
end
req = Stately::Db::BeginScanRequest.new(
store_id: @store_id,
limit:,
filter_condition: item_types.map do |item_type|
Stately::Db::FilterCondition.new(item_type: item_type.respond_to?(:name) ? item_type.name.split("::").last : item_type)
end,
schema_id: @schema::SCHEMA_ID,
schema_version_id: @schema::SCHEMA_VERSION_ID
)
resp = @stub.begin_scan(req)
process_list_response(resp)
end
# continue_scan takes the token from a begin_scan call and returns more results
# based on the original request parameters and pagination options.
#
# WARNING: THIS API CAN BE EXTREMELY EXPENSIVE FOR STORES WITH A LARGE NUMBER OF ITEMS.
#
# @param token [StatelyDB::Token] the token to continue from
# @return [Array<StatelyDB::Item>, StatelyDB::Token] the list of Items and the token
#
# @example
# (items, token) = client.data.begin_scan(limit: 10, item_types: [MyItem])
# client.data.continue_scan(token)
def continue_scan(token)
req = Stately::Db::ContinueScanRequest.new(
token_data: token.token_data,
schema_id: @schema::SCHEMA_ID,
schema_version_id: @schema::SCHEMA_VERSION_ID
)
resp = @stub.continue_scan(req)
process_list_response(resp)
end
# Sync a list of Items from a StatelyDB Store.
#
# @param token [StatelyDB::Token] the token to sync from
# @return [StatelyDB::SyncResult] the result of the sync operation
#
# @example
# (items, token) = client.data.begin_list("/ItemType-identifier")
# client.data.sync_list(token)
def sync_list(token)
req = Stately::Db::SyncListRequest.new(
token_data: token.token_data,
schema_id: @schema::SCHEMA_ID,
schema_version_id: @schema::SCHEMA_VERSION_ID
)
resp = @stub.sync_list(req)
process_sync_response(resp)
end
# Put an Item into a StatelyDB Store at the given key_path.
#
# @param item [StatelyDB::Item] a StatelyDB Item
# @param must_not_exist [Boolean] A condition that indicates this item must
# not already exist at any of its key paths. If there is already an item
# at one of those paths, the Put operation will fail with a
# "ConditionalCheckFailed" error. Note that if the item has an
# `initialValue` field in its key, that initial value will automatically
# be chosen not to conflict with existing items, so this condition only
# applies to key paths that do not contain the `initialValue` field.
# @param overwrite_metadata_timestamps [Boolean] If set to true, the server will
# set the `createdAtTime` and/or `lastModifiedAtTime` fields based on the
# current values in this item (assuming you've mapped them to a field using
# `fromMetadata`). Without this, those fields are always ignored and the
# server sets them to the appropriate times. This option can be useful when
# migrating data from another system.
# @return [StatelyDB::Item] the item that was stored
#
# @example client.data.put(my_item)
# @example client.data.put(my_item, must_not_exist: true)
def put(item,
must_not_exist: false,
overwrite_metadata_timestamps: false)
resp = put_batch({ item:, must_not_exist:, overwrite_metadata_timestamps: })
# Always return a single Item.
resp.first
end
# Put a batch of up to 50 Items into a StatelyDB Store.
#
# @param items [StatelyDB::Item, Array<StatelyDB::Item>] the items to store.
# Max 50 items.
# @return [Array<StatelyDB::Item>] the items that were stored
#
# @example
# client.data.put_batch(item1, item2)
# @example
# client.data.put_batch({ item: item1, must_not_exist: true }, item2)
def put_batch(*items)
puts = Array(items).flatten.map do |input|
if input.is_a?(Hash)
item = input[:item]
Stately::Db::PutItem.new(
item: item.send("marshal_stately"),
overwrite_metadata_timestamps: input[:overwrite_metadata_timestamps],
must_not_exist: input[:must_not_exist]
)
else
Stately::Db::PutItem.new(
item: input.send("marshal_stately")
)
end
end
req = Stately::Db::PutRequest.new(
store_id: @store_id,
schema_id: @schema::SCHEMA_ID,
schema_version_id: @schema::SCHEMA_VERSION_ID,
puts:
)
resp = @stub.put(req)
resp.items.map do |result|
@schema.unmarshal_item(stately_item: result)
end
end
# Delete up to 50 Items from a StatelyDB Store at the given key_paths.
#
# @param key_paths [String, Array<String>] the paths to the items. Max 50 key paths.
# @raise [StatelyDB::Error] if the parameters are invalid
# @raise [StatelyDB::Error] if the item is not found
# @return [void] nil
#
# @example
# client.data.delete("/ItemType-identifier", "/ItemType-identifier2")
def delete(*key_paths)
key_paths = Array(key_paths).flatten
req = Stately::Db::DeleteRequest.new(
store_id: @store_id,
schema_id: @schema::SCHEMA_ID,
schema_version_id: @schema::SCHEMA_VERSION_ID,
deletes: key_paths.map { |key_path| Stately::Db::DeleteItem.new(key_path: String(key_path)) }
)
@stub.delete(req)
nil
end
# Transaction takes a block and executes the block within a transaction.
# If the block raises an exception, the transaction is rolled back.
# If the block completes successfully, the transaction is committed.
#
# @return [StatelyDB::Transaction::Transaction::Result] the result of the transaction
# @raise [StatelyDB::Error] if the parameters are invalid
# @raise [StatelyDB::Error] if the item is not found
# @raise [Exception] if any other exception is raised
#
# @example
# client.data.transaction do |txn|
# txn.put(item: my_item)
# txn.put(item: another_item)
# end
def transaction
txn = StatelyDB::Transaction::Transaction.new(stub: @stub, store_id: @store_id, schema: @schema)
txn.begin
yield txn
txn.commit
rescue StatelyDB::Error
raise
# Handle any other exceptions and abort the transaction. We're rescuing Exception here
# because we want to catch all exceptions, including those that don't inherit from StandardError.
rescue Exception => e
txn.abort
# All gRPC errors inherit from GRPC::BadStatus. We wrap these in a StatelyDB::Error.
raise StatelyDB::Error.from(e) if e.is_a? GRPC::BadStatus
# Calling raise with no parameters re-raises the original exception
raise
end
# Construct the API endpoint from the region and endpoint.
# If the endpoint is provided, it will be returned as-is.
# If the region is provided and the endpoint is not,
# then the region-specific endpoint will be returned.
# If neither the region nor the endpoint is provided,
# then the default endpoint will be returned.
#
# @param endpoint [String] the endpoint to connect to
# @param region [String] the region to connect to
# @return [String] the constructed endpoint
def self.make_endpoint(endpoint: nil, region: nil)
return endpoint unless endpoint.nil?
return "https://api.stately.cloud" if region.nil?
region = region.sub("aws-", "") if region.start_with?("aws-")
"https://#{region}.aws.api.stately.cloud"
end
private
# Process a list response from begin_list or continue_list
#
# @param resp [::Stately::Db::ListResponse] the response to process
# @return [Array(Array<StatelyDB::Item>, StatelyDB::Token)] the list of Items and the token
# @api private
# @!visibility private
def process_list_response(resp)
items = []
token = nil
safe_yield_stream(resp) do |r|
case r.response
when :result
r.result.items.map do |result|
items << @schema.unmarshal_item(stately_item: result)
end
when :finished
raw_token = r.finished.token
token = StatelyDB::Token.new(token_data: raw_token.token_data,
can_continue: raw_token.can_continue,
can_sync: raw_token.can_sync,
schema_version_id: raw_token.schema_version_id)
end
end
[items, token]
end
# Process a sync response from sync_list
#
# @param resp [::Stately::Db::SyncResponse] the response to process
# @return [StatelyDB::SyncResult] the result of the sync operation
# @api private
# @!visibility private
def process_sync_response(resp)
changed_items = []
deleted_item_paths = []
updated_outside_list_window_paths = []
token = nil
is_reset = false
safe_yield_stream(resp) do |r|
case r.response
when :result
r.result.changed_items.each do |item|
changed_items << @schema.unmarshal_item(stately_item: item)
end
r.result.deleted_items.each do |item|
deleted_item_paths << item.key_path
end
r.result.updated_item_keys_outside_list_window.each do |item|
updated_outside_list_window_paths << item.key_path
end
when :reset
is_reset = true
when :finished
raw_token = r.finished.token
token = StatelyDB::Token.new(token_data: raw_token.token_data,
can_continue: raw_token.can_continue,
can_sync: raw_token.can_sync,
schema_version_id: raw_token.schema_version_id)
end
end
SyncResult.new(changed_items:, deleted_item_paths:, updated_outside_list_window_paths:, is_reset:, token:)
end
end
# SyncResult represents the results of a sync operation.
#
# @attr_reader changed_items [Array<StatelyDB::Item>] the items that were changed
# @attr_reader deleted_item_paths [Array<String>] the key paths that were deleted
# @attr_reader updated_outside_list_window_paths [Array<String>] the key paths of
# items that were updated but Stately cannot tell if they were in the sync window.
# Treat these as deleted in most cases.
# @attr_reader is_reset [Boolean] whether the sync operation reset the token
# @attr_reader token [StatelyDB::Token] the token to continue from
class SyncResult
attr_reader :changed_items, :deleted_item_paths, :updated_outside_list_window_paths, :is_reset, :token
# @param changed_items [Array<StatelyDB::Item>] the items that were changed
# @param deleted_item_paths [Array<String>] the key paths that were deleted
# @param updated_outside_list_window_paths [Array<String>] key paths for items that were updated
# but do not currently use the sort property that the list window is based on
# @param is_reset [Boolean] whether the sync operation reset the token
# @param token [StatelyDB::Token] the token to continue from
def initialize(changed_items:, deleted_item_paths:, updated_outside_list_window_paths:, is_reset:, token:)
@changed_items = changed_items
@deleted_item_paths = deleted_item_paths
@updated_outside_list_window_paths = updated_outside_list_window_paths
@is_reset = is_reset
@token = token
end
end
# StatelyDB::Item is a base class for all StatelyDB Items. This class is provided in documentation
# to show the expected interface for a StatelyDB Item, but in practice the SDK will return a subclass
# of this class that is generated from the schema.
class Item < Object
end
end