lib/protobuf/rpc/service_directory.rb



require 'delegate'
require 'singleton'
require 'socket'
require 'set'
require 'thread'
require 'timeout'

require 'protobuf/rpc/dynamic_discovery.pb'

module Protobuf
  module Rpc
    class ServiceDirectory
      include ::Singleton
      include ::Protobuf::Logging

      DEFAULT_ADDRESS = "0.0.0.0"
      DEFAULT_PORT = 53000
      DEFAULT_TIMEOUT = 1

      class Listing < SimpleDelegator
        attr_reader :expires_at

        def initialize(server)
          update(server)
        end

        def current?
          !expired?
        end

        def eql?(other)
          uuid.eql?(other.uuid)
        end

        def expired?
          Time.now.to_i >= @expires_at
        end

        def hash
          uuid.hash
        end

        def ttl
          [super.to_i, 1].max
        end

        def update(server)
          __setobj__(server)
          @expires_at = Time.now.to_i + ttl
        end
      end

      # Class Methods
      #
      class << self
        attr_writer :address, :port
      end

      def self.address
        @address ||= DEFAULT_ADDRESS
      end

      def self.port
        @port ||= DEFAULT_PORT
      end

      def self.start
        yield(self) if block_given?
        instance.start
      end

      def self.stop
        instance.stop
      end

      #
      # Instance Methods
      #
      def initialize
        reset
      end

      def all_listings_for(service)
        if running? && @listings_by_service.key?(service.to_s)
          start_listener_thread if listener_dead?
          @listings_by_service[service.to_s].entries.shuffle
        else
          []
        end
      end

      def each_listing(&block)
        start_listener_thread if listener_dead?
        @listings_by_uuid.each_value(&block)
      end

      def lookup(service)
        return unless running?
        start_listener_thread if listener_dead?
        return unless @listings_by_service.key?(service.to_s)
        @listings_by_service[service.to_s].entries.sample
      end

      def listener_dead?
        @thread.nil? || !@thread.alive?
      end

      def restart
        stop
        start
      end

      def running?
        !!@running
      end

      def start
        unless running?
          init_socket
          logger.info { sign_message("listening to udp://#{self.class.address}:#{self.class.port}") }
          @running = true
        end

        start_listener_thread if listener_dead?
        self
      end

      def start_listener_thread
        return if @thread.try(:alive?)
        @thread = Thread.new { send(:run) }
      end

      def stop
        logger.info { sign_message("Stopping directory") }

        @running = false
        @thread.try(:kill).try(:join)
        @socket.try(:close)

        reset
      end

      private

      def add_or_update_listing(uuid, server)
        listing = @listings_by_uuid[uuid]

        if listing
          action = :updated
          listing.update(server)
        else
          action = :added
          listing = Listing.new(server)
          @listings_by_uuid[uuid] = listing
        end

        listing.services.each do |service|
          @listings_by_service[service] << listing
        end

        trigger(action, listing)
        logger.debug { sign_message("#{action} server: #{server.inspect}") }
      end

      def init_socket
        @socket = UDPSocket.new
        @socket.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_REUSEADDR, true)

        if defined?(::Socket::SO_REUSEPORT)
          @socket.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_REUSEPORT, true)
        end

        @socket.bind(self.class.address, self.class.port.to_i)
      end

      def process_beacon(beacon)
        server = beacon.server
        uuid = server.try(:uuid)

        if server && uuid
          case beacon.beacon_type
          when ::Protobuf::Rpc::DynamicDiscovery::BeaconType::HEARTBEAT
            add_or_update_listing(uuid, server)
          when ::Protobuf::Rpc::DynamicDiscovery::BeaconType::FLATLINE
            remove_listing(uuid)
          end
        else
          logger.info { sign_message("Ignoring incomplete beacon: #{beacon.inspect}") }
        end
      end

      def read_beacon
        data, addr = @socket.recvfrom(2048)

        beacon = ::Protobuf::Rpc::DynamicDiscovery::Beacon.decode(data)

        # Favor the address captured by the socket
        beacon.try(:server).try(:address=, addr[3])

        beacon
      end

      def remove_expired_listings
        logger.debug { sign_message("Removing expired listings") }
        @listings_by_uuid.each do |uuid, listing|
          remove_listing(uuid) if listing.expired?
        end
      end

      def remove_listing(uuid)
        listing = @listings_by_uuid[uuid] || return

        logger.debug { sign_message("Removing listing: #{listing.inspect}") }

        @listings_by_service.each_value do |listings|
          listings.delete(listing)
        end

        trigger(:removed, @listings_by_uuid.delete(uuid))
      end

      def reset
        @thread = nil
        @socket = nil
        @listings_by_uuid = {}
        @listings_by_service = Hash.new { |h, k| h[k] = Set.new }
      end

      def run
        sweep_interval = 5 # sweep expired listings every 5 seconds
        next_sweep = Time.now.to_i + sweep_interval

        loop do
          timeout = [next_sweep - Time.now.to_i, 0.1].max
          readable = IO.select([@socket], nil, nil, timeout)
          process_beacon(read_beacon) if readable

          if Time.now.to_i >= next_sweep
            remove_expired_listings
            next_sweep = Time.now.to_i + sweep_interval
          end
        end
      rescue => e
        logger.debug { sign_message("ERROR: (#{e.class}) #{e.message}\n#{e.backtrace.join("\n")}") }
        retry
      end

      def trigger(action, listing)
        ::ActiveSupport::Notifications.instrument("directory.listing.#{action}", :listing => listing)
      end
    end
  end
end