class Clacky::Server::HttpServer::WebSocketConnection
the connection dead on timeout.
with an explicit deadline using IO.select + write_nonblock and declare
writes is best-effort and unreliable, so instead we bound every write
defaults are measured in hours). Thread#raise on blocking native socket
disconnects such as Wi-Fi handoff or NAT timeout, where TCP keepalive
thread indefinitely when the client’s receive buffer fills up (silent
send_json(). A blocking socket write with no deadline can pin the Agent
IMPORTANT: send_raw is called from the Agent thread via broadcast() →
Wraps a raw TCP socket, providing thread-safe WebSocket frame sending.
def self.apply_keepalive(socket)
peers are detected in minutes instead of the OS default of hours.
Enable TCP keepalive on the underlying socket so silently dead
def self.apply_keepalive(socket) return unless socket.respond_to?(:setsockopt) socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, true) # TCP-level keepalive tuning — constants vary by platform and are # only set when available. Values chosen to detect dead peers in # roughly 60-90 seconds total. if defined?(Socket::IPPROTO_TCP) # Idle time before first probe (Linux: TCP_KEEPIDLE, macOS: TCP_KEEPALIVE) idle_const = if Socket.const_defined?(:TCP_KEEPIDLE) Socket::TCP_KEEPIDLE elsif Socket.const_defined?(:TCP_KEEPALIVE) Socket::TCP_KEEPALIVE end socket.setsockopt(Socket::IPPROTO_TCP, idle_const, 60) if idle_const if Socket.const_defined?(:TCP_KEEPINTVL) socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_KEEPINTVL, 10) end if Socket.const_defined?(:TCP_KEEPCNT) socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_KEEPCNT, 3) end end rescue StandardError => e Clacky::Logger.debug("[WS] failed to set keepalive: #{e.class}: #{e.message}") end
def closed?
def closed? @closed end
def force_close!
Force-close the connection (used by the interrupt watchdog when an
def force_close! @closed = true @socket.close rescue StandardError # best effort end
def initialize(socket, version)
def initialize(socket, version) @socket = socket @version = version @send_mutex = Mutex.new @closed = false WebSocketConnection.apply_keepalive(socket) end
def send_json(data)
Send a JSON-serializable object over the WebSocket.
def send_json(data) send_raw(:text, JSON.generate(data)) rescue => e Clacky::Logger.debug("WS send error (connection dead): #{e.message}") false end
def send_raw(type, data)
the Agent thread) never blocks longer than SEND_DEADLINE, even if the
Uses write_nonblock with an overall deadline so the caller (typically
Returns true on success, false on broken/closed/sluggish socket.
Send a raw WebSocket frame.
def send_raw(type, data) started_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) @send_mutex.synchronize do return false if @closed outgoing = WebSocket::Frame::Outgoing::Server.new( version: @version, data: data, type: type ) bytes = outgoing.to_s unless write_with_deadline(bytes, SEND_DEADLINE) # Deadline exceeded — treat as a dead connection so broadcast # purges it and the Agent thread is freed immediately. @closed = true begin @socket.close rescue StandardError # ignore end Clacky::Logger.warn( "[WS] send_raw deadline exceeded — closing sluggish connection " \ "(bytes=#{bytes.bytesize}, deadline=#{SEND_DEADLINE}s)" ) return false end end elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - started_at if elapsed > SEND_SLOW_WARN Clacky::Logger.warn( "[WS] send_raw slow: #{elapsed.round(2)}s (type=#{type})" ) end true rescue Errno::EPIPE, Errno::ECONNRESET, IOError, Errno::EBADF => e @closed = true Clacky::Logger.debug("WS send_raw error (client disconnected): #{e.message}") false rescue => e @closed = true Clacky::Logger.debug("WS send_raw unexpected error: #{e.message}") false end
def write_with_deadline(data, deadline)
of *total* wall time across partial writes. Returns true on full
Write `data` to the underlying socket, bounded by `deadline` seconds
def write_with_deadline(data, deadline) ing = data ne_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) + deadline remaining.empty? _left = deadline_at - Process.clock_gettime(Process::CLOCK_MONOTONIC) rn false if time_left <= 0 n itten = @socket.write_nonblock(remaining, exception: false) ue Errno::EPIPE, Errno::ECONNRESET, IOError, Errno::EBADF ise written :wait_writable ady = IO.select(nil, [@socket], nil, [time_left, 0.25].min) Not ready → loop and re-check the overall deadline. xt unless ready Integer maining = remaining.byteslice(written, remaining.bytesize - written) Nil or unexpected — treat as dead. turn false