# frozen_string_literal: truerequire"action_dispatch/http/response"require"delegate"require"active_support/json"moduleActionController# = Action Controller \Live## Mix this module into your controller, and all actions in that controller# will be able to stream data to the client as it's written.## class MyController < ActionController::Base# include ActionController::Live## def stream# response.headers['Content-Type'] = 'text/event-stream'# 100.times {# response.stream.write "hello world\n"# sleep 1# }# ensure# response.stream.close# end# end## There are a few caveats with this module. You *cannot* write headers after the# response has been committed (Response#committed? will return truthy).# Calling +write+ or +close+ on the response stream will cause the response# object to be committed. Make sure all headers are set before calling write# or close on your stream.## You *must* call close on your stream when you're finished, otherwise the# socket may be left open forever.## The final caveat is that your actions are executed in a separate thread than# the main thread. Make sure your actions are thread safe, and this shouldn't# be a problem (don't share state across threads, etc).## Note that \Rails includes +Rack::ETag+ by default, which will buffer your# response. As a result, streaming responses may not work properly with Rack# 2.2.x, and you may need to implement workarounds in your application.# You can either set the +ETag+ or +Last-Modified+ response headers or remove# +Rack::ETag+ from the middleware stack to address this issue.## Here's an example of how you can set the +Last-Modified+ header if your Rack# version is 2.2.x:## def stream# response.headers["Content-Type"] = "text/event-stream"# response.headers["Last-Modified"] = Time.now.httpdate # Add this line if your Rack version is 2.2.x# ...# endmoduleLiveextendActiveSupport::ConcernmoduleClassMethodsdefmake_response!(request)ifrequest.get_header("HTTP_VERSION")=="HTTP/1.0"superelseLive::Response.new.tapdo|res|res.request=requestendendendend# = Action Controller \Live Server Sent Events## This class provides the ability to write an SSE (Server Sent Event)# to an IO stream. The class is initialized with a stream and can be used# to either write a JSON string or an object which can be converted to JSON.## Writing an object will convert it into standard SSE format with whatever# options you have configured. You may choose to set the following options:## 1) Event. If specified, an event with this name will be dispatched on# the browser.# 2) Retry. The reconnection time in milliseconds used when attempting# to send the event.# 3) Id. If the connection dies while sending an SSE to the browser, then# the server will receive a +Last-Event-ID+ header with value equal to +id+.## After setting an option in the constructor of the SSE object, all future# SSEs sent across the stream will use those options unless overridden.## Example Usage:## class MyController < ActionController::Base# include ActionController::Live## def index# response.headers['Content-Type'] = 'text/event-stream'# sse = SSE.new(response.stream, retry: 300, event: "event-name")# sse.write({ name: 'John'})# sse.write({ name: 'John'}, id: 10)# sse.write({ name: 'John'}, id: 10, event: "other-event")# sse.write({ name: 'John'}, id: 10, event: "other-event", retry: 500)# ensure# sse.close# end# end## Note: SSEs are not currently supported by IE. However, they are supported# by Chrome, Firefox, Opera, and Safari.classSSEPERMITTED_OPTIONS=%w( retry event id )definitialize(stream,options={})@stream=stream@options=optionsenddefclose@stream.closeenddefwrite(object,options={})caseobjectwhenStringperform_write(object,options)elseperform_write(ActiveSupport::JSON.encode(object),options)endendprivatedefperform_write(json,options)current_options=@options.merge(options).stringify_keysPERMITTED_OPTIONS.eachdo|option_name|if(option_value=current_options[option_name])@stream.write"#{option_name}: #{option_value}\n"endendmessage=json.gsub("\n","\ndata: ")@stream.write"data: #{message}\n\n"endendclassClientDisconnected<RuntimeErrorendclassBuffer<ActionDispatch::Response::Buffer# :nodoc:includeMonitorMixinclass<<selfattr_accessor:queue_sizeend@queue_size=10# Ignore that the client has disconnected.## If this value is `true`, calling `write` after the client# disconnects will result in the written content being silently# discarded. If this value is `false` (the default), a# ClientDisconnected exception will be raised.attr_accessor:ignore_disconnectdefinitialize(response)super(response,build_queue(self.class.queue_size))@error_callback=lambda{true}@cv=new_cond@aborted=false@ignore_disconnect=falseend# ActionDispatch::Response delegates #to_ary to the internal ActionDispatch::Response::Buffer,# defining #to_ary is an indicator that the response body can be buffered and/or cached by# Rack middlewares, this is not the case for Live responses so we undefine it for this Buffer subclass.undef_method:to_arydefwrite(string)unless@response.committed?@response.headers["Cache-Control"]||="no-cache"@response.delete_header"Content-Length"endsuperunlessconnected?@buf.clearunless@ignore_disconnect# Raise ClientDisconnected, which is a RuntimeError (not an# IOError), because that's more appropriate for something beyond# the developer's control.raiseClientDisconnected,"client disconnected"endendend# Same as +write+ but automatically include a newline at the end of the string.defwriteln(string)writestring.end_with?("\n")?string:"#{string}\n"end# Write a 'close' event to the buffer; the producer/writing thread# uses this to notify us that it's finished supplying content.## See also #abort.defclosesynchronizedosuper@buf.pushnil@cv.broadcastendend# Inform the producer/writing thread that the client has# disconnected; the reading thread is no longer interested in# anything that's being written.## See also #close.defabortsynchronizedo@aborted=true@buf.clearendend# Is the client still connected and waiting for content?## The result of calling `write` when this is `false` is determined# by `ignore_disconnect`.defconnected?!@abortedenddefon_error(&block)@error_callback=blockenddefcall_on_error@error_callback.callendprivatedefeach_chunk(&block)loopdostr=nilActiveSupport::Dependencies.interlock.permit_concurrent_loadsdostr=@buf.popendbreakunlessstryieldstrendenddefbuild_queue(queue_size)queue_size?SizedQueue.new(queue_size):Queue.newendendclassResponse<ActionDispatch::Response# :nodoc: allprivatedefbefore_committedsuperjar=request.cookie_jar# The response can be committed multiple timesjar.writeselfunlesscommitted?enddefbuild_buffer(response,body)buf=Live::Buffer.newresponsebody.each{|part|buf.writepart}bufendenddefprocess(name)t1=Thread.currentlocals=t1.keys.map{|key|[key,t1[key]]}error=nil# This processes the action in a child thread. It lets us return the# response code and headers back up the Rack stack, and still process# the body in parallel with sending data to the client.new_controller_thread{ActiveSupport::Dependencies.interlock.runningdot2=Thread.current# Since we're processing the view in a different thread, copy the# thread locals from the main thread to the child thread. :'(locals.each{|k,v|t2[k]=v}ActiveSupport::IsolatedExecutionState.share_with(t1)beginsuper(name)rescue=>eif@_response.committed?begin@_response.stream.write(ActionView::Base.streaming_completion_on_exception)ifrequest.format==:html@_response.stream.call_on_errorrescue=>exceptionlog_error(exception)ensurelog_error(e)@_response.stream.closeendelseerror=eendensure@_response.commit!endend}ActiveSupport::Dependencies.interlock.permit_concurrent_loadsdo@_response.await_commitendraiseerroriferrorenddefresponse_body=(body)superresponse.closeifresponseend# Sends a stream to the browser, which is helpful when you're generating exports or other running data where you# don't want the entire file buffered in memory first. Similar to send_data, but where the data is generated live.## Options:# * <tt>:filename</tt> - suggests a filename for the browser to use.# * <tt>:type</tt> - specifies an HTTP content type.# You can specify either a string or a symbol for a registered type with <tt>Mime::Type.register</tt>, for example :json.# If omitted, type will be inferred from the file extension specified in <tt>:filename</tt>.# If no content type is registered for the extension, the default type 'application/octet-stream' will be used.# * <tt>:disposition</tt> - specifies whether the file will be shown inline or downloaded.# Valid values are 'inline' and 'attachment' (default).## Example of generating a csv export:## send_stream(filename: "subscribers.csv") do |stream|# stream.write "email_address,updated_at\n"## @subscribers.find_each do |subscriber|# stream.write "#{subscriber.email_address},#{subscriber.updated_at}\n"# end# enddefsend_stream(filename:,disposition: "attachment",type: nil)response.headers["Content-Type"]=(type.is_a?(Symbol)?Mime[type].to_s:type)||Mime::Type.lookup_by_extension(File.extname(filename).downcase.delete("."))&.to_s||"application/octet-stream"response.headers["Content-Disposition"]=ActionDispatch::Http::ContentDisposition.format(disposition: disposition,filename: filename)yieldresponse.streamensureresponse.stream.closeendprivate# Spawn a new thread to serve up the controller in. This is to get# around the fact that Rack isn't based around IOs and we need to use# a thread to stream data from the response bodies. Nobody should call# this method except in Rails internals. Seriously!defnew_controller_thread# :nodoc:Thread.new{t2=Thread.currentt2.abort_on_exception=trueyield}enddeflog_error(exception)logger=ActionController::Base.loggerreturnunlessloggerlogger.fataldomessage=+"\n#{exception.class} (#{exception.message}):\n"message<<exception.annotated_source_code.to_sifexception.respond_to?(:annotated_source_code)message<<" "<<exception.backtrace.join("\n ")"#{message}\n\n"endendendend