# Copyright 2015 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
require "google/cloud/errors"
require "google/cloud/bigquery/service"
require "google/cloud/bigquery/job/list"
require "google/cloud/bigquery/convert"
require "json"
module Google
module Cloud
module Bigquery
##
# # Job
#
# Represents a generic Job that may be performed on a {Table}.
#
# The subclasses of Job represent the specific BigQuery job types:
# {CopyJob}, {ExtractJob}, {LoadJob}, and {QueryJob}.
#
# A job instance is created when you call {Project#query_job},
# {Dataset#query_job}, {Table#copy_job}, {Table#extract_job},
# {Table#load_job}.
#
# @see https://cloud.google.com/bigquery/docs/managing-jobs Running and
# Managing Jobs
# @see https://cloud.google.com/bigquery/docs/reference/v2/jobs Jobs API
# reference
#
# @example
# require "google/cloud/bigquery"
#
# bigquery = Google::Cloud::Bigquery.new
#
# job = bigquery.query_job "SELECT COUNT(word) as count FROM " \
# "`bigquery-public-data.samples.shakespeare`"
#
# job.wait_until_done!
#
# if job.failed?
# puts job.error
# else
# puts job.data.first
# end
#
class Job
##
# @private The Service object.
attr_accessor :service
##
# @private The Google API Client object.
attr_accessor :gapi
##
# @private Create an empty Job object.
def initialize
@service = nil
@gapi = {}
end
##
# The ID of the job.
#
# @return [String] The ID must contain only letters (`[A-Za-z]`), numbers
# (`[0-9]`), underscores (`_`), or dashes (`-`). The maximum length is 1,024
# characters.
#
def job_id
@gapi.job_reference.job_id
end
##
# The ID of the project containing the job.
#
# @return [String] The project ID.
#
def project_id
@gapi.job_reference.project_id
end
##
# The geographic location where the job runs.
#
# @return [String] A geographic location, such as "US", "EU" or
# "asia-northeast1".
#
# @!group Attributes
def location
@gapi.job_reference.location
end
##
# The email address of the user who ran the job.
#
# @return [String] The email address.
#
def user_email
@gapi.user_email
end
##
# The current state of the job. A `DONE` state does not mean that the
# job completed successfully. Use {#failed?} to discover if an error
# occurred or if the job was successful.
#
# @return [String] The state code. The possible values are `PENDING`,
# `RUNNING`, and `DONE`.
#
def state
return nil if @gapi.status.nil?
@gapi.status.state
end
##
# Checks if the job's state is `RUNNING`.
#
# @return [Boolean] `true` when `RUNNING`, `false` otherwise.
#
def running?
return false if state.nil?
"running".casecmp(state).zero?
end
##
# Checks if the job's state is `PENDING`.
#
# @return [Boolean] `true` when `PENDING`, `false` otherwise.
#
def pending?
return false if state.nil?
"pending".casecmp(state).zero?
end
##
# Checks if the job's state is `DONE`. When `true`, the job has stopped
# running. However, a `DONE` state does not mean that the job completed
# successfully. Use {#failed?} to detect if an error occurred or if the
# job was successful.
#
# @return [Boolean] `true` when `DONE`, `false` otherwise.
#
def done?
return false if state.nil?
"done".casecmp(state).zero?
end
##
# Checks if an error is present. Use {#error} to access the error
# object.
#
# @return [Boolean] `true` when there is an error, `false` otherwise.
#
def failed?
!error.nil?
end
##
# The time when the job was created.
#
# @return [Time, nil] The creation time from the job statistics.
#
def created_at
Convert.millis_to_time @gapi.statistics.creation_time
end
##
# The time when the job was started.
# This field is present after the job's state changes from `PENDING`
# to either `RUNNING` or `DONE`.
#
# @return [Time, nil] The start time from the job statistics.
#
def started_at
Convert.millis_to_time @gapi.statistics.start_time
end
##
# The time when the job ended.
# This field is present when the job's state is `DONE`.
#
# @return [Time, nil] The end time from the job statistics.
#
def ended_at
Convert.millis_to_time @gapi.statistics.end_time
end
##
# The number of child jobs executed.
#
# @return [Integer] The number of child jobs executed.
#
def num_child_jobs
@gapi.statistics.num_child_jobs || 0
end
##
# If this is a child job, the id of the parent.
#
# @return [String, nil] The ID of the parent job, or `nil` if not a child job.
#
def parent_job_id
@gapi.statistics.parent_job_id
end
##
# An array containing the job resource usage breakdown by reservation, if present. Reservation usage statistics
# are only reported for jobs that are executed within reservations. On-demand jobs do not report this data.
#
# @return [Array<Google::Cloud::Bigquery::Job::ReservationUsage>, nil] The reservation usage, if present.
#
def reservation_usage
return nil unless @gapi.statistics.reservation_usage
Array(@gapi.statistics.reservation_usage).map { |g| ReservationUsage.from_gapi g }
end
##
# The ID of the session if this job is part of one. See the `create_session` param in {Project#query_job} and
# {Dataset#query_job}.
#
# @return [String, nil] The session ID, or `nil` if not associated with a session.
#
def session_id
@gapi.statistics.session_info&.session_id
end
##
# The ID of a multi-statement transaction.
#
# @return [String, nil] The transaction ID, or `nil` if not associated with a transaction.
#
def transaction_id
@gapi.statistics.transaction_info&.transaction_id
end
##
# The statistics including stack frames for a child job of a script.
#
# @return [Google::Cloud::Bigquery::Job::ScriptStatistics, nil] The script statistics, or `nil` if the job is
# not a child job.
#
# @example
# require "google/cloud/bigquery"
#
# bigquery = Google::Cloud::Bigquery.new
#
# multi_statement_sql = <<~SQL
# -- Declare a variable to hold names as an array.
# DECLARE top_names ARRAY<STRING>;
# -- Build an array of the top 100 names from the year 2017.
# SET top_names = (
# SELECT ARRAY_AGG(name ORDER BY number DESC LIMIT 100)
# FROM `bigquery-public-data.usa_names.usa_1910_current`
# WHERE year = 2017
# );
# -- Which names appear as words in Shakespeare's plays?
# SELECT
# name AS shakespeare_name
# FROM UNNEST(top_names) AS name
# WHERE name IN (
# SELECT word
# FROM `bigquery-public-data.samples.shakespeare`
# );
# SQL
#
# job = bigquery.query_job multi_statement_sql
#
# job.wait_until_done!
#
# child_jobs = bigquery.jobs parent_job: job
#
# child_jobs.each do |child_job|
# script_statistics = child_job.script_statistics
# puts script_statistics.evaluation_kind
# script_statistics.stack_frames.each do |stack_frame|
# puts stack_frame.text
# end
# end
#
def script_statistics
ScriptStatistics.from_gapi @gapi.statistics.script_statistics if @gapi.statistics.script_statistics
end
##
# The configuration for the job. Returns a hash.
#
# @see https://cloud.google.com/bigquery/docs/reference/v2/jobs Jobs API
# reference
def configuration
JSON.parse @gapi.configuration.to_json
end
alias config configuration
##
# The statistics for the job. Returns a hash.
#
# @see https://cloud.google.com/bigquery/docs/reference/v2/jobs Jobs API
# reference
#
# @return [Hash] The job statistics.
#
def statistics
JSON.parse @gapi.statistics.to_json
end
alias stats statistics
##
# The job's status. Returns a hash. The values contained in the hash are
# also exposed by {#state}, {#error}, and {#errors}.
#
# @return [Hash] The job status.
#
def status
JSON.parse @gapi.status.to_json
end
##
# The last error for the job, if any errors have occurred. Returns a
# hash.
#
# @see https://cloud.google.com/bigquery/docs/reference/v2/jobs Jobs API
# reference
#
# @return [Hash, nil] Returns a hash containing `reason` and `message`
# keys:
#
# {
# "reason"=>"notFound",
# "message"=>"Not found: Table bigquery-public-data:samples.BAD_ID"
# }
#
def error
status["errorResult"]
end
##
# The errors for the job, if any errors have occurred. Returns an array
# of hash objects. See {#error}.
#
# @return [Array<Hash>, nil] Returns an array of hashes containing
# `reason` and `message` keys:
#
# {
# "reason"=>"notFound",
# "message"=>"Not found: Table bigquery-public-data:samples.BAD_ID"
# }
#
def errors
Array status["errors"]
end
##
# A hash of user-provided labels associated with this job. Labels can be
# provided when the job is created, and used to organize and group jobs.
#
# The returned hash is frozen and changes are not allowed. Use
# {CopyJob::Updater#labels=} or {ExtractJob::Updater#labels=} or
# {LoadJob::Updater#labels=} or {QueryJob::Updater#labels=} to replace
# the entire hash.
#
# @return [Hash] The job labels.
#
# @!group Attributes
#
def labels
m = @gapi.configuration.labels
m = m.to_h if m.respond_to? :to_h
m.dup.freeze
end
##
# Cancels the job.
#
# @example
# require "google/cloud/bigquery"
#
# bigquery = Google::Cloud::Bigquery.new
#
# query = "SELECT COUNT(word) as count FROM " \
# "`bigquery-public-data.samples.shakespeare`"
#
# job = bigquery.query_job query
#
# job.cancel
#
def cancel
ensure_service!
resp = service.cancel_job job_id, location: location
@gapi = resp.job
true
end
##
# Requests that a job is deleted. This call will return when the job is deleted.
#
# @return [Boolean] Returns `true` if the job was deleted.
#
# @example
# require "google/cloud/bigquery"
#
# bigquery = Google::Cloud::Bigquery.new
#
# job = bigquery.job "my_job"
#
# job.delete
#
# @!group Lifecycle
#
def delete
ensure_service!
service.delete_job job_id, location: location
true
end
##
# Created a new job with the current configuration.
#
# @example
# require "google/cloud/bigquery"
#
# bigquery = Google::Cloud::Bigquery.new
#
# query = "SELECT COUNT(word) as count FROM " \
# "`bigquery-public-data.samples.shakespeare`"
#
# job = bigquery.query_job query
#
# job.wait_until_done!
# job.rerun!
#
def rerun!
ensure_service!
gapi = service.insert_job @gapi.configuration, location: location
Job.from_gapi gapi, service
end
##
# Reloads the job with current data from the BigQuery service.
#
# @example
# require "google/cloud/bigquery"
#
# bigquery = Google::Cloud::Bigquery.new
#
# query = "SELECT COUNT(word) as count FROM " \
# "`bigquery-public-data.samples.shakespeare`"
#
# job = bigquery.query_job query
#
# job.done?
# job.reload!
# job.done? #=> true
#
def reload!
ensure_service!
gapi = service.get_job job_id, location: location
@gapi = gapi
end
alias refresh! reload!
##
# Refreshes the job until the job is `DONE`. The delay between refreshes
# starts at 5 seconds and increases exponentially to a maximum of 60
# seconds.
#
# @example
# require "google/cloud/bigquery"
#
# bigquery = Google::Cloud::Bigquery.new
# dataset = bigquery.dataset "my_dataset"
# table = dataset.table "my_table"
#
# extract_job = table.extract_job "gs://my-bucket/file-name.json",
# format: "json"
# extract_job.wait_until_done!
# extract_job.done? #=> true
#
def wait_until_done!
backoff = lambda do |retries|
delay = [(retries**2) + 5, 60].min # Maximum delay is 60
sleep delay
end
retries = 0
until done?
backoff.call retries
retries += 1
reload!
end
end
##
# @private New Job from a Google API Client object.
def self.from_gapi gapi, conn
klass = klass_for gapi
klass.new.tap do |f|
f.gapi = gapi
f.service = conn
end
end
##
# @private New Google::Apis::Error with job failure details
def gapi_error
return nil unless failed?
error_status_code = status_code_for_reason error["reason"]
error_body = error
error_body["errors"] = errors
Google::Apis::Error.new error["message"],
status_code: error_status_code,
body: error_body
end
##
# @private
# Get the subclass for a job type
def self.klass_for gapi
if gapi.configuration.copy
CopyJob
elsif gapi.configuration.extract
ExtractJob
elsif gapi.configuration.load
LoadJob
elsif gapi.configuration.query
QueryJob
else
Job
end
end
##
# Represents Job resource usage breakdown by reservation.
#
# @attr_reader [String] name The reservation name or "unreserved" for on-demand resources usage.
# @attr_reader [Fixnum] slot_ms The slot-milliseconds the job spent in the given reservation.
#
class ReservationUsage
attr_reader :name
attr_reader :slot_ms
##
# @private Creates a new ReservationUsage instance.
def initialize name, slot_ms
@name = name
@slot_ms = slot_ms
end
##
# @private New ReservationUsage from a statistics.reservation_usage value.
def self.from_gapi gapi
new gapi.name, gapi.slot_ms
end
end
##
# Represents statistics for a child job of a script.
#
# @attr_reader [String] evaluation_kind Indicates the type of child job. Possible values include `STATEMENT` and
# `EXPRESSION`.
# @attr_reader [Array<Google::Cloud::Bigquery::Job::ScriptStackFrame>] stack_frames Stack trace where the
# current evaluation happened. Shows line/column/procedure name of each frame on the stack at the point where
# the current evaluation happened. The leaf frame is first, the primary script is last.
#
# @example
# require "google/cloud/bigquery"
#
# bigquery = Google::Cloud::Bigquery.new
#
# multi_statement_sql = <<~SQL
# -- Declare a variable to hold names as an array.
# DECLARE top_names ARRAY<STRING>;
# -- Build an array of the top 100 names from the year 2017.
# SET top_names = (
# SELECT ARRAY_AGG(name ORDER BY number DESC LIMIT 100)
# FROM `bigquery-public-data.usa_names.usa_1910_current`
# WHERE year = 2017
# );
# -- Which names appear as words in Shakespeare's plays?
# SELECT
# name AS shakespeare_name
# FROM UNNEST(top_names) AS name
# WHERE name IN (
# SELECT word
# FROM `bigquery-public-data.samples.shakespeare`
# );
# SQL
#
# job = bigquery.query_job multi_statement_sql
#
# job.wait_until_done!
#
# child_jobs = bigquery.jobs parent_job: job
#
# child_jobs.each do |child_job|
# script_statistics = child_job.script_statistics
# puts script_statistics.evaluation_kind
# script_statistics.stack_frames.each do |stack_frame|
# puts stack_frame.text
# end
# end
#
class ScriptStatistics
attr_reader :evaluation_kind
attr_reader :stack_frames
##
# @private Creates a new ScriptStatistics instance.
def initialize evaluation_kind, stack_frames
@evaluation_kind = evaluation_kind
@stack_frames = stack_frames
end
##
# @private New ScriptStatistics from a statistics.script_statistics value.
def self.from_gapi gapi
frames = Array(gapi.stack_frames).map { |g| ScriptStackFrame.from_gapi g }
new gapi.evaluation_kind, frames
end
end
##
# Represents a stack frame showing the line/column/procedure name where the current evaluation happened.
#
# @attr_reader [Integer] start_line One-based start line.
# @attr_reader [Integer] start_column One-based start column.
# @attr_reader [Integer] end_line One-based end line.
# @attr_reader [Integer] end_column One-based end column.
# @attr_reader [String] text Text of the current statement/expression.
#
# @example
# require "google/cloud/bigquery"
#
# bigquery = Google::Cloud::Bigquery.new
#
# multi_statement_sql = <<~SQL
# -- Declare a variable to hold names as an array.
# DECLARE top_names ARRAY<STRING>;
# -- Build an array of the top 100 names from the year 2017.
# SET top_names = (
# SELECT ARRAY_AGG(name ORDER BY number DESC LIMIT 100)
# FROM `bigquery-public-data.usa_names.usa_1910_current`
# WHERE year = 2017
# );
# -- Which names appear as words in Shakespeare's plays?
# SELECT
# name AS shakespeare_name
# FROM UNNEST(top_names) AS name
# WHERE name IN (
# SELECT word
# FROM `bigquery-public-data.samples.shakespeare`
# );
# SQL
#
# job = bigquery.query_job multi_statement_sql
#
# job.wait_until_done!
#
# child_jobs = bigquery.jobs parent_job: job
#
# child_jobs.each do |child_job|
# script_statistics = child_job.script_statistics
# puts script_statistics.evaluation_kind
# script_statistics.stack_frames.each do |stack_frame|
# puts stack_frame.text
# end
# end
#
class ScriptStackFrame
attr_reader :start_line
attr_reader :start_column
attr_reader :end_line
attr_reader :end_column
attr_reader :text
##
# @private Creates a new ScriptStackFrame instance.
def initialize start_line, start_column, end_line, end_column, text
@start_line = start_line
@start_column = start_column
@end_line = end_line
@end_column = end_column
@text = text
end
##
# @private New ScriptStackFrame from a statistics.script_statistics[].stack_frames element.
def self.from_gapi gapi
new gapi.start_line, gapi.start_column, gapi.end_line, gapi.end_column, gapi.text
end
end
protected
##
# Raise an error unless an active connection is available.
def ensure_service!
raise "Must have active connection" unless service
end
def retrieve_table project_id, dataset_id, table_id, metadata_view: nil
ensure_service!
gapi = service.get_project_table project_id, dataset_id, table_id, metadata_view: metadata_view
Table.from_gapi gapi, service
rescue Google::Cloud::NotFoundError
nil
end
def status_code_for_reason reason
codes = { "accessDenied" => 403, "backendError" => 500, "billingNotEnabled" => 403,
"billingTierLimitExceeded" => 400, "blocked" => 403, "duplicate" => 409, "internalError" => 500,
"invalid" => 400, "invalidQuery" => 400, "notFound" => 404, "notImplemented" => 501,
"quotaExceeded" => 403, "rateLimitExceeded" => 403, "resourceInUse" => 400,
"resourcesExceeded" => 400, "responseTooLarge" => 403, "tableUnavailable" => 400 }
codes[reason] || 0
end
end
end
end
end
# We need Job to be defined before loading these.
require "google/cloud/bigquery/copy_job"
require "google/cloud/bigquery/extract_job"
require "google/cloud/bigquery/load_job"
require "google/cloud/bigquery/query_job"