class PG::Connection
Sync and async version of the method can be switched by Connection.async_api= , however it is not recommended to change the default.
3. #sync_exec - the method version that is implemented by blocking function(s) of libpq.
2. #async_exec - the async aware version of the method, implemented by libpq’s async API.
This is the method that should be used in general.
1. #exec - the base method which is an alias to #async_exec .
Many methods of this class have three variants kind of:
See the PG::Result class for information on working with the results of a query.
# res = conn.exec(‘SELECT 1 AS a, 2 AS b, NULL AS c’)
# Equivalent to:
res = conn.exec_params(‘SELECT $1 AS a, $2 AS b, $3 AS c’, [1, 2, nil])
conn = PG::Connection.open(:dbname => ‘test’)
require ‘pg’
For example, to send query to the database on the localhost:
is recommended, but not necessary.
application programmer’s interface to PostgreSQL. Some familiarity with libpq
, the C
The PostgreSQL connection class. The interface for this class is based on
def self.conndefaults_hash
##
## keyword (as a Symbol).
## Return the Postgres connection defaults structure as a Hash keyed by option
def self.conndefaults_hash return self.conndefaults.each_with_object({}) do |info, hash| hash[ info[:keyword].to_sym ] = info[:val] end end
def self.connect_hash_to_string( hash )
Convert Hash options to connection String
def self.connect_hash_to_string( hash ) hash.map { |k,v| "#{k}=#{quote_connstr(v)}" }.join( ' ' ) end
def self.parse_connect_args( *args )
The method adds the option "fallback_application_name" if it isn't already set.
* positional arguments
* URI object
* URI string
* an option Hash kind of {host: "name", port: 5432}
* an option String kind of "host=name port=5432"
It accepts:
See PG::Connection.new for valid arguments.
Parse the connection +args+ into a connection-parameter string.
def self.parse_connect_args( *args ) hash_arg = args.last.is_a?( Hash ) ? args.pop.transform_keys(&:to_sym) : {} iopts = {} if args.length == 1 case args.first when URI, /=/, /:\/\// # Option or URL string style conn_string = args.first.to_s iopts = PG::Connection.conninfo_parse(conn_string).each_with_object({}){|h, o| o[h[:keyword].to_sym] = h[:val] if h[:val] } else # Positional parameters (only host given) iopts[CONNECT_ARGUMENT_ORDER.first.to_sym] = args.first end else # Positional parameters with host and more max = CONNECT_ARGUMENT_ORDER.length raise ArgumentError, "Extra positional parameter %d: %p" % [ max + 1, args[max] ] if args.length > max CONNECT_ARGUMENT_ORDER.zip( args ) do |(k,v)| iopts[ k.to_sym ] = v if v end iopts.delete(:tty) # ignore obsolete tty parameter end iopts.merge!( hash_arg ) if !iopts[:fallback_application_name] iopts[:fallback_application_name] = $0.sub( /^(.{30}).{4,}(.{30})$/ ){ $1+"..."+$2 } end return connect_hash_to_string(iopts) end
def self.quote_connstr( value )
def self.quote_connstr( value ) return "'" + value.to_s.gsub( /[\\']/ ) {|m| '\\' + m } + "'" end
def async_api=(enable)
Any issues with the default setting of async_api=true should be reported to the maintainers instead.
Do not use this method in production code.
_PLEASE_ _NOTE_: This method is not part of the public API and is for debug and development use only.
pg-1.3.0+ defaults to libpq's async API for all possibly blocking methods.
pg-1.1.0+ defaults to libpq's async API for query related blocking methods.
sets an alias from #exec to #sync_exec, #reset to #sync_reset and so on.
PG::Connection.async_api = false
It sets an alias from #exec to #async_exec, #reset to #async_reset and so on.
this is the default.
PG::Connection.async_api = true
Switch between sync and async libpq API.
def async_api=(enable) self.async_send_api = enable REDIRECT_METHODS.each do |ali, (async, sync)| remove_method(ali) if method_defined?(ali) alias_method( ali, enable ? async : sync ) end REDIRECT_CLASS_METHODS.each do |ali, (async, sync)| singleton_class.remove_method(ali) if method_defined?(ali) singleton_class.alias_method(ali, enable ? async : sync ) end end
def async_connect_or_reset(poll_meth)
def async_connect_or_reset(poll_meth) the progress of the connection, waiting for the socket to become readable/writable before polling it eo = conninfo_hash[:connect_timeout].to_i) && timeo > 0 st timeout is 2 seconds - like in libpq = [timeo, 2].max ime = timeo + Process.clock_gettime(Process::CLOCK_MONOTONIC) atus = PG::PGRES_POLLING_WRITING oll_status == PG::PGRES_POLLING_OK || status == PG::PGRES_POLLING_FAILED t = stop_time&.-(Process.clock_gettime(Process::CLOCK_MONOTONIC)) = if !timeout || timeout >= 0 the socket needs to read, wait 'til it becomes readable to poll again poll_status PG::PGRES_POLLING_READING efined?(IO::READABLE) # ruby-3.0+ ket_io.wait(IO::READABLE | IO::PRIORITY, timeout) select([socket_io], nil, [socket_io], timeout) and the same for when the socket needs to write PG::PGRES_POLLING_WRITING efined?(IO::WRITABLE) # ruby-3.0+ se wait instead of wait_readable, since connection errors are delivered as xceptional/priority events on Windows. ket_io.wait(IO::WRITABLE | IO::PRIORITY, timeout) o#wait on ruby-2.x doesn't wait for priority, so fallback to IO.select select(nil, [socket_io], [socket_io], timeout) ection to server at "localhost" (127.0.0.1), port 5433 failed: timeout expired (PG::ConnectionBad) ection to server on socket "/var/run/postgresql/.s.PGSQL.5433" failed: No such file or directory event lf.class.send(:host_is_named_pipe?, host) host = "on socket \"#{host}\"" respond_to?(:hostaddr) host = "at \"#{host}\" (#{hostaddr}), port #{port}" host = "at \"#{host}\", port #{port}" PG::ConnectionBad.new("connection to server #{connhost} failed: timeout expired", connection: self) k to see if it's finished or failed yet tatus = send( poll_meth ) status = status unless [PG::CONNECTION_BAD, PG::CONNECTION_OK].include?(status) status == PG::CONNECTION_OK error_message PG::ConnectionBad.new(msg, connection: self) onnection to nonblocking to handle all blocking states in ruby. way a fiber scheduler is able to handle IO requests. tnonblocking(true) ush_data = true ault_encoding
def async_send_api=(enable)
def async_send_api=(enable) REDIRECT_SEND_METHODS.each do |ali, (async, sync)| undef_method(ali) if method_defined?(ali) alias_method( ali, enable ? async : sync ) end end
def cancel
Returns +nil+ on success, or a string containing the
processed.
Requests cancellation of the command currently being
conn.cancel() -> String
call-seq:
def cancel be_pid = backend_pid be_key = backend_key cancel_request = [0x10, 1234, 5678, be_pid, be_key].pack("NnnNN") if Fiber.respond_to?(:scheduler) && Fiber.scheduler && RUBY_PLATFORM =~ /mingw|mswin/ # Ruby's nonblocking IO is not really supported on Windows. # We work around by using threads and explicit calls to wait_readable/wait_writable. cl = Thread.new(socket_io.remote_address) { |ra| ra.connect }.value begin cl.write_nonblock(cancel_request) rescue IO::WaitReadable, Errno::EINTR cl.wait_writable retry end begin cl.read_nonblock(1) rescue IO::WaitReadable, Errno::EINTR cl.wait_readable retry rescue EOFError end elsif RUBY_ENGINE == 'truffleruby' begin cl = socket_io.remote_address.connect rescue NotImplementedError # Workaround for truffleruby < 21.3.0 cl2 = Socket.for_fd(socket_io.fileno) cl2.autoclose = false adr = cl2.remote_address if adr.ip? cl = TCPSocket.new(adr.ip_address, adr.ip_port) cl.autoclose = false else cl = UNIXSocket.new(adr.unix_path) cl.autoclose = false end end cl.write(cancel_request) cl.read(1) else cl = socket_io.remote_address.connect # Send CANCEL_REQUEST_CODE and parameters cl.write(cancel_request) # Wait for the postmaster to close the connection, which indicates that it's processed the request. cl.read(1) end cl.close nil rescue SystemCallError => err err.to_s end
def conndefaults
## Returns an array of Hashes with connection defaults. See ::conndefaults
def conndefaults return self.class.conndefaults end
def conndefaults_hash
## Returns a Hash with connection defaults. See ::conndefaults_hash
def conndefaults_hash return self.class.conndefaults_hash end
def connect_internal(opts, errors=nil)
def connect_internal(opts, errors=nil) self.connect_start(opts) or raise(PG::Error, "Unable to create a new connection") PG::ConnectionBad.new(conn.error_message, connection: self) if conn.status == PG::CONNECTION_BAD end(:async_connect_or_reset, :connect_poll) PG::ConnectionBad => err ors && !(conn && [PG::CONNECTION_AWAITING_RESPONSE].include?(conn.instance_variable_get(:@last_status))) ms to be no authentication error -> try next host s << err n nil bably an authentication error
def connect_to_hosts(*args)
def connect_to_hosts(*args) string = parse_connect_args(*args) PG::Connection.conninfo_parse(option_string).each_with_object({}){|h, o| o[h[:keyword].to_sym] = h[:val] if h[:val] } PG::Connection.conndefaults.each_with_object({}){|h, o| o[h[:keyword].to_sym] = h[:val] if h[:val] }.merge(iopts) = [] s[:hostaddr] addr is provided -> no need to resolve hostnames ddrs = iopts[:hostaddr].split(",", -1) = iopts[:host].split(",", -1) if iopts[:host] PG::ConnectionBad, "could not match #{ihosts.size} host names to #{ihostaddrs.size} hostaddr values" if ihosts && ihosts.size != ihostaddrs.size = iopts[:port].split(",", -1) = iports * ihostaddrs.size if iports.size == 1 PG::ConnectionBad, "could not match #{iports.size} port numbers to #{ihostaddrs.size} hosts" if iports.size != ihostaddrs.size to connect to each hostaddr with separate timeout ddrs.each_with_index do |ihostaddr, idx| = iopts.merge(hostaddr: ihostaddr, port: iports[idx]) [:host] = ihosts[idx] if ihosts onnect_internal(oopts, errors) n c if c opts[:host] && !iopts[:host].empty? lve DNS in Ruby to avoid blocking state while connecting, when it ... = iopts[:host].split(",", -1) = iopts[:port].split(",", -1) = iports * ihosts.size if iports.size == 1 PG::ConnectionBad, "could not match #{iports.size} port numbers to #{ihosts.size} hosts" if iports.size != ihosts.size .each_with_index do |mhost, idx| s host_is_named_pipe?(mhost) s = if Fiber.respond_to?(:scheduler) && iber.scheduler && UBY_VERSION < '3.1.' se a second thread to avoid blocking of the scheduler. TCPSocket.gethostbyname` isn't fiber aware before ruby-3.1. ead.new{ Addrinfo.getaddrinfo(mhost, nil, nil, :STREAM).map(&:ip_address) rescue [''] }.value rinfo.getaddrinfo(mhost, nil, nil, :STREAM).map(&:ip_address) rescue [''] y to connect to each host with separate timeout s.each do |addr| ts = iopts.merge(hostaddr: addr, host: mhost, port: iports[idx]) connect_internal(oopts, errors) urn c if c hostname to resolve (UnixSocket) s = iopts.merge(host: mhost, port: iports[idx]) connect_internal(oopts, errors) rn c if c ost given connect_internal(iopts) G::ConnectionBad, errors.join("\n")
def conninfo_hash
##
## keyword (as a Symbol).
## Return the Postgres connection info structure as a Hash keyed by option
def conninfo_hash return self.conninfo.each_with_object({}) do |info, hash| hash[ info[:keyword].to_sym ] = info[:val] end end
def copy_data( sql, coder=nil )
def copy_data( sql, coder=nil ) raise PG::NotInBlockingMode.new("copy_data can not be used in nonblocking mode", connection: self) if nonblocking? res = exec( sql ) case res.result_status when PGRES_COPY_IN begin if coder old_coder = self.encoder_for_put_copy_data self.encoder_for_put_copy_data = coder end yield res rescue Exception => err errmsg = "%s while copy data: %s" % [ err.class.name, err.message ] put_copy_end( errmsg ) get_result raise else put_copy_end get_last_result ensure self.encoder_for_put_copy_data = old_coder if coder end when PGRES_COPY_OUT begin if coder old_coder = self.decoder_for_get_copy_data self.decoder_for_get_copy_data = coder end yield res rescue Exception => err cancel begin while get_copy_data end rescue PG::Error # Ignore error in cleanup to avoid losing original exception end while get_result end raise err else res = get_last_result if !res || res.result_status != PGRES_COMMAND_OK while get_copy_data end while get_result end raise PG::NotAllCopyDataRetrieved.new("Not all COPY data retrieved", connection: self) end res ensure self.decoder_for_get_copy_data = old_coder if coder end else raise ArgumentError, "SQL command is no COPY statement: #{sql}" end end
def encrypt_password( password, username, algorithm=nil )
Available since PostgreSQL-10.
The caller can assume the string doesn't contain any special characters that would require escaping.
Return value is the encrypted password.
If you wish to use the default algorithm for the server but want to avoid blocking, query +password_encryption+ yourself before calling #encrypt_password, and pass that value as the algorithm.
That can block, and will fail if the current transaction is aborted, or if the connection is busy executing another query.
If algorithm is omitted or +nil+, this function will query the server for the current value of the +password_encryption+ setting.
Note that support for +scram-sha-256+ was introduced in PostgreSQL version 10, and will not work correctly with older server versions.
Currently supported algorithms are +md5+ and +scram-sha-256+ (+on+ and +off+ are also accepted as aliases for +md5+, for compatibility with older server versions).
+algorithm+ specifies the encryption algorithm to use to encrypt the password.
The +password+ and +username+ arguments are the cleartext password, and the SQL name of the user it is for.
Instead, use this function to convert the password to encrypted form before it is sent.
It is good practice not to send the original cleartext password in such a command, because it might be exposed in command logs, activity displays, and so on.
This function is intended to be used by client applications that wish to send commands like ALTER USER joe PASSWORD 'pwd'.
conn.encrypt_password( password, username, algorithm=nil ) -> String
call-seq:
def encrypt_password( password, username, algorithm=nil ) algorithm ||= exec("SHOW password_encryption").getvalue(0,0) sync_encrypt_password(password, username, algorithm) end
def get_copy_data(async=false, decoder=nil)
See also #copy_data.
if PG::TextDecoder::CopyRow#type_map is set accordingly.
Optionally the decoder can type cast the single fields to various Ruby types in one step,
COPY text format to an Array of Strings.
PG::TextDecoder::CopyRow decodes the received data fields from one row of PostgreSQL's
If _decoder_ is set to a PG::Coder derivation, the return type depends on this decoder.
If _decoder_ is not set or +nil+, data is returned as binary string.
block (only possible if _nonblock_ is true).
if the copy is done, or +false+ if the call would
Return one row of data, +nil+
conn.get_copy_data( [ nonblock = false [, decoder = nil ]] ) -> Object
call-seq:
def get_copy_data(async=false, decoder=nil) if async return sync_get_copy_data(async, decoder) else while (res=sync_get_copy_data(true, decoder)) == false socket_io.wait_readable consume_input end return res end end
def get_result
conn.exec
returns the value of the block.and the PG::Result object will automatically be cleared when the block terminates.
If the optional code block is given, it will be passed result as an argument,
you will not be able to issue further commands.
Note: call this function repeatedly until it returns +nil+, or else
it. Returns +nil+ if no more results are available.
#send_query (or another asynchronous command), and returns
Blocks waiting for the next result from a call to
conn.get_result() {|pg_result| block }
conn.get_result() -> PG::Result
call-seq:
def get_result block sync_get_result end
def host_is_named_pipe?(host_string)
def host_is_named_pipe?(host_string) ring.empty? || host_string.start_with?("/") || # it's UnixSocket? t_string.start_with?("@") || # it's UnixSocket in the abstract namespace? t's a path on Windows? BY_PLATFORM =~ /mingw|mswin/ && host_string =~ /\A([\/\\]|\w:[\/\\])/)
def isnonblocking
Returns the blocking status of the database connection.
conn.isnonblocking() -> Boolean
call-seq:
def isnonblocking false end
def new(*args)
connection will have its +client_encoding+ set accordingly.
If the Ruby default internal encoding is set (i.e.,
Encoding.default_internal != nil
), thePG::Connection.new( "postgresql://user:pass@pgsql.example.com:5432/testdb?sslmode=require" )
# As an URI
PG::Connection.new( nil, 5432, nil, nil, 'test', nil, nil )
# As an Array
PG::Connection.new( "dbname=test port=5432" )
# As a String
PG::Connection.new( dbname: 'test', port: 5432 )
# As a Hash
PG::Connection.new
# Connect using all defaults
Examples:
login password
[+password+]
login user name
[+user+]
connecting database name
[+dbname+]
(ignored in all versions of PostgreSQL)
[+tty+]
backend options
[+options+]
server port number
[+port+]
server hostname
[+host+]
The positional parameter form has the same functionality except that the missing parameters will always take on default values. The parameters are:
See the documentation of {connection strings}[https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING].
There are two accepted formats for +connection_string+: plain
keyword = value
strings and URIs.See the {list of valid parameters}[https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-PARAMKEYWORDS] in the PostgreSQL documentation.
+connection_hash+ must be a ruby Hash with connection parameters.
Create a connection to the specified server.
PG::Connection.new(host, port, options, tty, dbname, user, password) -> conn
PG::Connection.new(connection_string) -> conn
PG::Connection.new(connection_hash) -> conn
PG::Connection.new -> conn
call-seq:
def new(*args) conn = connect_to_hosts(*args) if block_given? begin return yield conn ensure conn.finish end end conn end
def ping(*args)
[+PQPING_NO_ATTEMPT+]
could not establish connection
[+PQPING_NO_RESPONSE+]
server is alive but rejecting connections
[+PQPING_REJECT+]
server is accepting connections
[+PQPING_OK+]
Returns one of:
See PG::Connection.new for a description of the parameters.
Check server status.
PG::Connection.ping(host, port, options, tty, dbname, login, password) -> Integer
PG::Connection.ping(connection_string) -> Integer
PG::Connection.ping(connection_hash) -> Integer
call-seq:
def ping(*args) if Fiber.respond_to?(:scheduler) && Fiber.scheduler # Run PQping in a second thread to avoid blocking of the scheduler. # Unfortunately there's no nonblocking way to run ping. Thread.new { sync_ping(*args) }.value else sync_ping(*args) end end
def put_copy_data(buffer, encoder=nil)
See also #copy_data.
Raises an exception if an error occurs.
if PG::TextEncoder::CopyRow#type_map is set accordingly.
the encoder can type cast the fields from various Ruby types in one step,
PostgreSQL's COPY text format inclusive proper escaping. Optionally
This encodes the data fields given as _buffer_ from an Array of Strings to
_encoder_ can be a PG::Coder derivation (typically PG::TextEncoder::CopyRow).
is in nonblocking mode, and this command would block).
not sent (false is only possible if the connection
Returns true if the data was sent, false if it was
Transmits _buffer_ as copy data to the server.
conn.put_copy_data( buffer [, encoder] ) -> Boolean
call-seq:
def put_copy_data(buffer, encoder=nil) # sync_put_copy_data does a non-blocking attept to flush data. until res=sync_put_copy_data(buffer, encoder) # It didn't flush immediately and allocation of more buffering memory failed. # Wait for all data sent by doing a blocking flush. res = flush end # And do a blocking flush every 100 calls. # This is to avoid memory bloat, when sending the data is slower than calls to put_copy_data happen. if (@calls_to_put_copy_data += 1) > 100 @calls_to_put_copy_data = 0 res = flush end res end
def put_copy_end(*args)
not sent (*false* is only possible if the connection
Returns true if the end-of-data was sent, #false* if it was
_error_message_.
forces the COPY command to fail with the string
_error_message_ is an optional parameter, and if set,
Sends end-of-data indication to the server.
conn.put_copy_end( [ error_message ] ) -> Boolean
call-seq:
def put_copy_end(*args) until sync_put_copy_end(*args) flush end @calls_to_put_copy_data = 0 flush end
def reset
Resets the backend connection. This method closes the
conn.reset()
call-seq:
def reset reset_start async_connect_or_reset(:reset_poll) self end
def setnonblocking(enabled)
processed the query and returned the results.
that function doesn't return until the server has
Note: This function does not affect #exec, because
writing.
will return an error if the socket is not ready for
In the nonblocking state, calls to #send_query
but will not wait for the query results.
will block until the message is sent to the server,
In the blocking state, calls to #send_query
Sets the nonblocking status of the connection.
conn.setnonblocking(Boolean) -> nil
call-seq:
def setnonblocking(enabled) singleton_class.async_send_api = !enabled self.flush_data = !enabled sync_setnonblocking(true) end
def ssl_attributes
and the type of connection.
The available attributes varies depending on the SSL library being used,
Returns SSL-related information about the connection as key/value pairs
conn.ssl_attributes -> Hash
call-seq:
def ssl_attributes ssl_attribute_names.each.with_object({}) do |n,h| h[n] = ssl_attribute(n) end end
def transaction
and a +COMMIT+ at the end of the block, or
Executes a +BEGIN+ at the start of the block,
conn.transaction { |conn| ... } -> result of the block
call-seq:
def transaction rollback = false exec "BEGIN" yield(self) rescue Exception rollback = true cancel if transaction_status == PG::PQTRANS_ACTIVE block exec "ROLLBACK" raise ensure exec "COMMIT" unless rollback end