class Fluent::ForwardOutput

def configure(conf)

def configure(conf)
  super
  # backward compatibility
  if host = conf['host']
    $log.warn "'host' option in forward output is obsoleted. Use '<server> host xxx </server>' instead."
    port = conf['port']
    port = port ? port.to_i : DEFAULT_LISTEN_PORT
    e = conf.add_element('server')
    e['host'] = host
    e['port'] = port.to_s
  end
  recover_sample_size = @recover_wait / @heartbeat_interval
  conf.elements.each {|e|
    next if e.name != "server"
    host = e['host']
    port = e['port']
    port = port ? port.to_i : DEFAULT_LISTEN_PORT
    weight = e['weight']
    weight = weight ? weight.to_i : 60
    standby = !!e['standby']
    name = e['name']
    unless name
      name = "#{host}:#{port}"
    end
    failure = FailureDetector.new(@heartbeat_interval, @hard_timeout, Time.now.to_i.to_f)
    sockaddr = Socket.pack_sockaddr_in(port, host)
    port, host = Socket.unpack_sockaddr_in(sockaddr)
    @nodes[sockaddr] = Node.new(name, host, port, weight, standby, failure,
                                @phi_threshold, recover_sample_size)
    $log.info "adding forwarding server '#{name}'", :host=>host, :port=>port, :weight=>weight
  }
end

def connect(node)

def connect(node)
  # TODO unix socket?
  TCPSocket.new(node.host, node.port)
end

def emit(tag, es, chain)

override BufferedOutput#emit
def emit(tag, es, chain)
  data = es.to_msgpack_stream
  if @buffer.emit(tag, data, chain)  # use key = tag
    submit_flush
  end
end

def initialize

def initialize
  super
  require 'socket'
  require 'fileutils'
  @nodes = {}  #=> {sockaddr => Node}
end

def on_heartbeat(sockaddr, msg)

def on_heartbeat(sockaddr, msg)
  if node = @nodes[sockaddr]
    #$log.trace "heartbeat from '#{node.name}'", :host=>node.host, :port=>node.port
    if node.heartbeat
      rebuild_weight_array
    end
  end
end

def on_timer

def on_timer
  return if @finished
  @nodes.each_pair {|sockaddr,n|
    if n.tick
      rebuild_weight_array
    end
    begin
      #$log.trace "sending heartbeat #{n.host}:#{n.port}"
      @usock.send "", 0, sockaddr
    rescue
      # TODO log
      $log.debug "failed to send heartbeat packet to #{Socket.unpack_sockaddr_in(sockaddr).reverse.join(':')}", :error=>$!.to_s
    end
  }
end

def rebuild_weight_array

def rebuild_weight_array
  standby_nodes, regular_nodes = @nodes.values.partition {|n|
    n.standby?
  }
  lost_weight = 0
  regular_nodes.each {|n|
    unless n.available?
      lost_weight += n.weight
    end
  }
  $log.debug "rebuilding weight array", :lost_weight=>lost_weight
  if lost_weight > 0
    standby_nodes.each {|n|
      if n.available?
        regular_nodes << n
        $log.info "using standby node #{n.host}:#{n.port}", :weight=>n.weight
        lost_weight -= n.weight
        break if lost_weight <= 0
      end
    }
  end
  weight_array = []
  gcd = regular_nodes.map {|n| n.weight }.inject(0) {|r,w| r.gcd(w) }
  regular_nodes.each {|n|
    (n.weight / gcd).times {
      weight_array << n
    }
  }
  r = Random.new(@rand_seed)
  weight_array.sort_by! { r.rand }
  @weight_array = weight_array
end

def run

def run
  @loop.run
rescue
  $log.error "unexpected error", :error=>$!.to_s
  $log.error_backtrace
end

def send_data(node, tag, es)

def send_data(node, tag, es)
  sock = connect(node)
  begin
    opt = [1, @send_timeout.to_i].pack('I!I!')  # { int l_onoff; int l_linger; }
    sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt)
    opt = [@send_timeout.to_i, 0].pack('L!L!')  # struct timeval
    sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, opt)
    # beginArray(2)
    sock.write FORWARD_HEADER
    # writeRaw(tag)
    sock.write tag.to_msgpack  # tag
    # beginRaw(size)
    sz = es.size
    #if sz < 32
    #  # FixRaw
    #  sock.write [0xa0 | sz].pack('C')
    #elsif sz < 65536
    #  # raw 16
    #  sock.write [0xda, sz].pack('Cn')
    #else
      # raw 32
      sock.write [0xdb, sz].pack('CN')
    #end
    # writeRawBody(packed_es)
    es.write_to(sock)
  ensure
    sock.close
  end
end

def shutdown

def shutdown
  @finished = true
  @loop.watchers.each {|w| w.detach }
  @loop.stop
  @thread.join
  @usock.close
end

def start

def start
  super
  @rand_seed = Random.new.seed
  rebuild_weight_array
  @rr = 0
  @loop = Coolio::Loop.new
  @usock = UDPSocket.new
  @hb = HeartbeatHandler.new(@usock, method(:on_heartbeat))
  @loop.attach(@hb)
  @timer = HeartbeatRequestTimer.new(@heartbeat_interval, method(:on_timer))
  @loop.attach(@timer)
  @thread = Thread.new(&method(:run))
end

def write_objects(tag, es)

def write_objects(tag, es)
  wlen = @weight_array.length
  wlen.times do
    node = @weight_array[@rr]
    @rr = (@rr + 1) % wlen
    if node.available?
      send_data(node, tag, es)
      return
    end
  end
  raise "no nodes are available"  # TODO message
end