lib/redis/pipeline.rb
class Redis unless defined?(::BasicObject) class BasicObject instance_methods.each { |meth| undef_method(meth) unless meth =~ /\A(__|instance_eval)/ } end end class Pipeline attr :futures def initialize @without_reconnect = false @shutdown = false @futures = [] end def without_reconnect? @without_reconnect end def shutdown? @shutdown end def call(command, &block) # A pipeline that contains a shutdown should not raise ECONNRESET when # the connection is gone. @shutdown = true if command.first == :shutdown future = Future.new(command, block) @futures << future future end def call_pipeline(pipeline) @shutdown = true if pipeline.shutdown? @futures.concat(pipeline.futures) nil end def commands @futures.map { |f| f._command } end def without_reconnect(&block) @without_reconnect = true yield end def finish(replies) futures.each_with_index.map do |future, i| future._set(replies[i]) end end class Multi < self def finish(replies) return if replies.last.nil? # The transaction failed because of WATCH. if replies.last.size < futures.size - 2 # Some command wasn't recognized by Redis. raise replies.detect { |r| r.kind_of?(::Exception) } end super(replies.last) end def commands [[:multi]] + super + [[:exec]] end end end class FutureNotReady < RuntimeError def initialize super("Value will be available once the pipeline executes.") end end class Future < BasicObject FutureNotReady = ::Redis::FutureNotReady.new def initialize(command, transformation) @command = command @transformation = transformation @object = FutureNotReady end def inspect "<Redis::Future #{@command.inspect}>" end def _set(object) @object = @transformation ? @transformation.call(object) : object value end def _command @command end def value ::Kernel.raise(@object) if @object.kind_of?(::Exception) @object end end end