class PG::Connection

Sync and async version of the method can be switched by Connection.async_api= , however it is not recommended to change the default.
3. #sync_exec - the method version that is implemented by blocking function(s) of libpq.
2. #async_exec - the async aware version of the method, implemented by libpq’s async API.
This is the method that should be used in general.
1. #exec - the base method which is an alias to #async_exec .
Many methods of this class have three variants kind of:
See the PG::Result class for information on working with the results of a query.
# res = conn.exec(‘SELECT 1 AS a, 2 AS b, NULL AS c’)
# Equivalent to:
res = conn.exec_params(‘SELECT $1 AS a, $2 AS b, $3 AS c’, [1, 2, nil])
conn = PG::Connection.open(:dbname => ‘test’)
require ‘pg’
For example, to send query to the database on the localhost:
is recommended, but not necessary.
application programmer’s interface to PostgreSQL. Some familiarity with libpq
, the C
The PostgreSQL connection class. The interface for this class is based on

def self.conndefaults_hash

## See also #conndefaults
##
## keyword (as a Symbol).
## Return the Postgres connection defaults structure as a Hash keyed by option
def self.conndefaults_hash
	return self.conndefaults.each_with_object({}) do |info, hash|
		hash[ info[:keyword].to_sym ] = info[:val]
	end
end

def self.connect_hash_to_string( hash )

Values are properly quoted and escaped.

Convert Hash options to connection String
def self.connect_hash_to_string( hash )
	hash.map { |k,v| "#{k}=#{quote_connstr(v)}" }.join( ' ' )
end

def self.parse_connect_args( *args )

It returns a connection string with "key=value" pairs.
The method adds the option "fallback_application_name" if it isn't already set.

* positional arguments
* URI object
* URI string
* an option Hash kind of {host: "name", port: 5432}
* an option String kind of "host=name port=5432"
It accepts:

See PG::Connection.new for valid arguments.
Parse the connection +args+ into a connection-parameter string.
def self.parse_connect_args( *args )
	hash_arg = args.last.is_a?( Hash ) ? args.pop.transform_keys(&:to_sym) : {}
	iopts = {}
	if args.length == 1
		case args.first
		when URI, /=/, /:\/\//
			# Option or URL string style
			conn_string = args.first.to_s
			iopts = PG::Connection.conninfo_parse(conn_string).each_with_object({}){|h, o| o[h[:keyword].to_sym] = h[:val] if h[:val] }
		else
			# Positional parameters (only host given)
			iopts[CONNECT_ARGUMENT_ORDER.first.to_sym] = args.first
		end
	else
		# Positional parameters with host and more
		max = CONNECT_ARGUMENT_ORDER.length
		raise ArgumentError,
				"Extra positional parameter %d: %p" % [ max + 1, args[max] ] if args.length > max
		CONNECT_ARGUMENT_ORDER.zip( args ) do |(k,v)|
			iopts[ k.to_sym ] = v if v
		end
		iopts.delete(:tty) # ignore obsolete tty parameter
	end
	iopts.merge!( hash_arg )
	if !iopts[:fallback_application_name]
		iopts[:fallback_application_name] = $0.sub( /^(.{30}).{4,}(.{30})$/ ){ $1+"..."+$2 }
	end
	return connect_hash_to_string(iopts)
end

def self.quote_connstr( value )

