class Fluent::FileBuffer
def configure(conf)
def configure(conf) super if pos = @buffer_path.index('*') @buffer_path_prefix = @buffer_path[0,pos] @buffer_path_suffix = @buffer_path[pos+1..-1] else @buffer_path_prefix = @buffer_path+"." @buffer_path_suffix = ".log" end end
def decode_key(encoded_key)
def decode_key(encoded_key) URI.unescape(encoded_key) end
def encode_key(key)
def encode_key(key) URI.escape(key, /[^-_.a-zA-Z0-9]/n) end
def enqueue(chunk)
def enqueue(chunk) path = chunk.path mp = path[@buffer_path_prefix.length..-(@buffer_path_suffix.length+1)] m = PATH_MATCH.match(mp) encoded_key = m ? m[1] : "" npath = make_path(encoded_key, "q") chunk.mv(npath) end
def initialize
def initialize require 'uri' super end
def make_path(encoded_key, bq)
def make_path(encoded_key, bq) now = Time.now.utc tsuffix = ((now.to_i*1000*1000+now.usec) << 12 | rand(0xfff)).to_s(16) "#{@buffer_path_prefix}#{encoded_key}.#{bq}#{tsuffix}#{@buffer_path_suffix}" end
def new_chunk(key)
def new_chunk(key) encoded_key = encode_key(key) path = make_path(encoded_key, "b") FileBufferChunk.new(key, path) end
def resume
def resume maps = [] queues = [] Dir.glob("#{@buffer_path_prefix}*#{@buffer_path_suffix}") {|path| match = path[@buffer_path_prefix.length..-(@buffer_path_suffix.length+1)] if m = PATH_MATCH.match(match) key = decode_key(m[1]) bq = m[2] tsuffix = m[3].to_i(16) if bq == 'b' chunk = FileBufferChunk.new(key, path, "a+") maps << [tsuffix, chunk] elsif bq == 'q' chunk = FileBufferChunk.new(key, path, "r") queues << [tsuffix, chunk] end end } map = {} maps.sort_by {|(tsuffix,chunk)| tsuffix }.each {|(tsuffix,chunk)| map[chunk.key] = chunk } queue = queues.sort_by {|(tsuffix,chunk)| tsuffix }.map {|(tsuffix,chunk)| chunk } return queue, map end
def start
def start FileUtils.mkdir_p File.dirname(@buffer_path_prefix+"path") super end