lib/autoload/kuroko2/workflow/engine.rb



module Kuroko2
  module Workflow
    class Engine
      DEFAULT_EXPECTED_TIME = 60 * 24 # 24 hours
      EXPECTED_TIME_NOTIFY_REMIND_TERM = 1.hours

      def process_all
        Token.processable.each do |token|
          process(token)
        end
      end

      def process(token)
        unless token.working? || token.waiting?
          Kuroko2.logger.info { "(token #{token.uuid}) Skip since current status marked as '#{token.status_name}'." }

          return
        end

        token.with_lock { process_with_lock(token) }
      end

      def retry(token)
        token.with_lock do
          node = extract_node(token)

          message = "(token #{token.uuid}) Retry current node: '#{node.type}: #{node.option}'"
          token.job_instance.update_columns(error_at: nil, retrying: true)
          token.job_instance.logs.info(message)

          token.mark_as_working
          token.save!

          Kuroko2.logger.info(message)

          Notifier.notify(:retrying, token.job_instance)
        end
      end

      def skip(token)
        token.with_lock do
          skip_with_lock(token)
        end
      end

      def failure(token)
        message = "(token #{token.uuid}) Mark as failure."

        token.job_instance.logs.error(message)
        token.job_instance.touch(:error_at)
        token.mark_as_failure

        Kuroko2.logger.info(message)

        Notifier.notify(:failure, token.job_instance)

        if token.context['AUTO_SKIP_ERROR']
          skip_with_lock(token)
        end
      end

      private

      def execute_task(node, token)
        result = if sleeping?(token)
            :pass
          else
            node.execute(token)
          end

        case result
        when :next
          process_next(node.next, token)
        when :next_sibling
          process_next(node.next_sibling, token)
        when :pass
          # Do nothing
        when :failure
          if auto_retryable?(node, token)
            auto_retry(node, token)
          else
            failure(token)
          end
        end
      rescue KeyError => e
        raise EngineError.new(e.message)
      end

      def sleeping?(token)
        token.context['SLEEP'].present? && token.context['SLEEP'] > Time.current.to_i
      end

      def process_next(node, token)
        if node
          message = "(token #{token.uuid}) Current node is '#{token.path}'."

          token.path = node.path
          token.job_instance.logs.info(message)

          Kuroko2.logger.info(message)
        else
          message = "(token #{token.uuid}) Marked as 'finished'."

          token.job_instance.logs.info(message)
          Kuroko2.logger.info(message)
          token.mark_as_finished
          unless token.parent
            token.job_instance.touch(:finished_at)
            if token.job_instance.retrying?
              Notifier.notify(:back_to_normal, token.job_instance)
            else
              Notifier.notify(:finished, token.job_instance)
            end
            token.destroy!
          end
        end
      end

      def process_with_lock(token)
        node = extract_node(token)

        execute_task(node, token)
        notify_long_elapsed_time_if_needed(token)
      rescue EngineError => e
        message = "#{e.message}\n" + e.backtrace.map { |trace| "    #{trace}" }.join("\n")

        token.mark_as_critical(e)
        token.job_instance.logs.error("(token #{token.uuid}) #{message}")
        token.job_instance.touch(:canceled_at)

        Token.where(job_instance: token.job_instance).delete_all
        token.job_instance.logs.warn("(token #{token.uuid}) This job is canceled.")

        Kuroko2.logger.error(message)
        Notifier.notify(:critical, token.job_instance)
      ensure
        token.save! unless token.destroyed?
      end

      def skip_with_lock(token)
        node = extract_node(token)

        message = "(token #{token.uuid}) Skip current node: '#{node.type}: #{node.option}'"
        token.job_instance.update_column(:error_at, nil)
        token.job_instance.logs.info(message)

        token.mark_as_working
        process_next(node.next, token)

        token.save! unless token.destroyed?

        Kuroko2.logger.info(message)

        Notifier.notify(:skipping, token.job_instance)
      end

      def extract_node(token)
        root = ScriptParser.new(token.script).parse(validate: false)
        root.find(token.path)
      end

      def expected_time(token)
        token.context['EXPECTED_TIME'].present? ?
          token.context['EXPECTED_TIME'].to_i :
          DEFAULT_EXPECTED_TIME
      end

      def available_notify_long_elapsed_time?(token)
        return false if token.parent && expected_time(token) == expected_time(token.parent)
        token.context['EXPECTED_TIME_NOTIFIED_AT'].nil? || Time.zone.parse(token.context['EXPECTED_TIME_NOTIFIED_AT']) < EXPECTED_TIME_NOTIFY_REMIND_TERM.ago
      end

      def elapsed_expected_time?(token)
        (token.created_at + expected_time(token).minutes).past?
      end

      def notify_long_elapsed_time_if_needed(token)
        if available_notify_long_elapsed_time?(token) && elapsed_expected_time?(token)
          token.context['EXPECTED_TIME_NOTIFIED_AT'] = Time.current
          Notifier.notify(:long_elapsed_time, token.job_instance)

          message = "(token #{token.uuid}) The running time is longer than #{expected_time(token)} minutes!"
          token.job_instance.logs.info(message)
          Kuroko2.logger.info(message)
        end
      end

      def auto_retry(node, token)
        token.context['RETRY'][node.path]['retried_count'] += 1

        message = "(token #{token.uuid}) Retry current node: '#{node.type}: #{node.option}'"
        token.job_instance.logs.info(message)
        Kuroko2.logger.info(message)

        message = "(token #{token.uuid}) The number of retries: " << 
          "#{token.context['RETRY'][node.path]['retried_count']} / #{token.context['RETRY'][node.path]['count']}"
        token.job_instance.logs.info(message)
        Kuroko2.logger.info(message)

        set_sleep_context_before_retrying(node, token)
        
        message = "(token #{token.uuid}) Sleep for #{token.context['RETRY'][node.path]['sleep_time']} seconds"
        token.job_instance.logs.info(message)
        Kuroko2.logger.info(message)
      end

      def set_sleep_context_before_retrying(node, token)
        token.context['SLEEP'] = Time.current.to_i + token.context['RETRY'][node.path]['sleep_time']
      end

      def auto_retryable?(node, token)
        token.context['RETRY'].present? &&
          token.context['RETRY'][node.path].present? &&
          token.context['RETRY'][node.path]['count'] > token.context['RETRY'][node.path]['retried_count']
      end
    end
  end
end