lib/pg_conn.rb



require "pg"
require 'ostruct'

require_relative "pg_conn/version"
require_relative "pg_conn/role_methods"
require_relative "pg_conn/schema_methods"
require_relative "pg_conn/rdbms_methods"
require_relative "pg_conn/session_methods"

module PgConn
  class Error < StandardError; end

  # Can be raised in #transaction blocks to rollback changes
  class Rollback < Error; end

  # Return a PgConn::Connection object. TODO: A block argument
  def self.new(*args, &block) Connection.new(*args, &block) end

  # Make the PgConn module pretend it has PgConn instances
  def self.===(element) element.is_a?(PgConn::Connection) or super end

  # Returns a PgConn::Connection object (aka. a PgConn object). It's arguments
  # can be an existing connection that will just be returned or a set of
  # PgConn::Connection#initialize arguments that will be used to create a new
  # PgConn::Connection object
  def self.ensure(*args)
    if args.size == 1 && args.first.is_a?(PgConn::Connection)
      args.first
    else
      PgConn::Connection.new(*args)
    end
  end

  # Used to mark strings as literals that should not be quoted. This is the
  # case for row and record values
  class Literal < String; end

  # All results from the database are converted into native Ruby types
  class Connection
    # Make PgConn::Connection pretend to be an instance of the PgConn module
    def is_a?(klass) klass == PgConn or super end

    # The PG::Connection object
    attr_reader :pg_connection

    # The class of column names (Symbol or String). Default is Symbol
    attr_reader :field_name_class

    # Name of user
    def user() @pg_connection.user end
    alias_method :username, :user # Obsolete FIXME Is it?

    # Name of database
    def name() @pg_connection.db end
    alias_method :database, :name # Obsolete FIXME Is it?

    # Database manipulation methods: #exist?, #create, #drop, #list
    attr_reader :rdbms

    # Role manipulation methods: #exist?, #create, #drop, #list
    attr_reader :role

    # Schema manipulation methods: #exist?, #create, #drop, #list, and
    # #exist?/#list for relations/tables/views/columns
    attr_reader :schema

    # Session manipulation methods: #list, #terminate, #disable, #enable
    attr_reader :session

    # The transaction timestamp of the most recent SQL statement executed by
    # #exec or #transaction block. The timestamp is without time zone
    attr_reader :timestamp

    # The transaction timestamp of the most recent SQL statement executed by
    # #exec or #transaction block. The timestamp includes the current time zone
    attr_reader :timestamptz

    # PG::Error object of the first failed statement in the transaction;
    # otherwise nil. It is cleared at the beginning of a transaction so be sure
    # to save it before you run any cleanup code that may initiate new
    # transactions
    attr_reader :error

    # True if the transaction is in a error state
    def error?() !@error.nil? end

    # Tuple of error message, lineno, and charno of the error object where each
    # element defaults to nil if not found
    def err
      @err ||=
        if error&.message =~ /.*?ERROR:\s*(.*?)\n(?:.*?(\s*LINE\s+(\d+): ).*?\n(?:(\s+)\^\n)?)?/
          [$1.capitalize, $3&.to_i, $4 && ($4.size - $2.size + 1)]
        else
          [nil, nil, nil]
        end
    end

    # Last error message. The error message is the first line of the PG error
    # message that may contain additional info. It doesn't contain a
    # terminating newline
    def errmsg = err[0]

    # The one-based line number of the last error or nil if absent in the
    # Postgres error message
    def errline = err[1]

    # The one-based character number of the error in the last PG::Error or nil
    # if absent in the Postgres error message
    def errchar = err[2]

    # :call-seq:
    #     initialize(dbname = nil, user = nil, field_name_class: Symbol)
    #     initialize(connection_hash, field_name_class: Symbol)
    #     initialize(connection_string, field_name_class: Symbol)
    #     initialize(host, port, dbname, user, password, field_name_class: Symbol)
    #     initialize(array, field_name_class: Symbol)
    #     initialize(pg_connection_object)
    #
    # Initialize a connection object and connect to the database
    #
    # The possible keys of the connection hash are :host, :port, :dbname, :user,
    # and :password. The connection string can either be a space-separated list
    # of <key>=<value> pairs with the same keys as the hash, or a URI with the
    # format 'postgres[ql]://[user[:password]@][host][:port][/name]
    #
    # If given an array argument, PgConn will not connect to the database and
    # instead write its commands to the array. In this case, methods extracting
    # values from the database (eg. #value) will return nil or raise an
    # exception
    #
    # The last variant is used to establish a PgConn from an existing
    # connection. It doesn't change the connection settings and is not
    # recommended except in cases where you want to piggyback on an existing
    # connection (eg. a Rails connection)
    #
    # The :field_name_class option controls the Ruby type of column names. It can be
    # Symbol (the default) or String. The :timestamp option is used
    # internally to set the timestamp for transactions
    #
    # Note that the connection hash and the connection string may support more
    # parameters than documented here. Consult
    # https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING
    # for the full list
    #
    # TODO: Change to 'initialize(*args, **opts)'
    def initialize(*args)
      if args.last.is_a?(Hash)
        @field_name_class = args.last.delete(:field_name_class) || Symbol
        @timestamp = args.last.delete(:timestamp)
        @timestamptz = args.last.delete(:timestamptz)
        args.pop if args.last.empty?
      else
        @field_name_class = Symbol
      end

