class Multiwoven::Integrations::Destination::AmazonS3::Client

def build_discover_prefix(connection_config)

def build_discover_prefix(connection_config)
  file_path = connection_config[:file_path].to_s.strip
  file_path = "#{file_path}/" if file_path.present? && file_path[-1] != "/"
  format_type = connection_config[:format_type].to_s.downcase
  "#{file_path}#{connection_config[:file_name]}.#{format_type}"
end

def check_connection(connection_config)

def check_connection(connection_config)
  connection_config = connection_config.with_indifferent_access
  conn = create_connection(connection_config)
  conn.head_bucket(bucket: connection_config[:bucket_name])
  ConnectionStatus.new(status: ConnectionStatusType["succeeded"]).to_multiwoven_message
rescue StandardError => e
  ConnectionStatus.new(status: ConnectionStatusType["failed"], message: e.message).to_multiwoven_message
end

def create_connection(connection_config)

def create_connection(connection_config)
  connection_config = connection_config.with_indifferent_access
  s3_options = {
    region: connection_config[:region],
    access_key_id: connection_config[:access_key_id],
    secret_access_key: connection_config[:secret_access_key]
  }
  endpoint = connection_config[:endpoint].to_s.strip
  if endpoint.present?
    s3_options[:endpoint] = endpoint
    # Path style is required for MinIO/S3-compatible endpoints. Accept both boolean and string
    # (e.g. from JSON/API) so it works in all environments.
    s3_options[:force_path_style] = path_style_enabled?(connection_config)
  end
  Aws::S3::Client.new(**s3_options)
end

def create_streams(tables)

def create_streams(tables)
  tables.values.map do |r|
    Multiwoven::Integrations::Protocol::Stream.new(
      name: r[:tablename],
      action: StreamAction["create"],
      json_schema: convert_to_json_schema(r[:columns]),
      batch_support: true,
      batch_size: 100_000
    )
  end
end

def discover(connection_config)

def discover(connection_config)
  connection_config = connection_config.with_indifferent_access
  conn = create_connection(connection_config)
  records = discover_columns_from_s3(conn, connection_config)
  grouped = group_by_table(records, connection_config[:file_name])
  catalog = Catalog.new(streams: create_streams(grouped))
  catalog.to_multiwoven_message
rescue StandardError => e
  handle_exception(e, {
                     context: "AMAZONS3:DISCOVER:EXCEPTION",
                     type: "error"
                   })
end

def discover_columns_from_s3(s3_client, connection_config)

def discover_columns_from_s3(s3_client, connection_config)
  bucket = connection_config[:bucket_name]
  prefix = build_discover_prefix(connection_config)
  format_type = connection_config[:format_type].to_s.downcase
  response = s3_client.list_objects_v2(bucket: bucket, prefix: prefix, max_keys: 100)
  raise StandardError, "No files found in the bucket" if response.contents.empty?
  key = response.contents&.find { |obj| obj.key.end_with?(".#{format_type}") }&.key
  raise StandardError, "No files found in the bucket" if key.nil?
  read_csv_headers(s3_client, bucket, key)
end

def generate_csv_content(records)

def generate_csv_content(records)
  CSV.generate do |csv|
    headers = records.first.keys
    csv << headers
    records.each { |record| csv << record.values_at(*headers) }
  end
end

def generate_local_file_name(connection_config)

def generate_local_file_name(connection_config)
  timestamp = Time.now.strftime("%Y%m%d-%H%M%S")
  "#{connection_config[:file_name]}_#{timestamp}.#{connection_config[:format_type]}"
end

def group_by_table(records, file_name)

def group_by_table(records, file_name)
  result = {}
  records.each do |entry|
    table_name = file_name
    column_data = {
      column_name: entry,
      type: "string",
      optional: true
    }
    result[table_name] ||= { tablename: table_name, columns: [] }
    result[table_name][:columns] << column_data
  end
  result
end

def path_style_enabled?(connection_config)

def path_style_enabled?(connection_config)
  val = connection_config[:path_style]
  val == true || val.to_s.casecmp("true").zero?
end

def read_csv_headers(s3_client, bucket, key)

def read_csv_headers(s3_client, bucket, key)
  obj = s3_client.get_object(bucket: bucket, key: key)
  first_line = obj.body.read.to_s.lines.first
  return [] if first_line.nil? || first_line.strip.empty?
  CSV.parse_line(first_line.strip)
end

def upload_csv_content(sync_config, records)

def upload_csv_content(sync_config, records)
  connection_config = sync_config.destination.connection_specification.with_indifferent_access
  conn = create_connection(connection_config)
  file_name = generate_local_file_name(connection_config)
  csv_content = generate_csv_content(records)
  begin
    @args = ["create", connection_config[:bucket_name], "#{connection_config[:file_path]}#{file_name}", csv_content]
    @response = conn.put_object(
      bucket: connection_config[:bucket_name],
      key: "#{connection_config[:file_path]}#{file_name}",
      body: csv_content
    )
    write_success = records.size
  rescue StandardError => e
    handle_exception(e, {
                       context: "AMAZONS3:RECORD:WRITE:EXCEPTION",
                       type: "error",
                       sync_id: sync_config.sync_id,
                       sync_run_id: sync_config.sync_run_id
                     })
    write_success = 0
  end
  write_success
end

def write(sync_config, records, _action = "destination_insert", _identifier_key = nil)

def write(sync_config, records, _action = "destination_insert", _identifier_key = nil)
  records_size = records.size
  log_message_array = []
  write_success = upload_csv_content(sync_config, records)
  write_failure = records_size - write_success
  log_message_array << log_request_response("info", @args, @response)
  tracking_message(write_success, write_failure, log_message_array)
rescue StandardError => e
  handle_exception(e, {
                     context: "AMAZONS3:WRITE:EXCEPTION",
                     type: "error",
                     sync_id: sync_config.sync_id,
                     sync_run_id: sync_config.sync_run_id
                   })
end