lib/msgpack/factory.rb



module MessagePack
  class Factory
    # see ext for other methods

    def register_type(type, klass, options = { packer: :to_msgpack_ext, unpacker: :from_msgpack_ext })
      raise FrozenError, "can't modify frozen MessagePack::Factory" if frozen?

      if options
        options = options.dup
        case packer = options[:packer]
        when nil, Proc
          # all good
        when String, Symbol
          options[:packer] = packer.to_sym.to_proc
        when Method
          options[:packer] = packer.to_proc
        when packer.respond_to?(:call)
          options[:packer] = packer.method(:call).to_proc
        else
          raise ::TypeError, "expected :packer argument to be a callable object, got: #{packer.inspect}"
        end

        case unpacker = options[:unpacker]
        when nil, Proc
          # all good
        when String, Symbol
          options[:unpacker] = klass.method(unpacker).to_proc
        when Method
          options[:unpacker] = unpacker.to_proc
        when packer.respond_to?(:call)
          options[:unpacker] = unpacker.method(:call).to_proc
        else
          raise ::TypeError, "expected :unpacker argument to be a callable object, got: #{unpacker.inspect}"
        end
      end

      register_type_internal(type, klass, options)
    end

    # [ {type: id, class: Class(or nil), packer: arg, unpacker: arg}, ... ]
    def registered_types(selector=:both)
      packer, unpacker = registered_types_internal
      # packer: Class -> [tid, proc, _flags]
      # unpacker: tid -> [klass, proc, _flags]

      list = []

      case selector
      when :both
        packer.each_pair do |klass, ary|
          type = ary[0]
          packer_proc = ary[1]
          unpacker_proc = nil
          if unpacker.has_key?(type)
            unpacker_proc = unpacker.delete(type)[1]
          end
          list << {type: type, class: klass, packer: packer_proc, unpacker: unpacker_proc}
        end

        # unpacker definition only
        unpacker.each_pair do |type, ary|
          list << {type: type, class: ary[0], packer: nil, unpacker: ary[1]}
        end

      when :packer
        packer.each_pair do |klass, ary|
          if ary[1]
            list << {type: ary[0], class: klass, packer: ary[1]}
          end
        end

      when :unpacker
        unpacker.each_pair do |type, ary|
          if ary[1]
            list << {type: type, class: ary[0], unpacker: ary[1]}
          end
        end

      else
        raise ArgumentError, "invalid selector #{selector}"
      end

      list.sort{|a, b| a[:type] <=> b[:type] }
    end

    def type_registered?(klass_or_type, selector=:both)
      case klass_or_type
      when Class
        klass = klass_or_type
        registered_types(selector).any?{|entry| klass <= entry[:class] }
      when Integer
        type = klass_or_type
        registered_types(selector).any?{|entry| type == entry[:type] }
      else
        raise ArgumentError, "class or type id"
      end
    end

    def load(src, param = nil)
      unpacker = nil

      if src.is_a? String
        unpacker = unpacker(param)
        unpacker.feed(src)
      else
        unpacker = unpacker(src, param)
      end

      unpacker.full_unpack
    end
    alias :unpack :load

    def dump(v, *rest)
      packer = packer(*rest)
      packer.write(v)
      packer.full_pack
    end
    alias :pack :dump

    def pool(size = 1, **options)
      Pool.new(
        frozen? ? self : dup.freeze,
        size,
        options.empty? ? nil : options,
      )
    end

    class Pool
      if RUBY_ENGINE == "ruby"
        class MemberPool
          def initialize(size, &block)
            @size = size
            @new_member = block
            @members = []
          end

          def with
            member = @members.pop || @new_member.call
            begin
              yield member
            ensure
              # If the pool is already full, we simply drop the extra member.
              # This is because contrary to a connection pool, creating an extra instance
              # is extremely unlikely to cause some kind of resource exhaustion.
              #
              # We could cycle the members (keep the newer one) but first It's more work and second
              # the older member might have been created pre-fork, so it might be at least partially
              # in shared memory.
              if member && @members.size < @size
                member.reset
                @members << member
              end
            end
          end
        end
      else
        class MemberPool
          def initialize(size, &block)
            @size = size
            @new_member = block
            @members = []
            @mutex = Mutex.new
          end

          def with
            member = @mutex.synchronize { @members.pop } || @new_member.call
            begin
              yield member
            ensure
              member.reset
              @mutex.synchronize do
                if member && @members.size < @size
                  @members << member
                end
              end
            end
          end
        end
      end

      def initialize(factory, size, options = nil)
        options = nil if !options || options.empty?
        @factory = factory
        @packers = MemberPool.new(size) { factory.packer(options).freeze }
        @unpackers = MemberPool.new(size) { factory.unpacker(options).freeze }
      end

      def load(data)
        @unpackers.with do |unpacker|
          unpacker.feed(data)
          unpacker.full_unpack
        end
      end

      def dump(object)
        @packers.with do |packer|
          packer.write(object)
          packer.full_pack
        end
      end

      def unpacker(&block)
        @unpackers.with(&block)
      end

      def packer(&block)
        @packers.with(&block)
      end
    end
  end
end