module ActionCable::Channel::Streams

def default_stream_handler(broadcasting, coder:)

Then we can skip decode+encode if we're just proxying messages.
so we can no-op when pubsub and connection are both JSON-encoded.
TODO: Room for optimization. Update transmit API to be coder-aware

TODO: Tests demonstrating this.

which decodes JSON and transmits to the client.
May be overridden to change the default stream handling behavior
def default_stream_handler(broadcasting, coder:)
  coder ||= ActiveSupport::JSON
  stream_transmitter stream_decoder(coder: coder), broadcasting: broadcasting
end

def identity_handler

def identity_handler
  -> message { message }
end

def stop_all_streams

Unsubscribes all streams associated with this channel from the pubsub queue.
def stop_all_streams
  streams.each do |broadcasting, callback|
    pubsub.unsubscribe broadcasting, callback
    logger.info "#{self.class.name} stopped streaming from #{broadcasting}"
  end.clear
end

def stream_decoder(handler = identity_handler, coder:)

def stream_decoder(handler = identity_handler, coder:)
  if coder
    -> message { handler.(coder.decode(message)) }
  else
    handler
  end
end

def stream_for(model, callback = nil, coder: nil, &block)

Defaults to coder: nil which does no decoding, passes raw messages.
Pass coder: ActiveSupport::JSON to decode messages as JSON before passing to the callback.

to the subscriber.
callback that'll be used instead of the default of just transmitting the updates straight
Start streaming the pubsub queue for the model in this channel. Optionally, you can pass a
def stream_for(model, callback = nil, coder: nil, &block)
  stream_from(broadcasting_for([ channel_name, model ]), callback || block, coder: coder)
end

def stream_from(broadcasting, callback = nil, coder: nil, &block)

Defaults to coder: nil which does no decoding, passes raw messages.
Pass coder: ActiveSupport::JSON to decode messages as JSON before passing to the callback.
instead of the default of just transmitting the updates straight to the subscriber.
Start streaming from the named broadcasting pubsub queue. Optionally, you can pass a callback that'll be used
def stream_from(broadcasting, callback = nil, coder: nil, &block)
  broadcasting = String(broadcasting)
  # Don't send the confirmation until pubsub#subscribe is successful
  defer_subscription_confirmation!
  # Build a stream handler by wrapping the user-provided callback with
  # a decoder or defaulting to a JSON-decoding retransmitter.
  handler = worker_pool_stream_handler(broadcasting, callback || block, coder: coder)
  streams << [ broadcasting, handler ]
  connection.server.event_loop.post do
    pubsub.subscribe(broadcasting, handler, lambda do
      ensure_confirmation_sent
      logger.info "#{self.class.name} is streaming from #{broadcasting}"
    end)
  end
end

def stream_handler(broadcasting, user_handler, coder: nil)

TODO: Tests demonstrating this.

handling, or other forms of handler decoration.
May be overridden to add instrumentation, logging, specialized error
def stream_handler(broadcasting, user_handler, coder: nil)
  if user_handler
    stream_decoder user_handler, coder: coder
  else
    default_stream_handler broadcasting, coder: coder
  end
end

def stream_transmitter(handler = identity_handler, broadcasting:)

def stream_transmitter(handler = identity_handler, broadcasting:)
  via = "streamed from #{broadcasting}"
  -> (message) do
    transmit handler.(message), via: via
  end
end

def streams

def streams
  @_streams ||= []
end

def worker_pool_stream_handler(broadcasting, user_handler, coder: nil)

worker pool rather than blocking the event loop.
Always wrap the outermost handler to invoke the user handler on the
def worker_pool_stream_handler(broadcasting, user_handler, coder: nil)
  handler = stream_handler(broadcasting, user_handler, coder: coder)
  -> message do
    connection.worker_pool.async_invoke handler, :call, message, connection: connection
  end
end