class AWS::DynamoDB::BatchWrite

def convert_delete_item item

def convert_delete_item item
  key = {}
  key[:hash_key_element] = str2sym(item['HashKeyElement'])
  key[:range_key_element] = str2sym(item['RangeKeyElement']) if 
    item['RangeKeyElement']
  { :delete_request => { :key => key}}
end

def convert_put_item item

def convert_put_item item
  
  attributes = {}
  item.each_pair do |name,value|
    attributes[name] = str2sym(value)
  end
  
  { :put_request => { :item => attributes }} 
end

def convert_unprocessed_items items

def convert_unprocessed_items items
  return nil if items.empty?
  request_items = {}
  items.each_pair do |table,requests|
    request_items[table] ||= []
    requests.each do |request|
      item = request.values.first
      request_items[table] << 
        case request.keys.first
        when 'PutRequest'    then convert_put_item(item['Item'])
        when 'DeleteRequest' then convert_delete_item(item['Key'])
        end
    end
  end
  request_items
end

def delete table, items

Returns:
  • (nil) -

Parameters:
  • items (Array, Array) -- A list of item keys to
  • table (Table, String) -- A {Table} object or table name string.
def delete table, items
  write(table, :delete => items)
  nil
end

def format_delete keys

def format_delete keys
  keys = [keys] unless keys.is_a?(Array)
  item = {}
  item[:hash_key_element] = format_attribute_value(keys.first)
  item[:range_key_element] = format_attribute_value(keys.last) if
    keys.count > 1
  item
end

def format_put attributes

def format_put attributes
  attributes.inject({}) do |hash, (key, value)|
    context = "value for attribute #{key}"
    hash.merge(key.to_s => format_attribute_value(value, context))
  end
end

def initialize options = {}

def initialize options = {}
  super(options)
  @request_items = {}
end

def process!

Returns:
  • (nil) -
def process!
  return if @request_items.empty?
  opts = { :request_items => @request_items }
  begin
    response = client.batch_write_item(opts)
    unprocessed = response.data['UnprocessedItems']
    opts[:request_items] = convert_unprocessed_items(unprocessed)
  end while opts[:request_items]
  @request_items = {}
  nil
end

def put table, items

Returns:
  • (nil) -

Parameters:
  • items (Array) -- A list of item attributes to put.
  • table (Table, String) -- A {Table} object or table name string.
def put table, items
  write(table, :put => items.flatten)
  nil
end

def str2sym key_desc

def str2sym key_desc
  type = key_desc.keys.first
  value = key_desc[type]
  case type
  when "S"  then { :s  => value }
  when "N"  then { :n  => value }
  when "SS" then { :ss => value }
  when "NS" then { :ns => value }
  else 
    raise "unhandled key type: #{type.inspect}"
  end
end

def table_items table

def table_items table
  @request_items[table_name(table)] ||= []
end

def table_name table

def table_name table
  table.is_a?(Table) ? table.name : table.to_s
end

def write table, options = {}

Options Hash: (**options)
  • :delete (Array, Array) -- A list of item keys
  • :put (Array) -- An array of items to put. Each item

Parameters:
  • options (Hash) --
  • table (Table, String) -- A {Table} object or table name string.
def write table, options = {}
  items = table_items(table) 
  if put = options[:put]
    put.each do |attributes|
      items << { :put_request => { :item => format_put(attributes) }}
    end
  end
  if del = options[:delete]
    del.each do |keys|
      items << { :delete_request => { :key => format_delete(keys) }}
    end
  end
end