# shareable_constant_value: literal# Ruby 2.0+ backport of `Ractor` class# Extra private methods and instance variables all start with `ractor_`moduleBackportsclassRactorrequire_relative'../tools/arguments'require_relative'cloner'require_relative'errors'require_relative'queues'require_relative'sharing'RactorThreadGroups=::ObjectSpace::WeakMap.new# ThreadGroup => Ractorprivate_constant:RactorThreadGroups# Implementation notes## Uses one `Thread` for each `Ractor`, as well as queues for communication## The incoming queue is strict: contrary to standard queue, you can't pop from an empty closed queue.# Since standard queues return `nil` is those conditions, we wrap/unwrap `nil` values and consider# all `nil` values to be results of closed queues. `ClosedQueueError` are re-raised as `Ractor::ClosedError`## The outgoing queue is strict and blocking. Same wrapping / raising as incoming,# with an extra queue to acknowledge when a value has been read (or if the port is closed while waiting).## The last result is a bit tricky as it needs to be pushed on the outgoing queue but can not be blocking.# For this, we "soft close" the outgoing port.definitialize(*args,&block)@ractor_incoming_queue=IncomingQueue.new@ractor_outgoing_queue=OutgoingQueue.newraise::ArgumentError,'must be called with a block'unlessblockkw=args.lastifkw.is_a?(::Hash)&&kw.size==1&&kw.key?(:name)args.popname=kw[:name]end@ractor_name=name&&Backports.coerce_to_str(name)@id=Ractor.ractor_next_idifRactor.main==nil# then initializing main Ractor@ractor_thread=::Thread.current@ractor_origin=nil@ractor_thread.thread_variable_set(:backports_ractor,self)else@ractor_origin=caller(1,1).first.split(':in `').firstargs.map!{|a|Ractor.ractor_isolate(a,false)}ractor_thread_start(args,block)endendprivatedefractor_thread_start(args,block)::Thread.newdo@ractor_thread=::Thread.current@ractor_thread_group=::ThreadGroup.newRactorThreadGroups[@ractor_thread_group]=self@ractor_thread_group.add(@ractor_thread)::Thread.current.thread_variable_set(:backports_ractor,self)result=nilbeginresult=instance_exec(*args,&block)rescue::Exception=>err# rubocop:disable Lint/RescueExceptionbeginraiseRemoteError,"thrown by remote Ractor: #{err.message}"rescueRemoteError=>e# Hack to create exception with `cause`result=OutgoingQueue::WrappedException.new(e)endensureractor_thread_terminate(result)endendendprivatedefractor_thread_terminate(result)beginractor_outgoing_queue.push(result,ack: false)unlessractor_outgoing_queue.closed?rescue::ClosedQueueErrorreturn# ignoreendractor_incoming_queue.closeractor_outgoing_queue.close(:soft)ensure# TODO: synchronize?@ractor_thread_group.list.eachdo|thread|thread.killunlessthread==Thread.currentendenddefsend(obj,move: false)ractor_incoming_queue<<Ractor.ractor_isolate(obj,move)selfrescue::ClosedQueueErrorraiseClosedError,'The incoming-port is already closed'endalias_method:<<,:senddeftakeractor_outgoing_queue.pop(ack: true)enddefname@ractor_nameendRACTOR_STATE={'sleep'=>'blocking','run'=>'running','aborting'=>'aborting',false=>'terminated',nil=>'terminated',}.freezeprivate_constant:RACTOR_STATEdefinspectstate=RACTOR_STATE[@ractor_thread?@ractor_thread.status:'run']info=["Ractor:##{@id}",name,@ractor_origin,state,].compact.join(' ')"#<#{info}>"enddefclose_incomingr=ractor_incoming_queue.closed?ractor_incoming_queue.closerenddefclose_outgoingr=ractor_outgoing_queue.closed?ractor_outgoing_queue.closerendprivatedefreceiveractor_incoming_queue.popendprivatedefreceive_if(&block)raise::ArgumentError,'no block given'unlessblockractor_incoming_queue.pop(&block)enddef[](key)Ractor.current.ractor_locals[key]enddef[]=(key,value)Ractor.current.ractor_locals[key]=valueend# @api privatedefractor_locals@ractor_locals||={}.compare_by_identityendclass<<selfdefyield(value,move: false)value=ractor_isolate(value,move)current.ractor_outgoing_queue.push(value,ack: true)rescue::ClosedQueueErrorraiseClosedError,'The outgoing-port is already closed'enddefreceivecurrent.__send__(:receive)endalias_method:recv,:receivedefreceive_if(&block)current.__send__(:receive_if,&block)enddefselect(*ractors,yield_value: not_given=true,move: false)cur=Ractor.currentqueues=ractors.mapdo|r|r==cur?r.ractor_incoming_queue:r.ractor_outgoing_queueendif!not_givenout=current.ractor_outgoing_queueyield_value=ractor_isolate(yield_value,move)elsifractors.empty?raise::ArgumentError,'specify at least one ractor or `yield_value`'endwhiletrue# rubocop:disable Style/InfiniteLoop# Don't `loop`, in case of `ClosedError` (not that there should be any)queues.each_with_indexdo|q,i|q.pop_non_blockingdo|val|r=ractors[i]return[r==cur?:receive:r,val]endendifout&&out.num_waiting>0# Not quite atomic...out.push(yield_value,ack: true)return[:yield,nil]endsleep(0.001)endenddefmake_shareable(obj)returnobjifractor_check_shareability?(obj,true)raiseRactor::Error,'#freeze does not freeze object correctly'enddefshareable?(obj)ractor_check_shareability?(obj,false)enddefcurrent::Thread.current.thread_variable_get(:backports_ractor)||::Thread.current.thread_variable_set(:backports_ractor,ractor_find_current)enddefcount::ObjectSpace.each_object(Ractor).count(&:ractor_live?)end# @api privatedefractor_reset::ObjectSpace.each_object(Ractor).eachdo|r|nextifr==Ractor.currentnextunless(th=r.ractor_thread)th.killth.joinendRactor.current.ractor_incoming_queue.clearend# @api privatedefractor_next_id@id||=0@id+=1endattr_reader:mainprivatedefractor_init@ractor_shareable=::ObjectSpace::WeakMap.new@main=Ractor.new{nil}RactorThreadGroups[::ThreadGroup::Default]=@mainendprivatedefractor_find_currentRactorThreadGroups[Thread.current.group]endend# @api privatedefractor_live?!defined?(@ractor_thread)||# May happen if `count` is called from another thread before `initialize` has completed@ractor_thread.statusend# @api privateattr_reader:ractor_outgoing_queue,:ractor_incoming_queue,:ractor_threadractor_initendend