class Google::Cloud::Bigquery::Table::AsyncInserter


rows. Default is 4.
@attr_reader [Integer] threads The number of threads used to insert
before the batch is inserted. Default is 10.
@attr_reader [Numeric] interval The number of seconds to collect rows
collected before the batch is inserted. Default is 500.
@attr_reader [Integer] max_rows The maximum number of rows to be
(10MB).
collected before the batch is inserted. Default is 10,000,000
@attr_reader [Integer] max_bytes The maximum size of rows to be
inserter.stop.wait!
inserter.insert rows
]
{ “first_name” => “Bob”, “age” => 22 }
{ “first_name” => “Alice”, “age” => 21 },
rows = [
end
end
“with #{result.error_count} errors”
log_insert “inserted #{result.insert_count} rows ” <br>else
log_error result.error
if result.error?
inserter = table.insert_async do |result|
table = dataset.table “my_table”
dataset = bigquery.dataset “my_dataset”
bigquery = Google::Cloud::Bigquery.new
require “google/cloud/bigquery”
@example
{Google::Cloud::Bigquery::Table#insert_async}.
Used to insert multiple rows in batches to a topic. See
# AsyncInserter
#

def flush

Returns:
  • (AsyncInserter) - returns self so calls can be chained.
def flush
  synchronize do
    push_batch_request!
    @cond.signal
  end
  self
end

def initialize table, skip_invalid: nil, ignore_unknown: nil, max_bytes: 10_000_000, max_rows: 500,

Other tags:
    Private: -
def initialize table, skip_invalid: nil, ignore_unknown: nil, max_bytes: 10_000_000, max_rows: 500,
               interval: 10, threads: 4, &block
  # init MonitorMixin
  super()
  @table = table
  @skip_invalid = skip_invalid
  @ignore_unknown = ignore_unknown
  @max_bytes = max_bytes
  @max_rows = max_rows
  @interval = interval
  @threads = threads
  @callback = block
  @batch = nil
  @thread_pool = Concurrent::ThreadPoolExecutor.new max_threads: @threads
  @cond = new_cond
end

def insert rows, insert_ids: nil

Parameters:
  • insert_ids (Array, Symbol) -- A unique ID for each row. BigQuery uses this property to
  • rows (Hash, Array) -- A hash object or array of hash objects

Other tags:
    See: https://cloud.google.com/bigquery/troubleshooting-errors#metadata-errors-for-streaming-inserts -
    See: https://cloud.google.com/bigquery/streaming-data-into-bigquery -
def insert rows, insert_ids: nil
  return nil if rows.nil?
  return nil if rows.is_a?(Array) && rows.empty?
  rows, insert_ids = validate_insert_args rows, insert_ids
  synchronize do
    rows.zip(Array(insert_ids)).each do |row, insert_id|
      if @batch.nil?
        @batch = Batch.new max_bytes: @max_bytes, max_rows: @max_rows
        @batch.insert row, insert_id
      else
        unless @batch.try_insert row, insert_id
          push_batch_request!
          @batch = Batch.new max_bytes: @max_bytes, max_rows: @max_rows
          @batch.insert row, insert_id
        end
      end
      @batch_created_at ||= ::Time.now
      @background_thread ||= Thread.new { run_background }
      push_batch_request! if @batch.ready?
    end
    @cond.signal
  end
  true
end

def push_batch_request!

def push_batch_request!
  return unless @batch
  orig_rows = @batch.rows
  json_rows = @batch.json_rows
  insert_ids = @batch.insert_ids
  Concurrent::Future.new executor: @thread_pool do
    raise ArgumentError, "No rows provided" if json_rows.empty?
    insert_resp = @table.service.insert_tabledata_json_rows @table.dataset_id,
                                                            @table.table_id,
                                                            json_rows,
                                                            skip_invalid: @skip_invalid,
                                                            ignore_unknown: @ignore_unknown,
                                                            insert_ids: insert_ids,
                                                            project_id: @table.project_id
    result = Result.new InsertResponse.from_gapi(orig_rows, insert_resp)
  rescue StandardError => e
    result = Result.new nil, e
  ensure
    @callback&.call result
  end.execute
  @batch = nil
  @batch_created_at = nil
end

def run_background

def run_background
  synchronize do
    until @stopped
      if @batch.nil?
        @cond.wait
        next
      end
      time_since_first_publish = ::Time.now - @batch_created_at
      if time_since_first_publish < @interval
        # still waiting for the interval to insert the batch...
        timeout = @interval - time_since_first_publish
        @cond.wait timeout
      else
        # interval met, insert the batch...
        push_batch_request!
        @cond.wait
      end
    end
  end
end

def started?

Returns:
  • (boolean) - `true` when started, `false` otherwise.
def started?
  !stopped?
end

def stop

Returns:
  • (AsyncInserter) - returns self so calls can be chained.
def stop
  synchronize do
    break if @stopped
    @stopped = true
    push_batch_request!
    @cond.signal
  end
  self
end

def stopped?

Returns:
  • (boolean) - `true` when stopped, `false` otherwise.
def stopped?
  synchronize { @stopped }
end

def validate_insert_args rows, insert_ids

def validate_insert_args rows, insert_ids
  rows = [rows] if rows.is_a? Hash
  raise ArgumentError, "No rows provided" if rows.empty?
  insert_ids = Array.new(rows.count) { :skip } if insert_ids == :skip
  insert_ids = Array insert_ids
  if insert_ids.count.positive? && insert_ids.count != rows.count
    raise ArgumentError, "insert_ids must be the same size as rows"
  end
  [rows, insert_ids]
end

def wait! timeout = nil

Returns:
  • (AsyncInserter) - returns self so calls can be chained.
def wait! timeout = nil
  synchronize do
    @thread_pool.shutdown
    @thread_pool.wait_for_termination timeout
  end
  self
end