class Multiwoven::Integrations::Source::Sftp::Client
def query(conn, query)
def query(conn, query) query_regex = /\ASELECT\s+(?<columns>[\w,\s]+)\s+FROM\s+\w+\s*(?:LIMIT\s+(?<limit>\d+))?\s*(?:OFFSET\s+(?<offset>\d+))?\z/i match = query.match(query_regex) columns = match[:columns] || "*" offset = match[:offset].to_i || 0 limit = match[:limit]&.to_i || nil @sftp.download!(@remote_file_path, @tempfile.path) adjusted_query = "SELECT #{columns} FROM read_csv_auto('#{@tempfile.path}') OFFSET #{offset}" adjusted_query += " LIMIT #{limit}" if limit records = get_results(conn, adjusted_query) records.map do |row| RecordMessage.new(data: row, emitted_at: Time.now.to_i).to_multiwoven_message end end