lib/semian/grpc.rb



# frozen_string_literal: true

require "semian/adapter"
require "grpc"

module GRPC
  GRPC::Unavailable.include(::Semian::AdapterError)
  GRPC::Unknown.include(::Semian::AdapterError)
  GRPC::ResourceExhausted.include(::Semian::AdapterError)

  class SemianError < GRPC::Unavailable
    attr_reader :details

    def initialize(semian_identifier, *args)
      super(*args)
      @details = message
      @semian_identifier = semian_identifier
    end
  end

  ResourceBusyError = Class.new(SemianError)
  CircuitOpenError = Class.new(SemianError)
end

module Semian
  module GRPC
    include Semian::Adapter

    ResourceBusyError = ::GRPC::ResourceBusyError
    CircuitOpenError = ::GRPC::CircuitOpenError

    class SemianConfigurationChangedError < RuntimeError
      def initialize(msg = "Cannot re-initialize semian_configuration")
        super
      end
    end

    class << self
      attr_reader :semian_configuration

      # rubocop:disable ThreadSafety/ClassInstanceVariable
      def semian_configuration=(configuration)
        # Only allow setting the configuration once in boot time
        raise Semian::GRPC::SemianConfigurationChangedError unless @semian_configuration.nil?

        @semian_configuration = configuration
      end

      def retrieve_semian_configuration(host)
        @semian_configuration.call(host) if @semian_configuration.respond_to?(:call)
      end
      # rubocop:enable ThreadSafety/ClassInstanceVariable
    end

    def raw_semian_options
      @raw_semian_options ||= begin
        # If the host is empty, it's possible that the adapter was initialized
        # with the channel. Therefore, we look into the channel to find the host
        host = if @host.empty?
          @ch.target
        else
          @host
        end
        @raw_semian_options = Semian::GRPC.retrieve_semian_configuration(host)
        @raw_semian_options = @raw_semian_options.dup unless @raw_semian_options.nil?
      end
    end

    def semian_identifier
      @semian_identifier ||= raw_semian_options[:name]
    end

    def resource_exceptions
      [
        ::GRPC::DeadlineExceeded,
        ::GRPC::ResourceExhausted,
        ::GRPC::Unavailable,
        ::GRPC::Unknown,
      ]
    end

    def disabled?
      raw_semian_options.nil?
    end

    def request_response(*, **)
      return super if disabled?

      acquire_semian_resource_grpc(scope: :request_response) { super }
    end

    def client_streamer(*, **)
      return super if disabled?

      acquire_semian_resource_grpc(scope: :client_streamer) { super }
    end

    def server_streamer(*, **)
      return super if disabled?

      acquire_semian_resource_grpc(scope: :server_streamer) { super }
    end

    def bidi_streamer(*, **)
      return super if disabled?

      acquire_semian_resource_grpc(scope: :bidi_streamer) { super }
    end

    def acquire_semian_resource_grpc(scope:)
      acquire_semian_resource(adapter: :grpc, scope: scope) do
        result = yield
        handle_operation(result, scope) if result.is_a?(::GRPC::ActiveCall::Operation)
        result
      end
    end

    def handle_operation(operation, scope)
      execute = operation.singleton_method(:execute)
      operation.instance_variable_set(:@semian, self)
      operation.define_singleton_method(:execute) do
        @semian.send(:acquire_semian_resource, adapter: :grpc, scope: scope) { execute.call }
      end
    end
  end
end

::GRPC::ClientStub.prepend(Semian::GRPC)