class Rage::Cable::Channel

def __has_action?(action_name)

Other tags:
    Private: -
def __has_action?(action_name)
  !INTERNAL_ACTIONS.include?(action_name) && self.class.__prepared_actions.has_key?(action_name)
end

def __prepare_id_method(method_name)

Other tags:
    Private: -
def __prepare_id_method(method_name)
  define_method(method_name) do
    @__identified_by[method_name]
  end
end

def __register_action_proc(action_name)

Other tags:
    Private: -
def __register_action_proc(action_name)
  if action_name == :subscribed && @__hooks
    before_subscribe_chunk = if @__hooks[:before_subscribe]
      lines = @__hooks[:before_subscribe].map do |h|
        condition = if h[:if] && h[:unless]
          "if #{h[:if]} && !#{h[:unless]}"
        elsif h[:if]
          "if #{h[:if]}"
        elsif h[:unless]
          "unless #{h[:unless]}"
        end
        <<~RUBY
          #{h[:name]} #{condition}
          return if @__subscription_rejected
        RUBY
      end
      lines.join("\n")
    end
    after_subscribe_chunk = if @__hooks[:after_subscribe]
      lines = @__hooks[:after_subscribe].map do |h|
        condition = if h[:if] && h[:unless]
          "if #{h[:if]} && !#{h[:unless]}"
        elsif h[:if]
          "if #{h[:if]}"
        elsif h[:unless]
          "unless #{h[:unless]}"
        end
        <<~RUBY
          #{h[:name]} #{condition}
        RUBY
      end
      lines.join("\n")
    end
  end
  if action_name == :unsubscribed && @__hooks
    before_unsubscribe_chunk = if @__hooks[:before_unsubscribe]
      lines = @__hooks[:before_unsubscribe].map do |h|
        condition = if h[:if] && h[:unless]
          "if #{h[:if]} && !#{h[:unless]}"
        elsif h[:if]
          "if #{h[:if]}"
        elsif h[:unless]
          "unless #{h[:unless]}"
        end
        <<~RUBY
          #{h[:name]} #{condition}
        RUBY
      end
      lines.join("\n")
    end
    after_unsubscribe_chunk = if @__hooks[:after_unsubscribe]
      lines = @__hooks[:after_unsubscribe].map do |h|
        condition = if h[:if] && h[:unless]
          "if #{h[:if]} && !#{h[:unless]}"
        elsif h[:if]
          "if #{h[:if]}"
        elsif h[:unless]
          "unless #{h[:unless]}"
        end
        <<~RUBY
          #{h[:name]} #{condition}
        RUBY
      end
      lines.join("\n")
    end
  end
  rescue_handlers_chunk = if @__rescue_handlers
    lines = @__rescue_handlers.map do |klasses, handler|
      <<~RUBY
      rescue #{klasses.join(", ")} => __e
        #{instance_method(handler).arity == 0 ? handler : "#{handler}(__e)"}
      RUBY
    end
    lines.join("\n")
  else
    ""
  end
  periodic_timers_chunk = if @__periodic_timers
    set_up_periodic_timers
    if action_name == :subscribed
      <<~RUBY
        self.class.__channels << self unless subscription_rejected?
      RUBY
    elsif action_name == :unsubscribed
      <<~RUBY
        self.class.__channels.delete(self)
      RUBY
    end
  else
    ""
  end
  is_subscribing = action_name == :subscribed
  should_release_connections = Rage.config.internal.should_manually_release_ar_connections?
  method_name = class_eval <<~RUBY, __FILE__, __LINE__ + 1
    def __run_#{action_name}(data)
      #{if is_subscribing
        <<~RUBY
          @__is_subscribing = true
        RUBY
      end}
      #{before_subscribe_chunk}
      #{before_unsubscribe_chunk}
      #{if instance_method(action_name).arity == 0
        <<~RUBY
          #{action_name}
        RUBY
      else
        <<~RUBY
          #{action_name}(data)
        RUBY
      end}
      #{after_subscribe_chunk}
      #{after_unsubscribe_chunk}
      #{periodic_timers_chunk}
      #{rescue_handlers_chunk}
      #{if should_release_connections
        <<~RUBY
        ensure
          ActiveRecord::Base.connection_handler.clear_active_connections!(:all)
        RUBY
      end}
    end
  RUBY
  eval("->(channel, data) { channel.#{method_name}(data) }")
end

def __register_actions

Other tags:
    Private: -
