lib/pwn/plugins/sock.rb



# frozen_string_literal: true

require 'socket'
require 'openssl'

module PWN
  module Plugins
    # This plugin was created to support fuzzing various networking protocols
    module Sock
      @@logger = PWN::Plugins::PWNLogger.create

      # Supported Method Parameters::
      # sock_obj = PWN::Plugins::Sock.connect(
      #   target: 'required - target host or ip',
      #   port: 'required - target port',
      #   protocol: 'optional - :tcp || :udp (defaults to tcp)',
      #   tls: 'optional - boolean connect to target socket using TLS (defaults to false)'
      # )

      public_class_method def self.connect(opts = {})
        target = opts[:target].to_s.scrub
        port = opts[:port].to_i

        protocol = opts[:protocol]
        protocol ||= :tcp

        # TODO: Add proxy support

        tls = true if opts[:tls]
        tls ||= false

        tls_min_version = OpenSSL::SSL::TLS1_VERSION if tls_min_version.nil?

        case protocol
        when :tcp
          if tls
            sock = TCPSocket.open(target, port)
            tls_context = OpenSSL::SSL::SSLContext.new
            tls_context.set_params(verify_mode: OpenSSL::SSL::VERIFY_NONE)
            tls_context.min_version = tls_min_version
            # tls_context.ciphers = tls_context.ciphers.select do |cipher|
            #   cipher[1] == cipher_tls
            # end
            tls_sock = OpenSSL::SSL::SSLSocket.new(sock, tls_context)
            tls_sock.hostname = target
            sock_obj = tls_sock.connect
            sock_obj.sync_close = true
          else
            sock_obj = TCPSocket.open(target, port)
          end
        when :udp
          sock_obj = UDPSocket.new
          sock_obj.connect(target, port)
        else
          raise "Unsupported protocol: #{protocol}"
        end

        sock_obj
      rescue OpenSSL::SSL::SSLError => e
        case tls_min_version
        when OpenSSL::SSL::TLS1_VERSION
          puts 'Attempting OpenSSL::SSL::TLS1_1_VERSION...'
          # cipher_tls = 'TLSv1.0'
          tls_min_version = OpenSSL::SSL::TLS1_1_VERSION
        when OpenSSL::SSL::TLS1_1_VERSION
          puts 'Attempting OpenSSL::SSL::TLS1_2_VERSION...'
          # cipher_tls = 'TLSv1.2'
          tls_min_version = OpenSSL::SSL::TLS1_2_VERSION
        when OpenSSL::SSL::TLS1_2_VERSION
          puts 'Attempting OpenSSL::SSL::TLS1_3_VERSION...'
          # cipher_tls = 'TLSv1.3'
          tls_min_version = OpenSSL::SSL::TLS1_3_VERSION
        else
          tls_min_version = :abort
        end

        retry unless tls_min_version == :abort
        raise "\n#{e.inspect}" if tls_min_version == :abort
      rescue StandardError => e
        sock_obj = disconnect(sock_obj: sock_obj) unless sock_obj.nil?
        raise e
      end

      # Supported Method Parameters::
      # PWN::Plugins::Sock.get_random_unused_port(
      #   server_ip: 'optional - target host or ip to check (Defaults to 127.0.0.1)',
      #   protocol: 'optional - :tcp || :udp (defaults to tcp)'
      # )

      public_class_method def self.get_random_unused_port(opts = {})
        server_ip = opts[:server_ip]
        server_ip ||= '127.0.0.1'
        port = -1
        protocol = opts[:protocol]
        protocol ||= :tcp

        port_in_use = true
        while port_in_use
          port = Random.rand(1024..65_535)
          port_in_use = check_port_in_use(
            server_ip: server_ip,
            port: port,
            protocol: protocol
          )
        end

        port
      rescue Errno::ECONNREFUSED,
             Errno::EHOSTUNREACH,
             Errno::ETIMEDOUT
        false
      end

      # Supported Method Parameters::
      # PWN::Plugins::Sock.check_port_in_use(
      #   server_ip: 'optional - target host or ip to check (Defaults to 127.0.0.1)',
      #   port: 'required - target port',
      #   protocol: 'optional - :tcp || :udp (defaults to tcp)'
      # )

      public_class_method def self.check_port_in_use(opts = {})
        server_ip = opts[:server_ip]
        server_ip ||= '127.0.0.1'
        port = opts[:port]
        protocol = opts[:protocol]
        protocol ||= :tcp

        # TODO: Add proxy support

        ct = 1
        s = Socket.tcp(server_ip, port, connect_timeout: ct) if protocol == :tcp
        s = Socket.udp(server_ip, port, connect_timeout: ct) if protocol == :udp
        s.close

        true
      rescue Errno::ECONNREFUSED,
             Errno::EHOSTUNREACH,
             Errno::ETIMEDOUT
        false
      end

      # Supported Method Parameters::
      # PWN::Plugins::Sock.listen(
      #   server_ip: 'required - target host or ip to listen',
      #   port: 'required - target port',
      #   protocol: 'optional - :tcp || :udp (defaults to tcp)',
      #   tls: 'optional - boolean listen on TLS-enabled socket (defaults to false)'
      # )

      public_class_method def self.listen(opts = {})
        server_ip = opts[:server_ip].to_s.scrub
        port = opts[:port].to_i
        opts[:protocol].nil? ? protocol = :tcp : protocol = opts[:protocol].to_s.downcase.to_sym
        tls = true if opts[:tls]
        tls ||= false

        case protocol
        when :tcp
          if tls
            # Multi-threaded - Not working
            sock = TCPServer.open(server_ip, port)
            tls_context = OpenSSL::SSL::SSLContext.new
            tls_context.set_params(verify_mode: OpenSSL::SSL::VERIFY_NONE)
            listen_obj = OpenSSL::SSL::SSLServer.new(sock, tls_context)
            # loop do
            #   Thread.start(listen_obj.accept) do |client_thread|
            #     while (client_input = client_thread.gets)
            #       puts client_input
            #     end
            #     client_thread.close
            #   end
            # end
          else
            # Multi-threaded
            listen_obj = TCPServer.open(server_ip, port)
            loop do
              Thread.start(listen_obj.accept) do |client_thread|
                while (client_input = client_thread.gets)
                  puts client_input
                end
                client_thread.close
              end
            end
          end
        when :udp
          # Single Threaded
          listen_obj = UDPSocket.new
          listen_obj.bind(server_ip, port)
          while (client_input = listen_obj.recvmsg)
            puts client_input[0]
          end
        else
          raise "Unsupported protocol: #{protocol}"
        end
      rescue StandardError => e
        raise e
      ensure
        listen_obj = disconnect(sock_obj: listen_obj) unless listen_obj.nil?
      end

      # Supported Method Parameters::
      # cert_obj = PWN::Plugins::Sock.get_tls_cert(
      #   target: 'required - target host or ip',
      #   port: 'optional - target port (defaults to 443)'
      # )

      public_class_method def self.get_tls_cert(opts = {})
        target = opts[:target].to_s.scrub
        port = opts[:port]
        port ||= 443

        tls_sock_obj = connect(
          target: target,
          port: port,
          protocol: :tcp,
          tls: true
        )
        tls_sock_obj.peer_cert
      rescue StandardError => e
        raise e
      ensure
        tls_sock_obj = disconnect(sock_obj: tls_sock_obj) unless tls_sock_obj.nil?
      end

      # Supported Method Parameters::
      # sock_obj = PWN::Plugins::Sock.disconnect(
      #   sock_obj: 'required - sock_obj returned from #connect method'
      # )

      public_class_method def self.disconnect(opts = {})
        sock_obj = opts[:sock_obj]
        sock_obj.close
        sock_obj = nil
      rescue StandardError => e
        raise e
      end

      # Author(s):: 0day Inc. <request.pentest@0dayinc.com>

      public_class_method def self.authors
        "AUTHOR(S):
          0day Inc. <request.pentest@0dayinc.com>
        "
      end

      # Display Usage for this Module

      public_class_method def self.help
        puts "USAGE:
          sock_obj = #{self}.connect(
            target: 'required - target host or ip',
            port: 'required - target port',
            protocol: 'optional - :tcp || :udp (defaults to tcp)',
            tls: 'optional - boolean connect to target socket using TLS (defaults to false)'
          )

          port = #{self}.get_random_unused_port(
            server_ip: 'optional - target host or ip to check (Defaults to 127.0.0.1)',
            protocol: 'optional - :tcp || :udp (defaults to tcp)'
          )

          #{self}.check_port_in_use(
            server_ip: 'optional - target host or ip to check (Defaults to 127.0.0.1)',
            port: 'required - target port',
            protocol: 'optional - :tcp || :udp (defaults to tcp)'
          )

          #{self}.listen(
            server_ip: 'required - target host or ip to listen',
            port: 'required - target port',
            protocol: 'optional - :tcp || :udp (defaults to tcp)',
            tls: 'optional - boolean listen on TLS-enabled socket (defaults to false)'
          )

          cert_obj = #{self}.get_tls_cert(
            target: 'required - target host or ip',
            port: 'optional - target port (defaults to 443)'
          )

          sock_obj = #{self}.disconnect(
            sock_obj: 'required - sock_obj returned from #connect method'
          )

          #{self}.authors
        "
      end
    end
  end
end