lib/roda/plugins/streaming.rb



# frozen-string-literal: true

#
class Roda
  module RodaPlugins
    # The streaming plugin adds support for streaming responses
    # from roda using the +stream+ method:
    #
    #   plugin :streaming
    #
    #   route do |r|
    #     stream do |out|
    #       ['a', 'b', 'c'].each{|v| out << v; sleep 1}
    #     end
    #   end
    #
    # In order for streaming to work, any webservers used in
    # front of the roda app must not buffer responses.
    #
    # The stream method takes the following options:
    #
    # :callback :: A callback proc to call when the connection is closed.
    # :loop :: Whether to call the stream block continuously until the connection is closed.
    # :async :: Whether to call the stream block in a separate thread (default: false). Only supported on Ruby 2.3+.
    # :queue :: A queue object to use for asynchronous streaming (default: `SizedQueue.new(10)`).
    #
    # If the :loop option is used, you can override the
    # handle_stream_error method to change how exceptions
    # are handled during streaming. This method is passed the
    # exception and the stream.  By default, this method
    # just reraises the exception, but you can choose to output
    # the an error message to the stream, before raising:
    #
    #   def handle_stream_error(e, out)
    #     out << 'ERROR!'
    #     raise e
    #   end
    #
    # Ignore errors completely while streaming:
    #
    #   def handle_stream_error(e, out)
    #   end
    #
    # or handle the errors in some other way.
    module Streaming
      # Class of the response body in case you use #stream.
      class Stream
        include Enumerable

        # Handle streaming options, see Streaming for details.
        def initialize(opts=OPTS, &block)
          @block = block
          @out = nil
          @callback = opts[:callback]
          @closed = false
        end

        # Add output to the streaming response body. Returns number of bytes written.
        def write(data)
          data = data.to_s
          @out.call(data)
          data.bytesize
        end

        # Add output to the streaming response body. Returns self.
        def <<(data)
          write(data)
          self
        end

        # If not already closed, close the connection, and call
        # any callbacks.
        def close
          return if closed?
          @closed = true
          @callback.call if @callback
        end

        # Whether the connection has already been closed.
        def closed?
          @closed
        end

        # Yield values to the block as they are passed in via #<<.
        def each(&out)
          @out = out
          @block.call(self)
        ensure
          close
        end
      end

      # Class of the response body if you use #stream with :async set to true.
      # Uses a separate thread that pushes streaming results to a queue, so that
      # data can be streamed to clients while it is being prepared by the application.
      class AsyncStream
        include Enumerable

        # Handle streaming options, see Streaming for details.
        def initialize(opts=OPTS, &block)
          @stream = Stream.new(opts, &block)
          @queue = opts[:queue] || SizedQueue.new(10) # have some default backpressure
          @thread = Thread.new { enqueue_chunks }
        end

        # Continue streaming data until the stream is finished.
        def each(&out)
          dequeue_chunks(&out)
          @thread.join
        end

        # Stop streaming.
        def close
          @queue.close # terminate the producer thread
          @stream.close
        end

        private

        # Push each streaming chunk onto the queue.
        def enqueue_chunks
          @stream.each do |chunk|
            @queue.push(chunk)
          end
        rescue ClosedQueueError
          # connection was closed
        ensure
          @queue.close
        end

        # Pop each streaming chunk from the queue and yield it.
        def dequeue_chunks
          while chunk = @queue.pop
            yield chunk
          end
        end
      end

      module InstanceMethods
        # Immediately return a streaming response using the current response
        # status and headers, calling the block to get the streaming response.
        # See Streaming for details.
        def stream(opts=OPTS, &block)
          if opts[:loop]
            block = proc do |out|
              until out.closed?
                begin
                  yield(out)
                rescue => e
                  handle_stream_error(e, out)
                end
              end
            end
          end

          stream_class = (opts[:async] && RUBY_VERSION >= '2.3') ? AsyncStream : Stream

          throw :halt, @_response.finish_with_body(stream_class.new(opts, &block))
        end

        # Handle exceptions raised while streaming when using :loop
        def handle_stream_error(e, out)
          raise e
        end
      end
    end

    register_plugin(:streaming, Streaming)
  end
end