module Fluent::PluginHelper::EventLoop
def after_shutdown
def after_shutdown timeout_at = Fluent::Clock.now + EVENT_LOOP_SHUTDOWN_TIMEOUT @_event_loop_mutex.synchronize do @_event_loop.watchers.reverse.each do |w| begin w.detach rescue => e log.warn "unexpected error while detaching event loop watcher", error: e end end end while @_event_loop_running if Fluent::Clock.now >= timeout_at log.warn "event loop does NOT exit until hard timeout." raise "event loop does NOT exit until hard timeout." if @under_plugin_development break end sleep 0.1 end super end
def close
def close if @_event_loop_running begin @_event_loop.stop # we cannot check loop is running or not rescue RuntimeError => e raise unless e.message == 'loop not running' end end super end
def event_loop_attach(watcher)
def event_loop_attach(watcher) @_event_loop_mutex.synchronize do @_event_loop.attach(watcher) @_event_loop_attached_watchers << watcher watcher end end
def event_loop_detach(watcher)
def event_loop_detach(watcher) if watcher.attached? watcher.detach end @_event_loop_mutex.synchronize do @_event_loop_attached_watchers.delete(watcher) end end
def event_loop_running?
def event_loop_running? @_event_loop_running end
def event_loop_wait_until_start
def event_loop_wait_until_start sleep(0.1) until event_loop_running? end
def event_loop_wait_until_stop
def event_loop_wait_until_stop timeout_at = Fluent::Clock.now + EVENT_LOOP_SHUTDOWN_TIMEOUT sleep(0.1) while event_loop_running? && Fluent::Clock.now < timeout_at if @_event_loop_running puts "terminating event_loop forcedly" caller.each{|bt| puts "\t#{bt}" } @_event_loop.stop rescue nil @_event_loop_running = true end end
def initialize
def initialize super @_event_loop = Coolio::Loop.new @_event_loop_running = false @_event_loop_mutex = Mutex.new # plugin MAY configure loop run timeout in #configure @_event_loop_run_timeout = EVENT_LOOP_RUN_DEFAULT_TIMEOUT @_event_loop_attached_watchers = [] end
def shutdown
def shutdown @_event_loop_mutex.synchronize do @_event_loop_attached_watchers.reverse.each do |w| if w.attached? begin w.detach rescue => e log.warn "unexpected error while detaching event loop watcher", error: e end end end end super end
def start
def start super # event loop does not run here, so mutex lock is not required thread_create :event_loop do begin default_watcher = DefaultWatcher.new event_loop_attach(default_watcher) @_event_loop_running = true @_event_loop.run(@_event_loop_run_timeout) # this method blocks ensure @_event_loop_running = false end end end
def terminate
def terminate @_event_loop = nil @_event_loop_running = false @_event_loop_mutex = nil @_event_loop_run_timeout = nil super end