lib/async/http/pool.rb
# Copyright, 2017, by Samuel G. D. Williams. <http://www.codeotaku.com> # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. module Async module HTTP # Pool behaviours # # - Single request per connection (HTTP/1 without keep-alive) # - Multiple sequential requests per connection (HTTP1 with keep-alive) # - Multiplex requests per connection (HTTP2) # # In general we don't know the policy until connection is established. # # This pool doesn't impose a maximum number of open resources, but it WILL block if there are no available resources and trying to allocate another one fails. # # Resources must respond to # #multiplex -> 1 or more. # #reusable? -> can be used again. # class Pool def initialize(limit = nil, &block) @available = {} # resource => count @waiting = [] @limit = limit @constructor = block end def acquire resource = wait_for_next_available return resource unless block_given? begin yield resource ensure release(resource) end end # Make the resource available and let waiting tasks know that there is something available. def release(resource) # A resource that is not good should also not be reusable. if resource.reusable? reuse(resource) else retire(resource) end end def close @available.each_key(&:close) @available.clear end protected def reuse(resource) Async.logger.debug(self) {"Reuse #{resource}"} @available[resource] -= 1 if task = @waiting.pop task.resume end end def retire(resource) Async.logger.debug(self) {"Retire #{resource}"} @available.delete(resource) resource.close end def wait_for_next_available until resource = next_available @waiting << Fiber.current Task.yield end return resource end def create begin # This might fail, which is okay :) resource = @constructor.call rescue Async.logger.error "#{$!}: #{$!.backtrace}" return nil end @available[resource] = 1 return resource end def next_available @available.each do |resource, count| if count < resource.multiplex # We want to use this resource... but is it good? if resource.good? @available[resource] += 1 return resource else retire(resource) end end end if !@limit or @available.count < @limit Async.logger.debug(self) {"No available resources, allocating new one..."} return create end end end end end