# frozen_string_literal: trueclassRedisclassSubscribedClientdefinitialize(client)@client=client@write_monitor=Monitor.newenddefcall_v(command)@write_monitor.synchronizedo@client.call_v(command)endenddefsubscribe(*channels,&block)subscription("subscribe","unsubscribe",channels,block)enddefsubscribe_with_timeout(timeout,*channels,&block)subscription("subscribe","unsubscribe",channels,block,timeout)enddefpsubscribe(*channels,&block)subscription("psubscribe","punsubscribe",channels,block)enddefpsubscribe_with_timeout(timeout,*channels,&block)subscription("psubscribe","punsubscribe",channels,block,timeout)enddefssubscribe(*channels,&block)subscription("ssubscribe","sunsubscribe",channels,block)enddefssubscribe_with_timeout(timeout,*channels,&block)subscription("ssubscribe","sunsubscribe",channels,block,timeout)enddefunsubscribe(*channels)call_v([:unsubscribe,*channels])enddefpunsubscribe(*channels)call_v([:punsubscribe,*channels])enddefsunsubscribe(*channels)call_v([:sunsubscribe,*channels])enddefclose@client.closeendprotecteddefsubscription(start,stop,channels,block,timeout=0)sub=Subscription.new(&block)casestartwhen"ssubscribe"thenchannels.each{|c|call_v([start,c])}# avoid cross-slot keyselsecall_v([start,*channels])endwhileevent=@client.next_event(timeout)ifevent.is_a?(::RedisClient::CommandError)raiseClient::ERROR_MAPPING.fetch(event.class),event.messageendtype,*rest=eventifcallback=sub.callbacks[type]callback.call(*rest)endbreakiftype==stop&&rest.last==0end# No need to unsubscribe here. The real client closes the connection# whenever an exception is raised (see #ensure_connected).endendclassSubscriptionattr:callbacksdefinitialize@callbacks={}yield(self)enddefsubscribe(&block)@callbacks["subscribe"]=blockenddefunsubscribe(&block)@callbacks["unsubscribe"]=blockenddefmessage(&block)@callbacks["message"]=blockenddefpsubscribe(&block)@callbacks["psubscribe"]=blockenddefpunsubscribe(&block)@callbacks["punsubscribe"]=blockenddefpmessage(&block)@callbacks["pmessage"]=blockenddefssubscribe(&block)@callbacks["ssubscribe"]=blockenddefsunsubscribe(&block)@callbacks["sunsubscribe"]=blockenddefsmessage(&block)@callbacks["smessage"]=blockendendend