# frozen-string-literal: true#classRodamoduleRodaPlugins# The streaming plugin adds support for streaming responses# from roda using the +stream+ method:## plugin :streaming## route do |r|# stream do |out|# ['a', 'b', 'c'].each{|v| out << v; sleep 1}# end# end## In order for streaming to work, any webservers used in# front of the roda app must not buffer responses.## The stream method takes the following options:## :callback :: A callback proc to call when the connection is# closed.# :keep_open :: Whether to keep the connection open after the# stream block returns, default is false.# :loop :: Whether to call the stream block continuously until# the connection is closed.## The implementation was originally taken from Sinatra,# which is also released under the MIT License:## Copyright (c) 2007, 2008, 2009 Blake Mizerany# Copyright (c) 2010, 2011, 2012, 2013, 2014 Konstantin Haase# # Permission is hereby granted, free of charge, to any person# obtaining a copy of this software and associated documentation# files (the "Software"), to deal in the Software without# restriction, including without limitation the rights to use,# copy, modify, merge, publish, distribute, sublicense, and/or sell# copies of the Software, and to permit persons to whom the# Software is furnished to do so, subject to the following# conditions:# # The above copyright notice and this permission notice shall be# included in all copies or substantial portions of the Software.# # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR# OTHER DEALINGS IN THE SOFTWARE.moduleStreamingOPTS={}.freeze# Class of the response body in case you use #stream.## Three things really matter: The front and back block (back being the# block generating content, front the one sending it to the client) and# the scheduler, integrating with whatever concurrency feature the Rack# handler is using.## Scheduler has to respond to defer and schedule.classStreamincludeEnumerable# The default scheduler to used when streaming, useful for code# using ruby's default threading support.classScheduler# Store the stream to schedule.definitialize(stream)@stream=streamend# Immediately yield.defdefer(*)yieldend# Close the stream if there is an exception when scheduling,# and reraise the exception if so.defschedule(*)yieldrescueException@stream.closeraiseendend# Handle streaming options, see Streaming for details.definitialize(opts=OPTS,&back)@scheduler=opts[:scheduler]||Scheduler.new(self)@back=back.to_proc@keep_open=opts[:keep_open]@callbacks=[]@closed=falseifopts[:callback]callback(&opts[:callback])endend# Add output to the streaming response body.defwrite(data)@scheduler.schedule{@front.call(data.to_s)}selfend# Alias for +write+.def<<(data)write(data)end# Add the given block as a callback to call when the block closes.defcallback(&block)returnyieldifclosed?@callbacks<<blockend# Alias to callback for EventMachine compatibility.aliaserrbackcallback# If not already closed, close the connection, and call# any callbacks.defclosereturnifclosed?@closed=true@scheduler.schedule{@callbacks.each(&:call)}end# Whether the connection has already been closed.defclosed?@closedend# Yield values to the block as they are passed in via #<<.defeach(&front)@front=front@scheduler.deferdobegin@back.call(self)rescueException=>e@scheduler.schedule{raisee}endcloseunless@keep_openendendendmoduleInstanceMethods# Immediately return a streaming response using the current response# status and headers, calling the block to get the streaming response.# See Streaming for details.defstream(opts=OPTS,&block)opts=opts.merge(:scheduler=>EventMachine)if!opts.has_key?(:scheduler)&&env['async.callback']ifopts[:loop]block=procdo|out|untilout.closed?yield(out)endendendthrow:halt,@_response.finish_with_body(Stream.new(opts,&block))endendendregister_plugin(:streaming,Streaming)endend