require "pg"
require 'ostruct'
require_relative "pg_conn/version"
require_relative "pg_conn/role_methods"
require_relative "pg_conn/schema_methods"
require_relative "pg_conn/rdbms_methods"
require_relative "pg_conn/session_methods"
module PgConn
class Error < StandardError; end
# Can be raised in #transaction blocks to rollback changes
class Rollback < Error; end
# Return a PgConn::Connection object. TODO: A block argument
def self.new(*args, &block) Connection.new(*args, &block) end
# Make the PgConn module pretend it has PgConn instances
def self.===(element) element.is_a?(PgConn::Connection) or super end
# Returns a PgConn::Connection object (aka. a PgConn object). It's arguments
# can be an existing connection that will just be returned or a set of
# PgConn::Connection#initialize arguments that will be used to create a new
# PgConn::Connection object
def self.ensure(*args)
if args.size == 1 && args.first.is_a?(PgConn::Connection)
args.first
else
PgConn::Connection.new(*args)
end
end
# Used to mark strings as literals that should not be quoted. This is the
# case for row and record values
class Literal < String; end
# All results from the database are converted into native Ruby types
class Connection
# Make PgConn::Connection pretend to be an instance of the PgConn module
def is_a?(klass) klass == PgConn or super end
# The PG::Connection object
attr_reader :pg_connection
# The class of column names (Symbol or String). Default is Symbol
attr_reader :field_name_class
# Name of user
def user() @pg_connection.user end
alias_method :username, :user # Obsolete FIXME Is it?
# Name of database
def name() @pg_connection.db end
alias_method :database, :name # Obsolete FIXME Is it?
# Database manipulation methods: #exist?, #create, #drop, #list
attr_reader :rdbms
# Role manipulation methods: #exist?, #create, #drop, #list
attr_reader :role
# Schema manipulation methods: #exist?, #create, #drop, #list, and
# #exist?/#list for relations/tables/views/columns
attr_reader :schema
# Session manipulation methods: #list, #terminate, #disable, #enable
attr_reader :session
# The transaction timestamp of the most recent SQL statement executed by
# #exec or #transaction block. The timestamp is without time zone
attr_reader :timestamp
# The transaction timestamp of the most recent SQL statement executed by
# #exec or #transaction block. The timestamp includes the current time zone
attr_reader :timestamptz
# PG::Error object of the first failed statement in the transaction;
# otherwise nil. It is cleared at the beginning of a transaction so be sure
# to save it before you run any cleanup code that may initiate new
# transactions
attr_reader :error
# True if the transaction is in a error state
def error?() !@error.nil? end
# Tuple of error message, lineno, and charno of the error object where each
# element defaults to nil if not found
def err
@err ||=
if error&.message =~ /.*?ERROR:\s*(.*?)\n(?:.*?(\s*LINE\s+(\d+): ).*?\n(?:(\s+)\^\n)?)?/
[$1.capitalize, $3&.to_i, $4 && ($4.size - $2.size + 1)]
else
[nil, nil, nil]
end
end
# Last error message. The error message is the first line of the PG error
# message that may contain additional info. It doesn't contain a
# terminating newline
def errmsg = err[0]
# The one-based line number of the last error or nil if absent in the
# Postgres error message
def errline = err[1]
# The one-based character number of the error in the last PG::Error or nil
# if absent in the Postgres error message
def errchar = err[2]
# :call-seq:
# initialize(dbname = nil, user = nil, field_name_class: Symbol)
# initialize(connection_hash, field_name_class: Symbol)
# initialize(connection_string, field_name_class: Symbol)
# initialize(host, port, dbname, user, password, field_name_class: Symbol)
# initialize(array, field_name_class: Symbol)
# initialize(pg_connection_object)
#
# Initialize a connection object and connect to the database
#
# The possible keys of the connection hash are :host, :port, :dbname, :user,
# and :password. The connection string can either be a space-separated list
# of <key>=<value> pairs with the same keys as the hash, or a URI with the
# format 'postgres[ql]://[user[:password]@][host][:port][/name]
#
# If given an array argument, PgConn will not connect to the database and
# instead write its commands to the array. In this case, methods extracting
# values from the database (eg. #value) will return nil or raise an
# exception
#
# The last variant is used to establish a PgConn from an existing
# connection. It doesn't change the connection settings and is not
# recommended except in cases where you want to piggyback on an existing
# connection (eg. a Rails connection)
#
# The :field_name_class option controls the Ruby type of column names. It can be
# Symbol (the default) or String. The :timestamp option is used
# internally to set the timestamp for transactions
#
# Note that the connection hash and the connection string may support more
# parameters than documented here. Consult
# https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING
# for the full list
#
# TODO: Change to 'initialize(*args, **opts)'
def initialize(*args)
if args.last.is_a?(Hash)
@field_name_class = args.last.delete(:field_name_class) || Symbol
@timestamp = args.last.delete(:timestamp)
@timestamptz = args.last.delete(:timestamptz)
args.pop if args.last.empty?
else
@field_name_class = Symbol
end
# else # We assume that the current user is a postgres superuser
# @db = PgConn.new("template0")
using_existing_connection = false
@pg_connection =
if args.size == 0
make_connection
elsif args.size == 1
case arg = args.first
when PG::Connection
using_existing_connection = true
arg
when String
if arg =~ /=/
make_connection arg
elsif arg =~ /\//
make_connection arg
else
make_connection dbname: arg
end
when Hash
make_connection **arg
when Array
@pg_commands = arg
nil
else
raise Error, "Illegal argument type: #{arg.class}"
end
elsif args.size == 2
make_connection dbname: args.first, user: args.last
elsif args.size == 5
make_connection args[0], args[1], nil, nil, args[2], args[3], args[4]
else
raise Error, "Illegal number of parameters: #{args.size}"
end
if @pg_connection && !using_existing_connection
# Set a dummy notice processor to avoid warnings on stderr
@pg_connection.set_notice_processor { |message| ; } # Intentionally a nop
# Auto-convert to ruby types
type_map = PG::BasicTypeMapForResults.new(@pg_connection)
# Use String as default type. Kills 'Warning: no type cast defined for
# type "uuid" with oid 2950..' warnings
type_map.default_type_map = PG::TypeMapAllStrings.new
# Timestamp decoder
type_map.add_coder PG::TextDecoder::Timestamp.new( # Timestamp without time zone
oid: 1114,
flags: PG::Coder::TIMESTAMP_DB_UTC | PG::Coder::TIMESTAMP_APP_UTC)
# Decode anonymous records but note that this is only useful to convert the
# outermost structure into an array, the elements are not decoded and are
# returned as strings. It is best to avoid anonymous records if possible
type_map.add_coder PG::TextDecoder::Record.new(
oid: 2249
)
@pg_connection.type_map_for_results = type_map
@pg_connection.field_name_type = @field_name_class.to_s.downcase.to_sym # Use symbol field names
@pg_connection.exec "set client_min_messages to warning;" # Silence warnings
end
@schema = SchemaMethods.new(self)
@role = RoleMethods.new(self)
@rdbms = RdbmsMethods.new(self)
@session = SessionMethods.new(self)
@savepoints = nil # Stack of savepoint names. Nil if no transaction in progress
end
# Reset connection but keep noise level (TODO: How about the other
# per-session settings in #initialize? Are they cleared by #reset too?)
def reset
@pg_connection.reset
@pg_connection.exec "set client_min_messages to warning;" # Silence warnings
end
# Close the database connection. TODO: Rename 'close'
def terminate()
@pg_connection.close if @pg_connection && !@pg_connection.finished?
end
def self.new(*args, **opts, &block)
if block_given?
begin
object = Connection.allocate
object.send(:initialize, *args, **opts)
yield(object) # if object.pg_connection
ensure
object.terminate if object.pg_connection
end
else
super(*args, **opts)
end
end
def literal(arg) Literal.new(arg) end
# Quote argument as an identifier. The argument should be a non-nil string
# or a symbol
def quote_identifier(s)
s = s.to_s if s.is_a?(Symbol)
@pg_connection.escape_identifier(s).gsub(/\./, '"."').sub(/"\*"/, "*")
end
# Quote identifiers and concatenate them using ',' as separator
def quote_identifiers(idents) = idents.map { |ident| quote_identifier(ident) }.join(", ")
# Quote the value as a string. Emit 'null' if value is nil
#
# The value can be of any type but is converted to a string using #to_s
# before quoting. This works by default for the regular types Integer,
# true/false, Time/Date/DateTime, and arrays. Other types may require
# special handling
#
# Note that a tuple value (an array) must be quoted using #quote_tuple
# because #quote_value would quote the tuple as an array instead of a list
# of values
#
# The :elem_type option can be a postgres type name (String or Symbol) or
# an array of type names. It is used as the required explicit element
# type when the argument is an empty array. It is not needed if the array
# is guaranteed to be non-empty. Nested arrays are not supported
#
def quote_value(value, elem_type: nil)
case value
when Literal; value
when String; @pg_connection.escape_literal(value)
when Integer, Float; value.to_s
when true, false; value.to_s
when nil; 'null'
when Date, DateTime; "'#{value}'"
when Time; "'#{value.strftime("%FT%T%:z")}'"
when Array
if value.empty?
elem_type or raise Error, "Empty array without elem_type"
"array[]::#{elem_type}[]"
else
"array[#{value.map { |elem| quote_value(elem) }.join(', ')}]"
end
else
@pg_connection.escape_literal(value.to_s)
end
end
# Quote values and concatenate them using ',' as separator
def quote_values(values, elem_type: nil)
values.map { |value| quote_value(value, elem_type: elem_type) }.join(", ")
end
# Quote an array of values as a tuple. The element types should be in the
# same order as the array arguments. #quote_tuples is same as #quote_values
# except the values may have different types (this makes no difference
# except in the case when the tuple may contain empty array(s))
def quote_tuple(tuple, elem_types: nil)
elem_types = Array(elem_types)
tuple.map { |value|
elem_type = value.is_a?(Array) ? elem_types&.shift : nil
quote_value(value, elem_type: elem_type)
}.join(", ")
end
# Quote an array of tuples
def quote_tuples(tuples, elem_types: nil)
tuples.map { |tuple| "(#{quote_tuple(tuple, elem_types: elem_types)})" }.join(", ")
end
# Quote a record and cast it into the given type, the type can also be a
# table or view. 'data' is an array, hash, or struct representation of the
# record
#
# Note that the fields are retrived from the database so this method is not
# as fast as the other quote-methods. It is however very convenient when
# you're testing and need a composite type because record-quoting can
# easily become unwieldly
def quote_record(data, schema_name = nil, type, elem_types: nil)
quote_record_impl(data, schema_name, type, elem_types: elem_types, array: false)
end
# Quote an array of records. The type is the record type, not the type of
# the enclosing array
def quote_records(data, schema_name = nil, type, elem_types: nil)
quote_record_impl(data, schema_name, type, elem_types: elem_types, array: true)
end
# :call-seq:
# exist?(query)
# exist?(table, id)
# eists?(table, where_clause)
#
# Return true iff the query returns exactly one record. Use '!empty?' to
# check if the query returns one or more records
def exist?(*query)
!empty?(*query)
end
# :call-seq:
# count(query)
# count(table, where_clause = nil)
#
# Return true if the table or the result of the query is empty
def empty?(*query)
inner_query = parse_query *query
self.value("select count(*) from (#{inner_query} limit 1) as \"inner_query\"") == 0
end
# :call-seq:
# count(query)
# count(table_name, where_clause = nil)
#
# The number of records in the table or in the query
def count(*query)
inner_query = parse_query *query
value("select count(*) from (#{inner_query}) as inner_query")
end
# TODO (but breaks a lot of code)
# value 1
# value? 0 or 1
#
# values 1 or more
# values? 0 or more
# Return a single value. It is an error if the query doesn't return a
# single record with a single column.
#
# TODO If :transaction is true, the query will be executed in a
# transaction and also be committed if :commit is true (this is the
# default). It can also be used to execute 'insert' statements with a
# 'returning' clause
def value(*query) #, transaction: false, commit: true)
r = pg_exec(parse_query *query)
check_1c(r)
check_1r(r)
r.values[0][0]
end
# Like #value but returns nil if no record was found. It is still an error
# if the query returns more than one column
def value?(*query) #, transaction: false, commit: true)
r = pg_exec(parse_query *query)
check_1c(r)
return nil if r.ntuples == 0
check_1r(r)
r.values[0][0]
end
# Return an array of values. It is an error if the query returns records
# with more than one column.
#
# TODO If :transaction is true, the query will be executed in a
# transaction and be committed it :commit is true (the default). This can
# be used in 'insert ... returning ...' statements
def values(*query)
r = pg_exec(parse_query *query)
check_1c(r)
r.column_values(0)
end
# Return an array of column values. It is an error if the query returns
# more than one record.
#
# TODO If :transaction is true, the query will be executed
# in a transaction and be committed it :commit is true (the default). This
# can be used in 'insert ... returning ...' statements
def tuple(*query)
r = pg_exec(parse_query *query)
check_1r(r)
r.values[0]
end
# Like #tuple but returns nil if no record was found
def tuple?(*query)
r = pg_exec(parse_query *query)
return nil if r.ntuples == 0
check_1r(r)
r.values[0]
end
# Return an array of tuples. If :transaction is true, the query will be
# executed in a transaction and be committed it :commit is true (the
# default). This can be used in 'insert ... returning ...' statements
def tuples(*query)
pg_exec(parse_query *query).values
end
# Return a single-element hash from column name to value. It is an error
# if the query returns more than one record or more than one column. Note
# that you will probably prefer to use #value instead when you query a
# single field
def field(*query)
r = pg_exec(parse_query *query)
check_1c(r)
check_1r(r)
r.tuple(0).to_h
end
# Like #field but returns nil if no record was found
def field?(*query)
r = pg_exec(parse_query *query)
check_1c(r)
return nil if r.ntuples == 0
check_1r(r)
r.tuple(0).to_h
end
# Return an array of single-element hashes from column name to value. It
# is an error if the query returns records with more than one column. Note
# that you will probably prefer to use #values instead when you expect only
# single-column records
def fields(*query)
r = pg_exec(parse_query *query)
check_1c(r)
r.each.to_a.map(&:to_h)
end
# Return a hash from column name (a Symbol) to field value. It is an error if
# the query returns more than one record. It blows up if a column name is
# not a valid ruby symbol
def record(*query)
r = pg_exec(parse_query *query)
check_1r(r)
r.tuple(0).to_h
end
# Like #record but returns nil if no record was found
def record?(*query)
r = pg_exec(parse_query *query)
return nil if r.ntuples == 0
check_1r(r)
r.tuple(0).to_h
end
# Return an array of hashes from column name to field value
def records(*query)
r = pg_exec(parse_query *query)
r.each.to_a.map(&:to_h)
end
# Return a record as a OpenStruct object. It is an error if the query
# returns more than one record. It blows up if a column name is not a valid
# ruby symbol
def struct(*query)
OpenStruct.new(**record(parse_query *query))
end
# Like #struct but returns nil if no record was found
def struct?(*query)
args = record?(parse_query *query)
return nil if args.nil?
OpenStruct.new(**args)
end
# Return an array of OpenStruct objects
def structs(*query)
records(parse_query *query).map { |record| OpenStruct.new(**record) }
end
# Return a hash from the record id column to record (hash from column name
# to field value) If the :key_column option is defined it will be used
# instead of id as the key It is an error if the id field value is not
# unique
def table(query, key_column: :id)
[String, Symbol].include?(key_column.class) or raise "Illegal key_column"
key_column = (field_name_class == Symbol ? key_column.to_sym : key_column.to_s)
r = pg_exec(query)
begin
r.fnumber(key_column.to_s) # FIXME: What is this?
rescue ArgumentError
raise Error, "Can't find column #{key_column}"
end
h = {}
r.each { |record|
key = record[key_column]
!h.key?(key) or raise Error, "Duplicate key: #{key.inspect}"
h[record[key_column]] = record.to_h
}
h
end
# TODO: #group - same as table but partitions a table on the given keys
# returning a map from key to array of records
# TODO: An #array method that returns a map from id to tuple. Hmm... almost
# the same as #map
# Return a hash from the record id column to an OpenStruct representation
# of the record. If the :key_column option is defined it will be used
# instead of id as the key. It is an error if the id field value is not
# unique
def set(query, key_column: :id)
key_column = key_column.to_sym
keys = {}
r = pg_exec(query)
begin
r.fnumber(key_column.to_s) # Check that key column exists
rescue ArgumentError
raise Error, "Can't find column #{key_column}"
end
h = {}
for i in 0...r.ntuples
struct = OpenStruct.new(**r[i])
key = struct.send(key_column)
!h.key?(key) or raise Error, "Duplicate key: #{key.inspect}"
h[key] = struct
end
h
end
# Returns a hash from the first field to a tuple of the remaining fields.
# If there is only one remaining field then that value is used instead of a
# tuple. The optional +key+ argument sets the mapping field and the
# +symbol+ option convert key to Symbol objects when true
def map(query, key = nil, symbol: false) # TODO Swap arguments
r = pg_exec(query)
begin
key = (key || r.fname(0)).to_s
key_index = r.fnumber(key.to_s)
one = (r.nfields == 2)
rescue ArgumentError
raise Error, "Can't find column #{key}"
end
h = {}
r.each_row { |row|
key_value = row.delete_at(key_index)
key_value = key_value.to_sym if symbol
!h.key?(key_value) or raise Error, "Duplicate key: #{key_value}"
h[key_value] = (one ? row.first : row)
}
h
end
# Like #map but values of duplicate keys are concatenated. It acts as a
# group-by on the key and array_agg on the remaining values. The value is
# an array of tuples if the query has more than one value field and an
# array of values if there is only one value field
def multimap(query, key = nil, symbol: false)
r = pg_exec(query)
begin
key = (key || r.fname(0)).to_s
key_index = r.fnumber(key.to_s)
one = (r.nfields == 2)
rescue ArgumentError
raise Error, "Can't find column #{key}"
end
h = {}
r.each_row { |row|
key_value = row.delete_at(key_index)
key_value = key_value.to_sym if symbol
(h[key_value] ||= []) << (one ? row.first : row)
}
h
end
# Return the value of calling the given postgres function. It dynamically
# detects the structure of the result and return a value or an array of
# values if the result contained only one column (like #value or #values),
# a tuple if the record has multiple columns (like #tuple), and an array of
# of tuples if the result contained more than one record with multiple
# columns (like #tuples).
#
# The name argument can be a String or a Symbol that may contain the schema
# of the function. If the :proc option is true the "function" is assumed
# to be a procedure
#
def call(name, *args, elem_type: nil, proc: false) # :proc may interfere with hashes
args_seq = quote_values(args, elem_type: elem_type)
if proc
pg_exec "call #{name}(#{args_seq})"
return nil
else
r = pg_exec "select * from #{name}(#{args_seq})"
if r.ntuples == 0
raise Error, "No records returned"
elsif r.ntuples == 1
if r.nfields == 1
r.values[0][0]
else
r.values[0]
end
elsif r.nfields == 1
r.column_values(0)
else
r&.values
end
end
end
# :call-seq:
# insert(table, record|records)
# insert(table, fields, record|records|tuples)
# insert(schema, table, record|records)
# insert(schema, table, fields, record|records|tuples)
#
# Insert record(s) in table and return id(s)
#
# There is no variant that takes a single tuple because it would then be
# impossible to have array or hash field values
def insert(*args, upsert: nil, **opts)
# Add options to args except the special :upsert option
args << opts if !opts.empty?
# Add schema (=nil) if absent
args.unshift nil if args.size == 2 || (args.size == 3 && args[1].is_a?(Array))
# Add fields (=nil) if absent
args.insert(-2, nil) if !args[-2].is_a?(Array)
# Check number of arguments
args.size == 4 or raise ArgumentError, "Illegal number of arguments"
# Extract parameters
schema, table, fields, data = args
# Normalize table
table = schema ? "#{schema}.#{table}" : table
# Find method and normalize data
if data.is_a?(Array) # Array of tuples
method = :values # The pg_conn method when multiple records are inserted
if data.empty?
return []
elsif data.first.is_a?(Array) # Tuple (array) element. Requires the 'fields' argument
fields or raise ArgumentError
tuples = data
elsif data.first.is_a?(Hash) # Hash element
fields ||= data.first.keys
tuples = data.map { |record| fields.map { |field| record[field] } }
else
raise ArgumentError
end
elsif data.is_a?(Hash)
method = :value # The pg_conn method when only one record is inserted
fields ||= data.keys
tuples = [fields.map { |field| data[field] }]
else
raise ArgumentError, "Illegal argument '#{data.inspect}'"
end
# On-conflict clause
upsert_sql =
case upsert
when true; "on conflict do nothing"
when String; "on conlict #{upsert}"
when false, nil; ""
else
raise ArgumentError, "Illegal value for :upsert option: #{upsert.inspect}"
end
# Execute SQL statement using either :value or :values depending on data arity
self.send method, %(
insert into #{table} (#{quote_identifiers(fields)})
values #{quote_tuples(tuples)}
#{upsert_sql}
returning id
)
end
# Use upsert. Currently on 'on conflict do nothing' is supported
def upsert(*args)
insert(*args, upsert: true)
end
# Update record(s)
def update(schema = nil, table, expr, hash)
table = [schema, table].compact.join(".")
assignments = hash.map { |k,v| "#{k} = #{quote_value(v)}" }.join(", ")
constraint =
case expr
when String; expr
when Integer; "id = #{quote_value(expr)}"
when Array; "id in (#{quote_values(expr)})"
else
raise ArgumentError
end
exec %(update #{table} set #{assignments} where #{constraint})
end
# Delete record(s)
def delete(schema = nil, table, expr)
table = [schema, table].compact.join(".")
constraint =
case expr
when String; expr
when Integer; "id = #{quote_value(expr)}"
when Array; "id in (#{quote_values(expr)})"
else
raise ArgumentError
end
exec %(delete from #{table} where #{constraint})
end
# Execute SQL statement(s) in a transaction and return the number of
# affected records (if any). Also sets #timestamp unless a transaction is
# already in progress. The +sql+ argument can be a command (String) or an
# arbitrarily nested array of commands. Note that you can't have commands
# that span multiple lines. The empty array is a NOP but the empty string
# is not.
#
# #exec pass Postgres exceptions to the caller unless :fail is false in which case
# it returns nil.
#
# Note that postgres crashes the whole transaction stack if any error is
# met so if you're inside a transaction, the transaction will be in an
# error state and if you're also using subtransactions the whole
# transaction stack has collapsed
#
# TODO: Make sure the transaction stack is emptied on postgres errors
def exec(sql, commit: true, fail: true, silent: false)
transaction(commit: commit) { execute(sql, fail: fail, silent: silent) }
end
# Like #exec but returns true/false depending on if the command succeeded.
# There is not a corresponding #execute? method because any failure rolls
# back the whole transaction stack. TODO: Check which exceptions that
# should be captured
def exec?(sql, commit: true, silent: true)
begin
exec(sql, commit: commit, fail: true, silent: silent)
rescue PG::Error
return false
end
return true
end
# Execute SQL statement(s) without a transaction block and return the
# number of affected records (if any). This used to call procedures that
# may manipulate transactions. The +sql+ argument can be a SQL command or
# an arbitrarily nested array of commands. The empty array is a NOP but the
# empty string is not. #execute pass Postgres exceptions to the caller
# unless :fail is false in which case it returns nil
#
# TODO: Handle postgres exceptions wrt transaction state and stack
def execute(sql, fail: true, silent: false)
if @pg_connection
begin
pg_exec(sql, silent: silent)&.cmd_tuples
rescue PG::Error
cancel_transaction
raise if fail
return nil
end
else
pg_exec(sql, silent: silent)
end
end
# Switch user to the given user and execute the statement before swithcing
# back to the original user
#
# FIXME: The out-commented transaction block makes postspec fail for some reason
# TODO: Rename 'sudo' because it acts just like it.
def su(username, &block)
raise Error, "Missing block in call to PgConn::Connection#su" if !block_given?
realuser = self.value "select current_user"
result = nil
# transaction(commit: false) {
execute "set session authorization #{username}"
result = yield
execute "set session authorization #{realuser}"
# }
result
end
# TODO: Move to TransactionMethods
def commit()
if transaction?
pop_transaction(fail: false)
else
pg_exec("commit")
end
end
def rollback() raise Rollback end
# True if a transaction is in progress
#
# Note that this requires all transactions to be started using PgConn's
# transaction methods; transactions started using raw SQL are not
# registered
def transaction?() !@savepoints.nil? end
# True if a database transaction is in progress
def database_transaction?
pg_exec("select transaction_timestamp() != statement_timestamp()", fail: false)
end
# Returns number of transaction or savepoint levels
def transactions() @savepoints ? 1 + @savepoints.size : 0 end
def push_transaction
if transaction?
savepoint = "savepoint_#{@savepoints.size + 1}"
@savepoints.push savepoint
pg_exec("savepoint #{savepoint}")
else
@savepoints = []
pg_exec("begin")
@error = @err = nil
# FIXME This special-cases the situation where commands are logged to a
# file instead of being executed. Maybe remove logging (or execute always
# and log as a side-effect)
if @pg_connection
@timestamp, @timestamptz = @pg_connection.exec(
'select current_timestamp, current_timestamp::timestamp without time zone'
).tuple_values(0)
end
end
end
def pop_transaction(commit: true, fail: true, exception: true)
if transaction?
if savepoint = @savepoints.pop
if !commit
pg_exec("rollback to savepoint #{savepoint}")
pg_exec("release savepoint #{savepoint}")
else
pg_exec("release savepoint #{savepoint}")
end
else
@savepoints = nil
pg_exec(commit ? "commit" : "rollback")
end
else
fail and raise Error, "No transaction in progress"
end
end
# Does a rollback and empties the stack. This should be called in response
# to PG::Error exceptions because the whole transaction stack is
# invalid and the server is in an invalid state
#
# It is not an error to call #cancel_transaction when no transaction is in
# progress, the method always succeeds
def cancel_transaction
begin
pg_exec("rollback")
rescue PG::Error
;
end
@savepoints = nil
true
end
# Start a transaction. If called with a block, the block is executed within
# a transaction that is auto-committed if the commit option is true (the
# default). #transaction returns the result of the block or nil if no block
# was given
#
# The transaction can be rolled back inside the block by raising a
# PgConn::Rollback exception in which case #transaction returns nil. Note
# that the transaction timestamp is set to the start of the first
# transaction even if transactions are nested
def transaction(commit: true, &block)
if block_given?
result = nil
begin
push_transaction
result = yield
rescue PgConn::Rollback
pop_transaction(commit: false, fail: false)
return nil
rescue PG::Error
cancel_transaction
@savepoints = nil
raise
end
pop_transaction(commit: commit, fail: false)
result
else
push_transaction
nil
end
end
private
# Wrapper around PG::Connection.new that switches to the postgres user
# before connecting if the current user is the root user
def make_connection(*args, **opts)
if Process.euid == 0
begin
postgres_uid = Process::UID.from_name "postgres"
rescue ArgumentError
raise Error, "Can't find 'postgres' user"
end
begin
postgres_gid = Process::GID.from_name "postgres"
rescue ArgumentError
raise Error, "Can't find 'postgres' group"
end
begin
Process::Sys.seteuid postgres_uid
Process::Sys.setegid postgres_gid
PG::Connection.new *args, **opts
ensure
Process::Sys.seteuid 0
Process::Sys.setguid 0
end
else
PG::Connection.new *args, **opts
end
end
# Common implementation for #quote_record and #quote_records that avoids
# query the database multiple times while not duplication the code. the
# :array flag controls the mode
def quote_record_impl(datas, schema_name = nil, type, elem_types: nil, array: nil)
pg_type = [schema_name, type].compact.join('.')
fields = self.values(%(
select attname
from pg_attribute
where
attrelid = '#{pg_type}'::regclass
and attnum > 0
and not attisdropped
order by attnum
)).map(&:to_sym)
datas = [datas] if !array
literals = datas.map { |data|
values =
case data
when Hash; fields.map { |f| data[f] }
when OpenStruct; fields.map { |f| data.send(f) }
when Array; data
else
raise Error, "Illegal value #{data.inspect}"
end
"(#{quote_tuple(values, elem_types: elem_types)})::#{pg_type}"
}
if array
"array[#{literals.join(', ')}]::#{pg_type}[]"
else
literals.first
end
end
# :call-seq
# parse_query(query)
# parse_query(table, id, fields...)
# parse_query(table, where_clause, fields...)
# parse_query(table, hash, fields...)
# parse_query(table, fields...)
#
# Parse a query. Used in query-functions (#value etc.). The fields argument
# can be a list of fields or arrays of fields
#
def parse_query(*args)
args.size > 0 or raise ArgumentError
return args.first if args.size == 1 && args.first =~ / /
table = args.shift
where_clause = "true"
fields = []
case args.first
when Integer; where_clause = "id = #{args.shift}"
when String; where_clause = args.shift
when Hash
where_clause = args.shift.map { |k,v|
"#{self.quote_identifier(k)} = #{self.quote_value(v) }"
}.join(" and ")
when Symbol; fields << args.shift
when Array; fields = args.shift
when nil; where_clause = "true"
else
raise ArgumentError
end
fields.concat args.flatten
field_list = fields.empty? ? "*" : self.quote_identifiers(fields)
"select #{field_list} from #{table} where #{where_clause}"
end
# :call-seq:
# pg_exec(string)
# pg_exec(array)
#
# Execute statement(s) on the server. If the argument is an array of
# commands, the commands are concatenated with ';' before being sent to the
# server. #pg_exec returns a PG::Result object or nil if +arg+ was empty
#
# Postgres errors are passed through and #error and #err set to the last
# statement's SQL errors or nil if it succeeded
#
# FIXME: Error message prints the last statement but what if another
# statement failed?
#
# TODO: Connsider executing statements one-by-one so we're able to
# pin-point Postgres errors without a line number. This may be expensive,
# though
#
# TODO: Fix silent by not handling exceptions
def pg_exec(arg, silent: false)
if @pg_connection
begin
last_stmt = nil # To make the current SQL statement visible to the rescue clause. FIXME Not used?
if arg.is_a?(String)
return nil if arg == ""
last_stmt = arg
@pg_connection.exec(last_stmt)
else
stmts = arg.flatten.compact
return nil if stmts.empty?
# stmts.unshift("set on_error_exit stop")
last_stmt = stmts.last
@pg_connection.exec(stmts.join(";\n"))
end
rescue PG::Error => ex
if @error.nil?
@error = ex
@err = nil
end
if !silent # FIXME Why do we handle this?
$stderr.puts arg
$stderr.puts
$stderr.puts ex.message
$stderr.flush
end
raise
end
# For dump of SQL statements
else # @pg_commands is defined
if arg.is_a?(String)
@pg_commands << arg if arg != ""
else
@pg_commands.concat(arg)
end
nil
end
end
def check_1c(r)
case r.nfields
when 0; raise Error, "No columns returned"
when 1;
else
raise Error, "More than one column returned"
end
end
def check_1r(r)
if r.ntuples == 0
raise Error, "No records returned"
elsif r.ntuples > 1
raise Error, "More than one record returned"
end
end
end
def self.sql_values(values) "'" + values.join("', '") + "'" end
def self.sql_idents(values) '"' + values.join('", "') + '"' end
end