class Cloudinary::Migrator
def start
def start return if @started @started = true @terminate = false self.work.clear main = self Thread.abort_on_exception = true 1.upto(@threads) do |i| Thread.start do while !main.terminate file = nil row = main.work.pop next if row.nil? begin debug "Thread #{i} - processing row #{row.inspect}. #{main.work.length} work waiting. #{main.results.length} results waiting." url = row["url"] cw = false result = nil if url.nil? && !self.retrieve.nil? data = self.retrieve.call(row["internal_id"]) if defined?(ActiveRecord::Base) && data.is_a?(ActiveRecord::Base) cw = true data.save! elsif defined?(::CarrierWave) && defined?(Cloudinary::CarrierWave) && data.is_a?(Cloudinary::CarrierWave) cw = true begin data.model.save! rescue Cloudinary::CarrierWave::UploadError # upload errors will be handled by the result values. end result = data.metadata elsif data.respond_to?(:read) && data.respond_to?(:path) # This is an IO style object, pass as is. file = data elsif data.nil? # Skip elsif data.match(/^https?:/) url = data else file = main.temporary_file(data, row["public_id"] || "cloudinaryfile") end end if url || file options = main.extra_options.merge(:public_id=>row["public_id"]) json_decode(row["metadata"]).each do |key, value| options[key.to_sym] = value end result = Cloudinary::Uploader.upload(url || file, options.merge(:return_error=>true)) || ({:error=>{:message=>"Received nil from uploader!"}}) elsif cw result ||= {"status" => "saved"} else result = {"error" => {"message" => "Empty data and url", "http_code"=>404}} end main.results << {"id"=>row["id"], "internal_id"=>row["internal_id"], "result"=>result.to_json} rescue => e $stderr.print "Thread #{i} - Error in processing row #{row.inspect} - #{e}\n" debug(e.backtrace.join("\n")) sleep 1 ensure main.mutex.synchronize{main.in_process -= 1} main.close_if_needed(file) end end end end retry_previous_queue # Retry all work from previous iteration before we start processing this one. end