module ActionCable::Channel::Streams
def default_stream_handler(broadcasting, coder:)
so we can no-op when pubsub and connection are both JSON-encoded.
TODO: Room for optimization. Update transmit API to be coder-aware
TODO: Tests demonstrating this.
which decodes JSON and transmits to the client.
May be overridden to change the default stream handling behavior
def default_stream_handler(broadcasting, coder:) coder ||= ActiveSupport::JSON stream_transmitter stream_decoder(coder: coder), broadcasting: broadcasting end
def identity_handler
def identity_handler -> message { message } end
def stop_all_streams
def stop_all_streams streams.each do |broadcasting, callback| pubsub.unsubscribe broadcasting, callback logger.info "#{self.class.name} stopped streaming from #{broadcasting}" end.clear end
def stream_decoder(handler = identity_handler, coder:)
def stream_decoder(handler = identity_handler, coder:) if coder -> message { handler.(coder.decode(message)) } else handler end end
def stream_for(model, callback = nil, coder: nil, &block)
Pass coder: ActiveSupport::JSON to decode messages as JSON before passing to the callback.
to the subscriber.
callback that'll be used instead of the default of just transmitting the updates straight
Start streaming the pubsub queue for the model in this channel. Optionally, you can pass a
def stream_for(model, callback = nil, coder: nil, &block) stream_from(broadcasting_for([ channel_name, model ]), callback || block, coder: coder) end
def stream_from(broadcasting, callback = nil, coder: nil, &block)
Pass coder: ActiveSupport::JSON to decode messages as JSON before passing to the callback.
instead of the default of just transmitting the updates straight to the subscriber.
Start streaming from the named broadcasting pubsub queue. Optionally, you can pass a callback that'll be used
def stream_from(broadcasting, callback = nil, coder: nil, &block) broadcasting = String(broadcasting) # Don't send the confirmation until pubsub#subscribe is successful defer_subscription_confirmation! # Build a stream handler by wrapping the user-provided callback with # a decoder or defaulting to a JSON-decoding retransmitter. handler = worker_pool_stream_handler(broadcasting, callback || block, coder: coder) streams << [ broadcasting, handler ] connection.server.event_loop.post do pubsub.subscribe(broadcasting, handler, lambda do ensure_confirmation_sent logger.info "#{self.class.name} is streaming from #{broadcasting}" end) end end
def stream_handler(broadcasting, user_handler, coder: nil)
handling, or other forms of handler decoration.
May be overridden to add instrumentation, logging, specialized error
def stream_handler(broadcasting, user_handler, coder: nil) if user_handler stream_decoder user_handler, coder: coder else default_stream_handler broadcasting, coder: coder end end
def stream_transmitter(handler = identity_handler, broadcasting:)
def stream_transmitter(handler = identity_handler, broadcasting:) via = "streamed from #{broadcasting}" -> (message) do transmit handler.(message), via: via end end
def streams
def streams @_streams ||= [] end
def worker_pool_stream_handler(broadcasting, user_handler, coder: nil)
Always wrap the outermost handler to invoke the user handler on the
def worker_pool_stream_handler(broadcasting, user_handler, coder: nil) handler = stream_handler(broadcasting, user_handler, coder: coder) -> message do connection.worker_pool.async_invoke handler, :call, message, connection: connection end end