lib/httpx/plugins/grpc/message.rb



# frozen_string_literal: true

module HTTPX
  module Plugins
    module GRPC
      # Encoding module for GRPC responses
      #
      # Can encode and decode grpc messages.
      module Message
        module_function

        # decodes a unary grpc response
        def unary(response)
          verify_status(response)
          decode(response.to_s, encodings: response.headers.get("grpc-encoding"), encoders: response.encoders)
        end

        # lazy decodes a grpc stream response
        def stream(response, &block)
          return enum_for(__method__, response) unless block

          response.each do |frame|
            decode(frame, encodings: response.headers.get("grpc-encoding"), encoders: response.encoders, &block)
          end

          verify_status(response)
        end

        # encodes a single grpc message
        def encode(bytes, deflater:)
          if deflater
            compressed_flag = 1
            bytes = deflater.deflate(StringIO.new(bytes))
          else
            compressed_flag = 0
          end

          "".b << [compressed_flag, bytes.bytesize].pack("CL>") << bytes.to_s
        end

        # decodes a single grpc message
        def decode(message, encodings:, encoders:)
          until message.empty?

            compressed, size = message.unpack("CL>")

            data = message.byteslice(5..size + 5 - 1)
            if compressed == 1
              encodings.reverse_each do |algo|
                next unless encoders.key?(algo)

                inflater = encoders[algo].inflater(size)
                data = inflater.inflate(data)
                size = data.bytesize
              end
            end

            return data unless block_given?

            yield data

            message = message.byteslice((size + 5)..-1)
          end
        end

        def cancel(request)
          request.emit(:refuse, :client_cancellation)
        end

        # interprets the grpc call trailing metadata, and raises an
        # exception in case of error code
        def verify_status(response)
          # return standard errors if need be
          response.raise_for_status

          status = Integer(response.headers["grpc-status"])
          message = response.headers["grpc-message"]

          return if status.zero?

          response.close
          raise GRPCError.new(status, message, response.trailing_metadata)
        end
      end
    end
  end
end