class Clacky::Channel::Adapters::Feishu::WSClient
method=0 → control (ping/pong/handshake), method=1 → data (event)
payloadType(7), payload(8), LogIDNew(9)
Frame fields: SeqID(1), LogID(2), service(3), method(4), headers(5),
Feishu uses protobuf-encoded binary frames (pbbp2.Frame) over WebSocket.
WebSocket client for Feishu long connection mode.
def connect_and_listen
def connect_and_listen Clacky::Logger.info("[feishu-ws] Fetching WebSocket endpoint...") endpoint = fetch_ws_endpoint Clacky::Logger.info("[feishu-ws] Connecting to #{endpoint.split("?").first}") uri = URI.parse(endpoint) port = uri.port || (uri.scheme == "wss" ? 443 : 80) tcp = TCPSocket.new(uri.host, port) socket = if uri.scheme == "wss" require "openssl" ssl_context = OpenSSL::SSL::SSLContext.new ssl_context.set_params(verify_mode: OpenSSL::SSL::VERIFY_PEER) ssl = OpenSSL::SSL::SSLSocket.new(tcp, ssl_context) ssl.sync_close = true ssl.connect ssl else tcp end # WebSocket handshake handshake = WebSocket::Handshake::Client.new(url: endpoint) socket.write(handshake.to_s) # Read until handshake complete until handshake.finished? handshake << socket.readpartial(4096) end raise "WebSocket handshake failed" unless handshake.valid? Clacky::Logger.info("[feishu-ws] WebSocket connected") @ws_version = handshake.version @ws_socket = socket @ws_open = true @incoming = WebSocket::Frame::Incoming::Client.new(version: @ws_version) start_ping_thread # read_timeout is based on the server-provided ping interval so it # automatically adapts if Feishu changes the cadence. read_timeout = (@ping_interval * READ_TIMEOUT_MULTIPLIER).ceil loop do break unless @running # Use IO.select with a timeout to detect silent connection drops # (NAT expiry, firewall idle-kill) that never send a TCP FIN/RST. ready = IO.select([socket], nil, nil, read_timeout) unless ready Clacky::Logger.warn("[feishu-ws] read timeout (#{read_timeout}s), reconnecting...") return end data = socket.read_nonblock(4096) @incoming << data while (frame = @incoming.next) case frame.type when :binary raw = frame.data handle_frame(raw.respond_to?(:b) ? raw.b : raw) when :text handle_frame(frame.data) when :ping send_raw_frame(:pong, frame.data) when :close Clacky::Logger.info("[feishu-ws] WebSocket closed by server, will reconnect") return end end end rescue EOFError, IOError, Errno::ECONNRESET, Errno::EPIPE, Errno::ETIMEDOUT, OpenSSL::SSL::SSLError => e # Let the exception bubble up to start() where it will log and sleep before retry raise ensure @ws_open = false @ws_socket = nil socket&.close rescue nil @ping_thread&.kill end
def fetch_ws_endpoint
def fetch_ws_endpoint uri = URI.parse("#{@domain}/callback/ws/endpoint") http = Net::HTTP.new(uri.host, uri.port) http.use_ssl = uri.scheme == "https" request = Net::HTTP::Post.new(uri.path) request["Content-Type"] = "application/json" request["locale"] = "en" request.body = JSON.generate({ AppID: @app_id, AppSecret: @app_secret }) response = http.request(request) data = JSON.parse(response.body) if data["code"] != 0 Clacky::Logger.warn("[feishu-ws] Failed to get endpoint: code=#{data["code"]} msg=#{data["msg"]}") raise "Failed to get WebSocket endpoint: #{data['msg']}" end client_config = data.dig("data", "ClientConfig") || {} @ping_interval = (client_config["PingInterval"] || 90).to_i url = data.dig("data", "URL") if url.nil? || url.strip.empty? Clacky::Logger.error("[feishu-ws] WebSocket endpoint URL is missing from response. " \ "Please verify your Feishu App ID and App Secret are correct.") raise "Failed to get WebSocket endpoint: URL is missing (check your Feishu App ID / App Secret)" end if url =~ /service_id=(\d+)/ @service_id = $1.to_i end url end
def handle_control_frame(frame, msg_type)
def handle_control_frame(frame, msg_type) case msg_type when "ping" send_frame( seq_id: frame[:seq_id], log_id: frame[:log_id], service: frame[:service], method: 0, headers: frame[:headers].merge("type" => "pong") ) when "handshake" status = frame[:headers]["handshake-status"] if status == "200" Clacky::Logger.info("[feishu-ws] Handshake successful") else Clacky::Logger.warn("[feishu-ws] Handshake failed: #{frame[:headers]['handshake-msg']}") end end end
def handle_data_frame(frame, headers)
def handle_data_frame(frame, headers) return unless headers["type"] == "event" payload_bytes = frame[:payload] return unless payload_bytes && !payload_bytes.empty? event_json = payload_bytes.force_encoding("UTF-8") event_data = JSON.parse(event_json) # Send ACK response send_frame( seq_id: frame[:seq_id], log_id: frame[:log_id], service: frame[:service], method: 1, headers: frame[:headers], payload: JSON.generate({ code: 200 }) ) event_type = event_data.dig("header", "event_type") || event_data["type"] Clacky::Logger.info("[feishu-ws] Dispatching event: #{event_type}") @on_event&.call(event_data) rescue JSON::ParserError => e Clacky::Logger.warn("[feishu-ws] Failed to parse event payload: #{e.message}") end
def handle_frame(raw)
def handle_frame(raw) raw = raw.b if raw.respond_to?(:b) frame = ProtoFrame.decode(raw) method_type = frame[:method] headers = frame[:headers] || {} case method_type when 0 # control frame handle_control_frame(frame, headers["type"]) when 1 # data frame (event) Clacky::Logger.info("[feishu-ws] Received data frame (type=#{headers["type"]})") handle_data_frame(frame, headers) end rescue => e Clacky::Logger.warn("[feishu-ws] Failed to handle frame: #{e.message}") end
def initialize(app_id:, app_secret:, domain: DEFAULT_DOMAIN)
def initialize(app_id:, app_secret:, domain: DEFAULT_DOMAIN) @app_id = app_id @app_secret = app_secret @domain = domain @running = false @ws = nil @ping_thread = nil @ping_interval = 90 # overridden by server config @seq_id = 0 @service_id = 0 end
def send_frame(seq_id:, log_id:, service:, method:, headers:, payload: nil)
def send_frame(seq_id:, log_id:, service:, method:, headers:, payload: nil) frame = { seq_id: seq_id, log_id: log_id, service: service, method: method, headers: headers, payload: payload } encoded = ProtoFrame.encode(frame) send_raw_frame(:binary, encoded) rescue => e warn "[feishu-ws] failed to send frame: #{e.message}" end
def send_raw_frame(type, data)
def send_raw_frame(type, data) return unless @ws_socket && @ws_open outgoing = WebSocket::Frame::Outgoing::Client.new( version: @ws_version || 13, data: data, type: type ) @ws_socket.write(outgoing.to_s) end
def start(&on_event)
def start(&on_event) @running = true @on_event = on_event Clacky::Logger.info("[feishu-ws] Starting WebSocket client (app_id=#{@app_id})") while @running begin connect_and_listen rescue => e Clacky::Logger.warn("[feishu-ws] Connection error: #{e.message}") sleep RECONNECT_DELAY if @running end end end
def start_ping_thread
def start_ping_thread @ping_thread&.kill @ping_thread = Thread.new do loop do sleep @ping_interval break unless @running begin @seq_id += 1 send_frame( seq_id: @seq_id, log_id: 0, service: @service_id, method: 0, headers: { "type" => "ping" } ) rescue => e Clacky::Logger.warn("[feishu-ws] ping failed (#{e.class}: #{e.message}), forcing reconnect") # Close the socket so IO.select in the read loop immediately # returns nil / read_nonblock raises IOError, triggering reconnect. @ws_socket&.close rescue nil break end end end end
def stop
def stop @running = false @ping_thread&.kill send_raw_frame(:close, "") rescue nil @ws_socket&.close rescue nil end