# frozen_string_literal: true
module Clacky
module Server
# SessionRegistry is the single authoritative source for session state.
#
# It owns two concerns:
# 1. Runtime state — agent instance, thread, status, pending_task, idle_timer.
# 2. Session list — reads from disk (via session_manager) and enriches with
# live runtime status. `list` is the only place the session
# list is assembled; no callers should build it elsewhere.
#
# Lazy restore: `ensure(session_id)` loads a disk session into the registry on
# demand. All session-specific APIs call this before touching the registry so
# disk-only sessions (e.g. loaded via loadMore) just work transparently.
#
# Thread safety: all public methods are protected by a Mutex.
class SessionRegistry
SESSION_TIMEOUT = 24 * 60 * 60 # 24 hours of inactivity before cleanup
def initialize(session_manager: nil, session_restorer: nil, agent_config:)
@sessions = {}
@mutex = Mutex.new
@session_manager = session_manager
@session_restorer = session_restorer
@agent_config = agent_config
# Tracks sessions currently being restored from disk.
# Other threads calling ensure() for the same id will wait via @restore_cond
# instead of seeing a half-built session (agent=nil).
@restoring = {}
@restore_cond = ConditionVariable.new
end
# Create a new (empty) session entry and return its id.
# agent/ui/thread are set later via with_session once they are constructed.
def create(session_id:)
raise ArgumentError, "session_id is required" if session_id.nil? || session_id.empty?
session = {
id: session_id,
status: :idle,
error: nil,
updated_at: Time.now,
agent: nil,
ui: nil,
thread: nil,
idle_timer: nil,
pending_task: nil,
pending_working_dir: nil
}
@mutex.synchronize { @sessions[session_id] = session }
session_id
end
# Ensure a session is in the registry, loading from disk if necessary.
# Returns true if the session is now available, false if not found anywhere.
#
# Thread-safe: if two threads race on the same session_id, the second one
# waits for the first to finish restoring (including agent construction) rather
# than seeing a half-built entry with agent=nil.
def ensure(session_id)
session_data = nil
@mutex.synchronize do
# Another thread is currently restoring this session (including the case where
# @registry.create was already called but with_session agent-set is not yet done) —
# wait for it to finish so callers never see agent=nil.
if @restoring[session_id]
@restore_cond.wait(@mutex) until !@restoring[session_id]
return @sessions.key?(session_id)
end
# Already fully ready (not being restored) — fast path.
return true if @sessions.key?(session_id)
return false unless @session_manager && @session_restorer
session_data = @session_manager.load(session_id)
return false unless session_data
# Mark as "restore in progress" so concurrent callers wait.
@restoring[session_id] = true
end
# Run the (potentially slow) restore outside the mutex so other sessions
# are not blocked during agent construction.
begin
@session_restorer.call(session_data)
ensure
@mutex.synchronize do
@restoring.delete(session_id)
@restore_cond.broadcast
end
end
@sessions.key?(session_id)
end
# Restore all sessions from disk (up to n per source type) into the registry.
# Used at startup. Already-registered sessions are skipped.
def restore_from_disk(n: 5)
return unless @session_manager && @session_restorer
all = @session_manager.all_sessions
.sort_by { |s| s[:created_at] || "" }
.reverse
# Take up to n per source type
counts = Hash.new(0)
all.each do |session_data|
src = (session_data[:source] || "manual").to_s
next if counts[src] >= n
next if exist?(session_data[:session_id])
@session_restorer.call(session_data)
counts[src] += 1
end
end
# Retrieve a session hash by id (returns nil if not found).
def get(session_id)
@mutex.synchronize { @sessions[session_id]&.dup }
end
# Update arbitrary runtime fields of a session (status, error, pending_*, etc.).
def update(session_id, **fields)
@mutex.synchronize do
session = @sessions[session_id]
return false unless session
fields[:updated_at] = Time.now
session.merge!(fields)
true
end
end
# Return a session list from disk enriched with live registry status.
# Sorted by created_at descending (newest first).
#
# Parameters (all optional, independent):
# source: "manual"|"cron"|"channel"|"setup"|nil
# nil = no source filter (all sessions)
# profile: "general"|"coding"|nil
# nil = no agent_profile filter
# limit: max sessions to return (applies to NON-PINNED only; see below)
# before: ISO8601 cursor — only sessions with created_at < before
# (also applies to NON-PINNED only; pinned items are a separate
# logical section, they should never be paginated away)
# include_pinned: when true (default), all matching pinned sessions are
# always returned on the FIRST page (before == nil) regardless
# of limit. Subsequent pages (before set) contain only
# non-pinned sessions. This guarantees that users who pinned
# an old session always see it at the top of the sidebar,
# even if many newer sessions exist.
#
# Ordering of the returned array:
# [ ...all_pinned_matching (newest-first), ...non_pinned (newest-first, limited) ]
#
# source and profile are orthogonal — either can be nil independently.
def list(limit: nil, before: nil, q: nil, date: nil, type: nil, include_pinned: true)
return [] unless @session_manager
live = @mutex.synchronize do
@sessions.transform_values do |s|
model_info = s[:agent]&.current_model_info
live_name = s[:agent]&.name
live_name = nil if live_name&.empty?
live_cost_source = s[:agent]&.cost_source
{ status: s[:status], error: s[:error], model: model_info&.dig(:model), name: live_name,
total_tasks: s[:agent]&.total_tasks, total_cost: s[:agent]&.total_cost,
cost_source: live_cost_source,
reasoning_effort: s[:agent]&.reasoning_effort,
latest_latency: s[:agent]&.latest_latency }
end
end
all = @session_manager.all_sessions # already sorted newest-first
# ── type filter (replaces old source/profile split) ──────────────────
# type=coding → agent_profile == "coding"
# type=manual/cron/channel/setup → source match (profile=general implied)
if type
if type == "coding"
all = all.select { |s| (s[:agent_profile] || "general").to_s == "coding" }
else
all = all.select { |s| s_source(s) == type && (s[:agent_profile] || "general").to_s != "coding" }
end
end
# ── date filter (YYYY-MM-DD, matches created_at prefix) ──────────────
all = all.select { |s| s[:created_at].to_s.start_with?(date) } if date
# ── name / id search ─────────────────────────────────────────────────
if q && !q.empty?
q_down = q.downcase
all = all.select { |s|
(s[:name] || "").downcase.include?(q_down) ||
(s[:session_id] || "").downcase.include?(q_down)
}
end
# ── Split pinned vs non-pinned BEFORE applying `before`/`limit`.
# Pinned sessions bypass pagination entirely so an old pinned session
# never falls off the first page just because newer sessions exist.
# (Regression fix for 0.9.37: previously `all_sessions` was only
# sorted by created_at and `limit` cut off old pinned rows, making
# them invisible until the user clicked "load more".)
pinned, non_pinned = all.partition { |s| s[:pinned] }
# `before` cursor ONLY applies to non-pinned (paginated) sessions.
non_pinned = non_pinned.select { |s| (s[:created_at] || "") < before } if before
non_pinned = non_pinned.first(limit) if limit
# Pinned section: only included on the first page (before == nil) so
# "load more" responses don't re-send them. On first page, return ALL
# matching pinned sessions regardless of limit.
pinned_section = (include_pinned && before.nil?) ? pinned : []
ordered = pinned_section + non_pinned
ordered.map { |s| build_enriched_row(s, live[s[:session_id]]) }
end
# Return the same enriched hash that a `list` row would produce, for a
# single session — merging on-disk fields with in-memory live fields.
# Returns nil if the session is unknown on disk.
#
# This is the targeted, O(1) counterpart to `list` used by the WS layer
# when it only needs one row (e.g. pushing a fresh snapshot to a client
# that just (re)subscribed, or broadcasting a status-change update).
def snapshot(session_id)
return nil unless @session_manager
disk = @session_manager.load(session_id)
return nil unless disk
live = @mutex.synchronize do
s = @sessions[session_id]
next nil unless s
model_info = s[:agent]&.current_model_info
live_name = s[:agent]&.name
live_name = nil if live_name&.empty?
{ status: s[:status], error: s[:error], model: model_info&.dig(:model),
name: live_name, total_tasks: s[:agent]&.total_tasks,
total_cost: s[:agent]&.total_cost, cost_source: s[:agent]&.cost_source,
reasoning_effort: s[:agent]&.reasoning_effort,
latest_latency: s[:agent]&.latest_latency }
end
build_enriched_row(disk, live)
end
# Merge a single disk-side session hash with the corresponding live
# in-memory agent fields (may be nil) into the row shape the frontend
# consumes.
private def build_enriched_row(s, ls)
id = s[:session_id]
{
id: id,
name: ls&.dig(:name) || s[:name] || "",
status: ls ? ls[:status].to_s : "idle",
error: ls ? ls[:error] : nil,
model: ls&.dig(:model),
source: s_source(s),
agent_profile: (s[:agent_profile] || "general").to_s,
working_dir: s[:working_dir],
created_at: s[:created_at],
updated_at: s[:updated_at],
total_tasks: ls&.dig(:total_tasks) || s.dig(:stats, :total_tasks) || 0,
total_cost: ls&.dig(:total_cost) || s.dig(:stats, :total_cost_usd) || 0.0,
cost_source: (ls&.dig(:cost_source) || s.dig(:stats, :cost_source) || "estimated").to_s,
# latest_latency is in-memory only (live sessions) — not persisted
# at the session-level on disk. The on-disk source of truth is
# per-assistant-message `latency` fields in messages[]. Reloaded
# sessions start with nil and get populated on the next LLM call.
latest_latency: ls&.dig(:latest_latency),
reasoning_effort: ls&.dig(:reasoning_effort) || s.dig(:config, :reasoning_effort),
pinned: s[:pinned] || false,
}
end
# Normalize source field from a disk session hash.
# "system" is a legacy value renamed to "setup" — treat them as equivalent.
def s_source(s)
src = (s[:source] || "manual").to_s
src == "system" ? "setup" : src
end
public
# Delete a session from registry (and interrupt its thread).
def delete(session_id)
@mutex.synchronize do
session = @sessions.delete(session_id)
return false unless session
session[:idle_timer]&.cancel
session[:thread]&.raise(Clacky::AgentInterrupted, "Session deleted")
true
end
end
# True if the session exists in registry (runtime).
def exist?(session_id)
@mutex.synchronize { @sessions.key?(session_id) }
end
# Execute a block with exclusive access to the raw session hash.
def with_session(session_id)
@mutex.synchronize do
session = @sessions[session_id]
return nil unless session
yield session
end
end
# Remove sessions idle longer than SESSION_TIMEOUT.
def cleanup_stale!
cutoff = Time.now - SESSION_TIMEOUT
@mutex.synchronize do
@sessions.delete_if do |_id, session|
session[:status] == :idle && session[:updated_at] < cutoff
end
end
end
def count_by_status(status)
@mutex.synchronize do
@sessions.count { |_, s| s[:status] == status }
end
end
def max_running_agents
@agent_config.max_running_agents
end
def max_idle_agents
@agent_config.max_idle_agents
end
def running_full?
count_by_status(:running) >= max_running_agents
end
# Evict oldest idle agents beyond MAX_IDLE_AGENTS.
# Persists session data to disk before releasing the agent from memory.
def evict_excess_idle!
to_evict = []
@mutex.synchronize do
idle = @sessions.select { |_, s| s[:status] == :idle && s[:agent] }
.sort_by { |_, s| s[:updated_at] || Time.at(0) }
while idle.size > max_idle_agents
id, session = idle.shift
to_evict << [id, session]
end
end
to_evict.each { |id, session| persist_and_release(id, session) }
end
private def persist_and_release(id, session)
agent = session[:agent]
@session_manager&.save(agent.to_session_data(status: :success)) if agent
@mutex.synchronize do
s = @sessions[id]
next unless s
s[:idle_timer]&.cancel
s[:agent] = nil
s[:ui] = nil
s[:idle_timer] = nil
s[:thread] = nil
@sessions.delete(id)
end
end
# Build a summary hash for API responses (for in-registry sessions).
# Used when we need live agent fields (name, cost, etc.) after ensure().
def session_summary(session_id)
session = @mutex.synchronize { @sessions[session_id] }
return nil unless session
agent = session[:agent]
return nil unless agent
model_info = agent.current_model_info
{
id: session[:id],
name: agent.name,
working_dir: agent.working_dir,
status: session[:status],
created_at: agent.created_at,
updated_at: session[:updated_at].iso8601,
total_tasks: agent.total_tasks || 0,
total_cost: agent.total_cost || 0.0,
cost_source: agent.cost_source.to_s,
error: session[:error],
model: model_info&.dig(:model),
permission_mode: agent.permission_mode,
source: agent.source.to_s,
agent_profile: agent.agent_profile.name,
pinned: agent.pinned || false,
latest_latency: agent.latest_latency,
}
end
end
end
end