lib/net/http/connection_pool.rb
# Copyright 2011-2012 Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"). You # may not use this file except in compliance with the License. A copy of # the License is located at # # http://aws.amazon.com/apache2.0/ # # or in the "license" file accompanying this file. This file is # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. require 'net/http/connection_pool/session' require 'net/http/connection_pool/connection' require 'thread' require 'logger' # A wrapper around Net::HTTP that provides a manged pool of persistant HTTP # connections. # # pool = Net::HTTP::ConnectionPool.new # connection = pool.connection_for('domain.com') # connection.request(Net::HTTP::Get.new('/')) do |resp| # # Connection#request yields Net::HTTPResponse objects # puts resp.body # end # # @private class Net::HTTP::ConnectionPool # @param [Hash] options # # @option options [Numeric] :http_idle_timeout (60) The number of seconds a # connection is allowed to sit idle before it is closed and removed # from the pool. # # @option options [Numeric] :http_open_timeout (15) The number of seconds to # wait when opening a http session before raising a timeout exception. # # @option options [Boolean] :http_wire_trace (false) When +true+, HTTP # debug output will be sent to the +:logger+. # # @option options [Logger] :logger (Logger.new($stdout)) Where debug out # is sent (wire traces). # def initialize options = {} @pool_mutex = Mutex.new @pool = [] @open_timeout = options[:http_open_timeout] || 15 @idle_timeout = options[:http_idle_timeout] || 60 @http_wire_trace = !!options[:http_wire_trace] if logger = options[:logger] @logger = logger elsif http_wire_trace? @logger = Logger.new($stdout) end end # @return [Integer] attr_reader :idle_timeout # @return [Integer] attr_accessor :open_timeout # @return [Boolean] Returns +true+ when HTTP debug output (wire traces) # will be logged. attr_reader :http_wire_trace alias_method :http_wire_trace?, :http_wire_trace alias_method :log_wire_trace?, :http_wire_trace alias_method :log_wire_trace, :http_wire_trace # @return [Logger] Where debug output is sent. attr_reader :logger # Requests a connection object from the connection pool. # # connection = pool.connection_for('domain.com') # connection.request(Net::HTTP::Get.new('/index.html')) {|resp|} # connection.request(Net::HTTP::Get.new('/about.html')) {|resp|} # # # same thing in block form # pool.connection_for('domain.com') do |connection| # connection.request(Net::HTTP::Get.new('/index.html')) {|resp|} # connection.request(Net::HTTP::Get.new('/about.html')) {|resp|} # end # # Because the pool manages HTTP sessions you do not have to # worry about closing a connection or returning a connection # to the pool. # # @param [String] host # # @param [Hash] options # # @option options [Integer] :port Which port the connection should use. # Defaults to 80, unless +:ssl+ is +true+, then it defaults to 443. # # @option options [Boolean] :ssl If the connection should be made over # SSL. Defaults to +false+, unless +:port+ is 443, then it defaults # to +true+. # # @option options [Boolean] :ssl_verify_peer (true) If true, ssl # connections should verify peer certificates. This should only ever be # set false false for debugging purposes. # # @option options [String] :ssl_ca_file Full path to the SSL certificate # authority bundle file that should be used when verifying peer # certificates. If you do not pass +:ssl_ca_file+ or +:ssl_ca_path+ # the the system default will be used if available. # # @option options [String] :ssl_ca_path Full path of the directory that # contains the unbundled SSL certificate authority files for verifying # peer certificates. If you do not pass +:ssl_ca_file+ or +:ssl_ca_path+ # the the system default will be used if available. # # @option options [URI::HTTP,String] :proxy_uri (nil) A URI string or # URI::HTTP object to use as a proxy. You should not provide # +:proxy_uri+ with any other proxy options. # # :proxy_uri => 'http://user:pass@host.com:80' # # @option options [String] :proxy_address # # @option options [String] :proxy_port # # @option options [String] :proxy_user # # @option options [String] :proxy_password # # @yield [connection] # # @yieldparam [optional,Connection] connection # # @return [Connection] # def connection_for host, options = {}, &block connection = Connection.new(self, host, options) yield(connection) if block_given? connection end # Returns the number of sessions currently in the pool, not counting those # currently in use. def size @pool_mutex.synchronize { @pool.size } end # Removes stale http sessions from the pool (that have exceeded # the idle timeout). def clean! @pool_mutex.synchronize { _clean } end # Closes and removes removes all sessions from the pool. # If empty! is called while there are outstanding requests they may # get checked back into the pool, leaving the pool in a non-empty state. def empty! @pool_mutex.synchronize do @pool.each(&:finish) @pool = [] end end # Makes a single HTTP request. See {Connection#request} for more information # on making an HTTP request. # @return [nil] # @private def request connection, *args, &block session_for(connection) do |session| session.read_timeout = connection.read_timeout session.request(*args, &block) end end protected # Yields an open http session for the given connection. def session_for connection, &block session = nil # search the pool for an idle session that can be used @pool_mutex.synchronize do _clean # removes stale sessions session = @pool.find{|idle_session| idle_session.key == connection.key } @pool.delete(session) if session end begin # opens a new HTTP session if no suitable idle session was found session = _create_session(connection) unless session yield(session) rescue Exception => error session.finish if session raise error else # only check the session back into the pool if no errors were raised @pool_mutex.synchronize { @pool << session } end nil end def _create_session connection Session.start(connection, :open_timeout => open_timeout, :debug_logger => log_wire_trace? ? logger : nil) end def _clean now = Time.now @pool.delete_if do |idle_session| if idle_session.last_used_at.nil? or now - idle_session.last_used_at > idle_timeout then idle_session.finish true end end end end