def __register_actions
  actions = (
    public_instance_methods(true) - Rage::Cable::Channel.public_instance_methods(true)
  ).reject { |m| m.start_with?("__rage_tmp") || m.start_with?("__run") }
  actions.reject! { |m| m != :receive } unless Rage.cable.__protocol.supports_rpc?
  @__prepared_actions = (INTERNAL_ACTIONS + actions).each_with_object({}) do |action_name, memo|
    memo[action_name] = __register_action_proc(action_name)
  end
  actions - INTERNAL_ACTIONS
end

def __run_action(action_name, data = nil)

Other tags:
    Private: -
def __run_action(action_name, data = nil)
  Rage::Telemetry.tracer.span_cable_action_process(channel: self, action: action_name, data:) do
    self.class.__prepared_actions[action_name].call(self, data)
  end
end

def __stream_name_for(streamables)

Other tags:
    Private: -
def __stream_name_for(streamables)
  stream_name = Array(streamables).map do |streamable|
    if streamable.respond_to?(:id)
      "#{streamable.class.name}:#{streamable.id}"
    elsif streamable.is_a?(String) || streamable.is_a?(Symbol) || streamable.is_a?(Numeric)
      streamable
    else
      raise ArgumentError, "Unable to generate stream name. Expected an object that responds to `id`, got: #{streamable.class}"
    end
  end
  "#{name}:#{stream_name.join(":")}"
end

def add_action(action_type, action_name = nil, **opts, &block)

def add_action(action_type, action_name = nil, **opts, &block)
  if block_given?
    action_name = define_tmp_method(block)
  elsif action_name.nil?
    raise ArgumentError, "No handler provided. Pass the `action_name` parameter or provide a block."
  end
  _if, _unless = opts.values_at(:if, :unless)
  action = {
    name: action_name,
    if: _if,
    unless: _unless
  }
  action[:if] = define_tmp_method(action[:if]) if action[:if].is_a?(Proc)
  action[:unless] = define_tmp_method(action[:unless]) if action[:unless].is_a?(Proc)
  if @__hooks.nil?
    @__hooks = {}
  elsif @__hooks[action_type] && @__hooks.frozen?
    @__hooks = @__hooks.dup
    @__hooks[action_type] = @__hooks[action_type].dup
  end
  if @__hooks[action_type].nil?
    @__hooks[action_type] = [action]
  elsif (i = @__hooks[action_type].find_index { |a| a[:name] == action_name })
    @__hooks[action_type][i] = action
  else
    @__hooks[action_type] << action
  end
end

def after_subscribe(action_name = nil, **opts, &block)

Other tags:
    Note: - This callback will be triggered even if the subscription was rejected with the {reject} method.
def after_subscribe(action_name = nil, **opts, &block)
  add_action(:after_subscribe, action_name, **opts, &block)
end

def after_unsubscribe(action_name = nil, **opts, &block)

Register a new `after_unsubscribe` hook that will be called after the {unsubscribed} method.
def after_unsubscribe(action_name = nil, **opts, &block)
  add_action(:after_unsubscribe, action_name, **opts, &block)
end

def before_subscribe(action_name = nil, **opts, &block)

before_subscribe :my_method, if: -> { ... }
@example
end
...
before_subscribe do
@example
before_subscribe :my_method
@example

Register a new `before_subscribe` hook that will be called before the {subscribed} method.
def before_subscribe(action_name = nil, **opts, &block)
  add_action(:before_subscribe, action_name, **opts, &block)
end

def before_unsubscribe(action_name = nil, **opts, &block)

Register a new `before_unsubscribe` hook that will be called before the {unsubscribed} method.
def before_unsubscribe(action_name = nil, **opts, &block)
  add_action(:before_unsubscribe, action_name, **opts, &block)
end

def broadcast(stream, data)

Parameters:
  • data (Object) -- the data to send to the clients
  • stream (String) -- the name of the stream
def broadcast(stream, data)
  Rage.cable.broadcast(stream, data)
end

def broadcast_to(streamable, data)

Raises:
  • (ArgumentError) - if the streamable object does not satisfy the type requirements

