class GraphQL::Subscriptions::ActionCableSubscriptions


end
end
end
raise ArgumentError, “Unexpected parameter: #{ambiguous_param}”
else
{}
when nil
ambiguous_param
when Hash, ActionController::Parameters
end
{}
else
ensure_hash(JSON.parse(ambiguous_param))
if ambiguous_param.present?
when String
case ambiguous_param
def ensure_hash(ambiguous_param)
private
end
}
MySchema.subscriptions.delete_subscription(sid)
@subscription_ids.each { |sid|
def unsubscribed
end
transmit(payload)
end
@subscription_ids << result.context[:subscription_id]
if result.context[:subscription_id]
# on unsubscribe.
# Track the subscription here so we can remove it
}
more: result.subscription?,
result: result.subscription? ? { data: nil } : result.to_h,
payload = {
})
operation_name: operation_name
variables: variables,
context: context,
query: query,
result = MySchema.execute({
}
channel: self,
# Make sure the channel is in the context
# current_user: current_user,
# in this channel or ApplicationCable::Channel
# Re-implement whatever context methods you need
context = {
operation_name = data[“operationName”]
variables = ensure_hash(data)
query = data[“query”]
def execute(data)
end
@subscription_ids = []
def subscribed
class GraphqlChannel < ApplicationCable::Channel
@example Implementing a channel for GraphQL Subscriptions
end
use GraphQL::Subscriptions::ActionCableSubscriptions
# …
MySchema = GraphQL::Schema.define do
@example Adding ActionCableSubscriptions to your schema
- Take care to reload context when re-delivering the subscription. (see {Query#subscription_update?})
- No queueing system; ActiveJob should be added
Experimental, some things to keep in mind:
as ActionCable broadcastings.
A subscriptions implementation that sends data

def delete_subscription(subscription_id)

The channel was closed, forget about it.
def delete_subscription(subscription_id)
  @subscriptions.delete(subscription_id)
end

def deliver(subscription_id, result)

Send it to the specific stream where this client was waiting.
This subscription was re-evaluated.
def deliver(subscription_id, result)
  payload = { result: result.to_h, more: true }
  ActionCable.server.broadcast(SUBSCRIPTION_PREFIX + subscription_id, payload)
end

def execute_all(event, object)

Subscribers will re-evaluate locally.
An event was triggered; Push the data over ActionCable.
def execute_all(event, object)
  stream = EVENT_PREFIX + event.topic
  message = @serializer.dump(object)
  ActionCable.server.broadcast(stream, message)
end

def initialize(serializer: Serialize, **rest)

Parameters:
  • serializer (<#dump(obj), #load(string)] Used for serializing messages before handing them to `.broadcast(msg)`) -- erializer [<#dump(obj), #load(string)] Used for serializing messages before handing them to `.broadcast(msg)`
def initialize(serializer: Serialize, **rest)
  # A per-process map of subscriptions to deliver.
  # This is provided by Rails, so let's use it
  @subscriptions = Concurrent::Map.new
  @serializer = serializer
  super
end

def read_subscription(subscription_id)

Return the query from "storage" (in memory)
def read_subscription(subscription_id)
  query = @subscriptions[subscription_id]
  {
    query_string: query.query_string,
    variables: query.provided_variables,
    context: query.context.to_h,
    operation_name: query.operation_name,
  }
end

def write_subscription(query, events)

and re-evaluate the query locally.
It will receive notifications when events come in
Store them in memory in _this_ ActionCable frontend.
A query was run where these events were subscribed to.
def write_subscription(query, events)
  channel = query.context.fetch(:channel)
  subscription_id = query.context[:subscription_id] ||= build_id
  stream = query.context[:action_cable_stream] ||= SUBSCRIPTION_PREFIX + subscription_id
  channel.stream_from(stream)
  @subscriptions[subscription_id] = query
  events.each do |event|
    channel.stream_from(EVENT_PREFIX + event.topic, coder: ActiveSupport::JSON) do |message|
      execute(subscription_id, event, @serializer.load(message))
      nil
    end
  end
end