#     else # We assume that the current user is a postgres superuser
#       @db = PgConn.new("template0")

      using_existing_connection = false
      @pg_connection =
          if args.size == 0
            make_connection
          elsif args.size == 1
            case arg = args.first
              when PG::Connection
                using_existing_connection = true
                arg
              when String
                if arg =~ /=/
                  make_connection arg
                elsif arg =~ /\//
                  make_connection arg
                else
                  make_connection dbname: arg
                end
              when Hash
                make_connection **arg
              when Array
                @pg_commands = arg
                nil
            else
              raise Error, "Illegal argument type: #{arg.class}"
            end
          elsif args.size == 2
            make_connection dbname: args.first, user: args.last
          elsif args.size == 5
            make_connection args[0], args[1], nil, nil, args[2], args[3], args[4]
          else
            raise Error, "Illegal number of parameters: #{args.size}"
          end

      if @pg_connection && !using_existing_connection
        # Set a dummy notice processor to avoid warnings on stderr
        @pg_connection.set_notice_processor { |message| ; } # Intentionally a nop

        # Auto-convert to ruby types
        type_map = PG::BasicTypeMapForResults.new(@pg_connection)

        # Use String as default type. Kills 'Warning: no type cast defined for
        # type "uuid" with oid 2950..' warnings
        type_map.default_type_map = PG::TypeMapAllStrings.new

        # Timestamp decoder
        type_map.add_coder PG::TextDecoder::Timestamp.new( # Timestamp without time zone
            oid: 1114,
            flags: PG::Coder::TIMESTAMP_DB_UTC | PG::Coder::TIMESTAMP_APP_UTC)

        # Decode anonymous records but note that this is only useful to convert the
        # outermost structure into an array, the elements are not decoded and are
        # returned as strings. It is best to avoid anonymous records if possible
        type_map.add_coder PG::TextDecoder::Record.new(
            oid: 2249
        )

        @pg_connection.type_map_for_results = type_map
        @pg_connection.field_name_type = @field_name_class.to_s.downcase.to_sym # Use symbol field names
        @pg_connection.exec "set client_min_messages to warning;" # Silence warnings
      end

      @schema = SchemaMethods.new(self)
      @role = RoleMethods.new(self)
      @rdbms = RdbmsMethods.new(self)
      @session = SessionMethods.new(self)
      @savepoints = nil # Stack of savepoint names. Nil if no transaction in progress
    end

    # Reset connection but keep noise level (TODO: How about the other
    # per-session settings in #initialize? Are they cleared by #reset too?)
    def reset
      @pg_connection.reset
      @pg_connection.exec "set client_min_messages to warning;" # Silence warnings
    end

    # Close the database connection. TODO: Rename 'close'
    def terminate()
      @pg_connection.close if @pg_connection && !@pg_connection.finished?
    end

    def self.new(*args, **opts, &block)
      if block_given?
        begin
          object = Connection.allocate
          object.send(:initialize, *args, **opts)
          yield(object) # if object.pg_connection
        ensure
          object.terminate if object.pg_connection
        end
      else
        super(*args, **opts)
      end
    end

    def literal(arg) Literal.new(arg) end

    # Quote argument as an identifier. The argument should be a non-nil string
    # or a symbol
    def quote_identifier(s)
      s = s.to_s if s.is_a?(Symbol)
      @pg_connection.escape_identifier(s).gsub(/\./, '"."').sub(/"\*"/, "*")
    end

    # Quote identifiers and concatenate them using ',' as separator
    def quote_identifiers(idents) = idents.map { |ident| quote_identifier(ident) }.join(", ")

    # Quote the value as a string. Emit 'null' if value is nil
    #
    # The value can be of any type but is converted to a string using #to_s
    # before quoting. This works by default for the regular types Integer,
    # true/false, Time/Date/DateTime, and arrays. Other types may require
    # special handling
    #
    # Note that a tuple value (an array) must be quoted using #quote_tuple
    # because #quote_value would quote the tuple as an array instead of a list
    # of values
    #
    # The :elem_type option can be a postgres type name (String or Symbol) or
    # an array of type names. It is used as the required explicit element
    # type when the argument is an empty array. It is not needed if the array
    # is guaranteed to be non-empty. Nested arrays are not supported
    #
    def quote_value(value, elem_type: nil)
      case value
        when Literal; value
        when String; @pg_connection.escape_literal(value)
        when Integer, Float; value.to_s
        when true, false; value.to_s
        when nil; 'null'
        when Date, DateTime; "'#{value}'"
        when Time; "'#{value.strftime("%FT%T%:z")}'"
        when Array
          if value.empty?
            elem_type or raise Error, "Empty array without elem_type"
            "array[]::#{elem_type}[]"
          else
            "array[#{value.map { |elem| quote_value(elem) }.join(', ')}]"
          end
      else
        @pg_connection.escape_literal(value.to_s)
      end
    end

    # Quote values and concatenate them using ',' as separator
    def quote_values(values, elem_type: nil)
      values.map { |value| quote_value(value, elem_type: elem_type) }.join(", ")
    end

   # Quote an array of values as a tuple. The element types should be in the
   # same order as the array arguments. #quote_tuples is same as #quote_values
   # except the values may have different types (this makes no difference
   # except in the case when the tuple may contain empty array(s))
   def quote_tuple(tuple, elem_types: nil)
      elem_types = Array(elem_types)
      tuple.map { |value|
        elem_type = value.is_a?(Array) ? elem_types&.shift : nil
        quote_value(value, elem_type: elem_type)
      }.join(", ")
    end

    # Quote an array of tuples
    def quote_tuples(tuples, elem_types: nil)
      tuples.map { |tuple| "(#{quote_tuple(tuple, elem_types: elem_types)})" }.join(", ")
    end

    # Quote a record and cast it into the given type, the type can also be a
    # table or view. 'data' is an array, hash, or struct representation of the
    # record
    #
    # Note that the fields are retrived from the database so this method is not
    # as fast as the other quote-methods. It is however very convenient when
    # you're testing and need a composite type because record-quoting can
    # easily become unwieldly
    def quote_record(data, schema_name = nil, type, elem_types: nil)
      quote_record_impl(data, schema_name, type, elem_types: elem_types, array: false)
    end

    # Quote an array of records. The type is the record type, not the type of
    # the enclosing array
    def quote_records(data, schema_name = nil, type, elem_types: nil)
      quote_record_impl(data, schema_name, type, elem_types: elem_types, array: true)
    end

    # :call-seq:
    #   exist?(query)
    #   exist?(table, id)
    #   eists?(table, where_clause)
    #
    # Return true iff the query returns exactly one record. Use '!empty?' to
    # check if the query returns one or more records
    def exist?(*query)
      !empty?(*query)
    end

    # :call-seq:
    #   count(query)
    #   count(table, where_clause = nil)
    #
    # Return true if the table or the result of the query is empty
    def empty?(*query)
      inner_query = parse_query *query
      self.value("select count(*) from (#{inner_query} limit 1) as \"inner_query\"") == 0
    end

    # :call-seq:
    #   count(query)
    #   count(table_name, where_clause = nil)
    #
    # The number of records in the table or in the query
    def count(*query)
      inner_query = parse_query *query
      value("select count(*) from (#{inner_query}) as inner_query")
    end

    # TODO (but breaks a lot of code)
    #   value     1
    #   value?    0 or 1
    #
    #   values    1 or more
    #   values?   0 or more

    # Return a single value. It is an error if the query doesn't return a
    # single record with a single column.
    #
    # TODO If :transaction is true, the query will be executed in a
    # transaction and also be committed if :commit is true (this is the
    # default). It can also be used to execute 'insert' statements with a
    # 'returning' clause
    def value(*query) #, transaction: false, commit: true)
      r = pg_exec(parse_query *query)
      check_1c(r)
      check_1r(r)
      r.values[0][0]
    end

    # Like #value but returns nil if no record was found. It is still an error
    # if the query returns more than one column
    def value?(*query) #, transaction: false, commit: true)
      r = pg_exec(parse_query *query)
      check_1c(r)
      return nil if r.ntuples == 0
      check_1r(r)
      r.values[0][0]
    end

    # Return an array of values. It is an error if the query returns records
    # with more than one column.
    #
    # TODO If :transaction is true, the query will be executed in a
    # transaction and be committed it :commit is true (the default). This can
    # be used in 'insert ... returning ...' statements
    def values(*query)
      r = pg_exec(parse_query *query)
      check_1c(r)
      r.column_values(0)
    end

    # Return an array of column values. It is an error if the query returns
    # more than one record.
    #
    # TODO If :transaction is true, the query will be executed
    # in a transaction and be committed it :commit is true (the default). This
    # can be used in 'insert ... returning ...' statements
    def tuple(*query)
      r = pg_exec(parse_query *query)
      check_1r(r)
      r.values[0]
    end

    # Like #tuple but returns nil if no record was found
    def tuple?(*query)
      r = pg_exec(parse_query *query)
      return nil if r.ntuples == 0
      check_1r(r)
      r.values[0]
    end

    # Return an array of tuples. If :transaction is true, the query will be
    # executed in a transaction and be committed it :commit is true (the
    # default). This can be used in 'insert ... returning ...' statements
    def tuples(*query)
      pg_exec(parse_query *query).values
    end

    # Return a single-element hash from column name to value. It is an error
    # if the query returns more than one record or more than one column. Note
    # that you will probably prefer to use #value instead when you query a
    # single field
    def field(*query)
      r = pg_exec(parse_query *query)
      check_1c(r)
      check_1r(r)
      r.tuple(0).to_h
    end

    # Like #field but returns nil if no record was found
    def field?(*query)
      r = pg_exec(parse_query *query)
      check_1c(r)
      return nil if r.ntuples == 0
      check_1r(r)
      r.tuple(0).to_h
    end

    # Return an array of single-element hashes from column name to value. It
    # is an error if the query returns records with more than one column. Note
    # that you will probably prefer to use #values instead when you expect only
    # single-column records
    def fields(*query)
      r = pg_exec(parse_query *query)
      check_1c(r)
      r.each.to_a.map(&:to_h)
    end

    # Return a hash from column name (a Symbol) to field value. It is an error if
    # the query returns more than one record. It blows up if a column name is
    # not a valid ruby symbol
    def record(*query)
      r = pg_exec(parse_query *query)
      check_1r(r)
      r.tuple(0).to_h
    end

    # Like #record but returns nil if no record was found
    def record?(*query)
      r = pg_exec(parse_query *query)
      return nil if r.ntuples == 0
      check_1r(r)
      r.tuple(0).to_h
    end

    # Return an array of hashes from column name to field value
    def records(*query)
      r = pg_exec(parse_query *query)
      r.each.to_a.map(&:to_h)
    end

    # Return a record as a OpenStruct object. It is an error if the query
    # returns more than one record. It blows up if a column name is not a valid
    # ruby symbol
    def struct(*query)
      OpenStruct.new(**record(parse_query *query))
    end

    # Like #struct but returns nil if no record was found
    def struct?(*query)
      args = record?(parse_query *query)
      return nil if args.nil?
      OpenStruct.new(**args)
    end

    # Return an array of OpenStruct objects
    def structs(*query)
      records(parse_query *query).map { |record| OpenStruct.new(**record) }
    end

    # Return a hash from the record id column to record (hash from column name
    # to field value) If the :key_column option is defined it will be used
    # instead of id as the key It is an error if the id field value is not
    # unique
    def table(query, key_column: :id)
      [String, Symbol].include?(key_column.class) or raise "Illegal key_column"
      key_column = (field_name_class == Symbol ? key_column.to_sym : key_column.to_s)
      r = pg_exec(query)
      begin
        r.fnumber(key_column.to_s) # FIXME: What is this?
      rescue ArgumentError
        raise Error, "Can't find column #{key_column}"
      end
      h = {}
      r.each { |record|
        key = record[key_column]
        !h.key?(key) or raise Error, "Duplicate key: #{key.inspect}"
        h[record[key_column]] = record.to_h
      }
      h
    end

    # TODO: #group - same as table but partitions a table on the given keys
    #       returning a map from key to array of records

    # TODO: An #array method that returns a map from id to tuple. Hmm... almost
    #       the same as #map

    # Return a hash from the record id column to an OpenStruct representation
    # of the record. If the :key_column option is defined it will be used
    # instead of id as the key. It is an error if the id field value is not
    # unique
    def set(query, key_column: :id)
      key_column = key_column.to_sym
      keys = {}
      r = pg_exec(query)
      begin
        r.fnumber(key_column.to_s) # Check that key column exists
      rescue ArgumentError
        raise Error, "Can't find column #{key_column}"
      end
      h = {}
      for i in 0...r.ntuples
        struct = OpenStruct.new(**r[i])
        key = struct.send(key_column)
        !h.key?(key) or raise Error, "Duplicate key: #{key.inspect}"
        h[key] = struct
      end
      h
    end

    # Returns a hash from the first field to a tuple of the remaining fields.
    # If there is only one remaining field then that value is used instead of a
    # tuple. The optional +key+ argument sets the mapping field and the
    # +symbol+ option convert key to Symbol objects when true
    def map(query, key = nil, symbol: false) # TODO Swap arguments
      r = pg_exec(query)
      begin
        key = (key || r.fname(0)).to_s
        key_index = r.fnumber(key.to_s)
        one = (r.nfields == 2)
      rescue ArgumentError
        raise Error, "Can't find column #{key}"
      end
      h = {}
      r.each_row { |row|
        key_value = row.delete_at(key_index)
        key_value = key_value.to_sym if symbol
        !h.key?(key_value) or raise Error, "Duplicate key: #{key_value}"
        h[key_value] = (one ? row.first : row)
      }
      h
    end

    # Like #map but values of duplicate keys are concatenated. It acts as a
    # group-by on the key and array_agg on the remaining values. The value is
    # an array of tuples if the query has more than one value field and an
    # array of values if there is only one value field
    def multimap(query, key = nil, symbol: false)
      r = pg_exec(query)
      begin
        key = (key || r.fname(0)).to_s
        key_index = r.fnumber(key.to_s)
        one = (r.nfields == 2)
      rescue ArgumentError
        raise Error, "Can't find column #{key}"
      end
      h = {}
      r.each_row { |row|
        key_value = row.delete_at(key_index)
        key_value = key_value.to_sym if symbol
        (h[key_value] ||= []) << (one ? row.first : row)
      }
      h
    end

    # Return the value of calling the given postgres function. It dynamically
    # detects the structure of the result and return a value or an array of
    # values if the result contained only one column (like #value or #values),
    # a tuple if the record has multiple columns (like #tuple), and an array of
    # of tuples if the result contained more than one record with multiple
    # columns (like #tuples).
    #
    # The name argument can be a String or a Symbol that may contain the schema
    # of the function.  If the :proc option is true the "function" is assumed
    # to be a procedure
    #
    def call(name, *args, elem_type: nil, proc: false) # :proc may interfere with hashes
      args_seq = quote_values(args, elem_type: elem_type)
      if proc
        pg_exec "call #{name}(#{args_seq})"
        return nil
      else
        r = pg_exec "select * from #{name}(#{args_seq})"
        if r.ntuples == 0
          raise Error, "No records returned"
        elsif r.ntuples == 1
          if r.nfields == 1
            r.values[0][0]
          else
            r.values[0]
          end
        elsif r.nfields == 1
          r.column_values(0)
        else
          r&.values
        end
      end
    end

    # :call-seq:
    #   insert(table, record|records)
    #   insert(table, fields, record|records|tuples)
    #   insert(schema, table, record|records)
    #   insert(schema, table, fields, record|records|tuples)
    #
    # Insert record(s) in table and return id(s)
    #
    # There is no variant that takes a single tuple because it would then be
    # impossible to have array or hash field values
    def insert(*args, upsert: nil, **opts)
      # Add options to args except the special :upsert option
      args << opts if !opts.empty?

      # Add schema (=nil) if absent
      args.unshift nil if args.size == 2 || (args.size == 3 && args[1].is_a?(Array))

      # Add fields (=nil) if absent
      args.insert(-2, nil) if !args[-2].is_a?(Array)

      # Check number of arguments
      args.size == 4 or raise ArgumentError, "Illegal number of arguments"

      # Extract parameters
      schema, table, fields, data = args

      # Normalize table
      table = schema ? "#{schema}.#{table}" : table

      # Find method and normalize data
      if data.is_a?(Array) # Array of tuples
        method = :values # The pg_conn method when multiple records are inserted
        if data.empty?
          return []
        elsif data.first.is_a?(Array) # Tuple (array) element. Requires the 'fields' argument
          fields or raise ArgumentError
          tuples = data
        elsif data.first.is_a?(Hash) # Hash element
          fields ||= data.first.keys
          tuples = data.map { |record| fields.map { |field| record[field] } }
        else
          raise ArgumentError
        end
      elsif data.is_a?(Hash)
        method = :value # The pg_conn method when only one record is inserted
        fields ||= data.keys
        tuples = [fields.map { |field| data[field] }]
      else
        raise ArgumentError, "Illegal argument '#{data.inspect}'"
      end

      # On-conflict clause
      upsert_sql =
          case upsert
            when true; "on conflict do nothing"
            when String; "on conlict #{upsert}"
            when false, nil; ""
          else
            raise ArgumentError, "Illegal value for :upsert option: #{upsert.inspect}"
          end

      # Execute SQL statement using either :value or :values depending on data arity
      self.send method, %(
        insert into #{table} (#{quote_identifiers(fields)})
          values #{quote_tuples(tuples)}
          #{upsert_sql}
          returning id
      )
    end

    # Use upsert. Currently on 'on conflict do nothing' is supported
    def upsert(*args)
      insert(*args, upsert: true)
    end

    # Update record(s)
    def update(schema = nil, table, expr, hash)
      table = [schema, table].compact.join(".")
      assignments = hash.map { |k,v| "#{k} = #{quote_value(v)}" }.join(", ")
      constraint =
          case expr
            when String; expr
            when Integer; "id = #{quote_value(expr)}"
            when Array; "id in (#{quote_values(expr)})"
          else
            raise ArgumentError
          end
      exec %(update #{table} set #{assignments} where #{constraint})
    end

    # Delete record(s)
    def delete(schema = nil, table, expr)
      table = [schema, table].compact.join(".")
      constraint =
          case expr
            when String; expr
            when Integer; "id = #{quote_value(expr)}"
            when Array; "id in (#{quote_values(expr)})"
          else
            raise ArgumentError
          end
      exec %(delete from #{table} where #{constraint})
    end

    # Execute SQL statement(s) in a transaction and return the number of
    # affected records (if any). Also sets #timestamp unless a transaction is
    # already in progress. The +sql+ argument can be a command (String) or an
    # arbitrarily nested array of commands. Note that you can't have commands
    # that span multiple lines. The empty array is a NOP but the empty string
    # is not.
    #
    # #exec pass Postgres exceptions to the caller unless :fail is false in which case
    # it returns nil.
    #
    # Note that postgres crashes the whole transaction stack if any error is
    # met so if you're inside a transaction, the transaction will be in an
    # error state and if you're also using subtransactions the whole
    # transaction stack has collapsed
    #
    # TODO: Make sure the transaction stack is emptied on postgres errors
    def exec(sql, commit: true, fail: true, silent: false)
      transaction(commit: commit) { execute(sql, fail: fail, silent: silent) }
    end

    # Like #exec but returns true/false depending on if the command succeeded.
    # There is not a corresponding #execute? method because any failure rolls
    # back the whole transaction stack. TODO: Check which exceptions that
    # should be captured
    def exec?(sql, commit: true, silent: true)
      begin
        exec(sql, commit: commit, fail: true, silent: silent)
      rescue PG::Error
        return false
      end
      return true
    end

    # Execute SQL statement(s) without a transaction block and return the
    # number of affected records (if any). This used to call procedures that
    # may manipulate transactions. The +sql+ argument can be a SQL command or
    # an arbitrarily nested array of commands. The empty array is a NOP but the
    # empty string is not. #execute pass Postgres exceptions to the caller
    # unless :fail is false in which case it returns nil
    #
    # TODO: Handle postgres exceptions wrt transaction state and stack
    def execute(sql, fail: true, silent: false)
      if @pg_connection
        begin
          pg_exec(sql, silent: silent)&.cmd_tuples
        rescue PG::Error
          cancel_transaction
          raise if fail
          return nil
        end
      else
        pg_exec(sql, silent: silent)
      end
    end

    # Switch user to the given user and execute the statement before swithcing
    # back to the original user
    #
    # FIXME: The out-commented transaction block makes postspec fail for some reason
    # TODO: Rename 'sudo' because it acts just like it.
    def su(username, &block)
      raise Error, "Missing block in call to PgConn::Connection#su" if !block_given?
      realuser = self.value "select current_user"
      result = nil
#     transaction(commit: false) {
        execute "set session authorization #{username}"
        result = yield
        execute "set session authorization #{realuser}"
#     }
      result
    end

    # TODO: Move to TransactionMethods

    def commit()
      if transaction?
        pop_transaction(fail: false)
      else
        pg_exec("commit")
      end
    end

    def rollback() raise Rollback end

    # True if a transaction is in progress
    #
    # Note that this requires all transactions to be started using PgConn's
    # transaction methods; transactions started using raw SQL are not
    # registered
    def transaction?() !@savepoints.nil? end

    # True if a database transaction is in progress
    def database_transaction?
      pg_exec("select transaction_timestamp() != statement_timestamp()", fail: false)
    end

    # Returns number of transaction or savepoint levels
    def transactions() @savepoints ? 1 + @savepoints.size : 0 end

    def push_transaction
      if transaction?
        savepoint = "savepoint_#{@savepoints.size + 1}"
        @savepoints.push savepoint
        pg_exec("savepoint #{savepoint}")
      else
        @savepoints = []
        pg_exec("begin")
        @error = @err = nil
        # FIXME This special-cases the situation where commands are logged to a
        # file instead of being executed. Maybe remove logging (or execute always
        # and log as a side-effect)
        if @pg_connection
            @timestamp, @timestamptz = @pg_connection.exec(
                'select current_timestamp, current_timestamp::timestamp without time zone'
            ).tuple_values(0)
        end
      end
    end

    def pop_transaction(commit: true, fail: true, exception: true)
      if transaction?
        if savepoint = @savepoints.pop
          if !commit
            pg_exec("rollback to savepoint #{savepoint}")
            pg_exec("release savepoint #{savepoint}")
          else
            pg_exec("release savepoint #{savepoint}")
          end
        else
          @savepoints = nil
          pg_exec(commit ? "commit" : "rollback")
        end
      else
        fail and raise Error, "No transaction in progress"
      end
    end

    # Does a rollback and empties the stack. This should be called in response
    # to PG::Error exceptions because the whole transaction stack is
    # invalid and the server is in an invalid state
    #
    # It is not an error to call #cancel_transaction when no transaction is in
    # progress, the method always succeeds
    def cancel_transaction
      begin
        pg_exec("rollback")
      rescue PG::Error
        ;
      end
      @savepoints = nil
      true
    end

    # Start a transaction. If called with a block, the block is executed within
    # a transaction that is auto-committed if the commit option is true (the
    # default). #transaction returns the result of the block or nil if no block
    # was given
    #
    # The transaction can be rolled back inside the block by raising a
    # PgConn::Rollback exception in which case #transaction returns nil. Note
    # that the transaction timestamp is set to the start of the first
    # transaction even if transactions are nested
    def transaction(commit: true, &block)
      if block_given?
        result = nil
        begin
          push_transaction
          result = yield
        rescue PgConn::Rollback
          pop_transaction(commit: false, fail: false)
          return nil
        rescue PG::Error
          cancel_transaction
          @savepoints = nil
          raise
        end
        pop_transaction(commit: commit, fail: false)
        result
      else
        push_transaction
        nil
      end
    end

  private
    # Wrapper around PG::Connection.new that switches to the postgres user
    # before connecting if the current user is the root user
    def make_connection(*args, **opts)
      if Process.euid == 0
        begin
          postgres_uid = Process::UID.from_name "postgres"
        rescue ArgumentError
          raise Error, "Can't find 'postgres' user"
        end
        begin
          postgres_gid = Process::GID.from_name "postgres"
        rescue ArgumentError
          raise Error, "Can't find 'postgres' group"
        end

        begin
          Process::Sys.seteuid postgres_uid
          Process::Sys.setegid postgres_gid
          PG::Connection.new *args, **opts
        ensure
          Process::Sys.seteuid 0
          Process::Sys.setguid 0
        end
      else
        PG::Connection.new *args, **opts
      end
    end

    # Common implementation for #quote_record and #quote_records that avoids
    # query the database multiple times while not duplication the code. the
    # :array flag controls the mode
    def quote_record_impl(datas, schema_name = nil, type, elem_types: nil, array: nil)
      pg_type = [schema_name, type].compact.join('.')
      fields = self.values(%(
        select attname
        from pg_attribute
        where
          attrelid = '#{pg_type}'::regclass
          and attnum > 0
          and not attisdropped
        order by attnum
      )).map(&:to_sym)

      datas = [datas] if !array

      literals = datas.map { |data|
        values =
            case data
              when Hash; fields.map { |f| data[f] }
              when OpenStruct; fields.map { |f| data.send(f) }
              when Array; data
            else
              raise Error, "Illegal value #{data.inspect}"
            end
        "(#{quote_tuple(values, elem_types: elem_types)})::#{pg_type}"
      }

      if array
        "array[#{literals.join(', ')}]::#{pg_type}[]"
      else
        literals.first
      end
    end

    # :call-seq
    #   parse_query(query)
    #   parse_query(table, id, fields...)
    #   parse_query(table, where_clause, fields...)
    #   parse_query(table, hash, fields...)
    #   parse_query(table, fields...)
    #
    # Parse a query. Used in query-functions (#value etc.). The fields argument
    # can be a list of fields or arrays of fields
    #
    def parse_query(*args)
      args.size > 0 or raise ArgumentError
      return args.first if args.size == 1 && args.first =~ / /

      table = args.shift
      where_clause = "true"
      fields = []
      case args.first
        when Integer; where_clause = "id = #{args.shift}"
        when String; where_clause = args.shift
        when Hash
          where_clause = args.shift.map { |k,v|
            "#{self.quote_identifier(k)} = #{self.quote_value(v) }"
          }.join(" and ")
        when Symbol; fields << args.shift
        when Array; fields = args.shift
        when nil; where_clause = "true"
      else
        raise ArgumentError
      end
      fields.concat args.flatten
      field_list = fields.empty? ? "*" : self.quote_identifiers(fields)

      "select #{field_list} from #{table} where #{where_clause}"
    end

    # :call-seq:
    #   pg_exec(string)
    #   pg_exec(array)
    #
    # Execute statement(s) on the server. If the argument is an array of
    # commands, the commands are concatenated with ';' before being sent to the
    # server. #pg_exec returns a PG::Result object or nil if +arg+ was empty
    #
    # Postgres errors are passed through and #error and #err set to the last
    # statement's SQL errors or nil if it succeeded
    #
    # FIXME: Error message prints the last statement but what if another
    # statement failed?
    #
    # TODO: Connsider executing statements one-by-one so we're able to
    # pin-point Postgres errors without a line number. This may be expensive,
    # though
    #
    # TODO: Fix silent by not handling exceptions
    def pg_exec(arg, silent: false)
      if @pg_connection
        begin
          last_stmt = nil # To make the current SQL statement visible to the rescue clause. FIXME Not used?
          if arg.is_a?(String)
            return nil if arg == ""
            last_stmt = arg
            @pg_connection.exec(last_stmt)
          else
            stmts = arg.flatten.compact
            return nil if stmts.empty?
#           stmts.unshift("set on_error_exit stop")
            last_stmt = stmts.last
            @pg_connection.exec(stmts.join(";\n"))
          end
        rescue PG::Error => ex
          if @error.nil?
            @error = ex
            @err = nil
          end
          if !silent # FIXME Why do we handle this?
            $stderr.puts arg
            $stderr.puts
            $stderr.puts ex.message
            $stderr.flush
          end
          raise
        end

      # For dump of SQL statements
      else # @pg_commands is defined
        if arg.is_a?(String)
          @pg_commands << arg if arg != ""
        else
          @pg_commands.concat(arg)
        end
        nil
      end
    end

    def check_1c(r)
      case r.nfields
        when 0; raise Error, "No columns returned"
        when 1;
      else
        raise Error, "More than one column returned"
      end
    end

    def check_1r(r)
      if r.ntuples == 0
        raise Error, "No records returned"
      elsif r.ntuples > 1
        raise Error, "More than one record returned"
      end
    end
  end

  def self.sql_values(values) "'" + values.join("', '") + "'" end
  def self.sql_idents(values) '"' + values.join('", "') + '"' end
end