class AWS::Core::Http::CurbHandler
@private
def fill_multi(items)
def fill_multi(items) items.each do |item| c = make_easy_handle(*item) @multi.add(c) end end
def handle request, response
def handle request, response raise "unsupport http reqest method: #{request.http_method}" unless ['GET', 'HEAD', 'PUT', 'POST', 'DELETE'].include? request.http_method @sem.synchronize do @q << [request, response, Thread.current] @processor.wakeup end Thread.stop nil end
def initialize
def initialize @q = [] @sem = Mutex.new @multi = Curl::Multi.new start_processor end
def make_easy_handle request, response, thread = nil
def make_easy_handle request, response, thread = nil url = request.use_ssl? ? "https://#{request.host}:443#{request.uri}" : "http://#{request.host}#{request.uri}" curl = Curl::Easy.new(url) curl.headers = request.headers if proxy = request.proxy_uri curl.proxy_url = proxy.to_s curl.proxy_port = proxy.port end curl.on_header {|header_data| name, value = header_data.strip.split(/:\s+/, 2) response.headers[name] = value header_data.length } case request.http_method when 'GET' # .... when 'HEAD' curl.head = true when 'PUT' curl.put_data = request.body when 'POST' curl.post_body = request.body when 'DELETE' curl.delete = true end curl.on_complete do response.body = curl.body_str response.status = curl.response_code thread.run if thread end curl end
def start_processor
def start_processor @processor = Thread.new do loop do items = nil @sem.synchronize do items = @q.slice!(0..-1) end unless items.empty? fill_multi(items) @multi.perform do # curl is idle, so process more items if we can get them # without blocking if !@q.empty? && @sem.try_lock begin fill_multi(@q.slice!(0..-1)) ensure @sem.unlock end end end end # wait for a new item to show up before continuing Thread.stop if @q.empty? end end end