lib/pg_conn/role_methods.rb
module PgConn class RoleMethods attr_reader :conn def initialize(conn) @conn = conn # TODO: Check if conn is a superuser connection end # Return true if role(s) exists def exist?(*rolenames, superuser: nil, can_login: nil) rolenames = Array(rolenames).flatten.compact rolename_clause = "rolname in (#{PgConn.sql_values(rolenames)})" superuser_clause = case superuser when true; "rolsuper" when false; "not rolsuper" else nil end can_login_clause = case can_login when true; "rolcanlogin" when false; "not rolcanlogin" else nil end where_clause = [rolename_clause, superuser_clause, can_login_clause].compact.join(" and ") conn.value("select count(*) from pg_roles where #{where_clause}") == rolenames.size end # Return true if the user can login def can_login?(username, superuser: nil) exist?(username, superuser: superuser, can_login: true) end alias_method :user?, :can_login? # Return true if the user is a superuser def superuser?(username) exist?(username, superuser: true) end # Create a new role def create(rolename, superuser: false, create_database: false, can_login: false, create_role: false) user_decl = "create role \"#{rolename}\"" superuser_decl = superuser ? "superuser" : "nosuperuser" create_database_decl = create_database ? "createdb" : "nocreatedb" can_login_decl = can_login ? "login" : "nologin" create_role_decl = create_role ? "createrole" : "nocreaterole" stmt = [user_decl, superuser_decl, can_login_decl, create_role_decl].compact.join(" ") conn.exec stmt end # Remove all privileges from the given role. TODO #demote!, strip! ? def clean(rolename) end # Drop existing users. Return true if any role was dropped. Drop depending # privileges and objects too if :cascade is true. Returns true if the # user(s) was deleted and false if :fail is true and one or more user # counldn't be deleted # # Note that cascade only works if connected to the database where the # privileges exist. # # TODO The :silent option is used in tests - fix it somehow! def drop(*rolenames, cascade: false, fail: true, silent: false) rolenames = Array(rolenames).flatten.compact.select { |role| exist?(role) } return false if rolenames.empty? rolenames_sql = PgConn.sql_idents(rolenames) # begin conn.exec("drop owned by #{rolenames_sql} cascade", fail: false, silent: silent) if cascade conn.exec("drop role #{rolenames_sql}", fail: fail, silent: silent) && true # rescue PG::Error # raise if fail # conn.cancel_transaction # return false # end end # List users. TODO Use RE. Also doc this shit def list(database: nil, owner: false, superuser: nil, can_login: nil) database_clause = database && "rolname like '#{database}\\_\\_%'" database_clause = database && "(#{database_clause} or rolname = '#{database}')" if owner superuser_clause = superuser.nil? ? nil : "rolsuper = #{superuser}" can_login_clause = can_login.nil? ? nil : "rolcanlogin = #{can_login}" query = [ "select rolname from pg_roles where true", database_clause, superuser_clause, can_login_clause ].compact.join(" and ") conn.values(query) end end end