module NATS::JetStream::PullSubscription

def consumer_info(params = {})

Returns:
  • (JetStream::API::ConsumerInfo) - The latest ConsumerInfo of the consumer.

Options Hash: (**params)
  • :timeout (Float) -- Time to wait for response.

Parameters:
  • params (Hash) -- Options to customize API request.
def consumer_info(params = {})
  @jsi.js.consumer_info(@jsi.stream, @jsi.consumer, params)
end

def fetch(batch = 1, params = {})

Returns:
  • (Array) -

Options Hash: (**params)
  • :timeout (Float) -- Duration of the fetch request before it expires.

Parameters:
  • params (Hash) -- Options to customize the fetch request.
  • batch (Fixnum) -- Number of messages to pull from the stream.
def fetch(batch = 1, params = {})
  if batch < 1
    raise ::NATS::JetStream::Error.new("nats: invalid batch size")
  end
  t = MonotonicTime.now
  timeout = params[:timeout] ||= 5
  expires = (timeout * 1_000_000_000) - 100_000
  next_req = {
    batch: batch
  }
  msgs = []
  case
  when batch < 1
    raise ::NATS::JetStream::Error.new("nats: invalid batch size")
  when batch == 1
    ####################################################
    # Fetch (1)                                        #
    ####################################################
    # Check if there is any pending message in the queue that is
    # ready to be consumed.
    synchronize do
      unless @pending_queue.empty?
        msg = @pending_queue.pop
        @pending_size -= msg.data.size
        # Check for a no msgs response status.
        if JS.is_status_msg(msg)
          case msg.header["Status"]
          when JS::Status::NoMsgs
            nil
          when JS::Status::RequestTimeout
            # Skip
          else
            raise JS.from_msg(msg)
          end
        else
          msgs << msg
        end
      end
    end
    # Make lingering request with expiration.
    next_req[:expires] = expires
    if msgs.empty?
      # Make publish request and wait for response.
      @nc.publish(@jsi.nms, JS.next_req_to_json(next_req), @subject)
      # Wait for result of fetch or timeout.
      synchronize { wait_for_msgs_cond.wait(timeout) }
      unless @pending_queue.empty?
        msg = @pending_queue.pop
        @pending_size -= msg.data.size
        msgs << msg
      end
      duration = MonotonicTime.since(t)
      if duration > timeout
        raise ::NATS::Timeout.new("nats: fetch timeout")
      end
      # Should have received at least a message at this point,
      # if that is not the case then error already.
      if JS.is_status_msg(msgs.first)
        msg = msgs.first
        case msg.header[JS::Header::Status]
        when JS::Status::RequestTimeout
          raise NATS::Timeout.new("nats: fetch request timeout")
        else
          raise JS.from_msg(msgs.first)
        end
      end
    end
  when batch > 1
    ####################################################
    # Fetch (n)                                        #
    ####################################################
    # Check if there already enough in the pending buffer.
    synchronize do
      if batch <= @pending_queue.size
        batch.times do
          msg = @pending_queue.pop
          @pending_size -= msg.data.size
          # Check for a no msgs response status.
          if JS.is_status_msg(msg)
            case msg.header[JS::Header::Status]
            when JS::Status::NoMsgs, JS::Status::RequestTimeout
              # Skip these
              next
            else
              raise JS.from_msg(msg)
            end
          else
            msgs << msg
          end
        end
        return msgs
      end
    end
    # Make publish request and wait any response.
    next_req[:no_wait] = true
    @nc.publish(@jsi.nms, JS.next_req_to_json(next_req), @subject)
    # Not receiving even one is a timeout.
    start_time = MonotonicTime.now
    msg = nil
    synchronize do
      wait_for_msgs_cond.wait(timeout)
      unless @pending_queue.empty?
        msg = @pending_queue.pop
        @pending_size -= msg.data.size
      end
    end
    # Check if the first message was a response saying that
    # there are no messages.
    if !msg.nil? && JS.is_status_msg(msg)
      case msg.header[JS::Header::Status]
      when JS::Status::NoMsgs
        # Make another request that does wait.
        next_req[:expires] = expires
        next_req.delete(:no_wait)
        @nc.publish(@jsi.nms, JS.next_req_to_json(next_req), @subject)
      when JS::Status::RequestTimeout
        raise NATS::Timeout.new("nats: fetch request timeout")
      else
        raise JS.from_msg(msg)
      end
    else
      msgs << msg unless msg.nil?
    end
    # Check if have not received yet a single message.
    duration = MonotonicTime.since(start_time)
    if msgs.empty? && (duration > timeout)
      raise NATS::Timeout.new("nats: fetch timeout")
    end
    needed = batch - msgs.count
    while (needed > 0) && (MonotonicTime.since(start_time) < timeout)
      duration = MonotonicTime.since(start_time)
      # Wait for the rest of the messages.
      synchronize do
        # Wait until there is a message delivered.
        if @pending_queue.empty?
          deadline = timeout - duration
          MonotonicTime.now
          wait_for_msgs_cond.wait(deadline) if deadline > 0
          duration = MonotonicTime.since(start_time)
          if msgs.empty? && @pending_queue.empty? && (duration > timeout)
            raise NATS::Timeout.new("nats: fetch timeout")
          end
        end
        unless @pending_queue.empty?
          msg = @pending_queue.pop
          @pending_size -= msg.data.size
          if JS.is_status_msg(msg)
            case msg.header[JS::Header::Status]
            when JS::Status::NoMsgs, JS::Status::RequestTimeout
              duration = MonotonicTime.since(start_time)
              if duration > timeout
                # Only received a subset of the messages.
                if !msgs.empty?
                  return msgs
                else
                  raise NATS::Timeout.new("nats: fetch timeout")
                end
              end
            else
              raise JS.from_msg(msg)
            end
          else
            # Add to the set of messages that will be returned.
            msgs << msg
            needed -= 1
          end
        end
      end # :end: synchronize
    end
  end
  # Check if timed out waiting for messages.
  if msgs.empty? && (MonotonicTime.since(start_time) > timeout)
    raise NATS::Timeout.new("nats: fetch timeout")
  end
  msgs
end

def next_msg(params = {})

Raises:
  • (NATS::JetStream::Error) -
def next_msg(params = {})
  raise ::NATS::JetStream::Error.new("nats: pull subscription cannot use next_msg")
end