# frozen_string_literal: true# Copyright, 2017, by Samuel G. D. Williams. <http://www.codeotaku.com># # Permission is hereby granted, free of charge, to any person obtaining a copy# of this software and associated documentation files (the "Software"), to deal# in the Software without restriction, including without limitation the rights# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell# copies of the Software, and to permit persons to whom the Software is# furnished to do so, subject to the following conditions:# # The above copyright notice and this permission notice shall be included in# all copies or substantial portions of the Software.# # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN# THE SOFTWARE.require_relative'socket'require_relative'stream'require'fcntl'moduleAsyncmoduleIO# Asynchronous TCP socket wrapper.classTCPSocket<IPSocketwraps::TCPSocketdefinitialize(remote_host,remote_port=nil,local_host=nil,local_port=nil)ifremote_host.is_a?::TCPSocketsuper(remote_host)elseremote_address=Addrinfo.tcp(remote_host,remote_port)local_address=Addrinfo.tcp(local_host,local_port)iflocal_host# We do this unusual dance to avoid leaking an "open" socket instance.socket=Socket.connect(remote_address,local_address: local_address)fd=socket.fcntl(Fcntl::F_DUPFD)Console.logger.debug(self){"Connected to #{remote_address.inspect}: #{fd}"}socket.closesuper(::TCPSocket.for_fd(fd))# The equivalent blocking operation. Unfortunately there is no trivial way to make this non-blocking.# super(::TCPSocket.new(remote_host, remote_port, local_host, local_port))end@stream=Stream.new(self)endclass<<selfaliasopennewenddefclose@stream.flushsuperendincludePeerattr:stream# The way this buffering works is pretty atrocious.def_delegators:@stream,:gets,:putsdefsysread(size,buffer=nil)data=@stream.read_partial(size)ifbufferbuffer.replace(data)endreturndataendend# Asynchronous TCP server wrappper.classTCPServer<TCPSocketwraps::TCPServer,:listendefinitialize(*args)ifargs.first.is_a?::TCPServersuper(args.first)else# We assume this operation doesn't block (for long):super(::TCPServer.new(*args))endenddefaccept(timeout: nil,task: Task.current)peer,address=async_send(:accept_nonblock,timeout: timeout)wrapper=TCPSocket.new(peer)wrapper.timeout=self.timeoutreturnwrapper,addressunlessblock_given?beginyieldwrapper,addressensurewrapper.closeendendaliasaccept_nonblockacceptaliassysacceptacceptincludeServerendendend