class Fluent::Plugin::HttpInput
def close
def close server_wait_until_stop super end
def configure(conf)
def configure(conf) compat_parameters_convert(conf, :parser) super m = if @parser_configs.first['@type'] == 'in_http' @parser_msgpack = parser_create(usage: 'parser_in_http_msgpack', type: 'msgpack') @parser_msgpack.estimate_current_event = false @parser_json = parser_create(usage: 'parser_in_http_json', type: 'json') @parser_json.estimate_current_event = false @format_name = 'default' @parser_time_key = if parser_config = conf.elements('parse').first parser_config['time_key'] || 'time' else 'time' end method(:parse_params_default) else @parser = parser_create @format_name = @parser_configs.first['@type'] @parser_time_key = @parser.time_key method(:parse_params_with_parser) end self.singleton_class.module_eval do define_method(:parse_params, m) end end
def multi_workers_ready?
def multi_workers_ready? true end
def on_request(path_info, params)
def on_request(path_info, params) begin path = path_info[1..-1] # remove / tag = path.split('/').join('.') record_time, record = parse_params(params) # Skip nil record if record.nil? log.debug { "incoming event is invalid: path=#{path_info} params=#{params.to_json}" } if @respond_with_empty_img return ["200 OK", {'Content-Type'=>'image/gif; charset=utf-8'}, EMPTY_GIF_IMAGE] else return ["200 OK", {'Content-Type'=>'text/plain'}, ""] end end unless record.is_a?(Array) if @add_http_headers params.each_pair { |k,v| if k.start_with?("HTTP_") record[k] = v end } end if @add_remote_addr record['REMOTE_ADDR'] = params['REMOTE_ADDR'] end end time = if param_time = params['time'] param_time = param_time.to_f param_time.zero? ? Fluent::Engine.now : @float_time_parser.parse(param_time) else record_time.nil? ? Fluent::Engine.now : record_time end rescue return ["400 Bad Request", {'Content-Type'=>'text/plain'}, "400 Bad Request\n#{$!}\n"] end # TODO server error begin # Support batched requests if record.is_a?(Array) mes = Fluent::MultiEventStream.new record.each do |single_record| if @add_http_headers params.each_pair { |k,v| if k.start_with?("HTTP_") single_record[k] = v end } end if @add_remote_addr single_record['REMOTE_ADDR'] = params['REMOTE_ADDR'] end if defined? @parser single_time = @parser.parse_time(single_record) single_time, single_record = @parser.convert_values(single_time, single_record) else single_time = if t = single_record.delete(@parser_time_key) Fluent::EventTime.from_time(Time.at(t)) else time end end mes.add(single_time, single_record) end router.emit_stream(tag, mes) else router.emit(tag, time, record) end rescue return ["500 Internal Server Error", {'Content-Type'=>'text/plain'}, "500 Internal Server Error\n#{$!}\n"] end if @respond_with_empty_img return ["200 OK", {'Content-Type'=>'image/gif; charset=utf-8'}, EMPTY_GIF_IMAGE] else return ["200 OK", {'Content-Type'=>'text/plain'}, ""] end end
def on_server_connect(conn)
def on_server_connect(conn) handler = Handler.new(conn, @km, method(:on_request), @body_size_limit, @format_name, log, @cors_allow_origins) conn.on(:data) do |data| handler.on_read(data) end conn.on(:write_complete) do |_| handler.on_write_complete end conn.on(:close) do |_| handler.on_close end end
def parse_params_default(params)
def parse_params_default(params) if msgpack = params['msgpack'] @parser_msgpack.parse(msgpack) do |_time, record| return nil, record end elsif js = params['json'] @parser_json.parse(js) do |_time, record| return nil, record end else raise "'json' or 'msgpack' parameter is required" end end
def parse_params_with_parser(params)
def parse_params_with_parser(params) if content = params[EVENT_RECORD_PARAMETER] @parser.parse(content) { |time, record| raise "Received event is not #{@format_name}: #{content}" if record.nil? return time, record } else raise "'#{EVENT_RECORD_PARAMETER}' parameter is required" end end
def start
def start @_event_loop_run_timeout = @blocking_timeout super log.debug "listening http", bind: @bind, port: @port @km = KeepaliveManager.new(@keepalive_timeout) event_loop_attach(@km) server_create_connection(:in_http, @port, bind: @bind, backlog: @backlog, &method(:on_server_connect)) @float_time_parser = Fluent::NumericTimeParser.new(:float) end