class EventMachine::Protocols::Postgres3

}
end
end
end
end
end
p row
result.rows.each do |row|
if status
db.query( “select * from some_table” ).callback do |status, result, errors|
if status
db.connect( dbname, username, psw ).callback do |status|
db = EM.connect_unix_domain( “/tmp/.s.PGSQL.5432”, EM::P::Postgres3 )
EM.run {
=== Usage
requiring the user to define both a callback and an errback function.
instead of the traditional pattern of calling Deferrable#succeed or #fail, and
first argument of a deferrable callback to indicate success or failure. This is
Experimentally, we’re using the pattern of always returning a boolean value as the
We return Deferrables from the user-level operations surfaced by this interface.
TODO: The password handling in dispatch_conn_message is totally incomplete.
in the postgres-pr library, and modified it for event-handling.
We cloned the handling of postgres messages from lib/postgres-pr/connection.rb
It is modified to raise an IOError instead of TruncatedDataException since the exception is unused.
which adds method #readbytes directly to class IO. But StringIO is not a subclass of IO.
The StringIO monkeypatch is lifted from the standard library readbytes.rb,
by postgres-pr.
We need to monkeypatch StringIO because it lacks the #readbytes method needed
requires postgres, this file will need to be required explicitly.
EM installations. Until we find a good way to only require these if a program
We’re tucking in a bunch of require statements that may not be present in garden-variety
anyway.
in basically a production-ready state, and the wire protocol isn’t that complicated
Neumann but (at this writing) appears to be no longer maintained. Still, it’s
of the existing postgres-pr library, which was originally written by Michael
But rather than re-implement the Postgres Wire3 protocol, we’re taking advantage
thread-free evented model.
have all made use of blocking I/O calls, which is incompatible with a
Until now this has been a problem because the Postgres client implementations
Objective: we want to access Postgres databases without requiring threads.
with any Postgres version from roughly 7.4 onward.
This implements version 3 of the Postgres wire protocol, which will work
PROVISIONAL IMPLEMENTATION of an evented Postgres client.

def connect db, user, psw=nil

def connect db, user, psw=nil
  d = EM::DefaultDeferrable.new
  d.timeout 15
  if @pending_query || @pending_conn
    d.succeed false, "Operation already in progress"
  else
    @pending_conn = d
    prms = {"user"=>user, "database"=>db}
    @user = user
    if psw
      @password = psw
      #prms["password"] = psw
    end
    send_data PostgresPR::StartupMessage.new( 3 << 16, prms ).dump
  end
  d
end

def dispatch_conn_message msg

Cloned and modified from the postgres-pr.
def dispatch_conn_message msg
  case msg
  when AuthentificationClearTextPassword
    raise ArgumentError, "no password specified" if @password.nil?
    send_data PasswordMessage.new(@password).dump
  when AuthentificationCryptPassword
    raise ArgumentError, "no password specified" if @password.nil?
    send_data PasswordMessage.new(@password.crypt(msg.salt)).dump
  when AuthentificationMD5Password
    raise ArgumentError, "no password specified" if @password.nil?
    require 'digest/md5'
    m = Digest::MD5.hexdigest(@password + @user)
    m = Digest::MD5.hexdigest(m + msg.salt)
    m = 'md5' + m
    send_data PasswordMessage.new(m).dump
  when AuthentificationKerberosV4, AuthentificationKerberosV5, AuthentificationSCMCredential
    raise "unsupported authentification"
  when AuthentificationOk
  when ErrorResponse
    raise msg.field_values.join("\t")
  when NoticeResponse
    @notice_processor.call(msg) if @notice_processor
  when ParameterStatus
    @params[msg.key] = msg.value
  when BackendKeyData
    # TODO
    #p msg
  when ReadyForQuery
    # TODO: use transaction status
    pc,@pending_conn = @pending_conn,nil
    pc.succeed true
  else
    raise "unhandled message type"
  end
end

def dispatch_query_message msg

Cloned and modified from the postgres-pr.
def dispatch_query_message msg
  case msg
  when DataRow
    @r.rows << msg.columns
  when CommandComplete
    @r.cmd_tag = msg.cmd_tag
  when ReadyForQuery
    pq,@pending_query = @pending_query,nil
    pq.succeed true, @r, @e
  when RowDescription
    @r.fields = msg.fields
  when CopyInResponse
  when CopyOutResponse
  when EmptyQueryResponse
  when ErrorResponse
    # TODO
    @e << msg
  when NoticeResponse
    @notice_processor.call(msg) if @notice_processor
  else
    # TODO
  end
end

def initialize

def initialize
  @data = ""
  @params = {}
end

def query sql

def query sql
  d = EM::DefaultDeferrable.new
  d.timeout 15
  if @pending_query || @pending_conn
    d.succeed false, "Operation already in progress"
  else
    @r = PostgresPR::Connection::Result.new
    @e = []
    @pending_query = d
    send_data PostgresPR::Query.dump(sql)
  end
  d
end

def receive_data data

def receive_data data
  @data << data
  while @data.length >= 5
    pktlen = @data[1...5].unpack("N").first
    if @data.length >= (1 + pktlen)
      pkt = @data.slice!(0...(1+pktlen))
      m = StringIO.open( pkt, "r" ) {|io| PostgresPR::Message.read( io ) }
      if @pending_conn
        dispatch_conn_message m
      elsif @pending_query
        dispatch_query_message m
      else
        raise "Unexpected message from database"
      end
    else
      break # very important, break out of the while
    end
  end
end

def unbind

def unbind
  if o = (@pending_query || @pending_conn)
    o.succeed false, "lost connection"
  end
end