class Multiwoven::Integrations::Source::AmazonS3::Client

def build_discover_columns(describe_results)

def build_discover_columns(describe_results)
  describe_results.map do |row|
    type = column_schema_helper(row["column_type"])
    {
      column_name: row["column_name"],
      type: type
    }
  end
end

def build_path(connection_config)

def build_path(connection_config)
  path = connection_config[:path]
  path = "#{path}/" if path.to_s.strip.empty? || path[-1] != "/"
  "s3://#{connection_config[:bucket]}#{path}*.#{connection_config[:file_type]}"
end

def check_connection(connection_config)

def check_connection(connection_config)
  connection_config = connection_config.with_indifferent_access
  @session_name = "connection-#{connection_config[:region]}-#{connection_config[:bucket]}"
  if unstructured_data?(connection_config)
    create_s3_connection(connection_config)
    @s3_resource.bucket(connection_config[:bucket]).objects.limit(1).first
  else
    conn = create_connection(connection_config)
    path = build_path(connection_config)
    get_results(conn, "DESCRIBE SELECT * FROM '#{path}';")
  end
  ConnectionStatus.new(status: ConnectionStatusType["succeeded"]).to_multiwoven_message
rescue StandardError => e
  ConnectionStatus.new(status: ConnectionStatusType["failed"], message: e.message).to_multiwoven_message
end

def column_schema_helper(column_type)

def column_schema_helper(column_type)
  case column_type
  when "VARCHAR", "BIT", "DATE", "TIME", "TIMESTAMP", "UUID"
    "string"
  when "DOUBLE"
    "number"
  when "BIGINT", "HUGEINT", "INTEGER", "SMALLINT"
    "integer"
  when "BOOLEAN"
    "boolean"
  end
end

def create_connection(connection_config)

def create_connection(connection_config)
  # In the case when previewing a query
  @session_name = "preview-#{connection_config[:region]}-#{connection_config[:bucket]}" if @session_name.to_s.empty?
  auth_data = get_auth_data(connection_config)
  conn = DuckDB::Database.open.connect
  # Install and/or Load the HTTPFS extension
  conn.execute(INSTALL_HTTPFS_QUERY)
  # Set up S3 configuration
  secret_query = "
        CREATE SECRET amazons3_source (
        TYPE S3,
        KEY_ID '#{auth_data.credentials.access_key_id}',
        SECRET '#{auth_data.credentials.secret_access_key}',
        REGION '#{connection_config[:region]}',
        SESSION_TOKEN '#{auth_data.credentials.session_token}'
    );
  "
  get_results(conn, secret_query)
  conn
end

def create_s3_connection(connection_config)

def create_s3_connection(connection_config)
  connection_config = connection_config.with_indifferent_access
  # Get authentication credentials
  auth_data = get_auth_data(connection_config)
  # Create S3 resource for easier operations
  @s3_resource = Aws::S3::Resource.new(
    region: connection_config[:region],
    credentials: auth_data
  )
end

def discover(connection_config)

def discover(connection_config)
  connection_config = connection_config.with_indifferent_access
  @session_name = "discover-#{connection_config[:region]}-#{connection_config[:bucket]}"
  streams = if unstructured_data?(connection_config)
              [create_unstructured_stream]
            else
              conn = create_connection(connection_config)
              # If pulling from multiple files, all files must have the same schema
              path = build_path(connection_config)
              records = get_results(conn, "DESCRIBE SELECT * FROM '#{path}';")
              columns = build_discover_columns(records)
              [Multiwoven::Integrations::Protocol::Stream.new(name: path, action: StreamAction["fetch"], json_schema: convert_to_json_schema(columns))]
            end
  catalog = Catalog.new(streams: streams)
  catalog.to_multiwoven_message
rescue StandardError => e
  handle_exception(e, { context: "AMAZONS3:DISCOVER:EXCEPTION", type: "error" })
end

def download_file_to_local(bucket_name, file_path, sync_id)

