require'mutex_m'require'concurrent/map'moduleActiveSupportmoduleNotifications# This is a default queue implementation that ships with Notifications.# It just pushes events to all registered log subscribers.## This class is thread safe. All methods are reentrant.classFanoutincludeMutex_mdefinitialize@subscribers=[]@listeners_for=Concurrent::Map.newsuperenddefsubscribe(pattern=nil,block=Proc.new)subscriber=Subscribers.newpattern,blocksynchronizedo@subscribers<<subscriber@listeners_for.clearendsubscriberenddefunsubscribe(subscriber_or_name)synchronizedocasesubscriber_or_namewhenString@subscribers.reject!{|s|s.matches?(subscriber_or_name)}else@subscribers.delete(subscriber_or_name)end@listeners_for.clearendenddefstart(name,id,payload)listeners_for(name).each{|s|s.start(name,id,payload)}enddeffinish(name,id,payload,listeners=listeners_for(name))listeners.each{|s|s.finish(name,id,payload)}enddefpublish(name,*args)listeners_for(name).each{|s|s.publish(name,*args)}enddeflisteners_for(name)# this is correctly done double-checked locking (Concurrent::Map's lookups have volatile semantics)@listeners_for[name]||synchronizedo# use synchronisation when accessing @subscribers@listeners_for[name]||=@subscribers.select{|s|s.subscribed_to?(name)}endenddeflistening?(name)listeners_for(name).any?end# This is a sync queue, so there is no waiting.defwaitendmoduleSubscribers# :nodoc:defself.new(pattern,listener)iflistener.respond_to?(:start)andlistener.respond_to?(:finish)subscriber=Evented.newpattern,listenerelsesubscriber=Timed.newpattern,listenerendunlesspatternAllMessages.new(subscriber)elsesubscriberendendclassEvented#:nodoc:definitialize(pattern,delegate)@pattern=pattern@delegate=delegate@can_publish=delegate.respond_to?(:publish)enddefpublish(name,*args)if@can_publish@delegate.publishname,*argsendenddefstart(name,id,payload)@delegate.startname,id,payloadenddeffinish(name,id,payload)@delegate.finishname,id,payloadenddefsubscribed_to?(name)@pattern===nameenddefmatches?(name)@pattern&&@pattern===nameendendclassTimed<Evented# :nodoc:defpublish(name,*args)@delegate.callname,*argsenddefstart(name,id,payload)timestack=Thread.current[:_timestack]||=[]timestack.pushTime.nowenddeffinish(name,id,payload)timestack=Thread.current[:_timestack]started=timestack.pop@delegate.call(name,started,Time.now,id,payload)endendclassAllMessages# :nodoc:definitialize(delegate)@delegate=delegateenddefstart(name,id,payload)@delegate.startname,id,payloadenddeffinish(name,id,payload)@delegate.finishname,id,payloadenddefpublish(name,*args)@delegate.publishname,*argsenddefsubscribed_to?(name)trueendalias:matches?:===endendendendend