class EventMachine::Reactor
@private
def add_selectable io
def add_selectable io @selectables[io.uuid] = io end
def close_loopbreaker
def close_loopbreaker @loopbreak_writer.close @loopbreak_writer = nil end
def crank_selectables
def crank_selectables #$stderr.write 'R' readers = @selectables.values.select {|io| io.select_for_reading?} writers = @selectables.values.select {|io| io.select_for_writing?} s = select( readers, writers, nil, @timer_quantum) s and s[1] and s[1].each {|w| w.eventable_write } s and s[0] and s[0].each {|r| r.eventable_read } @selectables.delete_if {|k,io| if io.close_scheduled? io.close begin EventMachine::event_callback io.uuid, ConnectionUnbound, nil rescue ConnectionNotBound; end true end } end
def get_selectable uuid
def get_selectable uuid @selectables[uuid] end
def initialize
def initialize initialize_for_run end
def initialize_for_run
- Private: -
def initialize_for_run @running = false @stop_scheduled = false @selectables ||= {}; @selectables.clear @timers = SortedSet.new # [] set_timer_quantum(0.1) @current_loop_time = Time.now @next_heartbeat = @current_loop_time + HeartbeatInterval end
def install_oneshot_timer interval
def install_oneshot_timer interval uuid = UuidGenerator::generate #@timers << [Time.now + interval, uuid] #@timers.sort! {|a,b| a.first <=> b.first} @timers.add([Time.now + interval, uuid]) uuid end
def open_loopbreaker
def open_loopbreaker # Can't use an IO.pipe because they can't be set nonselectable in Windows. # Pick a random localhost UDP port. #@loopbreak_writer.close if @loopbreak_writer #rd,@loopbreak_writer = IO.pipe @loopbreak_reader = UDPSocket.new @loopbreak_writer = UDPSocket.new bound = false 100.times { @loopbreak_port = rand(10000) + 40000 begin @loopbreak_reader.bind "127.0.0.1", @loopbreak_port bound = true break rescue end } raise "Unable to bind Loopbreaker" unless bound LoopbreakReader.new(@loopbreak_reader) end
def run
def run raise Error.new( "already running" ) if @running @running = true begin open_loopbreaker loop { @current_loop_time = Time.now break if @stop_scheduled run_timers break if @stop_scheduled crank_selectables break if @stop_scheduled run_heartbeats } ensure close_loopbreaker @selectables.each {|k, io| io.close} @selectables.clear @running = false end end
def run_heartbeats
def run_heartbeats if @next_heartbeat <= @current_loop_time @next_heartbeat = @current_loop_time + HeartbeatInterval @selectables.each {|k,io| io.heartbeat} end end
def run_timers
def run_timers @timers.each {|t| if t.first <= @current_loop_time @timers.delete t EventMachine::event_callback "", TimerFired, t.last else break end } #while @timers.length > 0 and @timers.first.first <= now # t = @timers.shift # EventMachine::event_callback "", TimerFired, t.last #end end
def set_timer_quantum interval_in_seconds
def set_timer_quantum interval_in_seconds @timer_quantum = interval_in_seconds end
def signal_loopbreak
def signal_loopbreak begin @loopbreak_writer.send('+',0,"127.0.0.1",@loopbreak_port) if @loopbreak_writer rescue IOError; end end
def stop
def stop raise Error.new( "not running") unless @running @stop_scheduled = true end