class Rufus::Scheduler::EmScheduler


loop.
A rufus-scheduler that uses an EventMachine periodic timer instead of a

def initialize(opts={})

def initialize(opts={})
  raise LoadError.new(
    'EventMachine missing, "require \'eventmachine\'" might help'
  ) unless defined?(EM)
  super(opts)
end

def join


underlying EventMachine.
Joins this scheduler. Will actually join it only if it started the
def join
  @em_thread.join if @em_thread
end

def start

def start
  @em_thread = nil
  unless EM.reactor_running?
    @em_thread = Thread.new { EM.run }
    while (not EM.reactor_running?)
      Thread.pass
    end
  end
  #unless EM.reactor_running?
  #  t = Thread.current
  #  @em_thread = Thread.new { EM.run { t.wakeup } }
  #  Thread.stop # EM will wake us up when it's ready
  #end
  @timer = EM::PeriodicTimer.new(@frequency) { step }
end

def stop(opts={})


EventMachine (but only if it started the EM by itself !).
If the :stop_em option is passed and set to true, it will stop the

Stops the scheduler.
def stop(opts={})
  @timer.cancel
  EM.stop if opts[:stop_em] and @em_thread
end

def trigger_job(params, &block)


'next_tick'. Else the block will get called via 'defer' (own thread).
If 'blocking' is set to true, the block will get called at the
def trigger_job(params, &block)
  # :next_tick monopolizes the EM
  # :defer executes its block in another thread
  # (if I read the doc carefully...)
  if params[:blocking]
    EM.next_tick { block.call }
  elsif m = params[:mutex]
    m = (@mutexes[m.to_s] ||= Mutex.new) unless m.is_a?(Mutex)
    EM.defer { m.synchronize { block.call } }
  else
    EM.defer { block.call }
  end
end