class Redis::SubscribedClient
def call_v(command)
def call_v(command) @write_monitor.synchronize do @client.call_v(command) end end
def close
def close @client.close end
def initialize(client)
def initialize(client) @client = client @write_monitor = Monitor.new end
def psubscribe(*channels, &block)
def psubscribe(*channels, &block) subscription("psubscribe", "punsubscribe", channels, block) end
def psubscribe_with_timeout(timeout, *channels, &block)
def psubscribe_with_timeout(timeout, *channels, &block) subscription("psubscribe", "punsubscribe", channels, block, timeout) end
def punsubscribe(*channels)
def punsubscribe(*channels) call_v([:punsubscribe, *channels]) end
def ssubscribe(*channels, &block)
def ssubscribe(*channels, &block) subscription("ssubscribe", "sunsubscribe", channels, block) end
def ssubscribe_with_timeout(timeout, *channels, &block)
def ssubscribe_with_timeout(timeout, *channels, &block) subscription("ssubscribe", "sunsubscribe", channels, block, timeout) end
def subscribe(*channels, &block)
def subscribe(*channels, &block) subscription("subscribe", "unsubscribe", channels, block) end
def subscribe_with_timeout(timeout, *channels, &block)
def subscribe_with_timeout(timeout, *channels, &block) subscription("subscribe", "unsubscribe", channels, block, timeout) end
def subscription(start, stop, channels, block, timeout = 0)
def subscription(start, stop, channels, block, timeout = 0) sub = Subscription.new(&block) case start when "ssubscribe" then channels.each { |c| call_v([start, c]) } # avoid cross-slot keys else call_v([start, *channels]) end while event = @client.next_event(timeout) if event.is_a?(::RedisClient::CommandError) raise Client::ERROR_MAPPING.fetch(event.class), event.message end type, *rest = event if callback = sub.callbacks[type] callback.call(*rest) end break if type == stop && rest.last == 0 end # No need to unsubscribe here. The real client closes the connection # whenever an exception is raised (see #ensure_connected). end
def sunsubscribe(*channels)
def sunsubscribe(*channels) call_v([:sunsubscribe, *channels]) end
def unsubscribe(*channels)
def unsubscribe(*channels) call_v([:unsubscribe, *channels]) end