lib/clacky/server/session_registry.rb



# frozen_string_literal: true

module Clacky
  module Server
    # SessionRegistry is the single authoritative source for session state.
    #
    # It owns two concerns:
    #   1. Runtime state  — agent instance, thread, status, pending_task, idle_timer.
    #   2. Session list   — reads from disk (via session_manager) and enriches with
    #                       live runtime status. `list` is the only place the session
    #                       list is assembled; no callers should build it elsewhere.
    #
    # Lazy restore: `ensure(session_id)` loads a disk session into the registry on
    # demand. All session-specific APIs call this before touching the registry so
    # disk-only sessions (e.g. loaded via loadMore) just work transparently.
    #
    # Thread safety: all public methods are protected by a Mutex.
    class SessionRegistry
      SESSION_TIMEOUT = 24 * 60 * 60 # 24 hours of inactivity before cleanup

      def initialize(session_manager: nil, session_restorer: nil, agent_config:)
        @sessions         = {}
        @mutex            = Mutex.new
        @session_manager  = session_manager
        @session_restorer = session_restorer
        @agent_config     = agent_config
        # Tracks sessions currently being restored from disk.
        # Other threads calling ensure() for the same id will wait via @restore_cond
        # instead of seeing a half-built session (agent=nil).
        @restoring        = {}
        @restore_cond     = ConditionVariable.new
      end

      # Create a new (empty) session entry and return its id.
      # agent/ui/thread are set later via with_session once they are constructed.
      def create(session_id:)
        raise ArgumentError, "session_id is required" if session_id.nil? || session_id.empty?

        session = {
          id:                   session_id,
          status:               :idle,
          error:                nil,
          updated_at:           Time.now,
          agent:                nil,
          ui:                   nil,
          thread:               nil,
          idle_timer:           nil,
          pending_task:         nil,
          pending_working_dir:  nil
        }

        @mutex.synchronize { @sessions[session_id] = session }
        session_id
      end

      # Ensure a session is in the registry, loading from disk if necessary.
      # Returns true if the session is now available, false if not found anywhere.
      #
      # Thread-safe: if two threads race on the same session_id, the second one
      # waits for the first to finish restoring (including agent construction) rather
      # than seeing a half-built entry with agent=nil.
      def ensure(session_id)
        session_data = nil

        @mutex.synchronize do
          # Another thread is currently restoring this session (including the case where
          # @registry.create was already called but with_session agent-set is not yet done) —
          # wait for it to finish so callers never see agent=nil.
          if @restoring[session_id]
            @restore_cond.wait(@mutex) until !@restoring[session_id]
            return @sessions.key?(session_id)
          end

          # Already fully ready (not being restored) — fast path.
          return true if @sessions.key?(session_id)

          return false unless @session_manager && @session_restorer

          session_data = @session_manager.load(session_id)
          return false unless session_data

          # Mark as "restore in progress" so concurrent callers wait.
          @restoring[session_id] = true
        end

        # Run the (potentially slow) restore outside the mutex so other sessions
        # are not blocked during agent construction.
        begin
          @session_restorer.call(session_data)
        ensure
          @mutex.synchronize do
            @restoring.delete(session_id)
            @restore_cond.broadcast
          end
        end

        @sessions.key?(session_id)
      end

      # Restore all sessions from disk (up to n per source type) into the registry.
      # Used at startup. Already-registered sessions are skipped.
      def restore_from_disk(n: 5)
        return unless @session_manager && @session_restorer

        all = @session_manager.all_sessions
          .sort_by { |s| s[:created_at] || "" }
          .reverse

        # Take up to n per source type
        counts = Hash.new(0)
        all.each do |session_data|
          src = (session_data[:source] || "manual").to_s
          next if counts[src] >= n
          next if exist?(session_data[:session_id])
          @session_restorer.call(session_data)
          counts[src] += 1
        end
      end

      # Retrieve a session hash by id (returns nil if not found).
      def get(session_id)
        @mutex.synchronize { @sessions[session_id]&.dup }
      end

      # Update arbitrary runtime fields of a session (status, error, pending_*, etc.).
      def update(session_id, **fields)
        @mutex.synchronize do
          session = @sessions[session_id]
          return false unless session

          fields[:updated_at] = Time.now
          session.merge!(fields)
          true
        end
      end

      # Return a session list from disk enriched with live registry status.
      # Sorted by created_at descending (newest first).
      #
      # Parameters (all optional, independent):
      #   source:  "manual"|"cron"|"channel"|"setup"|nil
      #            nil = no source filter (all sessions)
      #   profile: "general"|"coding"|nil
      #            nil = no agent_profile filter
      #   limit:   max sessions to return (applies to NON-PINNED only; see below)
      #   before:  ISO8601 cursor — only sessions with created_at < before
      #             (also applies to NON-PINNED only; pinned items are a separate
      #             logical section, they should never be paginated away)
      #   include_pinned: when true (default), all matching pinned sessions are
      #             always returned on the FIRST page (before == nil) regardless
      #             of limit. Subsequent pages (before set) contain only
      #             non-pinned sessions. This guarantees that users who pinned
      #             an old session always see it at the top of the sidebar,
      #             even if many newer sessions exist.
      #
      # Ordering of the returned array:
      #   [ ...all_pinned_matching (newest-first), ...non_pinned (newest-first, limited) ]
      #
      # source and profile are orthogonal — either can be nil independently.
      def list(limit: nil, before: nil, q: nil, date: nil, type: nil, include_pinned: true)
        return [] unless @session_manager

        live = @mutex.synchronize do
          @sessions.transform_values do |s|
            model_info = s[:agent]&.current_model_info
            live_name  = s[:agent]&.name
            live_name  = nil if live_name&.empty?
          live_cost_source = s[:agent]&.cost_source
          { status: s[:status], error: s[:error], model: model_info&.dig(:model), name: live_name,
            total_tasks: s[:agent]&.total_tasks, total_cost: s[:agent]&.total_cost,
            cost_source: live_cost_source,
            reasoning_effort: s[:agent]&.reasoning_effort,
            latest_latency: s[:agent]&.latest_latency }
          end
        end

        all = @session_manager.all_sessions  # already sorted newest-first

        # ── type filter (replaces old source/profile split) ──────────────────
        # type=coding  → agent_profile == "coding"
        # type=manual/cron/channel/setup → source match (profile=general implied)
        if type
          if type == "coding"
            all = all.select { |s| (s[:agent_profile] || "general").to_s == "coding" }
          else
            all = all.select { |s| s_source(s) == type && (s[:agent_profile] || "general").to_s != "coding" }
          end
        end

        # ── date filter (YYYY-MM-DD, matches created_at prefix) ──────────────
        all = all.select { |s| s[:created_at].to_s.start_with?(date) } if date

        # ── name / id search ─────────────────────────────────────────────────
        if q && !q.empty?
          q_down = q.downcase
          all = all.select { |s|
            (s[:name] || "").downcase.include?(q_down) ||
              (s[:session_id] || "").downcase.include?(q_down)
          }
        end

        # ── Split pinned vs non-pinned BEFORE applying `before`/`limit`.
        # Pinned sessions bypass pagination entirely so an old pinned session
        # never falls off the first page just because newer sessions exist.
        # (Regression fix for 0.9.37: previously `all_sessions` was only
        # sorted by created_at and `limit` cut off old pinned rows, making
        # them invisible until the user clicked "load more".)
        pinned, non_pinned = all.partition { |s| s[:pinned] }

        # `before` cursor ONLY applies to non-pinned (paginated) sessions.
        non_pinned = non_pinned.select { |s| (s[:created_at] || "") < before } if before
        non_pinned = non_pinned.first(limit) if limit

        # Pinned section: only included on the first page (before == nil) so
        # "load more" responses don't re-send them. On first page, return ALL
        # matching pinned sessions regardless of limit.
        pinned_section = (include_pinned && before.nil?) ? pinned : []

        ordered = pinned_section + non_pinned

        ordered.map { |s| build_enriched_row(s, live[s[:session_id]]) }
      end

      # Return the same enriched hash that a `list` row would produce, for a
      # single session — merging on-disk fields with in-memory live fields.
      # Returns nil if the session is unknown on disk.
      #
      # This is the targeted, O(1) counterpart to `list` used by the WS layer
      # when it only needs one row (e.g. pushing a fresh snapshot to a client
      # that just (re)subscribed, or broadcasting a status-change update).
      def snapshot(session_id)
        return nil unless @session_manager
        disk = @session_manager.load(session_id)
        return nil unless disk

        live = @mutex.synchronize do
          s = @sessions[session_id]
          next nil unless s
          model_info = s[:agent]&.current_model_info
          live_name  = s[:agent]&.name
          live_name  = nil if live_name&.empty?
          { status: s[:status], error: s[:error], model: model_info&.dig(:model),
            name: live_name, total_tasks: s[:agent]&.total_tasks,
            total_cost: s[:agent]&.total_cost, cost_source: s[:agent]&.cost_source,
            reasoning_effort: s[:agent]&.reasoning_effort,
            latest_latency: s[:agent]&.latest_latency }
        end

        build_enriched_row(disk, live)
      end

      # Merge a single disk-side session hash with the corresponding live
      # in-memory agent fields (may be nil) into the row shape the frontend
      # consumes.
      private def build_enriched_row(s, ls)
        id = s[:session_id]
        {
          id:            id,
          name:          ls&.dig(:name) || s[:name] || "",
          status:        ls ? ls[:status].to_s : "idle",
          error:         ls ? ls[:error] : nil,
          model:         ls&.dig(:model),
          source:        s_source(s),
          agent_profile: (s[:agent_profile] || "general").to_s,
          working_dir:   s[:working_dir],
          created_at:    s[:created_at],
          updated_at:    s[:updated_at],
          total_tasks:   ls&.dig(:total_tasks) || s.dig(:stats, :total_tasks) || 0,
          total_cost:    ls&.dig(:total_cost)  || s.dig(:stats, :total_cost_usd) || 0.0,
          cost_source:   (ls&.dig(:cost_source) || s.dig(:stats, :cost_source) || "estimated").to_s,
          # latest_latency is in-memory only (live sessions) — not persisted
          # at the session-level on disk. The on-disk source of truth is
          # per-assistant-message `latency` fields in messages[]. Reloaded
          # sessions start with nil and get populated on the next LLM call.
          latest_latency: ls&.dig(:latest_latency),
          reasoning_effort: ls&.dig(:reasoning_effort) || s.dig(:config, :reasoning_effort),
          pinned:        s[:pinned] || false,
        }
      end


      # Normalize source field from a disk session hash.
      # "system" is a legacy value renamed to "setup" — treat them as equivalent.
      def s_source(s)
        src = (s[:source] || "manual").to_s
        src == "system" ? "setup" : src
      end

      public

      # Delete a session from registry (and interrupt its thread).
      def delete(session_id)
        @mutex.synchronize do
          session = @sessions.delete(session_id)
          return false unless session

          session[:idle_timer]&.cancel
          session[:thread]&.raise(Clacky::AgentInterrupted, "Session deleted")
          true
        end
      end

      # True if the session exists in registry (runtime).
      def exist?(session_id)
        @mutex.synchronize { @sessions.key?(session_id) }
      end

      # Execute a block with exclusive access to the raw session hash.
      def with_session(session_id)
        @mutex.synchronize do
          session = @sessions[session_id]
          return nil unless session
          yield session
        end
      end

      # Remove sessions idle longer than SESSION_TIMEOUT.
      def cleanup_stale!
        cutoff = Time.now - SESSION_TIMEOUT
        @mutex.synchronize do
          @sessions.delete_if do |_id, session|
            session[:status] == :idle && session[:updated_at] < cutoff
          end
        end
      end

      def count_by_status(status)
        @mutex.synchronize do
          @sessions.count { |_, s| s[:status] == status }
        end
      end

      def max_running_agents
        @agent_config.max_running_agents
      end

      def max_idle_agents
        @agent_config.max_idle_agents
      end

      def running_full?
        count_by_status(:running) >= max_running_agents
      end

      # Evict oldest idle agents beyond MAX_IDLE_AGENTS.
      # Persists session data to disk before releasing the agent from memory.
      def evict_excess_idle!
        to_evict = []

        @mutex.synchronize do
          idle = @sessions.select { |_, s| s[:status] == :idle && s[:agent] }
                   .sort_by { |_, s| s[:updated_at] || Time.at(0) }

          while idle.size > max_idle_agents
            id, session = idle.shift
            to_evict << [id, session]
          end
        end

        to_evict.each { |id, session| persist_and_release(id, session) }
      end

      private def persist_and_release(id, session)
        agent = session[:agent]
        @session_manager&.save(agent.to_session_data(status: :success)) if agent

        @mutex.synchronize do
          s = @sessions[id]
          next unless s
          s[:idle_timer]&.cancel
          s[:agent] = nil
          s[:ui] = nil
          s[:idle_timer] = nil
          s[:thread] = nil
          @sessions.delete(id)
        end
      end

      # Build a summary hash for API responses (for in-registry sessions).
      # Used when we need live agent fields (name, cost, etc.) after ensure().
      def session_summary(session_id)
        session = @mutex.synchronize { @sessions[session_id] }
        return nil unless session
        agent = session[:agent]
        return nil unless agent

        model_info = agent.current_model_info

        {
          id:              session[:id],
          name:            agent.name,
          working_dir:     agent.working_dir,
          status:          session[:status],
          created_at:      agent.created_at,
          updated_at:      session[:updated_at].iso8601,
          total_tasks:     agent.total_tasks || 0,
          total_cost:      agent.total_cost  || 0.0,
          cost_source:     agent.cost_source.to_s,
          error:           session[:error],
          model:           model_info&.dig(:model),
          permission_mode: agent.permission_mode,
          source:          agent.source.to_s,
          agent_profile:   agent.agent_profile.name,
          pinned:          agent.pinned || false,
          latest_latency:  agent.latest_latency,
        }
      end
    end
  end
end