lib/fluent/event.rb



#
# Fluent
#
# Copyright (C) 2011 FURUHASHI Sadayuki
#
#    Licensed under the Apache License, Version 2.0 (the "License");
#    you may not use this file except in compliance with the License.
#    You may obtain a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#
module Fluent


class EventStream
  include Enumerable

  def repeatable?
    false
  end

  #def each(&block)
  #end

  def to_msgpack_stream
    out = ''
    each {|time,record|
      [time,record].to_msgpack(out)
    }
    out
  end
end


class OneEventStream < EventStream
  def initialize(time, record)
    @time = time
    @record = record
  end

  def repeatable?
    true
  end

  def each(&block)
    block.call(@time, @record)
    nil
  end
end


class ArrayEventStream < EventStream
  def initialize(entries)
    @entries = entries
  end

  def repeatable?
    true
  end

  def empty?
    @time_array.empty?
  end

  def each(&block)
    @entries.each(&block)
    nil
  end

  #attr_reader :entries
  #
  #def to_a
  #  @entries
  #end
end


class MultiEventStream < EventStream
  def initialize
    @time_array = []
    @record_array = []
  end

  def add(time, record)
    @time_array << time
    @record_array << record
  end

  def repeatable?
    true
  end

  def empty?
    @time_array.empty?
  end

  def each(&block)
    time_array = @time_array
    record_array = @record_array
    for i in 0..time_array.length-1
      block.call(time_array[i], record_array[i])
    end
    nil
  end
end


class MessagePackEventStream < EventStream
  def initialize(data, cached_unpacker=nil)
    @data = data
    @unpacker = cached_unpacker || MessagePack::Unpacker.new
  end

  def repeatable?
    true
  end

  def each(&block)
    @unpacker.reset
    # TODO format check
    @unpacker.feed_each(@data, &block)
    nil
  end

  def to_msgpack_stream
    @data
  end
end


#class IoEventStream < EventStream
#  def initialize(io, num)
#    @io = io
#    @num = num
#    @u = MessagePack::Unpacker.new(@io)
#  end
#
#  def repeatable?
#    false
#  end
#
#  def each(&block)
#    return nil if @num == 0
#    @u.each {|obj|
#      block.call(obj[0], obj[1])
#      break if @array.size >= @num
#    }
#  end
#end


end