# frozen_string_literal: truerequire'thread'require'cgi'moduleAwsmoduleS3# @api privateclassObjectMultipartCopierFIVE_MB=5*1024*1024# 5MBFILE_TOO_SMALL="unable to multipart copy files smaller than 5MB"MAX_PARTS=10_000# @option options [Client] :client# @option options [Integer] :min_part_size (52428800)# Size of copied parts. Defaults to 50MB.# @option options [Integer] :thread_count (10) Number of concurrent# threads to use for copying parts.# @option options [Boolean] :use_source_parts (false) Use part sizes# defined on the source object if any exist. If copying or moving an# object that is already multipart, this does not re-part the object,# instead re-using the part definitions on the original. That means# the etag and any checksums will not change. This is especially# useful if the source object has parts with varied sizes.definitialize(options={})@use_source_parts=options.delete(:use_source_parts)||false@thread_count=options.delete(:thread_count)||10@min_part_size=options.delete(:min_part_size)||(FIVE_MB*10)@client=options[:client]||Client.newend# @return [Client]attr_reader:client# @option (see S3::Client#copy_object)defcopy(options={})metadata=source_metadata(options)size=metadata[:content_length]options[:upload_id]=initiate_upload(metadata.merge(options))beginparts=copy_parts(size,default_part_size(size),options)complete_upload(parts,options)rescue=>errorabort_upload(options)raiseerrorendendprivatedefinitiate_upload(options)options=options_for(:create_multipart_upload,options)@client.create_multipart_upload(options).upload_idenddefcopy_parts(size,default_part_size,options)queue=PartQueue.new(compute_parts(size,default_part_size,options))threads=[]@thread_count.timesdothreads<<copy_part_thread(queue)endthreads.map(&:value).flatten.sort_by{|part|part[:part_number]}enddefcopy_part_thread(queue)Thread.newdobegincompleted=[]whilepart=queue.shiftcompleted<<copy_part(part)endcompletedrescue=>errorqueue.clear!raiseerrorendendenddefcopy_part(part)@client.upload_part_copy(part).copy_part_result.to_h.merge({part_number: part[:part_number]}).tap{|result|result.delete(:last_modified)}enddefcomplete_upload(parts,options)options=options_for(:complete_multipart_upload,options)options[:multipart_upload]={parts: parts}@client.complete_multipart_upload(options)enddefabort_upload(options)@client.abort_multipart_upload({bucket: options[:bucket],key: options[:key],upload_id: options[:upload_id],})enddefcompute_parts(size,default_part_size,options)part_number=1offset=0parts=[]options=options_for(:upload_part_copy,options)whileoffset<sizepart_size=calculate_part_size(part_number,default_part_size,options)parts<<options.merge({part_number: part_number,copy_source_range: byte_range(offset,part_size,size),})part_number+=1offset+=part_sizeendpartsenddefbyte_range(offset,part_size,size)ifoffset+part_size<size"bytes=#{offset}-#{offset+part_size-1}"else"bytes=#{offset}-#{size-1}"endenddefcalculate_part_size(part_number,default_part_size,options)if@use_source_parts&&source_has_parts(options)source_metadata(options.merge({part_number: part_number}))[:content_length]elsedefault_part_sizeendenddefsource_has_parts(options)@source_has_parts||=source_metadata(options.merge({part_number: 1}))[:parts_count]enddefsource_metadata(options)returnoptions.slice(:content_length)ifoptions[:content_length]client=options[:copy_source_client]||@clientifvid_match=options[:copy_source].match(/([^\/]+?)\/(.+)\?versionId=(.+)/)bucket,key,version_id=vid_match[1,3]elsebucket,key=options[:copy_source].match(/([^\/]+?)\/(.+)/)[1,2]endhead_opts={bucket: bucket,key: CGI.unescape(key)}.tap{|opts|opts[:version_id]=version_idifversion_idopts[:part_number]=options[:part_number]ifoptions[:part_number]}client.head_object(head_opts).to_h.tap{|head|head.delete(:server_side_encryption)head.delete(:ssekms_key_id)}enddefdefault_part_size(source_size)ifsource_size<FIVE_MBraiseArgumentError,FILE_TOO_SMALLelse[(source_size.to_f/MAX_PARTS).ceil,@min_part_size].max.to_iendenddefoptions_for(operation_name,options)API_OPTIONS[operation_name].inject({})do|hash,opt_name|hash[opt_name]=options[opt_name]ifoptions.key?(opt_name)hashendend# @api privatedefself.options_for(shape_name)Client.api.metadata['shapes'][shape_name].member_namesendAPI_OPTIONS={create_multipart_upload: Types::CreateMultipartUploadRequest.members,upload_part_copy: Types::UploadPartCopyRequest.members,complete_multipart_upload: Types::CompleteMultipartUploadRequest.members,}classPartQueuedefinitialize(parts=[])@parts=parts@mutex=Mutex.newenddefshift@mutex.synchronize{@parts.shift}enddefclear!@mutex.synchronize{@parts.clear}endendendendend