## Quote a single +value+ for use in a connection-parameter string.
def self.quote_connstr( value )
	return "'" + value.to_s.gsub( /[\\']/ ) {|m| '\\' + m } + "'"
end

def async_api=(enable)


Any issues with the default setting of async_api=true should be reported to the maintainers instead.
Do not use this method in production code.
_PLEASE_ _NOTE_: This method is not part of the public API and is for debug and development use only.

pg-1.3.0+ defaults to libpq's async API for all possibly blocking methods.
pg-1.1.0+ defaults to libpq's async API for query related blocking methods.

sets an alias from #exec to #sync_exec, #reset to #sync_reset and so on.
PG::Connection.async_api = false

It sets an alias from #exec to #async_exec, #reset to #async_reset and so on.
this is the default.
PG::Connection.async_api = true

Switch between sync and async libpq API.
def async_api=(enable)
	self.async_send_api = enable
	REDIRECT_METHODS.each do |ali, (async, sync)|
		remove_method(ali) if method_defined?(ali)
		alias_method( ali, enable ? async : sync )
	end
	REDIRECT_CLASS_METHODS.each do |ali, (async, sync)|
		singleton_class.remove_method(ali) if method_defined?(ali)
		singleton_class.alias_method(ali, enable ? async : sync )
	end
end

def async_connect_or_reset(poll_meth)

def async_connect_or_reset(poll_meth)
 the progress of the connection, waiting for the socket to become readable/writable before polling it
eo = conninfo_hash[:connect_timeout].to_i) && timeo > 0
st timeout is 2 seconds - like in libpq
= [timeo, 2].max
ime = timeo + Process.clock_gettime(Process::CLOCK_MONOTONIC)
atus = PG::PGRES_POLLING_WRITING
oll_status == PG::PGRES_POLLING_OK ||
status == PG::PGRES_POLLING_FAILED
t = stop_time&.-(Process.clock_gettime(Process::CLOCK_MONOTONIC))
= if !timeout || timeout >= 0
the socket needs to read, wait 'til it becomes readable to poll again
poll_status
PG::PGRES_POLLING_READING
efined?(IO::READABLE) # ruby-3.0+
ket_io.wait(IO::READABLE | IO::PRIORITY, timeout)

select([socket_io], nil, [socket_io], timeout)
and the same for when the socket needs to write
PG::PGRES_POLLING_WRITING
efined?(IO::WRITABLE) # ruby-3.0+
se wait instead of wait_readable, since connection errors are delivered as
xceptional/priority events on Windows.
ket_io.wait(IO::WRITABLE | IO::PRIORITY, timeout)

o#wait on ruby-2.x doesn't wait for priority, so fallback to IO.select
select(nil, [socket_io], [socket_io], timeout)
ection to server at "localhost" (127.0.0.1), port 5433 failed: timeout expired (PG::ConnectionBad)
ection to server on socket "/var/run/postgresql/.s.PGSQL.5433" failed: No such file or directory
 event
lf.class.send(:host_is_named_pipe?, host)
host = "on socket \"#{host}\""
 respond_to?(:hostaddr)
host = "at \"#{host}\" (#{hostaddr}), port #{port}"
host = "at \"#{host}\", port #{port}"
 PG::ConnectionBad.new("connection to server #{connhost} failed: timeout expired", connection: self)
k to see if it's finished or failed yet
tatus = send( poll_meth )
status = status unless [PG::CONNECTION_BAD, PG::CONNECTION_OK].include?(status)
status == PG::CONNECTION_OK
error_message

PG::ConnectionBad.new(msg, connection: self)
onnection to nonblocking to handle all blocking states in ruby.
way a fiber scheduler is able to handle IO requests.
tnonblocking(true)
ush_data = true
ault_encoding

def async_send_api=(enable)

def async_send_api=(enable)
	REDIRECT_SEND_METHODS.each do |ali, (async, sync)|
		undef_method(ali) if method_defined?(ali)
		alias_method( ali, enable ? async : sync )
	end
end

def cancel

error message if a failure occurs.
Returns +nil+ on success, or a string containing the

processed.
Requests cancellation of the command currently being

conn.cancel() -> String
call-seq:
def cancel
	be_pid = backend_pid
	be_key = backend_key
	cancel_request = [0x10, 1234, 5678, be_pid, be_key].pack("NnnNN")
	if Fiber.respond_to?(:scheduler) && Fiber.scheduler && RUBY_PLATFORM =~ /mingw|mswin/
		# Ruby's nonblocking IO is not really supported on Windows.
		# We work around by using threads and explicit calls to wait_readable/wait_writable.
		cl = Thread.new(socket_io.remote_address) { |ra| ra.connect }.value
		begin
			cl.write_nonblock(cancel_request)
		rescue IO::WaitReadable, Errno::EINTR
			cl.wait_writable
			retry
		end
		begin
			cl.read_nonblock(1)
		rescue IO::WaitReadable, Errno::EINTR
			cl.wait_readable
			retry
		rescue EOFError
		end
	elsif RUBY_ENGINE == 'truffleruby'
		begin
			cl = socket_io.remote_address.connect
		rescue NotImplementedError
			# Workaround for truffleruby < 21.3.0
			cl2 = Socket.for_fd(socket_io.fileno)
			cl2.autoclose = false
			adr = cl2.remote_address
			if adr.ip?
				cl = TCPSocket.new(adr.ip_address, adr.ip_port)
				cl.autoclose = false
			else
				cl = UNIXSocket.new(adr.unix_path)
				cl.autoclose = false
			end
		end
		cl.write(cancel_request)
		cl.read(1)
	else
		cl = socket_io.remote_address.connect
		# Send CANCEL_REQUEST_CODE and parameters
		cl.write(cancel_request)
		# Wait for the postmaster to close the connection, which indicates that it's processed the request.
		cl.read(1)
	end
	cl.close
	nil
rescue SystemCallError => err
	err.to_s
end

def conndefaults

## for details.
## Returns an array of Hashes with connection defaults. See ::conndefaults
def conndefaults
	return self.class.conndefaults
end

def conndefaults_hash

## for details.
## Returns a Hash with connection defaults. See ::conndefaults_hash
def conndefaults_hash
	return self.class.conndefaults_hash
end

def connect_internal(opts, errors=nil)

def connect_internal(opts, errors=nil)
 self.connect_start(opts) or
	raise(PG::Error, "Unable to create a new connection")
PG::ConnectionBad.new(conn.error_message, connection: self) if conn.status == PG::CONNECTION_BAD
end(:async_connect_or_reset, :connect_poll)
PG::ConnectionBad => err
ors && !(conn && [PG::CONNECTION_AWAITING_RESPONSE].include?(conn.instance_variable_get(:@last_status)))
ms to be no authentication error -> try next host
s << err
n nil
bably an authentication error

def connect_to_hosts(*args)

def connect_to_hosts(*args)
string = parse_connect_args(*args)
 PG::Connection.conninfo_parse(option_string).each_with_object({}){|h, o| o[h[:keyword].to_sym] = h[:val] if h[:val] }
 PG::Connection.conndefaults.each_with_object({}){|h, o| o[h[:keyword].to_sym] = h[:val] if h[:val] }.merge(iopts)
= []
s[:hostaddr]
addr is provided -> no need to resolve hostnames
ddrs = iopts[:hostaddr].split(",", -1)
 = iopts[:host].split(",", -1) if iopts[:host]
PG::ConnectionBad, "could not match #{ihosts.size} host names to #{ihostaddrs.size} hostaddr values" if ihosts && ihosts.size != ihostaddrs.size
 = iopts[:port].split(",", -1)
 = iports * ihostaddrs.size if iports.size == 1
PG::ConnectionBad, "could not match #{iports.size} port numbers to #{ihostaddrs.size} hosts" if iports.size != ihostaddrs.size
to connect to each hostaddr with separate timeout
ddrs.each_with_index do |ihostaddr, idx|
 = iopts.merge(hostaddr: ihostaddr, port: iports[idx])
[:host] = ihosts[idx] if ihosts
onnect_internal(oopts, errors)
n c if c
opts[:host] && !iopts[:host].empty?
lve DNS in Ruby to avoid blocking state while connecting, when it ...
 = iopts[:host].split(",", -1)
 = iopts[:port].split(",", -1)
 = iports * ihosts.size if iports.size == 1
PG::ConnectionBad, "could not match #{iports.size} port numbers to #{ihosts.size} hosts" if iports.size != ihosts.size
.each_with_index do |mhost, idx|
s host_is_named_pipe?(mhost)
s = if Fiber.respond_to?(:scheduler) &&
iber.scheduler &&
UBY_VERSION < '3.1.'
se a second thread to avoid blocking of the scheduler.
TCPSocket.gethostbyname` isn't fiber aware before ruby-3.1.
ead.new{ Addrinfo.getaddrinfo(mhost, nil, nil, :STREAM).map(&:ip_address) rescue [''] }.value

rinfo.getaddrinfo(mhost, nil, nil, :STREAM).map(&:ip_address) rescue ['']
y to connect to each host with separate timeout
s.each do |addr|
ts = iopts.merge(hostaddr: addr, host: mhost, port: iports[idx])
 connect_internal(oopts, errors)
urn c if c
 hostname to resolve (UnixSocket)
s = iopts.merge(host: mhost, port: iports[idx])
connect_internal(oopts, errors)
rn c if c
ost given
 connect_internal(iopts)
G::ConnectionBad, errors.join("\n")

def conninfo_hash

## See also #conninfo
##
## keyword (as a Symbol).
## Return the Postgres connection info structure as a Hash keyed by option
def conninfo_hash
	return self.conninfo.each_with_object({}) do |info, hash|
		hash[ info[:keyword].to_sym ] = info[:val]
	end
end

def copy_data( sql, coder=nil )

def copy_data( sql, coder=nil )
	raise PG::NotInBlockingMode.new("copy_data can not be used in nonblocking mode", connection: self) if nonblocking?
	res = exec( sql )
	case res.result_status
	when PGRES_COPY_IN
		begin
			if coder
				old_coder = self.encoder_for_put_copy_data
				self.encoder_for_put_copy_data = coder
			end
			yield res
		rescue Exception => err
			errmsg = "%s while copy data: %s" % [ err.class.name, err.message ]
			put_copy_end( errmsg )
			get_result
			raise
		else
			put_copy_end
			get_last_result
		ensure
			self.encoder_for_put_copy_data = old_coder if coder
		end
	when PGRES_COPY_OUT
		begin
			if coder
				old_coder = self.decoder_for_get_copy_data
				self.decoder_for_get_copy_data = coder
			end
			yield res
		rescue Exception => err
			cancel
			begin
				while get_copy_data
				end
			rescue PG::Error
				# Ignore error in cleanup to avoid losing original exception
			end
			while get_result
			end
			raise err
		else
			res = get_last_result
			if !res || res.result_status != PGRES_COMMAND_OK
				while get_copy_data
				end
				while get_result
				end
				raise PG::NotAllCopyDataRetrieved.new("Not all COPY data retrieved", connection: self)
			end
			res
		ensure
			self.decoder_for_get_copy_data = old_coder if coder
		end
	else
		raise ArgumentError, "SQL command is no COPY statement: #{sql}"
	end
end

def encrypt_password( password, username, algorithm=nil )

See also corresponding {libpq function}[https://www.postgresql.org/docs/current/libpq-misc.html#LIBPQ-PQENCRYPTPASSWORDCONN].
Available since PostgreSQL-10.

The caller can assume the string doesn't contain any special characters that would require escaping.
Return value is the encrypted password.

If you wish to use the default algorithm for the server but want to avoid blocking, query +password_encryption+ yourself before calling #encrypt_password, and pass that value as the algorithm.
That can block, and will fail if the current transaction is aborted, or if the connection is busy executing another query.
If algorithm is omitted or +nil+, this function will query the server for the current value of the +password_encryption+ setting.
Note that support for +scram-sha-256+ was introduced in PostgreSQL version 10, and will not work correctly with older server versions.
Currently supported algorithms are +md5+ and +scram-sha-256+ (+on+ and +off+ are also accepted as aliases for +md5+, for compatibility with older server versions).
+algorithm+ specifies the encryption algorithm to use to encrypt the password.
The +password+ and +username+ arguments are the cleartext password, and the SQL name of the user it is for.

Instead, use this function to convert the password to encrypted form before it is sent.
It is good practice not to send the original cleartext password in such a command, because it might be exposed in command logs, activity displays, and so on.
This function is intended to be used by client applications that wish to send commands like ALTER USER joe PASSWORD 'pwd'.

conn.encrypt_password( password, username, algorithm=nil ) -> String
call-seq:
def encrypt_password( password, username, algorithm=nil )
	algorithm ||= exec("SHOW password_encryption").getvalue(0,0)
	sync_encrypt_password(password, username, algorithm)
end

def get_copy_data(async=false, decoder=nil)


See also #copy_data.

if PG::TextDecoder::CopyRow#type_map is set accordingly.
Optionally the decoder can type cast the single fields to various Ruby types in one step,
COPY text format to an Array of Strings.
PG::TextDecoder::CopyRow decodes the received data fields from one row of PostgreSQL's
If _decoder_ is set to a PG::Coder derivation, the return type depends on this decoder.

If _decoder_ is not set or +nil+, data is returned as binary string.

block (only possible if _nonblock_ is true).
if the copy is done, or +false+ if the call would
Return one row of data, +nil+

conn.get_copy_data( [ nonblock = false [, decoder = nil ]] ) -> Object
call-seq:
def get_copy_data(async=false, decoder=nil)
	if async
		return sync_get_copy_data(async, decoder)
	else
		while (res=sync_get_copy_data(true, decoder)) == false
			socket_io.wait_readable
			consume_input
		end
		return res
	end
end

def get_result

In this instance, conn.exec returns the value of the block.
and the PG::Result object will automatically be cleared when the block terminates.
If the optional code block is given, it will be passed result as an argument,

you will not be able to issue further commands.
Note: call this function repeatedly until it returns +nil+, or else

it. Returns +nil+ if no more results are available.
#send_query (or another asynchronous command), and returns
Blocks waiting for the next result from a call to

conn.get_result() {|pg_result| block }
conn.get_result() -> PG::Result
call-seq:
def get_result
	block
	sync_get_result
end

def host_is_named_pipe?(host_string)

def host_is_named_pipe?(host_string)
ring.empty? || host_string.start_with?("/") ||  # it's UnixSocket?
t_string.start_with?("@") ||  # it's UnixSocket in the abstract namespace?
t's a path on Windows?
BY_PLATFORM =~ /mingw|mswin/ && host_string =~ /\A([\/\\]|\w:[\/\\])/)

def isnonblocking

Returns +true+ if the connection is set to nonblocking mode and +false+ if blocking.
Returns the blocking status of the database connection.

conn.isnonblocking() -> Boolean
call-seq:
def isnonblocking
	false
end

def new(*args)

Raises a PG::Error if the connection fails.

connection will have its +client_encoding+ set accordingly.
If the Ruby default internal encoding is set (i.e., Encoding.default_internal != nil), the

PG::Connection.new( "postgresql://user:pass@pgsql.example.com:5432/testdb?sslmode=require" )
# As an URI

PG::Connection.new( nil, 5432, nil, nil, 'test', nil, nil )
# As an Array

PG::Connection.new( "dbname=test port=5432" )
# As a String

PG::Connection.new( dbname: 'test', port: 5432 )
# As a Hash

PG::Connection.new
# Connect using all defaults

Examples:

login password
[+password+]
login user name
[+user+]
connecting database name
[+dbname+]
(ignored in all versions of PostgreSQL)
[+tty+]
backend options
[+options+]
server port number
[+port+]
server hostname
[+host+]
The positional parameter form has the same functionality except that the missing parameters will always take on default values. The parameters are:

See the documentation of {connection strings}[https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING].
There are two accepted formats for +connection_string+: plain keyword = value strings and URIs.

See the {list of valid parameters}[https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-PARAMKEYWORDS] in the PostgreSQL documentation.
+connection_hash+ must be a ruby Hash with connection parameters.

Create a connection to the specified server.

PG::Connection.new(host, port, options, tty, dbname, user, password) -> conn
PG::Connection.new(connection_string) -> conn
PG::Connection.new(connection_hash) -> conn
PG::Connection.new -> conn
call-seq:
def new(*args)
	conn = connect_to_hosts(*args)
	if block_given?
		begin
			return yield conn
		ensure
			conn.finish
		end
	end
	conn
end

def ping(*args)

connection not attempted (bad params)
[+PQPING_NO_ATTEMPT+]
could not establish connection
[+PQPING_NO_RESPONSE+]
server is alive but rejecting connections
[+PQPING_REJECT+]
server is accepting connections
[+PQPING_OK+]
Returns one of:

See PG::Connection.new for a description of the parameters.

Check server status.

PG::Connection.ping(host, port, options, tty, dbname, login, password) -> Integer
PG::Connection.ping(connection_string) -> Integer
PG::Connection.ping(connection_hash) -> Integer
call-seq:
def ping(*args)
	if Fiber.respond_to?(:scheduler) && Fiber.scheduler
		# Run PQping in a second thread to avoid blocking of the scheduler.
		# Unfortunately there's no nonblocking way to run ping.
		Thread.new { sync_ping(*args) }.value
	else
		sync_ping(*args)
	end
end

def put_copy_data(buffer, encoder=nil)


See also #copy_data.

Raises an exception if an error occurs.

if PG::TextEncoder::CopyRow#type_map is set accordingly.
the encoder can type cast the fields from various Ruby types in one step,
PostgreSQL's COPY text format inclusive proper escaping. Optionally
This encodes the data fields given as _buffer_ from an Array of Strings to
_encoder_ can be a PG::Coder derivation (typically PG::TextEncoder::CopyRow).

is in nonblocking mode, and this command would block).
not sent (false is only possible if the connection
Returns true if the data was sent, false if it was
Transmits _buffer_ as copy data to the server.

conn.put_copy_data( buffer [, encoder] ) -> Boolean
call-seq:
def put_copy_data(buffer, encoder=nil)
	# sync_put_copy_data does a non-blocking attept to flush data.
	until res=sync_put_copy_data(buffer, encoder)
		# It didn't flush immediately and allocation of more buffering memory failed.
		# Wait for all data sent by doing a blocking flush.
		res = flush
	end
	# And do a blocking flush every 100 calls.
	# This is to avoid memory bloat, when sending the data is slower than calls to put_copy_data happen.
	if (@calls_to_put_copy_data += 1) > 100
		@calls_to_put_copy_data = 0
		res = flush
	end
	res
end

def put_copy_end(*args)

is in nonblocking mode, and this command would block).
not sent (*false* is only possible if the connection
Returns true if the end-of-data was sent, #false* if it was

_error_message_.
forces the COPY command to fail with the string
_error_message_ is an optional parameter, and if set,

Sends end-of-data indication to the server.

conn.put_copy_end( [ error_message ] ) -> Boolean
call-seq:
def put_copy_end(*args)
	until sync_put_copy_end(*args)
		flush
	end
	@calls_to_put_copy_data = 0
	flush
end

def reset

backend connection and tries to re-connect.
Resets the backend connection. This method closes the

conn.reset()
call-seq:
def reset
	reset_start
	async_connect_or_reset(:reset_poll)
	self
end

def setnonblocking(enabled)

Returns +nil+.

processed the query and returned the results.
that function doesn't return until the server has
Note: This function does not affect #exec, because
writing.
will return an error if the socket is not ready for
In the nonblocking state, calls to #send_query
but will not wait for the query results.
will block until the message is sent to the server,
In the blocking state, calls to #send_query
Sets the nonblocking status of the connection.

conn.setnonblocking(Boolean) -> nil
call-seq:
def setnonblocking(enabled)
	singleton_class.async_send_api = !enabled
	self.flush_data = !enabled
	sync_setnonblocking(true)
end

def ssl_attributes

See also #ssl_attribute

and the type of connection.
The available attributes varies depending on the SSL library being used,

Returns SSL-related information about the connection as key/value pairs

conn.ssl_attributes -> Hash
call-seq:
def ssl_attributes
	ssl_attribute_names.each.with_object({}) do |n,h|
		h[n] = ssl_attribute(n)
	end
end

def transaction

+ROLLBACK+ if any exception occurs.
and a +COMMIT+ at the end of the block, or
Executes a +BEGIN+ at the start of the block,

conn.transaction { |conn| ... } -> result of the block
call-seq:
def transaction
	rollback = false
	exec "BEGIN"
	yield(self)
rescue Exception
	rollback = true
	cancel if transaction_status == PG::PQTRANS_ACTIVE
	block
	exec "ROLLBACK"
	raise
ensure
	exec "COMMIT" unless rollback
end