lib/action_controller/metal/live.rb
# frozen_string_literal: true require "action_dispatch/http/response" require "delegate" require "active_support/json" module ActionController # Mix this module into your controller, and all actions in that controller # will be able to stream data to the client as it's written. # # class MyController < ActionController::Base # include ActionController::Live # # def stream # response.headers['Content-Type'] = 'text/event-stream' # 100.times { # response.stream.write "hello world\n" # sleep 1 # } # ensure # response.stream.close # end # end # # There are a few caveats with this module. You *cannot* write headers after the # response has been committed (Response#committed? will return truthy). # Calling +write+ or +close+ on the response stream will cause the response # object to be committed. Make sure all headers are set before calling write # or close on your stream. # # You *must* call close on your stream when you're finished, otherwise the # socket may be left open forever. # # The final caveat is that your actions are executed in a separate thread than # the main thread. Make sure your actions are thread safe, and this shouldn't # be a problem (don't share state across threads, etc). module Live extend ActiveSupport::Concern module ClassMethods def make_response!(request) if request.get_header("HTTP_VERSION") == "HTTP/1.0" super else Live::Response.new.tap do |res| res.request = request end end end end # This class provides the ability to write an SSE (Server Sent Event) # to an IO stream. The class is initialized with a stream and can be used # to either write a JSON string or an object which can be converted to JSON. # # Writing an object will convert it into standard SSE format with whatever # options you have configured. You may choose to set the following options: # # 1) Event. If specified, an event with this name will be dispatched on # the browser. # 2) Retry. The reconnection time in milliseconds used when attempting # to send the event. # 3) Id. If the connection dies while sending an SSE to the browser, then # the server will receive a +Last-Event-ID+ header with value equal to +id+. # # After setting an option in the constructor of the SSE object, all future # SSEs sent across the stream will use those options unless overridden. # # Example Usage: # # class MyController < ActionController::Base # include ActionController::Live # # def index # response.headers['Content-Type'] = 'text/event-stream' # sse = SSE.new(response.stream, retry: 300, event: "event-name") # sse.write({ name: 'John'}) # sse.write({ name: 'John'}, id: 10) # sse.write({ name: 'John'}, id: 10, event: "other-event") # sse.write({ name: 'John'}, id: 10, event: "other-event", retry: 500) # ensure # sse.close # end # end # # Note: SSEs are not currently supported by IE. However, they are supported # by Chrome, Firefox, Opera, and Safari. class SSE PERMITTED_OPTIONS = %w( retry event id ) def initialize(stream, options = {}) @stream = stream @options = options end def close @stream.close end def write(object, options = {}) case object when String perform_write(object, options) else perform_write(ActiveSupport::JSON.encode(object), options) end end private def perform_write(json, options) current_options = @options.merge(options).stringify_keys PERMITTED_OPTIONS.each do |option_name| if (option_value = current_options[option_name]) @stream.write "#{option_name}: #{option_value}\n" end end message = json.gsub("\n", "\ndata: ") @stream.write "data: #{message}\n\n" end end class ClientDisconnected < RuntimeError end class Buffer < ActionDispatch::Response::Buffer #:nodoc: include MonitorMixin class << self attr_accessor :queue_size end @queue_size = 10 # Ignore that the client has disconnected. # # If this value is `true`, calling `write` after the client # disconnects will result in the written content being silently # discarded. If this value is `false` (the default), a # ClientDisconnected exception will be raised. attr_accessor :ignore_disconnect def initialize(response) super(response, build_queue(self.class.queue_size)) @error_callback = lambda { true } @cv = new_cond @aborted = false @ignore_disconnect = false end def write(string) unless @response.committed? @response.headers["Cache-Control"] ||= "no-cache" @response.delete_header "Content-Length" end super unless connected? @buf.clear unless @ignore_disconnect # Raise ClientDisconnected, which is a RuntimeError (not an # IOError), because that's more appropriate for something beyond # the developer's control. raise ClientDisconnected, "client disconnected" end end end # Write a 'close' event to the buffer; the producer/writing thread # uses this to notify us that it's finished supplying content. # # See also #abort. def close synchronize do super @buf.push nil @cv.broadcast end end # Inform the producer/writing thread that the client has # disconnected; the reading thread is no longer interested in # anything that's being written. # # See also #close. def abort synchronize do @aborted = true @buf.clear end end # Is the client still connected and waiting for content? # # The result of calling `write` when this is `false` is determined # by `ignore_disconnect`. def connected? !@aborted end def on_error(&block) @error_callback = block end def call_on_error @error_callback.call end private def each_chunk(&block) loop do str = nil ActiveSupport::Dependencies.interlock.permit_concurrent_loads do str = @buf.pop end break unless str yield str end end def build_queue(queue_size) queue_size ? SizedQueue.new(queue_size) : Queue.new end end class Response < ActionDispatch::Response #:nodoc: all private def before_committed super jar = request.cookie_jar # The response can be committed multiple times jar.write self unless committed? end def build_buffer(response, body) buf = Live::Buffer.new response body.each { |part| buf.write part } buf end end def process(name) t1 = Thread.current locals = t1.keys.map { |key| [key, t1[key]] } error = nil # This processes the action in a child thread. It lets us return the # response code and headers back up the Rack stack, and still process # the body in parallel with sending data to the client. new_controller_thread { ActiveSupport::Dependencies.interlock.running do t2 = Thread.current # Since we're processing the view in a different thread, copy the # thread locals from the main thread to the child thread. :'( locals.each { |k, v| t2[k] = v } begin super(name) rescue => e if @_response.committed? begin @_response.stream.write(ActionView::Base.streaming_completion_on_exception) if request.format == :html @_response.stream.call_on_error rescue => exception log_error(exception) ensure log_error(e) @_response.stream.close end else error = e end ensure @_response.commit! end end } ActiveSupport::Dependencies.interlock.permit_concurrent_loads do @_response.await_commit end raise error if error end def response_body=(body) super response.close if response end private # Spawn a new thread to serve up the controller in. This is to get # around the fact that Rack isn't based around IOs and we need to use # a thread to stream data from the response bodies. Nobody should call # this method except in Rails internals. Seriously! def new_controller_thread # :nodoc: Thread.new { t2 = Thread.current t2.abort_on_exception = true yield } end def log_error(exception) logger = ActionController::Base.logger return unless logger logger.fatal do message = +"\n#{exception.class} (#{exception.message}):\n" message << exception.annotated_source_code.to_s if exception.respond_to?(:annotated_source_code) message << " " << exception.backtrace.join("\n ") "#{message}\n\n" end end end end