class Multiwoven::Integrations::Source::Sftp::Client

def query(conn, query)

def query(conn, query)
  query_regex = /\ASELECT\s+(?<columns>[\w,\s]+)\s+FROM\s+\w+\s*(?:LIMIT\s+(?<limit>\d+))?\s*(?:OFFSET\s+(?<offset>\d+))?\z/i
  match = query.match(query_regex)
  columns = match[:columns] || "*"
  offset = match[:offset].to_i || 0
  limit = match[:limit]&.to_i || nil
  @sftp.download!(@remote_file_path, @tempfile.path)
  adjusted_query = "SELECT #{columns} FROM read_csv_auto('#{@tempfile.path}') OFFSET #{offset}"
  adjusted_query += " LIMIT #{limit}" if limit
  records = get_results(conn, adjusted_query)
  records.map do |row|
    RecordMessage.new(data: row, emitted_at: Time.now.to_i).to_multiwoven_message
  end
end