# Phusion Passenger - http://www.modrails.com/
# Copyright (c) 2010, 2011, 2012 Phusion
#
# "Phusion Passenger" is a trademark of Hongli Lai & Ninh Bui.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
require 'thread'
require 'phusion_passenger/utils'
require 'phusion_passenger/debug_logging'
require 'phusion_passenger/message_channel'
module PhusionPassenger
class AnalyticsLogger
RETRY_SLEEP = 0.2
NETWORK_ERRORS = [Errno::EPIPE, Errno::ECONNREFUSED, Errno::ECONNRESET,
Errno::EHOSTUNREACH, Errno::ENETDOWN, Errno::ENETUNREACH, Errno::ETIMEDOUT]
include Utils
class Log
attr_reader :txn_id
def initialize(connection = nil, txn_id = nil)
if connection
@connection = connection
@txn_id = txn_id
connection.ref
end
end
def null?
return !@connection
end
def message(text)
@connection.synchronize do
return if !@connection.connected?
begin
timestamp_string = AnalyticsLogger.timestamp_string
DebugLogging.trace(3, "[Union Station log] #{@txn_id} #{timestamp_string} #{text}")
@connection.channel.write("log", @txn_id, timestamp_string)
@connection.channel.write_scalar(text)
rescue SystemCallError, IOError => e
@connection.disconnect
DebugLogging.warn("Error communicating with the logging agent: #{e.message}")
rescue Exception => e
@connection.disconnect
raise e
end
end if @connection
end
def begin_measure(name, extra_info = nil)
if extra_info
extra_info_base64 = [extra_info].pack("m")
extra_info_base64.gsub!("\n", "")
extra_info_base64.strip!
else
extra_info_base64 = nil
end
times = NativeSupport.process_times
message "BEGIN: #{name} (#{current_timestamp.to_s(36)},#{times.utime.to_s(36)},#{times.stime.to_s(36)}) #{extra_info_base64}"
end
def end_measure(name, error_encountered = false)
times = NativeSupport.process_times
if error_encountered
message "FAIL: #{name} (#{current_timestamp.to_s(36)},#{times.utime.to_s(36)},#{times.stime.to_s(36)})"
else
message "END: #{name} (#{current_timestamp.to_s(36)},#{times.utime.to_s(36)},#{times.stime.to_s(36)})"
end
end
def measure(name, extra_info = nil)
begin_measure(name, extra_info)
begin
yield
rescue Exception
error = true
is_closed = closed?
raise
ensure
end_measure(name, error) if !is_closed
end
end
def measured_time_points(name, begin_time, end_time, extra_info = nil)
if extra_info
extra_info_base64 = [extra_info].pack("m")
extra_info_base64.gsub!("\n", "")
extra_info_base64.strip!
else
extra_info_base64 = nil
end
begin_timestamp = begin_time.to_i * 1_000_000 + begin_time.usec
end_timestamp = end_time.to_i * 1_000_000 + end_time.usec
message "BEGIN: #{name} (#{begin_timestamp.to_s(36)}) #{extra_info_base64}"
message "END: #{name} (#{end_timestamp.to_s(36)})"
end
def close(flush_to_disk = false)
@connection.synchronize do
begin
# We need an ACK here. See abstract_request_handler.rb finalize_request.
@connection.channel.write("closeTransaction", @txn_id,
AnalyticsLogger.timestamp_string, true)
result = @connection.channel.read
if result != ["ok"]
raise "Expected logging agent to respond with 'ok', but got #{result.inspect} instead"
end
if flush_to_disk
@connection.channel.write("flush")
result = @connection.channel.read
if result != ["ok"]
raise "Invalid logging agent response #{result.inspect} to the 'flush' command"
end
end
rescue SystemCallError, IOError => e
@connection.disconnect
DebugLogging.warn("Error communicating with the logging agent: #{e.message}")
rescue Exception => e
@connection.disconnect
raise e
ensure
@connection.unref
@connection = nil
end
end if @connection
end
def closed?
if @connection
@connection.synchronize do
return !@connection.connected?
end
else
return nil
end
end
private
def current_timestamp
time = AnalyticsLogger.current_time
return time.to_i * 1_000_000 + time.usec
end
end
def self.new_from_options(options)
if options["analytics"] && options["logging_agent_address"]
return new(options["logging_agent_address"],
options["logging_agent_username"],
options["logging_agent_password"],
options["node_name"])
else
return nil
end
end
attr_accessor :max_connect_tries
attr_accessor :reconnect_timeout
def initialize(logging_agent_address, username, password, node_name)
@server_address = logging_agent_address
@username = username
@password = password
if node_name && !node_name.empty?
@node_name = node_name
else
@node_name = `hostname`.strip
end
@random_dev = File.open("/dev/urandom")
# This mutex protects the following instance variables, but
# not the contents of @connection.
@mutex = Mutex.new
@connection = Connection.new(nil)
if @server_address && local_socket_address?(@server_address)
@max_connect_tries = 10
else
@max_connect_tries = 1
end
@reconnect_timeout = 1
@next_reconnect_time = Time.utc(1980, 1, 1)
end
def clear_connection
@mutex.synchronize do
@connection.synchronize do
@random_dev = File.open("/dev/urandom") if @random_dev.closed?
@connection.unref
@connection = Connection.new(nil)
end
end
end
def close
@mutex.synchronize do
@connection.synchronize do
@random_dev.close
@connection.unref
@connection = nil
end
end
end
def new_transaction(group_name, category = :requests, union_station_key = nil)
if !@server_address
return Log.new
elsif !group_name || group_name.empty?
raise ArgumentError, "Group name may not be empty"
end
txn_id = (AnalyticsLogger.current_time.to_i / 60).to_s(36)
txn_id << "-#{random_token(11)}"
Lock.new(@mutex).synchronize do |lock|
if current_time < @next_reconnect_time
return Log.new
end
Lock.new(@connection.mutex).synchronize do |connection_lock|
if !@connection.connected?
begin
connect
connection_lock.reset(@connection.mutex)
rescue SystemCallError, IOError
@connection.disconnect
DebugLogging.warn("Cannot connect to the logging agent at #{@server_address}; " +
"retrying in #{@reconnect_timeout} second(s).")
@next_reconnect_time = current_time + @reconnect_timeout
return Log.new
rescue Exception => e
@connection.disconnect
raise e
end
end
begin
@connection.channel.write("openTransaction",
txn_id, group_name, "", category,
AnalyticsLogger.timestamp_string,
union_station_key,
true,
true)
result = @connection.channel.read
if result != ["ok"]
raise "Expected logging server to respond with 'ok', but got #{result.inspect} instead"
end
return Log.new(@connection, txn_id)
rescue SystemCallError, IOError
@connection.disconnect
DebugLogging.warn("The logging agent at #{@server_address}" <<
" closed the connection; will reconnect in " <<
"#{@reconnect_timeout} second(s).")
@next_reconnect_time = current_time + @reconnect_timeout
return Log.new
rescue Exception => e
@connection.disconnect
raise e
end
end
end
end
def continue_transaction(txn_id, group_name, category = :requests, union_station_key = nil)
if !@server_address
return Log.new
elsif !txn_id || txn_id.empty?
raise ArgumentError, "Transaction ID may not be empty"
end
Lock.new(@mutex).synchronize do |lock|
if current_time < @next_reconnect_time
return Log.new
end
Lock.new(@connection.mutex).synchronize do |connection_lock|
if !@connection.connected?
begin
connect
connection_lock.reset(@connection.mutex)
rescue SystemCallError, IOError
@connection.disconnect
DebugLogging.warn("Cannot connect to the logging agent at #{@server_address}; " +
"retrying in #{@reconnect_timeout} second(s).")
@next_reconnect_time = current_time + @reconnect_timeout
return Log.new
rescue Exception => e
@connection.disconnect
raise e
end
end
begin
@connection.channel.write("openTransaction",
txn_id, group_name, "", category,
AnalyticsLogger.timestamp_string,
union_station_key,
true)
return Log.new(@connection, txn_id)
rescue SystemCallError, IOError
@connection.disconnect
DebugLogging.warn("The logging agent at #{@server_address}" <<
" closed the connection; will reconnect in " <<
"#{@reconnect_timeout} second(s).")
@next_reconnect_time = current_time + @reconnect_timeout
return Log.new
rescue Exception => e
@connection.disconnect
raise e
end
end
end
end
private
RANDOM_CHARS = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm',
'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M',
'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z',
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9']
class Lock
def initialize(mutex)
@mutex = mutex
@locked = false
end
def reset(mutex, lock_now = true)
unlock if @locked
@mutex = mutex
lock if lock_now
end
def synchronize
lock if !@locked
begin
yield(self)
ensure
unlock if @locked
end
end
def lock
raise if @locked
@mutex.lock
@locked = true
end
def unlock
raise if !@locked
@mutex.unlock
@locked = false
end
end
class Connection
attr_reader :mutex
attr_accessor :channel
def initialize(io)
@mutex = Mutex.new
@refcount = 1
@channel = MessageChannel.new(io) if io
end
def connected?
return !!@channel
end
def disconnect
@channel.close if @channel
@channel = nil
end
def ref
@refcount += 1
end
def unref
@refcount -= 1
if @refcount == 0
disconnect
end
end
def synchronize
@mutex.synchronize do
yield
end
end
end
def connect
socket = connect_to_server(@server_address)
channel = MessageChannel.new(socket)
result = channel.read
if result.nil?
raise EOFError
elsif result.size != 2 || result[0] != "version"
raise IOError, "The logging agent didn't sent a valid version identifier"
elsif result[1] != "1"
raise IOError, "Unsupported logging agent protocol version #{result[1]}"
end
channel.write_scalar(@username)
channel.write_scalar(@password)
result = channel.read
if result.nil?
raise EOFError
elsif result[0] != "ok"
raise SecurityError, result[0]
end
channel.write("init", @node_name)
args = channel.read
if !args
raise Errno::ECONNREFUSED, "Cannot connect to logging agent"
elsif args.size != 1
raise IOError, "Logging agent returned an invalid reply for the 'init' command"
elsif args[0] == "server shutting down"
raise Errno::ECONNREFUSED, "Cannot connect to logging agent"
elsif args[0] != "ok"
raise IOError, "Logging agent returned an invalid reply for the 'init' command"
end
@connection.unref
@connection = Connection.new(socket)
rescue Exception => e
socket.close if socket && !socket.closed?
raise e
end
def random_token(length)
token = ""
@random_dev.read(length).each_byte do |c|
token << RANDOM_CHARS[c % RANDOM_CHARS.size]
end
return token
end
def current_time
return self.class.current_time
end
def self.current_time
return Time.now
end
def self.timestamp_string(time = current_time)
timestamp = time.to_i * 1_000_000 + time.usec
return timestamp.to_s(36)
end
end
end # module PhusionPassenger