class Net::HTTP::Persistent
def connection_for uri
def connection_for uri Thread.current[@connection_key] ||= {} Thread.current[@request_key] ||= Hash.new 0 connections = Thread.current[@connection_key] net_http_args = [uri.host, uri.port] connection_id = net_http_args.join ':' if @proxy_uri then connection_id << @proxy_connection_id net_http_args.concat @proxy_args end unless connection = connections[connection_id] then connections[connection_id] = Net::HTTP.new(*net_http_args) connection = connections[connection_id] ssl connection if uri.scheme == 'https' end unless connection.started? then connection.set_debug_output @debug_output if @debug_output connection.open_timeout = @open_timeout if @open_timeout connection.read_timeout = @read_timeout if @read_timeout connection.start end connection rescue Errno::ECONNREFUSED raise Error, "connection refused: #{connection.address}:#{connection.port}" rescue Errno::EHOSTDOWN raise Error, "host down: #{connection.address}:#{connection.port}" end
def error_message connection
def error_message connection requests = Thread.current[@request_key][connection.object_id] "after #{requests} requests on #{connection.object_id}" end
def escape str
def escape str CGI.escape str if str end
def finish connection
def finish connection Thread.current[@request_key].delete connection.object_id connection.finish rescue IOError end
def http_version uri
def http_version uri @http_versions["#{uri.host}:#{uri.port}"] end
def idempotent? req
def idempotent? req case req when Net::HTTP::Delete, Net::HTTP::Get, Net::HTTP::Head, Net::HTTP::Options, Net::HTTP::Put, Net::HTTP::Trace then true end end
def initialize name = nil, proxy = nil
def initialize name = nil, proxy = nil @name = name @proxy_uri = case proxy when :ENV then proxy_from_env when URI::HTTP then proxy when nil then # ignore else raise ArgumentError, 'proxy must be :ENV or a URI::HTTP' end if @proxy_uri then @proxy_args = [ @proxy_uri.host, @proxy_uri.port, @proxy_uri.user, @proxy_uri.password, ] @proxy_connection_id = [nil, *@proxy_args].join ':' end @debug_output = nil @headers = {} @http_versions = {} @keep_alive = 30 @open_timeout = nil @read_timeout = nil key = ['net_http_persistent', name, 'connections'].compact.join '_' @connection_key = key.intern key = ['net_http_persistent', name, 'requests'].compact.join '_' @request_key = key.intern @certificate = nil @ca_file = nil @private_key = nil @verify_callback = nil @verify_mode = nil end
def normalize_uri uri
def normalize_uri uri (uri =~ /^https?:/) ? uri : "http://#{uri}" end
def proxy_from_env
def proxy_from_env env_proxy = ENV['http_proxy'] || ENV['HTTP_PROXY'] return nil if env_proxy.nil? or env_proxy.empty? uri = URI.parse normalize_uri env_proxy unless uri.user or uri.password then uri.user = escape ENV['http_proxy_user'] || ENV['HTTP_PROXY_USER'] uri.password = escape ENV['http_proxy_pass'] || ENV['HTTP_PROXY_PASS'] end uri end
def request uri, req = nil, &block
def request uri, req = nil, &block retried = false bad_response = false req = Net::HTTP::Get.new uri.request_uri unless req headers.each do |pair| req.add_field(*pair) end if uri.user or uri.password req.basic_auth uri.user, uri.password end req.add_field 'Connection', 'keep-alive' req.add_field 'Keep-Alive', @keep_alive connection = connection_for uri connection_id = connection.object_id begin Thread.current[@request_key][connection_id] += 1 response = connection.request req, &block rescue Net::HTTPBadResponse => e message = error_message connection finish connection raise Error, "too many bad responses #{message}" if bad_response or not idempotent? req bad_response = true retry rescue IOError, EOFError, Timeout::Error, Errno::ECONNABORTED, Errno::ECONNRESET, Errno::EPIPE => e if retried or not idempotent? req due_to = "(due to #{e.message} - #{e.class})" message = error_message connection finish connection raise Error, "too many connection resets #{due_to} #{message}" end reset connection retried = true retry end @http_versions["#{uri.host}:#{uri.port}"] ||= response.http_version response end
def reset connection
def reset connection Thread.current[@request_key].delete connection.object_id finish connection connection.start rescue Errno::ECONNREFUSED raise Error, "connection refused: #{connection.address}:#{connection.port}" rescue Errno::EHOSTDOWN raise Error, "host down: #{connection.address}:#{connection.port}" end
def shutdown thread = Thread.current
def shutdown thread = Thread.current connections = thread[@connection_key] connections.each do |_, connection| begin connection.finish rescue IOError end end if connections thread[@connection_key] = nil thread[@request_key] = nil end
def shutdown_in_all_threads
def shutdown_in_all_threads Thread.list.each do |thread| shutdown thread end nil end
def ssl connection
def ssl connection require 'net/https' connection.use_ssl = true # suppress warning but allow override connection.verify_mode = OpenSSL::SSL::VERIFY_NONE unless @verify_mode if @ca_file then connection.ca_file = @ca_file connection.verify_mode = OpenSSL::SSL::VERIFY_PEER connection.verify_callback = @verify_callback if @verify_callback end if @certificate and @private_key then connection.cert = @certificate connection.key = @private_key end connection.verify_mode = @verify_mode if @verify_mode end