class GraphQL::Subscriptions::ActionCableSubscriptions
end
end
end
raise ArgumentError, “Unexpected parameter: #{ambiguous_param}”
else
{}
when nil
ambiguous_param
when Hash, ActionController::Parameters
end
{}
else
ensure_hash(JSON.parse(ambiguous_param))
if ambiguous_param.present?
when String
case ambiguous_param
def ensure_hash(ambiguous_param)
private
end
}
MySchema.subscriptions.delete_subscription(sid)
@subscription_ids.each { |sid|
def unsubscribed
end
transmit(payload)
end
@subscription_ids << result.context[:subscription_id]
if result.context[:subscription_id]
# on unsubscribe.
# Track the subscription here so we can remove it
}
more: result.subscription?,
result: result.to_h,
payload = {
)
operation_name: operation_name
variables: variables,
context: context,
query,
result = MySchema.execute(
}
channel: self,
# Make sure the channel is in the context
# current_user: current_user,
# in this channel or ApplicationCable::Channel
# Re-implement whatever context methods you need
context = {
operation_name = data[“operationName”]
variables = ensure_hash(data)
query = data[“query”]
def execute(data)
end
@subscription_ids = []
def subscribed
class GraphqlChannel < ApplicationCable::Channel
@example Implementing a channel for GraphQL Subscriptions
end
use GraphQL::Subscriptions::ActionCableSubscriptions
# …
class MySchema < GraphQL::Schema
@example Adding ActionCableSubscriptions to your schema
- Avoid the async ActionCable adapter and use the redis or PostgreSQL adapters instead. Otherwise calling #trigger won’t work from background jobs or the Rails console.
- Take care to reload context when re-delivering the subscription. (see {Query#subscription_update?})
- No queueing system; ActiveJob should be added
Some things to keep in mind:
as ActionCable broadcastings.
A subscriptions implementation that sends data
def delete_subscription(subscription_id)
def delete_subscription(subscription_id) query = @subscriptions.delete(subscription_id) # In case this came from the server, tell the client to unsubscribe: @action_cable.server.broadcast(stream_subscription_name(subscription_id), { more: false }) # This can be `nil` when `.trigger` happens inside an unsubscribed ActionCable channel, # see https://github.com/rmosolgo/graphql-ruby/issues/2478 if query events = query.context.namespace(:subscriptions)[:events] events.each do |event| ev_by_fingerprint = @events[event.topic] ev_for_fingerprint = ev_by_fingerprint[event.fingerprint] ev_for_fingerprint.delete(event) if ev_for_fingerprint.empty? ev_by_fingerprint.delete(event.fingerprint) end end end end
def deliver(subscription_id, result)
This subscription was re-evaluated.
def deliver(subscription_id, result) has_more = !result.context.namespace(:subscriptions)[:final_update] payload = { result: result.to_h, more: has_more } @action_cable.server.broadcast(stream_subscription_name(subscription_id), payload) end
def execute_all(event, object)
An event was triggered; Push the data over ActionCable.
def execute_all(event, object) stream = stream_event_name(event) message = @serializer.dump(object) @action_cable.server.broadcast(stream, message) end
def initialize(serializer: Serialize, namespace: '', action_cable: ActionCable, action_cable_coder: ActiveSupport::JSON, **rest)
-
namespace
(string
) -- Used to namespace events and subscriptions (default: '') -
serializer
(<#dump(obj), #load(string)] Used for serializing messages before handing them to `.broadcast(msg)`
) -- erializer [<#dump(obj), #load(string)] Used for serializing messages before handing them to `.broadcast(msg)`
def initialize(serializer: Serialize, namespace: '', action_cable: ActionCable, action_cable_coder: ActiveSupport::JSON, **rest) # A per-process map of subscriptions to deliver. # This is provided by Rails, so let's use it @subscriptions = Concurrent::Map.new @events = Concurrent::Map.new do |h, k| h.compute_if_absent(k) do Concurrent::Map.new do |h2, k2| h2.compute_if_absent(k2) { Concurrent::Array.new } end end end @action_cable = action_cable @action_cable_coder = action_cable_coder @serializer = serializer @serialize_with_context = case @serializer.method(:load).arity when 1 false when 2 true else raise ArgumentError, "#{@serializer} must repond to `.load` accepting one or two arguments" end @transmit_ns = namespace super end
def load_action_cable_message(message, context)
-
context
(GraphQL::Query::Context
) -- the context of the first event for a given subscription fingerprint -
message
(String
) -- n ActionCable-broadcasted string (JSON)
def load_action_cable_message(message, context) if @serialize_with_context @serializer.load(message, context) else @serializer.load(message) end end
def read_subscription(subscription_id)
def read_subscription(subscription_id) query = @subscriptions[subscription_id] if query.nil? # This can happen when a subscription is triggered from an unsubscribed channel, # see https://github.com/rmosolgo/graphql-ruby/issues/2478. # (This `nil` is handled by `#execute_update`) nil else { query_string: query.query_string, variables: query.provided_variables, context: query.context.to_h, operation_name: query.operation_name, } end end
def setup_stream(channel, initial_event)
the one to build and publish payloads.
let the listener belonging to the first event on the list be
To make sure there's always one-and-only-one channel building payloads,
be ready to take over the primary position.
But the problem is, any channel could close at any time, so each channel has to
all subscribers.
This is so we can reuse payloads when possible, and make one payload to send to
Every subscribing channel is listening here, but only one of them takes any action.
def setup_stream(channel, initial_event) topic = initial_event.topic channel.stream_from(stream_event_name(initial_event), coder: @action_cable_coder) do |message| events_by_fingerprint = @events[topic] object = nil events_by_fingerprint.each do |_fingerprint, events| if events.any? && events.first == initial_event # The fingerprint has told us that this response should be shared by all subscribers, # so just run it once, then deliver the result to every subscriber first_event = events.first first_subscription_id = first_event.context.fetch(:subscription_id) object ||= load_action_cable_message(message, first_event.context) result = execute_update(first_subscription_id, first_event, object) if !result.nil? # Having calculated the result _once_, send the same payload to all subscribers events.each do |event| subscription_id = event.context.fetch(:subscription_id) deliver(subscription_id, result) end end end end nil end end
def stream_event_name(event)
def stream_event_name(event) [EVENT_PREFIX, @transmit_ns, event.topic].join end
def stream_subscription_name(subscription_id)
def stream_subscription_name(subscription_id) [SUBSCRIPTION_PREFIX, @transmit_ns, subscription_id].join end
def write_subscription(query, events)
It will receive notifications when events come in
Store them in memory in _this_ ActionCable frontend.
A query was run where these events were subscribed to.
def write_subscription(query, events) unless (channel = query.context[:channel]) raise GraphQL::Error, "This GraphQL Subscription client does not support the transport protocol expected"\ "by the backend Subscription Server implementation (graphql-ruby ActionCableSubscriptions in this case)."\ "Some official client implementation including Apollo (https://graphql-ruby.org/javascript_client/apollo_subscriptions.html), "\ "Relay Modern (https://graphql-ruby.org/javascript_client/relay_subscriptions.html#actioncable)."\ "GraphiQL via `graphiql-rails` may not work out of box (#1051)." end subscription_id = query.context[:subscription_id] ||= build_id stream = stream_subscription_name(subscription_id) channel.stream_from(stream) @subscriptions[subscription_id] = query events.each do |event| # Setup a new listener to run all events with this topic in this process setup_stream(channel, event) # Add this event to the list of events to be updated @events[event.topic][event.fingerprint] << event end end