lib/google/cloud/bigquery/table/async_inserter.rb



# Copyright 2017 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


require "google/cloud/bigquery/convert"
require "monitor"
require "concurrent"
require "securerandom"

module Google
  module Cloud
    module Bigquery
      class Table
        ##
        # # AsyncInserter
        #
        # Used to insert multiple rows in batches to a topic. See
        # {Google::Cloud::Bigquery::Table#insert_async}.
        #
        # @example
        #   require "google/cloud/bigquery"
        #
        #   bigquery = Google::Cloud::Bigquery.new
        #   dataset = bigquery.dataset "my_dataset"
        #   table = dataset.table "my_table"
        #   inserter = table.insert_async do |result|
        #     if result.error?
        #       log_error result.error
        #     else
        #       log_insert "inserted #{result.insert_count} rows " \
        #         "with #{result.error_count} errors"
        #     end
        #   end
        #
        #   rows = [
        #     { "first_name" => "Alice", "age" => 21 },
        #     { "first_name" => "Bob", "age" => 22 }
        #   ]
        #   inserter.insert rows
        #
        #   inserter.stop.wait!
        #
        # @attr_reader [Integer] max_bytes The maximum size of rows to be
        #   collected before the batch is inserted. Default is 10,000,000
        #   (10MB).
        # @attr_reader [Integer] max_rows The maximum number of rows to be
        #   collected before the batch is inserted. Default is 500.
        # @attr_reader [Numeric] interval The number of seconds to collect rows
        #   before the batch is inserted. Default is 10.
        # @attr_reader [Integer] threads The number of threads used to insert
        #   rows. Default is 4.
        #
        class AsyncInserter
          include MonitorMixin

          attr_reader :max_bytes
          attr_reader :max_rows
          attr_reader :interval
          attr_reader :threads
          ##
          # @private Implementation accessors
          attr_reader :table, :batch

          ##
          # @private
          def initialize table, skip_invalid: nil, ignore_unknown: nil, max_bytes: 10_000_000, max_rows: 500,
                         interval: 10, threads: 4, &block
            # init MonitorMixin
            super()

            @table = table
            @skip_invalid = skip_invalid
            @ignore_unknown = ignore_unknown

            @max_bytes = max_bytes
            @max_rows = max_rows
            @interval = interval
            @threads = threads
            @callback = block

            @batch = nil

            @thread_pool = Concurrent::ThreadPoolExecutor.new max_threads: @threads

            @cond = new_cond
          end

          ##
          # Adds rows to the async inserter to be inserted. Rows will be
          # collected in batches and inserted together.
          # See {Google::Cloud::Bigquery::Table#insert_async}.
          #
          # Simple Ruby types are generally accepted per JSON rules, along with the following support for BigQuery's
          # more complex types:
          #
          # | BigQuery     | Ruby                                 | Notes                                              |
          # |--------------|--------------------------------------|----------------------------------------------------|
          # | `NUMERIC`    | `BigDecimal`                         | `BigDecimal` values will be rounded to scale 9.    |
          # | `BIGNUMERIC` | `String`                             | Pass as `String` to avoid rounding to scale 9.     |
          # | `DATETIME`   | `DateTime`                           | `DATETIME` does not support time zone.             |
          # | `DATE`       | `Date`                               |                                                    |
          # | `GEOGRAPHY`  | `String`                             |                                                    |
          # | `JSON`       | `String` (Stringified JSON)          | String, as JSON does not have a schema to verify.  |
          # | `TIMESTAMP`  | `Time`                               |                                                    |
          # | `TIME`       | `Google::Cloud::BigQuery::Time`      |                                                    |
          # | `BYTES`      | `File`, `IO`, `StringIO`, or similar |                                                    |
          # | `ARRAY`      | `Array`                              | Nested arrays, `nil` values are not supported.     |
          # | `STRUCT`     | `Hash`                               | Hash keys may be strings or symbols.               |
          #
          # Because BigQuery's streaming API is designed for high insertion
          # rates, modifications to the underlying table metadata are eventually
          # consistent when interacting with the streaming system. In most cases
          # metadata changes are propagated within minutes, but during this
          # period API responses may reflect the inconsistent state of the
          # table.
          #
          # @see https://cloud.google.com/bigquery/streaming-data-into-bigquery
          #   Streaming Data Into BigQuery
          #
          # @see https://cloud.google.com/bigquery/troubleshooting-errors#metadata-errors-for-streaming-inserts
          #   BigQuery Troubleshooting: Metadata errors for streaming inserts
          #
          # @param [Hash, Array<Hash>] rows A hash object or array of hash objects
          #   containing the data. Required. `BigDecimal` values will be rounded to
          #   scale 9 to conform with the BigQuery `NUMERIC` data type. To avoid
          #   rounding `BIGNUMERIC` type values with scale greater than 9, use `String`
          #   instead of `BigDecimal`.
          # @param [Array<String|Symbol>, Symbol] insert_ids A unique ID for each row. BigQuery uses this property to
          #   detect duplicate insertion requests on a best-effort basis. For more information, see [data
          #   consistency](https://cloud.google.com/bigquery/streaming-data-into-bigquery#dataconsistency). Optional. If
          #   not provided, the client library will assign a UUID to each row before the request is sent.
          #
          #  The value `:skip` can be provided to skip the generation of IDs for all rows, or to skip the generation of
          #  an ID for a specific row in the array.
          #
          def insert rows, insert_ids: nil
            return nil if rows.nil?
            return nil if rows.is_a?(Array) && rows.empty?
            rows, insert_ids = validate_insert_args rows, insert_ids

            synchronize do
              rows.zip(Array(insert_ids)).each do |row, insert_id|
                if @batch.nil?
                  @batch = Batch.new max_bytes: @max_bytes, max_rows: @max_rows
                  @batch.insert row, insert_id
                else
                  unless @batch.try_insert row, insert_id
                    push_batch_request!

                    @batch = Batch.new max_bytes: @max_bytes, max_rows: @max_rows
                    @batch.insert row, insert_id
                  end
                end

                @batch_created_at ||= ::Time.now
                @background_thread ||= Thread.new { run_background }

                push_batch_request! if @batch.ready?
              end

              @cond.signal
            end

            true
          end

          ##
          # Begins the process of stopping the inserter. Rows already in the
          # queue will be inserted, but no new rows can be added. Use {#wait!}
          # to block until the inserter is fully stopped and all pending rows
          # have been inserted.
          #
          # @return [AsyncInserter] returns self so calls can be chained.
          #
          def stop
            synchronize do
              break if @stopped

              @stopped = true
              push_batch_request!
              @cond.signal
            end

            self
          end

          ##
          # Blocks until the inserter is fully stopped, all pending rows
          # have been inserted, and all callbacks have completed. Does not stop
          # the inserter. To stop the inserter, first call {#stop} and then
          # call {#wait!} to block until the inserter is stopped.
          #
          # @return [AsyncInserter] returns self so calls can be chained.
          #
          def wait! timeout = nil
            synchronize do
              @thread_pool.shutdown
              @thread_pool.wait_for_termination timeout
            end

            self
          end

          ##
          # Forces all rows in the current batch to be inserted immediately.
          #
          # @return [AsyncInserter] returns self so calls can be chained.
          #
          def flush
            synchronize do
              push_batch_request!
              @cond.signal
            end

            self
          end

          ##
          # Whether the inserter has been started.
          #
          # @return [boolean] `true` when started, `false` otherwise.
          #
          def started?
            !stopped?
          end

          ##
          # Whether the inserter has been stopped.
          #
          # @return [boolean] `true` when stopped, `false` otherwise.
          #
          def stopped?
            synchronize { @stopped }
          end

          protected

          def validate_insert_args rows, insert_ids
            rows = [rows] if rows.is_a? Hash
            raise ArgumentError, "No rows provided" if rows.empty?

            insert_ids = Array.new(rows.count) { :skip } if insert_ids == :skip
            insert_ids = Array insert_ids
            if insert_ids.count.positive? && insert_ids.count != rows.count
              raise ArgumentError, "insert_ids must be the same size as rows"
            end

            [rows, insert_ids]
          end

          def run_background
            synchronize do
              until @stopped
                if @batch.nil?
                  @cond.wait
                  next
                end

                time_since_first_publish = ::Time.now - @batch_created_at
                if time_since_first_publish < @interval
                  # still waiting for the interval to insert the batch...
                  timeout = @interval - time_since_first_publish
                  @cond.wait timeout
                else
                  # interval met, insert the batch...
                  push_batch_request!
                  @cond.wait
                end
              end
            end
          end

          def push_batch_request!
            return unless @batch

            orig_rows = @batch.rows
            json_rows = @batch.json_rows
            insert_ids = @batch.insert_ids
            Concurrent::Future.new executor: @thread_pool do
              raise ArgumentError, "No rows provided" if json_rows.empty?
              insert_resp = @table.service.insert_tabledata_json_rows @table.dataset_id,
                                                                      @table.table_id,
                                                                      json_rows,
                                                                      skip_invalid: @skip_invalid,
                                                                      ignore_unknown: @ignore_unknown,
                                                                      insert_ids: insert_ids,
                                                                      project_id: @table.project_id

              result = Result.new InsertResponse.from_gapi(orig_rows, insert_resp)
            rescue StandardError => e
              result = Result.new nil, e
            ensure
              @callback&.call result
            end.execute

            @batch = nil
            @batch_created_at = nil
          end

          ##
          # @private
          class Batch
            attr_reader :max_bytes
            attr_reader :max_rows
            attr_reader :rows
            attr_reader :json_rows
            attr_reader :insert_ids

            def initialize max_bytes: 10_000_000, max_rows: 500
              @max_bytes = max_bytes
              @max_rows = max_rows
              @rows = []
              @json_rows = []
              @insert_ids = []
              # The default request byte size overhead is 63.
              # "{\"rows\":[],\"ignoreUnknownValues\":false,
              # \"skipInvalidRows\":false}".bytesize #=> 63
              @current_bytes = 63
            end

            def insert row, insert_id
              insert_id ||= SecureRandom.uuid
              json_row = to_json_row row

              insert_rows_bytes row, json_row, insert_id, addl_bytes_for(json_row, insert_id)
            end

            def try_insert row, insert_id
              insert_id ||= SecureRandom.uuid
              json_row = to_json_row row
              addl_bytes = addl_bytes_for json_row, insert_id

              return false if @current_bytes + addl_bytes >= @max_bytes
              return false if @rows.count + 1 >= @max_rows

              insert_rows_bytes row, json_row, insert_id, addl_bytes
              true
            end

            def ready?
              @current_bytes >= @max_bytes || rows.count >= @max_rows
            end

            private

            def insert_rows_bytes row, json_row, insert_id, addl_bytes
              @rows << row
              @json_rows << json_row
              @insert_ids << insert_id if insert_id
              @current_bytes += addl_bytes
            end

            def to_json_row row
              Convert.to_json_row row
            end

            def addl_bytes_for json_row, insert_id
              if insert_id == :skip
                # "{\"json\":},".bytesize #=> 10
                10 + json_row.to_json.bytesize
              else
                # "{\"insertId\":\"\",\"json\":},".bytesize #=> 24
                24 + json_row.to_json.bytesize + insert_id.bytesize
              end
            end
          end

          ##
          # AsyncInserter::Result
          #
          # Represents the result from BigQuery, including any error
          # encountered, when data is asynchronously inserted into a table for
          # near-immediate querying. See {Dataset#insert_async} and
          # {Table#insert_async}.
          #
          # @see https://cloud.google.com/bigquery/streaming-data-into-bigquery
          #   Streaming Data Into BigQuery
          #
          # @attr_reader [Google::Cloud::Bigquery::InsertResponse, nil]
          #   insert_response The response from the insert operation if no
          #   error was encountered, or `nil` if the insert operation
          #   encountered an error.
          # @attr_reader [Error, nil] error The error from the insert operation
          #   if any error was encountered, otherwise `nil`.
          #
          # @example
          #   require "google/cloud/bigquery"
          #
          #   bigquery = Google::Cloud::Bigquery.new
          #   dataset = bigquery.dataset "my_dataset"
          #   table = dataset.table "my_table"
          #   inserter = table.insert_async do |result|
          #     if result.error?
          #       log_error result.error
          #     else
          #       log_insert "inserted #{result.insert_count} rows " \
          #         "with #{result.error_count} errors"
          #     end
          #   end
          #
          #   rows = [
          #     { "first_name" => "Alice", "age" => 21 },
          #     { "first_name" => "Bob", "age" => 22 }
          #   ]
          #   inserter.insert rows
          #
          #   inserter.stop.wait!
          #
          class Result
            # @private
            def initialize insert_response, error = nil
              @insert_response = insert_response
              @error = error
            end

            attr_reader :insert_response
            attr_reader :error

            ##
            # Checks if an error is present, meaning that the insert operation
            # encountered an error. Use {#error} to access the error. For
            # row-level errors, see {#success?} and {#insert_errors}.
            #
            # @return [Boolean] `true` when an error is present, `false`
            #   otherwise.
            #
            def error?
              !error.nil?
            end

            ##
            # Checks if the error count for row-level errors is zero, meaning
            # that all of the rows were inserted. Use {#insert_errors} to access
            # the row-level errors. To check for and access any operation-level
            # error, use {#error?} and {#error}.
            #
            # @return [Boolean, nil] `true` when the error count is zero,
            #   `false` when the error count is positive, or `nil` if the insert
            #   operation encountered an error.
            #
            def success?
              return nil if error?
              insert_response.success?
            end

            ##
            # The count of rows in the response, minus the count of errors for
            # rows that were not inserted.
            #
            # @return [Integer, nil] The number of rows inserted, or `nil` if
            #   the insert operation encountered an error.
            #
            def insert_count
              return nil if error?
              insert_response.insert_count
            end

            ##
            # The count of errors for rows that were not inserted.
            #
            # @return [Integer, nil] The number of errors, or `nil` if the
            #   insert operation encountered an error.
            #
            def error_count
              return nil if error?
              insert_response.error_count
            end

            ##
            # The error objects for rows that were not inserted.
            #
            # @return [Array<InsertError>, nil] An array containing error
            #   objects, or `nil` if the insert operation encountered an error.
            #
            def insert_errors
              return nil if error?
              insert_response.insert_errors
            end

            ##
            # The rows that were not inserted.
            #
            # @return [Array<Hash>, nil] An array of hash objects containing the
            #   row data, or `nil` if the insert operation encountered an error.
            #
            def error_rows
              return nil if error?
              insert_response.error_rows
            end

            ##
            # Returns the error object for a row that was not inserted.
            #
            # @param [Hash] row A hash containing the data for a row.
            #
            # @return [InsertError, nil] An error object, `nil` if no error is
            #   found in the response for the row, or `nil` if the insert
            #   operation encountered an error.
            #
            def insert_error_for row
              return nil if error?
              insert_response.insert_error_for row
            end

            ##
            # Returns the error hashes for a row that was not inserted. Each
            # error hash contains the following keys: `reason`, `location`,
            # `debugInfo`, and `message`.
            #
            # @param [Hash, nil] row A hash containing the data for a row.
            #
            # @return [Array<Hash>, nil] An array of error hashes, `nil` if no
            #   errors are found in the response for the row, or `nil` if the
            #   insert operation encountered an error.
            #
            def errors_for row
              return nil if error?
              insert_response.errors_for row
            end

            ##
            # Returns the index for a row that was not inserted.
            #
            # @param [Hash, nil] row A hash containing the data for a row.
            #
            # @return [Integer, nil] An error object, `nil` if no error is
            #   found in the response for the row, or `nil` if the insert
            #   operation encountered an error.
            #
            def index_for row
              return nil if error?
              insert_response.index_for row
            end
          end
        end
      end
    end
  end
end