class PhusionPassenger::UnionStation::Core
def self.current_time
def self.current_time return Time.now end
def self.new_from_options(options)
def self.new_from_options(options) if options["analytics"] && options["logging_agent_address"] return new(options["logging_agent_address"], options["logging_agent_username"], options["logging_agent_password"], options["node_name"]) else return nil end end
def self.timestamp_string(time = current_time)
def self.timestamp_string(time = current_time) timestamp = time.to_i * 1_000_000 + time.usec return timestamp.to_s(36) end
def clear_connection
def clear_connection @mutex.synchronize do @connection.synchronize do @random_dev = File.open("/dev/urandom") if @random_dev.closed? @connection.unref @connection = Connection.new(nil) end end end
def close
def close @mutex.synchronize do @connection.synchronize do @random_dev.close @connection.unref @connection = nil end end end
def connect
def connect socket = connect_to_server(@server_address) channel = MessageChannel.new(socket) result = channel.read if result.nil? raise EOFError elsif result.size != 2 || result[0] != "version" raise IOError, "The logging agent didn't sent a valid version identifier" elsif result[1] != "1" raise IOError, "Unsupported logging agent protocol version #{result[1]}" end channel.write_scalar(@username) channel.write_scalar(@password) result = channel.read if result.nil? raise EOFError elsif result[0] != "ok" raise SecurityError, result[0] end channel.write("init", @node_name) args = channel.read if !args raise Errno::ECONNREFUSED, "Cannot connect to logging agent" elsif args.size != 1 raise IOError, "Logging agent returned an invalid reply for the 'init' command" elsif args[0] == "server shutting down" raise Errno::ECONNREFUSED, "Cannot connect to logging agent" elsif args[0] != "ok" raise IOError, "Logging agent returned an invalid reply for the 'init' command" end @connection.unref @connection = Connection.new(socket) rescue Exception => e socket.close if socket && !socket.closed? raise e end
def continue_transaction(txn_id, group_name, category = :requests, union_station_key = "-")
def continue_transaction(txn_id, group_name, category = :requests, union_station_key = "-") if !@server_address return Transaction.new elsif !txn_id || txn_id.empty? raise ArgumentError, "Transaction ID may not be empty" end Utils::Lock.new(@mutex).synchronize do |lock| if current_time < @next_reconnect_time return Transaction.new end Utils::Lock.new(@connection.mutex).synchronize do |connection_lock| if !@connection.connected? begin connect connection_lock.reset(@connection.mutex) rescue SystemCallError, IOError @connection.disconnect warn("Cannot connect to the logging agent at #{@server_address}; " + "retrying in #{@reconnect_timeout} second(s).") @next_reconnect_time = current_time + @reconnect_timeout return Transaction.new rescue Exception => e @connection.disconnect raise e end end begin @connection.channel.write("openTransaction", txn_id, group_name, "", category, Core.timestamp_string, union_station_key, true) return Transaction.new(@connection, txn_id) rescue SystemCallError, IOError @connection.disconnect warn("The logging agent at #{@server_address}" << " closed the connection; will reconnect in " << "#{@reconnect_timeout} second(s).") @next_reconnect_time = current_time + @reconnect_timeout return Transaction.new rescue Exception => e @connection.disconnect raise e end end end end
def current_time
def current_time return self.class.current_time end
def initialize(logging_agent_address, username, password, node_name)
def initialize(logging_agent_address, username, password, node_name) @server_address = logging_agent_address @username = username @password = password if node_name && !node_name.empty? @node_name = node_name else @node_name = `hostname`.strip end @random_dev = File.open("/dev/urandom") # This mutex protects the following instance variables, but # not the contents of @connection. @mutex = Mutex.new @connection = Connection.new(nil) if @server_address && local_socket_address?(@server_address) @max_connect_tries = 10 else @max_connect_tries = 1 end @reconnect_timeout = 1 @next_reconnect_time = Time.utc(1980, 1, 1) end
def new_transaction(group_name, category = :requests, union_station_key = "-")
def new_transaction(group_name, category = :requests, union_station_key = "-") if !@server_address return Transaction.new elsif !group_name || group_name.empty? raise ArgumentError, "Group name may not be empty" end txn_id = (Core.current_time.to_i / 60).to_s(36) txn_id << "-#{random_token(11)}" Utils::Lock.new(@mutex).synchronize do |lock| if current_time < @next_reconnect_time return Transaction.new end Utils::Lock.new(@connection.mutex).synchronize do |connection_lock| if !@connection.connected? begin connect connection_lock.reset(@connection.mutex) rescue SystemCallError, IOError @connection.disconnect warn("Cannot connect to the logging agent at #{@server_address}; " + "retrying in #{@reconnect_timeout} second(s).") @next_reconnect_time = current_time + @reconnect_timeout return Transaction.new rescue Exception => e @connection.disconnect raise e end end begin @connection.channel.write("openTransaction", txn_id, group_name, "", category, Core.timestamp_string, union_station_key, true, true) result = @connection.channel.read if result != ["ok"] raise "Expected logging server to respond with 'ok', but got #{result.inspect} instead" end return Transaction.new(@connection, txn_id) rescue SystemCallError, IOError @connection.disconnect warn("The logging agent at #{@server_address}" << " closed the connection; will reconnect in " << "#{@reconnect_timeout} second(s).") @next_reconnect_time = current_time + @reconnect_timeout return Transaction.new rescue Exception => e @connection.disconnect raise e end end end end
def random_token(length)
def random_token(length) token = "" @random_dev.read(length).each_byte do |c| token << RANDOM_CHARS[c % RANDOM_CHARS.size] end return token end