Parameters:
  • data (Object) -- the data to send to the clients
  • streamable (#id, String, Symbol, Numeric, Array) -- an object that will be used to generate the stream name
def broadcast_to(streamable, data)
  Rage.cable.broadcast(__stream_name_for(streamable), data)
end

def define_tmp_method(block)

def define_tmp_method(block)
  name = @@__tmp_name_seed.next.join
  define_method("__rage_tmp_#{name}", block)
end

def inherited(klass)

def inherited(klass)
  klass.__hooks = @__hooks.freeze
  klass.__rescue_handlers = @__rescue_handlers.freeze
  klass.__periodic_timers = @__periodic_timers.freeze
end

def initialize(connection, params, identified_by)

Other tags:
    Private: -
def initialize(connection, params, identified_by)
  @__connection = connection
  @__params = params
  @__identified_by = identified_by
end

def params

Returns:
  • (Hash{Symbol=>String,Array,Hash,Numeric,NilClass,TrueClass,FalseClass}) -
def params
  @__params
end

def periodically(method_name = nil, every:, &block)

Parameters:
  • every (Integer) -- the calling period in seconds
  • method_name (Symbol, nil) -- the name of the method to call
def periodically(method_name = nil, every:, &block)
  callback_name = if block_given?
    raise ArgumentError, "Pass the `method_name` argument or provide a block, not both" if method_name
    define_tmp_method(block)
  elsif method_name.is_a?(Symbol)
    define_tmp_method(eval("-> { #{method_name} }"))
  else
    raise ArgumentError, "Expected a Symbol method name, got #{method_name.inspect}"
  end
  unless every.is_a?(Numeric) && every > 0
    raise ArgumentError, "Expected every: to be a positive number of seconds, got #{every.inspect}"
  end
  callback = eval("->(channel) { channel.#{callback_name} }")
  if @__periodic_timers.nil?
    @__periodic_timers = []
  elsif @__periodic_timers.frozen?
    @__periodic_timers = @__periodic_timers.dup
  end
  @__periodic_timers << [callback, every]
end

def reject

process (i.e. inside the {subscribed} method or {before_subscribe}/{after_subscribe} hooks).
Reject the subscription request. The method should only be called during the subscription
def reject
  @__subscription_rejected = true
end

def rescue_from(*klasses, with: nil, &block)

Parameters:
  • with (Symbol) -- the name of a handler method. The method can take one argument, which is the raised exception. Alternatively, you can pass a block, which can also take one argument.
  • klasses (Class, Array) -- exception classes to watch on
def rescue_from(*klasses, with: nil, &block)
  unless with
    if block_given?
      with = define_tmp_method(block)
    else
      raise ArgumentError, "No handler provided. Pass the `with` keyword argument or provide a block."
    end
  end
  if @__rescue_handlers.nil?
    @__rescue_handlers = []
  elsif @__rescue_handlers.frozen?
    @__rescue_handlers = @__rescue_handlers.dup
  end
  @__rescue_handlers.unshift([klasses, with])
end

def set_up_periodic_timers

def set_up_periodic_timers
  return if @__periodic_timers_set_up
  @__channels = Set.new
  @__periodic_timers.each do |callback, every|
    ::Iodine.run_every((every * 1000).to_i) do
      slice_length = (@__channels.length / 20.0).ceil
      if slice_length != 0
        @__channels.each_slice(slice_length) do |slice|
          Fiber.schedule do
            slice.each { |channel| callback.call(channel) }
          rescue => e
            Rage.logger.error("Unhandled exception has occured - #{e.class} (#{e.message}):\n#{e.backtrace.join("\n")}")
          end
        end
      end
    end
  end
  @__periodic_timers_set_up = true
end

def stream_for(streamable)

Other tags:
    Example: Broadcast to the stream -
    Example: Subscribe to a stream -

Raises:
  • (ArgumentError) - if the streamable object does not satisfy the type requirements

Parameters:
  • streamable (#id, String, Symbol, Numeric, Array) -- an object that will be used to generate the stream name
def stream_for(streamable)
  stream_from(self.class.__stream_name_for(streamable))
end

def stream_from(stream)

Other tags:
    Example: Broadcast to the stream -
    Example: Subscribe to a stream -

Raises:
  • (ArgumentError) - if the stream name is not a String

Parameters:
  • stream (String) -- the name of the stream
def stream_from(stream)
  raise ArgumentError, "Stream name must be a String" unless stream.is_a?(String)
  Rage.cable.__protocol.subscribe(@__connection, stream, @__params)
end

def subscribed

Called once a client has become a subscriber of the channel.
def subscribed
end

def subscription_rejected?

Returns:
  • (Boolean) -
def subscription_rejected?
  !!@__subscription_rejected
end

def transmit(data)

Parameters:
  • data (Object) -- the data to send to the client
def transmit(data)
  message = Rage.cable.__protocol.serialize(@__params, data)
  if @__is_subscribing
    # we expect a confirmation message to be sent as a result of a successful subscribe call;
    # this will make sure `transmit` calls send data after the confirmation;
    ::Iodine.defer { @__connection.write(message) }
  else
    @__connection.write(message)
  end
end

def unsubscribed

Called once a client unsubscribes from the channel.
def unsubscribed
end