def download_file_to_local(bucket_name, file_path, sync_id)
  download_path = ENV["FILE_DOWNLOAD_PATH"]
  file = if download_path
           File.join(download_path, "syncs", sync_id, File.basename(file_path))
         else
           Tempfile.new(["s3_file", "syncs", sync_id, File.extname(file_path)]).path
         end
  object = @s3_resource.bucket(bucket_name).object(file_path)
  object.get(response_target: file)
  [RecordMessage.new(
    data: {
      local_path: file,
      file_name: File.basename(file_path),
      file_path: file_path,
      size: object.content_length,
      file_type: File.extname(file_path).sub(".", ""),
      modified_date: object.last_modified.to_s,
      created_date: object.last_modified.to_s
    },
    emitted_at: Time.now.to_i
  ).to_multiwoven_message]
rescue Aws::S3::Errors::NoSuchKey
  raise "File not found: #{file_path}"
end

def get_auth_data(connection_config)

def get_auth_data(connection_config)
  session = @session_name.gsub(/\s+/, "-")
  @session_name = ""
  if connection_config[:auth_type] == "user"
    Aws::Credentials.new(connection_config[:access_id], connection_config[:secret_access])
  elsif connection_config[:auth_type] == "role"
    sts_client = Aws::STS::Client.new(region: connection_config[:region])
    resp = sts_client.assume_role({
                                    role_arn: connection_config[:arn],
                                    role_session_name: session,
                                    external_id: connection_config[:external_id]
                                  })
    Aws::Credentials.new(
      resp.credentials.access_key_id,
      resp.credentials.secret_access_key,
      resp.credentials.session_token
    )
  end
end

def get_results(conn, query)

def get_results(conn, query)
  results = conn.query(query)
  hash_array_values(results)
end

def handle_unstructured_data(sync_config)

def handle_unstructured_data(sync_config)
  connection_config = sync_config.source.connection_specification.with_indifferent_access
  bucket_name = connection_config[:bucket]
  command = sync_config.model.query.strip
  create_s3_connection(connection_config)
  case command
  when LIST_FILES_CMD
    list_files_in_folder(bucket_name, connection_config[:path] || "")
  when /^#{DOWNLOAD_FILE_CMD}\s+(.+)$/
    # Extract the file path and remove surrounding quotes if present
    file_path = ::Regexp.last_match(1).strip
    file_path = file_path.gsub(/^["']|["']$/, "") # Remove leading/trailing quotes
    download_file_to_local(bucket_name, file_path, sync_config.sync_id)
  else
    raise "Invalid command. Supported commands: #{LIST_FILES_CMD}, #{DOWNLOAD_FILE_CMD} <file_path>"
  end
end

def hash_array_values(describe)

def hash_array_values(describe)
  keys = describe.columns.map(&:name)
  describe.map do |row|
    Hash[keys.zip(row)]
  end
end

def list_files_in_folder(bucket_name, folder_path)

def list_files_in_folder(bucket_name, folder_path)
  folder_path = folder_path.end_with?("/") ? folder_path : "#{folder_path}/"
  bucket = @s3_resource.bucket(bucket_name)
  bucket.objects(prefix: folder_path).reject { |object| object.key == folder_path }.map do |object|
    RecordMessage.new(
      data: {
        file_name: File.basename(object.key),
        file_path: object.key,
        size: object.content_length,
        file_type: File.extname(object.key).sub(".", ""),
        created_date: object.last_modified.to_s,
        modified_date: object.last_modified.to_s
      },
      emitted_at: Time.now.to_i
    ).to_multiwoven_message
  end
end

def query(conn, query)

def query(conn, query)
  records = get_results(conn, query)
  records.map do |row|
    RecordMessage.new(data: row, emitted_at: Time.now.to_i).to_multiwoven_message
  end
end

def read(sync_config)

def read(sync_config)
  connection_config = sync_config.source.connection_specification.with_indifferent_access
  @session_name = "#{sync_config.sync_id}-#{sync_config.source.name}-#{sync_config.destination.name}"
  return handle_unstructured_data(sync_config) if unstructured_data?(connection_config)
  conn = create_connection(connection_config)
  query = sync_config.model.query
  query = batched_query(query, sync_config.limit, sync_config.offset) unless sync_config.limit.nil? && sync_config.offset.nil?
  query(conn, query)
rescue StandardError => e
  handle_exception(e, {
                     context: "AMAZONS3:READ:EXCEPTION",
                     type: "error",
                     sync_id: sync_config.sync_id,
                     sync_run_id: sync_config.sync_run_id
                   })
end