# frozen_string_literal: truerequire'pathname'require'thread'require'set'require'tmpdir'moduleAwsmoduleS3# @api privateclassFileDownloaderMIN_CHUNK_SIZE=5*1024*1024MAX_PARTS=10_000THREAD_COUNT=10definitialize(options={})@client=options[:client]||Client.newend# @return [Client]attr_reader:clientdefdownload(destination,options={})@path=destination@mode=options[:mode]||'auto'@thread_count=options[:thread_count]||THREAD_COUNT@chunk_size=options[:chunk_size]@params={bucket: options[:bucket],key: options[:key]}@params[:version_id]=options[:version_id]ifoptions[:version_id]@on_checksum_validated=options[:on_checksum_validated]@progress_callback=options[:progress_callback]validate!Aws::Plugins::UserAgent.metric('S3_TRANSFER')docase@modewhen'auto'thenmultipart_downloadwhen'single_request'thensingle_requestwhen'get_range'if@chunk_sizeresp=@client.head_object(@params)multithreaded_get_by_ranges(resp.content_length)elsemsg='In :get_range mode, :chunk_size must be provided'raiseArgumentError,msgendelsemsg="Invalid mode #{@mode} provided, "\'mode should be :single_request, :get_range or :auto'raiseArgumentError,msgendendendprivatedefvalidate!if@on_checksum_validated&&!@on_checksum_validated.respond_to?(:call)raiseArgumentError,'on_checksum_validated must be callable'endenddefmultipart_downloadresp=@client.head_object(@params.merge(part_number: 1))count=resp.parts_countifcount.nil?||count<=1ifresp.content_length<=MIN_CHUNK_SIZEsingle_requestelsemultithreaded_get_by_ranges(resp.content_length)endelse# partNumber is an optionresp=@client.head_object(@params)ifresp.content_length<=MIN_CHUNK_SIZEsingle_requestelsecompute_mode(resp.content_length,count)endendenddefcompute_mode(file_size,count)chunk_size=compute_chunk(file_size)part_size=(file_size.to_f/count.to_f).ceilifchunk_size<part_sizemultithreaded_get_by_ranges(file_size)elsemultithreaded_get_by_parts(count,file_size)endenddefconstruct_chunks(file_size)offset=0default_chunk_size=compute_chunk(file_size)chunks=[]whileoffset<file_sizeprogress=offset+default_chunk_sizeprogress=file_sizeifprogress>file_sizechunks<<"bytes=#{offset}-#{progress-1}"offset=progressendchunksenddefcompute_chunk(file_size)if@chunk_size&&@chunk_size>file_sizeraiseArgumentError,":chunk_size shouldn't exceed total file size."else@chunk_size||[(file_size.to_f/MAX_PARTS).ceil,MIN_CHUNK_SIZE].max.to_iendenddefbatches(chunks,mode)chunks=(1..chunks)ifmode.eql?'part_number'chunks.each_slice(@thread_count).to_aenddefmultithreaded_get_by_ranges(file_size)offset=0default_chunk_size=compute_chunk(file_size)chunks=[]part_number=1# parts start at 1whileoffset<file_sizeprogress=offset+default_chunk_sizeprogress=file_sizeifprogress>file_sizerange="bytes=#{offset}-#{progress-1}"chunks<<Part.new(part_number: part_number,size: (progress-offset),params: @params.merge(range: range))part_number+=1offset=progressenddownload_in_threads(PartList.new(chunks),file_size)enddefmultithreaded_get_by_parts(n_parts,total_size)parts=(1..n_parts).mapdo|part|Part.new(part_number: part,params: @params.merge(part_number: part))enddownload_in_threads(PartList.new(parts),total_size)enddefdownload_in_threads(pending,total_size)threads=[]progress=MultipartProgress.new(pending,total_size,@progress_callback)if@progress_callback@thread_count.timesdothread=Thread.newdobeginwhilepart=pending.shiftifprogresspart.params[:on_chunk_received]=procdo|_chunk,bytes,total|progress.call(part.part_number,bytes,total)endendresp=@client.get_object(part.params)write(resp)if@on_checksum_validated&&resp.checksum_validated@on_checksum_validated.call(resp.checksum_validated,resp)endendnilrescue=>error# keep other threads from downloading other partspending.clear!raiseerrorendendthreads<<threadendthreads.map(&:value).compactenddefwrite(resp)range,_=resp.content_range.split(' ').last.split('/')head,_=range.split('-').map{|s|s.to_i}File.write(@path,resp.body.read,head)enddefsingle_requestparams=@params.merge(response_target: @path)params[:on_chunk_received]=single_part_progressif@progress_callbackresp=@client.get_object(params)returnrespunless@on_checksum_validated@on_checksum_validated.call(resp.checksum_validated,resp)ifresp.checksum_validatedrespenddefsingle_part_progressprocdo|_chunk,bytes_read,total_size|@progress_callback.call([bytes_read],[total_size],total_size)endendclassPart<Struct.new(:part_number,:size,:params)includeAws::Structureend# @api privateclassPartListincludeEnumerabledefinitialize(parts=[])@parts=parts@mutex=Mutex.newenddefshift@mutex.synchronize{@parts.shift}enddefsize@mutex.synchronize{@parts.size}enddefclear!@mutex.synchronize{@parts.clear}enddefeach(&block)@mutex.synchronize{@parts.each(&block)}endend# @api privateclassMultipartProgressdefinitialize(parts,total_size,progress_callback)@bytes_received=Array.new(parts.size,0)@part_sizes=parts.map(&:size)@total_size=total_size@progress_callback=progress_callbackenddefcall(part_number,bytes_received,total)# part numbers start at 1@bytes_received[part_number-1]=bytes_received# part size may not be known until we get the first response@part_sizes[part_number-1]||=total@progress_callback.call(@bytes_received,@part_sizes,@total_size)endendendendend