class Plumb::StreamClass

end
result.value # => [‘name’, 10]
result.valid? # => true
stream.each |result|
stream = csv_stream.parse(CSV.new(File.new(‘data.csv’)).to_enum)
csv_stream = Types::Stream[row]
row = Types::Tuple[String, Types::Lax::Integer]
Example:
A stream that validates each element.

def [](element_type)

Parameters:
  • element_type (Composable) -- the type of the elements in the stream
def [](element_type)
  self.class.new(element_type:)
end

def _inspect = "Stream[#{@element_type.inspect}]"

def _inspect = "Stream[#{@element_type.inspect}]"

def call(result)

Returns:
  • (Result::Valid, Result::Invalid) -

Parameters:
  • result (Result::Valid) --
def call(result)
  return result.invalid(errors: 'is not an Enumerable') unless result.value.respond_to?(:each)
  enum = Enumerator.new do |y|
    result.value.each do |e|
      y << @element_type.resolve(e)
    end
  end
  result.valid(enum)
end

def filtered

Returns:
  • (Step) - a step that resolves to an Enumerator that filters out invalid elements
def filtered
  self >> Step.new(nil, 'filtered') do |result|
    set = result.value.lazy.filter_map { |e| e.value if e.valid? }
    result.valid(set)
  end
end

def initialize(element_type: Types::Any)

Options Hash: (**element_type)
  • the (Composable) -- type of the elements in the stream
def initialize(element_type: Types::Any)
  @element_type = Composable.wrap(element_type)
  @children = [@element_type].freeze
  freeze
end