class Google::Cloud::Bigquery::Table::AsyncInserter
rows. Default is 4.
@attr_reader [Integer] threads The number of threads used to insert
before the batch is inserted. Default is 10.
@attr_reader [Numeric] interval The number of seconds to collect rows
collected before the batch is inserted. Default is 500.
@attr_reader [Integer] max_rows The maximum number of rows to be
(10MB).
collected before the batch is inserted. Default is 10,000,000
@attr_reader [Integer] max_bytes The maximum size of rows to be
inserter.stop.wait!
inserter.insert rows
]
{ “first_name” => “Bob”, “age” => 22 }
{ “first_name” => “Alice”, “age” => 21 },
rows = [
end
end
“with #{result.error_count} errors”
log_insert “inserted #{result.insert_count} rows ” <br>else
log_error result.error
if result.error?
inserter = table.insert_async do |result|
table = dataset.table “my_table”
dataset = bigquery.dataset “my_dataset”
bigquery = Google::Cloud::Bigquery.new
require “google/cloud/bigquery”
@example
{Google::Cloud::Bigquery::Table#insert_async}.
Used to insert multiple rows in batches to a topic. See
# AsyncInserter
#
def flush
-
(AsyncInserter)
- returns self so calls can be chained.
def flush synchronize do push_batch_request! @cond.signal end self end
def initialize table, skip_invalid: nil, ignore_unknown: nil, max_bytes: 10_000_000, max_rows: 500,
- Private: -
def initialize table, skip_invalid: nil, ignore_unknown: nil, max_bytes: 10_000_000, max_rows: 500, interval: 10, threads: 4, &block # init MonitorMixin super() @table = table @skip_invalid = skip_invalid @ignore_unknown = ignore_unknown @max_bytes = max_bytes @max_rows = max_rows @interval = interval @threads = threads @callback = block @batch = nil @thread_pool = Concurrent::ThreadPoolExecutor.new max_threads: @threads @cond = new_cond end
def insert rows, insert_ids: nil
-
insert_ids
(Array
) -- A unique ID for each row. BigQuery uses this property to, Symbol -
rows
(Hash, Array
) -- A hash object or array of hash objects
Other tags:
- See: https://cloud.google.com/bigquery/troubleshooting-errors#metadata-errors-for-streaming-inserts -
See: https://cloud.google.com/bigquery/streaming-data-into-bigquery -
def insert rows, insert_ids: nil return nil if rows.nil? return nil if rows.is_a?(Array) && rows.empty? rows, insert_ids = validate_insert_args rows, insert_ids synchronize do rows.zip(Array(insert_ids)).each do |row, insert_id| if @batch.nil? @batch = Batch.new max_bytes: @max_bytes, max_rows: @max_rows @batch.insert row, insert_id else unless @batch.try_insert row, insert_id push_batch_request! @batch = Batch.new max_bytes: @max_bytes, max_rows: @max_rows @batch.insert row, insert_id end end @batch_created_at ||= ::Time.now @background_thread ||= Thread.new { run_background } push_batch_request! if @batch.ready? end @cond.signal end true end
def push_batch_request!
def push_batch_request! return unless @batch orig_rows = @batch.rows json_rows = @batch.json_rows insert_ids = @batch.insert_ids Concurrent::Future.new executor: @thread_pool do raise ArgumentError, "No rows provided" if json_rows.empty? insert_resp = @table.service.insert_tabledata_json_rows @table.dataset_id, @table.table_id, json_rows, skip_invalid: @skip_invalid, ignore_unknown: @ignore_unknown, insert_ids: insert_ids, project_id: @table.project_id result = Result.new InsertResponse.from_gapi(orig_rows, insert_resp) rescue StandardError => e result = Result.new nil, e ensure @callback&.call result end.execute @batch = nil @batch_created_at = nil end
def run_background
def run_background synchronize do until @stopped if @batch.nil? @cond.wait next end time_since_first_publish = ::Time.now - @batch_created_at if time_since_first_publish < @interval # still waiting for the interval to insert the batch... timeout = @interval - time_since_first_publish @cond.wait timeout else # interval met, insert the batch... push_batch_request! @cond.wait end end end end
def started?
-
(boolean)
- `true` when started, `false` otherwise.
def started? !stopped? end
def stop
-
(AsyncInserter)
- returns self so calls can be chained.
def stop synchronize do break if @stopped @stopped = true push_batch_request! @cond.signal end self end
def stopped?
-
(boolean)
- `true` when stopped, `false` otherwise.
def stopped? synchronize { @stopped } end
def validate_insert_args rows, insert_ids
def validate_insert_args rows, insert_ids rows = [rows] if rows.is_a? Hash raise ArgumentError, "No rows provided" if rows.empty? insert_ids = Array.new(rows.count) { :skip } if insert_ids == :skip insert_ids = Array insert_ids if insert_ids.count.positive? && insert_ids.count != rows.count raise ArgumentError, "insert_ids must be the same size as rows" end [rows, insert_ids] end
def wait! timeout = nil
-
(AsyncInserter)
- returns self so calls can be chained.
def wait! timeout = nil synchronize do @thread_pool.shutdown @thread_pool.wait_for_termination timeout end self end