class Rage::Cable::Channel
def __has_action?(action_name)
- Private: -
def __has_action?(action_name) !INTERNAL_ACTIONS.include?(action_name) && self.class.__prepared_actions.has_key?(action_name) end
def __prepare_id_method(method_name)
- Private: -
def __prepare_id_method(method_name) define_method(method_name) do @__identified_by[method_name] end end
def __register_action_proc(action_name)
- Private: -
def __register_action_proc(action_name) if action_name == :subscribed && @__hooks before_subscribe_chunk = if @__hooks[:before_subscribe] lines = @__hooks[:before_subscribe].map do |h| condition = if h[:if] && h[:unless] "if #{h[:if]} && !#{h[:unless]}" elsif h[:if] "if #{h[:if]}" elsif h[:unless] "unless #{h[:unless]}" end <<~RUBY #{h[:name]} #{condition} return if @__subscription_rejected RUBY end lines.join("\n") end after_subscribe_chunk = if @__hooks[:after_subscribe] lines = @__hooks[:after_subscribe].map do |h| condition = if h[:if] && h[:unless] "if #{h[:if]} && !#{h[:unless]}" elsif h[:if] "if #{h[:if]}" elsif h[:unless] "unless #{h[:unless]}" end <<~RUBY #{h[:name]} #{condition} RUBY end lines.join("\n") end end if action_name == :unsubscribed && @__hooks before_unsubscribe_chunk = if @__hooks[:before_unsubscribe] lines = @__hooks[:before_unsubscribe].map do |h| condition = if h[:if] && h[:unless] "if #{h[:if]} && !#{h[:unless]}" elsif h[:if] "if #{h[:if]}" elsif h[:unless] "unless #{h[:unless]}" end <<~RUBY #{h[:name]} #{condition} RUBY end lines.join("\n") end after_unsubscribe_chunk = if @__hooks[:after_unsubscribe] lines = @__hooks[:after_unsubscribe].map do |h| condition = if h[:if] && h[:unless] "if #{h[:if]} && !#{h[:unless]}" elsif h[:if] "if #{h[:if]}" elsif h[:unless] "unless #{h[:unless]}" end <<~RUBY #{h[:name]} #{condition} RUBY end lines.join("\n") end end rescue_handlers_chunk = if @__rescue_handlers lines = @__rescue_handlers.map do |klasses, handler| <<~RUBY rescue #{klasses.join(", ")} => __e #{instance_method(handler).arity == 0 ? handler : "#{handler}(__e)"} RUBY end lines.join("\n") else "" end periodic_timers_chunk = if @__periodic_timers set_up_periodic_timers if action_name == :subscribed <<~RUBY self.class.__channels << self unless subscription_rejected? RUBY elsif action_name == :unsubscribed <<~RUBY self.class.__channels.delete(self) RUBY end else "" end is_subscribing = action_name == :subscribed should_release_connections = Rage.config.internal.should_manually_release_ar_connections? method_name = class_eval <<~RUBY, __FILE__, __LINE__ + 1 def __run_#{action_name}(data) #{if is_subscribing <<~RUBY @__is_subscribing = true RUBY end} #{before_subscribe_chunk} #{before_unsubscribe_chunk} #{if instance_method(action_name).arity == 0 <<~RUBY #{action_name} RUBY else <<~RUBY #{action_name}(data) RUBY end} #{after_subscribe_chunk} #{after_unsubscribe_chunk} #{periodic_timers_chunk} #{rescue_handlers_chunk} #{if should_release_connections <<~RUBY ensure ActiveRecord::Base.connection_handler.clear_active_connections!(:all) RUBY end} end RUBY eval("->(channel, data) { channel.#{method_name}(data) }") end
def __register_actions
- Private: -
def __register_actions actions = ( public_instance_methods(true) - Rage::Cable::Channel.public_instance_methods(true) ).reject { |m| m.start_with?("__rage_tmp") || m.start_with?("__run") } actions.reject! { |m| m != :receive } unless Rage.cable.__protocol.supports_rpc? @__prepared_actions = (INTERNAL_ACTIONS + actions).each_with_object({}) do |action_name, memo| memo[action_name] = __register_action_proc(action_name) end actions - INTERNAL_ACTIONS end
def __run_action(action_name, data = nil)
- Private: -
def __run_action(action_name, data = nil) Rage::Telemetry.tracer.span_cable_action_process(channel: self, action: action_name, data:) do self.class.__prepared_actions[action_name].call(self, data) end end
def __stream_name_for(streamables)
- Private: -
def __stream_name_for(streamables) stream_name = Array(streamables).map do |streamable| if streamable.respond_to?(:id) "#{streamable.class.name}:#{streamable.id}" elsif streamable.is_a?(String) || streamable.is_a?(Symbol) || streamable.is_a?(Numeric) streamable else raise ArgumentError, "Unable to generate stream name. Expected an object that responds to `id`, got: #{streamable.class}" end end "#{name}:#{stream_name.join(":")}" end
def add_action(action_type, action_name = nil, **opts, &block)
def add_action(action_type, action_name = nil, **opts, &block) if block_given? action_name = define_tmp_method(block) elsif action_name.nil? raise ArgumentError, "No handler provided. Pass the `action_name` parameter or provide a block." end _if, _unless = opts.values_at(:if, :unless) action = { name: action_name, if: _if, unless: _unless } action[:if] = define_tmp_method(action[:if]) if action[:if].is_a?(Proc) action[:unless] = define_tmp_method(action[:unless]) if action[:unless].is_a?(Proc) if @__hooks.nil? @__hooks = {} elsif @__hooks[action_type] && @__hooks.frozen? @__hooks = @__hooks.dup @__hooks[action_type] = @__hooks[action_type].dup end if @__hooks[action_type].nil? @__hooks[action_type] = [action] elsif (i = @__hooks[action_type].find_index { |a| a[:name] == action_name }) @__hooks[action_type][i] = action else @__hooks[action_type] << action end end
def after_subscribe(action_name = nil, **opts, &block)
- Note: - This callback will be triggered even if the subscription was rejected with the {reject} method.
def after_subscribe(action_name = nil, **opts, &block) add_action(:after_subscribe, action_name, **opts, &block) end
def after_unsubscribe(action_name = nil, **opts, &block)
def after_unsubscribe(action_name = nil, **opts, &block) add_action(:after_unsubscribe, action_name, **opts, &block) end
def before_subscribe(action_name = nil, **opts, &block)
@example
end
...
before_subscribe do
@example
before_subscribe :my_method
@example
Register a new `before_subscribe` hook that will be called before the {subscribed} method.
def before_subscribe(action_name = nil, **opts, &block) add_action(:before_subscribe, action_name, **opts, &block) end
def before_unsubscribe(action_name = nil, **opts, &block)
def before_unsubscribe(action_name = nil, **opts, &block) add_action(:before_unsubscribe, action_name, **opts, &block) end
def broadcast(stream, data)
-
data(Object) -- the data to send to the clients -
stream(String) -- the name of the stream
def broadcast(stream, data) Rage.cable.broadcast(stream, data) end
def broadcast_to(streamable, data)
-
(ArgumentError)- if the streamable object does not satisfy the type requirements
Parameters:
-
data(Object) -- the data to send to the clients -
streamable(#id, String, Symbol, Numeric, Array) -- an object that will be used to generate the stream name
def broadcast_to(streamable, data) Rage.cable.broadcast(__stream_name_for(streamable), data) end
def define_tmp_method(block)
def define_tmp_method(block) name = @@__tmp_name_seed.next.join define_method("__rage_tmp_#{name}", block) end
def inherited(klass)
def inherited(klass) klass.__hooks = @__hooks.freeze klass.__rescue_handlers = @__rescue_handlers.freeze klass.__periodic_timers = @__periodic_timers.freeze end
def initialize(connection, params, identified_by)
- Private: -
def initialize(connection, params, identified_by) @__connection = connection @__params = params @__identified_by = identified_by end
def params
-
(Hash{Symbol=>String,Array,Hash,Numeric,NilClass,TrueClass,FalseClass})-
def params @__params end
def periodically(method_name = nil, every:, &block)
-
every(Integer) -- the calling period in seconds -
method_name(Symbol, nil) -- the name of the method to call
def periodically(method_name = nil, every:, &block) callback_name = if block_given? raise ArgumentError, "Pass the `method_name` argument or provide a block, not both" if method_name define_tmp_method(block) elsif method_name.is_a?(Symbol) define_tmp_method(eval("-> { #{method_name} }")) else raise ArgumentError, "Expected a Symbol method name, got #{method_name.inspect}" end unless every.is_a?(Numeric) && every > 0 raise ArgumentError, "Expected every: to be a positive number of seconds, got #{every.inspect}" end callback = eval("->(channel) { channel.#{callback_name} }") if @__periodic_timers.nil? @__periodic_timers = [] elsif @__periodic_timers.frozen? @__periodic_timers = @__periodic_timers.dup end @__periodic_timers << [callback, every] end
def reject
Reject the subscription request. The method should only be called during the subscription
def reject @__subscription_rejected = true end
def rescue_from(*klasses, with: nil, &block)
-
with(Symbol) -- the name of a handler method. The method can take one argument, which is the raised exception. Alternatively, you can pass a block, which can also take one argument. -
klasses(Class, Array) -- exception classes to watch on
def rescue_from(*klasses, with: nil, &block) unless with if block_given? with = define_tmp_method(block) else raise ArgumentError, "No handler provided. Pass the `with` keyword argument or provide a block." end end if @__rescue_handlers.nil? @__rescue_handlers = [] elsif @__rescue_handlers.frozen? @__rescue_handlers = @__rescue_handlers.dup end @__rescue_handlers.unshift([klasses, with]) end
def set_up_periodic_timers
def set_up_periodic_timers return if @__periodic_timers_set_up @__channels = Set.new @__periodic_timers.each do |callback, every| ::Iodine.run_every((every * 1000).to_i) do slice_length = (@__channels.length / 20.0).ceil if slice_length != 0 @__channels.each_slice(slice_length) do |slice| Fiber.schedule do slice.each { |channel| callback.call(channel) } rescue => e Rage.logger.error("Unhandled exception has occured - #{e.class} (#{e.message}):\n#{e.backtrace.join("\n")}") end end end end end @__periodic_timers_set_up = true end
def stream_for(streamable)
- Example: Broadcast to the stream -
Example: Subscribe to a stream -
Raises:
-
(ArgumentError)- if the streamable object does not satisfy the type requirements
Parameters:
-
streamable(#id, String, Symbol, Numeric, Array) -- an object that will be used to generate the stream name
def stream_for(streamable) stream_from(self.class.__stream_name_for(streamable)) end
def stream_from(stream)
- Example: Broadcast to the stream -
Example: Subscribe to a stream -
Raises:
-
(ArgumentError)- if the stream name is not a String
Parameters:
-
stream(String) -- the name of the stream
def stream_from(stream) raise ArgumentError, "Stream name must be a String" unless stream.is_a?(String) Rage.cable.__protocol.subscribe(@__connection, stream, @__params) end
def subscribed
def subscribed end
def subscription_rejected?
-
(Boolean)-
def subscription_rejected? !!@__subscription_rejected end
def transmit(data)
-
data(Object) -- the data to send to the client
def transmit(data) message = Rage.cable.__protocol.serialize(@__params, data) if @__is_subscribing # we expect a confirmation message to be sent as a result of a successful subscribe call; # this will make sure `transmit` calls send data after the confirmation; ::Iodine.defer { @__connection.write(message) } else @__connection.write(message) end end
def unsubscribed
def unsubscribed end