lib/envirobly/aws/s3.rb
# frozen_string_literal: true
require "zlib"
require "json"
require "open3"
require "concurrent"
require "aws-sdk-s3"
class Envirobly::Aws::S3
OBJECTS_PREFIX = "blobs"
MANIFESTS_PREFIX = "manifests"
CONCURRENCY = 6
def initialize(bucket:, region:, credentials: nil)
@region = region
@bucket = bucket
client_options = { region: }
unless credentials.nil?
client_options.merge! credentials.transform_keys(&:to_sym)
end
@client = Aws::S3::Client.new(client_options)
resource = Aws::S3::Resource.new(client: @client)
@bucket_resource = resource.bucket(@bucket)
end
def push(commit)
if object_exists?(manifest_key(commit.object_tree_checksum))
print "Build context is already uploaded"
$stdout.flush
return
end
# puts "Pushing #{commit.object_tree_checksum} to #{@bucket}"
manifest = []
objects_count = 0
objects_to_upload = []
remote_object_hashes = list_object_hashes
commit.object_tree.each do |chdir, objects|
objects.each do |(mode, type, object_hash, path)|
objects_count += 1
path = File.join chdir.delete_prefix(commit.working_dir), path
manifest << [ mode, type, object_hash, path.delete_prefix("/") ]
next if remote_object_hashes.include?(object_hash)
objects_to_upload << [ chdir, object_hash ]
end
end
upload_git_objects(objects_to_upload)
upload_manifest manifest_key(commit.object_tree_checksum), manifest
end
def pull(object_tree_checksum, target_dir)
puts "Pulling #{object_tree_checksum} into #{target_dir}"
manifest = fetch_manifest(object_tree_checksum)
FileUtils.mkdir_p(target_dir)
puts "Downloading #{manifest.size} files"
pool = Concurrent::FixedThreadPool.new(CONCURRENCY)
manifest.each do |(mode, type, object_hash, path)|
pool.post do
target_path = File.join target_dir, path
if mode == Envirobly::Git::Commit::SYMLINK_FILE_MODE
fetch_symlink(object_hash, target_path:)
else
fetch_object(object_hash, target_path:)
if mode == Envirobly::Git::Commit::EXECUTABLE_FILE_MODE
FileUtils.chmod("+x", target_path)
end
end
end
end
pool.shutdown
pool.wait_for_termination
end
private
def list_object_hashes
@client.list_objects({
bucket: @bucket,
prefix: "#{OBJECTS_PREFIX}/"
}).map do |response|
response.contents.map do |object|
object.key.delete_prefix("#{OBJECTS_PREFIX}/").delete_suffix(".gz")
end
end.flatten
end
def object_key(object_hash)
"#{OBJECTS_PREFIX}/#{object_hash}.gz"
end
def manifest_key(object_tree_checksum)
"#{MANIFESTS_PREFIX}/#{object_tree_checksum}.gz"
end
def object_exists?(key)
@client.head_object(bucket: @bucket, key:)
true
rescue Aws::S3::Errors::NotFound
false
end
def compress_and_upload_object(object_hash, chdir:)
key = object_key object_hash
Tempfile.create([ "envirobly-push", ".gz" ]) do |tempfile|
gz = Zlib::GzipWriter.new(tempfile)
Open3.popen3("git", "cat-file", "-p", object_hash, chdir:) do |_, stdout, stderr, thread|
IO.copy_stream(stdout, gz)
unless thread.value.success?
raise "`git cat-file -p #{object_hash}` failed: #{stderr.read}"
end
ensure
gz.close
end
@client.put_object(bucket: @bucket, body: tempfile, key:)
# puts "⤴ #{key}"
end
end
def upload_git_objects(objects)
pool = Concurrent::FixedThreadPool.new(CONCURRENCY)
uploaded = Concurrent::AtomicFixnum.new
objects_count = objects.count
notifier = Thread.new do
next unless objects_count > 0
# Hide cursor
# print "\e[?25l"
# $stdout.flush
loop do
value = uploaded.value
print "\rUploading build context files: #{value}/#{objects_count}"
$stdout.flush
sleep 0.5
break if value >= objects_count
end
# Show cursor again
# print "\e[?25h\n"
end
objects.each do |(chdir, object_hash)|
pool.post do
compress_and_upload_object(object_hash, chdir:)
uploaded.increment
end
end
pool.shutdown
pool.wait_for_termination
notifier.join
end
def upload_manifest(key, content)
Tempfile.create([ "envirobly-push", ".gz" ]) do |tempfile|
gz = Zlib::GzipWriter.new(tempfile)
gz.write JSON.dump(content)
gz.close
@client.put_object(bucket: @bucket, body: tempfile, key:)
# puts "⤴ #{key}"
end
end
def fetch_manifest(ref)
stream = @bucket_resource.object(manifest_key(ref)).get.body
JSON.parse Zlib::GzipReader.new(stream).read
rescue Aws::S3::Errors::NoSuchKey
puts "Commit #{ref} doesn't exist at s3://#{@bucket}"
exit 1
end
def fetch_object(object_hash, target_path:)
FileUtils.mkdir_p File.dirname(target_path)
key = object_key object_hash
stream = @bucket_resource.object(key).get.body
File.open(target_path, "wb") do |target|
gz = Zlib::GzipReader.new(stream)
IO.copy_stream(gz, target)
gz.close
end
end
def fetch_symlink(object_hash, target_path:)
FileUtils.mkdir_p File.dirname(target_path)
key = object_key object_hash
gz = Zlib::GzipReader.new @bucket_resource.object(key).get.body
symlink_to = gz.read
gz.close
FileUtils.ln_s symlink_to, target_path
end
def format_duration(duration)
total_seconds = duration.to_i
minutes = (total_seconds / 60).floor
seconds = (total_seconds % 60).ceil
result = [ "#{seconds}s" ]
result.prepend "#{minutes}m" if minutes > 0
result.join " "
end
end