class EventMachine::Synchrony::TCPSocket
def close
def close close_connection true @in_req = @out_req = nil end
def closed?
def closed? @in_req.nil? && @out_req.nil? end
def connection_completed
def connection_completed @in_req.succeed self end
def new(*args)
def new(*args) if args.size == 1 _old_new *args else socket = EventMachine::connect( *args[0..1], self ) raise SocketError unless socket.sync(:in) # wait for connection socket end end
def post_init
def post_init @in_buff, @out_buff = '', '' @in_req = @out_req = nil end
def read(num_bytes = 16*1024, dest = nil)
def read(num_bytes = 16*1024, dest = nil) read_data(num_bytes, dest) or sync(:in) or raise(IOError) end
def read_data(num_bytes = nil, dest = nil)
def read_data(num_bytes = nil, dest = nil) @read_bytes = num_bytes if num_bytes @read_dest = dest if dest if @in_buff.size > 0 data = @in_buff.slice!(0, @read_bytes) @read_bytes = 0 if @read_dest @read_dest.replace data @read_dest = nil end data else nil end end
def receive_data(data)
def receive_data(data) @in_buff << data if @in_req && (data = read_data) @in_req.succeed data end end
def send(msg, flags = 0)
def send(msg, flags = 0) raise "Unknown flags in send(): #{flags}" if flags.nonzero? len = msg.bytesize write_data(msg) or sync(:out) or raise(IOError) len end
def setsockopt(level, name, value); end
def setsockopt(level, name, value); end
def sync(direction)
def sync(direction) req = self.instance_variable_set "@#{direction.to_s}_req", EventMachine::DefaultDeferrable.new EventMachine::Synchrony.sync req end
def unbind
def unbind @in_req.fail nil if @in_req @out_req.fail nil if @out_req end
def write_data(data = nil)
def write_data(data = nil) @out_buff += data if data loop do if @out_buff.empty? @out_req.succeed true if @out_req return true end if self.get_outbound_data_size > EventMachine::FileStreamer::BackpressureLevel EventMachine::next_tick { write_data } return false else len = [@out_buff.bytesize, EventMachine::FileStreamer::ChunkSize].min self.send_data @out_buff.slice!( 0, len ) end end end