lib/autoload/kuroko2/workflow/task/execute.rb



module Kuroko2
  module Workflow
    module Task
      class Execute < Base
        def execute
          if (execution = Execution.of(token).take)
            update_execution(execution)
          else
            validate

            token.context['CHDIR'] = chdir
            before_execute

            Execution.create!(token:          token,
              job_definition: token.job_definition,
              job_instance:   token.job_instance,
              shell:          shell,
              queue:          token.context['QUEUE'] || Execution::DEFAULT_QUEUE,
              context:        token.context)
            :pass
          end
        end

        def before_execute
        end

        def chdir
          nil
        end

        def shell
          option
        end

        def validate
          if option.blank?
            raise Workflow::AssertionError, "Option is required for execute"
          end
        end

        private

        def update_execution(execution)
          if execution.completed?
            Kuroko2.logger.info("(token #{token.uuid}) `#{execution.shell}` returns #{execution.exit_status}.")

            instance = token.job_instance
            message  = "(token #{token.uuid}) [#{execution.success? ? 'SUCCESS' : 'FAILURE'}] `#{execution.shell}` returns #{execution.exit_status}."
            if execution.output?
              message += <<-MESSAGE

```
#{execution.output.chomp}
```
            MESSAGE
            end

            if execution.success?
              instance.logs.info(message)
            else
              instance.logs.error(message)
            end

            Kuroko2::ExecutionHistory.create(
              hostname: execution.hostname,
              worker_id: execution.worker_id,
              queue: execution.queue,
              job_definition: execution.job_definition,
              job_instance: execution.job_instance,
              shell: execution.shell,
              started_at: execution.started_at,
              finished_at: execution.finished_at,
            )

            execution.with_lock do
              execution.destroy
              execution.success? ? :next : :failure
            end
          else
            process_timeout_if_needed(execution)
            :pass
          end
        end

        def process_timeout_if_needed(execution)
          timeout = token.context['TIMEOUT'].to_i

          if timeout > 0 && ((execution.created_at + timeout.minutes) < Time.current) && execution.pid
            hostname = Worker.executing(execution.id).try!(:hostname)
            # XXX: Store pid and hostname for compatibility
            ProcessSignal.create!(pid: execution.pid, hostname: hostname, execution_id: execution.id)
            message = "(token #{token.uuid}) Timeout occurred after #{timeout} minutes."
            token.job_instance.logs.info(message)
            Kuroko2.logger.info(message)
            token.context.delete('TIMEOUT')
          end
        end
      end
    end
  end
end