lib/aws/simple_workflow/workflow_execution_collection.rb



# Copyright 2011-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
#     http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.

require 'time'

module AWS
  class SimpleWorkflow

    # A collection that enumerates workflow executions.
    #
    #     domain.workflow_executions.each do |execution|
    #       # ...
    #     end
    #
    # ## Filtering Executions
    #
    # By default, all open workflow executions are enumerated.
    #
    class WorkflowExecutionCollection

      # @api private
      FILTERS = [
        :status,
        :workflow_type,
        :workflow_id,
        :tagged,
        :started_before,
        :started_after,
        :closed_before,
        :closed_after,
      ]

      include Core::Collection::WithLimitAndNextToken
      include OptionFormatters

      # @api private
      def initialize domain, options = {}

        @domain = domain

        @reverse_order = !!options[:reverse_order]

        @defaults = FILTERS.inject({}) do |defaults,opt|
          defaults[opt] = options[opt] if options.has_key?(opt)
          defaults
        end

        super

      end

      # @return [Domain] Returns the domain this execution was started in.
      attr_reader :domain

      # Returns the workflow execution with the given `workflow_id` and
      # `run_id`.
      #
      #     # get a reference to a single workflow execution
      #     domain.workflow_executions['workflow-id', 'run-id']
      #     domain.workflow_executions.at('workflow-id', 'run-id')
      #
      # @param [String] workflow_id The workflow execution id.
      #
      # @param [String] run_id The workflow execution run id.
      #
      # @return [WorkflowExecution
      #
      def at workflow_id, run_id
        WorkflowExecution.new(domain, workflow_id, run_id)
      end
      alias_method :[], :at

      # Records a WorkflowExecutionSignaled event in the workflow execution
      # history and creates a decision task for the workflow execution.
      #
      #     domain.signal_workflow_execution('workflowid', 'newdata', :input => '...')
      #
      # @param [String] workflow_id The id of the workflow execution to signal.
      #
      # @param [String] signal_name The name of the signal. This name must be
      #   meaningful to the target workflow.
      #
      # @param [Hash] options
      #
      # @option options [String] :input (nil) Data to attach to the
      #   WorkflowExecutionSignaled event in the target workflow
      #   execution's history.
      #
      # @option options [String] :run_id (nil) The run id of the workflow
      #   execution to signal.
      #
      #   If `:run_id` is not specified, then the WorkflowExecutionSignaled
      #   event is recorded in the history of the current open workflow
      #   with the matching workflow_id in the domain.
      #
      # @return [nil]
      #
      def signal workflow_id, signal_name, options = {}
        options[:domain] = domain.name
        options[:workflow_id] = workflow_id
        options[:signal_name] = signal_name
        client.signal_workflow_execution(options)
        nil
      end

      # Records a WorkflowExecutionCancelRequested event in the currently
      # running workflow execution identified. This logically requests
      # the cancellation of the workflow execution as a whole.
      # It is up to the decider to take appropriate actions when it receives
      # an execution history with this event.
      #
      # @note If the `:run_id` is not specified, the
      #   WorkflowExecutionCancelRequested event is recorded in the history
      #   of the current open workflow execution with the specified
      #   `workflow_id` in the domain.
      #
      # @note Because this action allows the workflow to properly clean up
      #   and gracefully close, it should be used instead of {#terminate}
      #   when possible.
      #
      # @param [String] workflow_id The id of the workflow execution to cancel.
      #
      # @param [Hash] options
      #
      # @option options [String] :run_id (nil) The run id of the workflow
      #   execution to cancel.
      #
      # @return [nil]
      #
      def request_cancel workflow_id, options = {}
        options[:domain] = domain.name
        options[:workflow_id] = workflow_id
        client.request_cancel_workflow_execution(options)
        nil
      end

      # Records a WorkflowExecutionTerminated event and forces closure of
      # the workflow execution identified. The child policy, registered
      # with the workflow type or specified when starting this execution,
      # is applied to any open child workflow executions of this workflow
      # execution.
      #
      # @note If the workflow execution was in progress, it is terminated
      #   immediately.
      #
      # @note If a `:run_id` is not specified, then the
      #   WorkflowExecutionTerminated event is recorded in the history of
      #   the current open workflow with the matching workflowId in the
      #   domain.
      #
      # @note You should consider canceling the workflow execution
      #   instead because it allows the workflow to gracefully close
      #   while terminate does not.
      #
      # @param [String] workflow_id
      #
      # @param [Hash] options
      #
      # @option options [Symbol] :child_policy (nil)
      #   If set, specifies the policy to use for the child workflow
      #   executions of the workflow execution being terminated. This
      #   policy overrides the default child policy.  Valid policies include:
      #
      #   * `:terminate` - the child executions will be terminated.
      #
      #   * `:request_cancel` - a request to cancel will be attempted for each
      #     child execution by recording a WorkflowExecutionCancelRequested
      #     event in its history. It is up to the decider to take appropriate
      #     actions when it receives an execution history with this event.
      #
      #   * `:abandon` - no action will be taken. The child executions will
      #     continue to run.
      #
      # @option options [String] :details Optional details for
      #   terminating the workflow execution.
      #
      # @option options [String] :reason An optional descriptive
      #   reason for terminating the workflow execution.
      #
      # @option options [String] :run_id The run id of the workflow
      #   execution to terminate. If a `:run_id` is not provided, then a
      #   WorkflowExecutionTerminated event is recorded in the history of
      #   the current open workflow with the matching workflow id in the
      #   domain.
      #
      # @return [nil]
      #
      def terminate workflow_id, options = {}
        options[:domain] = domain.name
        options[:workflow_id] = workflow_id
        upcase_opts(options, :child_policy)
        client.terminate_workflow_execution(options)
        nil
      end

      # @param [Symbol] status Causes the returned collection to filter
      #   executions by the given status. Accepted statuses include:
      #
      #   * `:open`
      #   * `:closed`
      #   * `:completed`
      #   * `:failed`
      #   * `:canceled`
      #   * `:terminated`
      #   * `:continued`
      #   * `:timed_out`
      #
      #   If `:status` is anything besides `:open` or `:closed` then
      #   it may not be used in combination with `workflow_id`,
      #   `workflow_type` or `tagged`.
      #
      # @return [WorkflowExecutionCollection] Returns a collection
      #   that will only enumerate or count executions of the given
      #   status.
      #
      def with_status status
        collection_with(:status => status)
      end

      # @param [String] workflow_id
      #
      # @return [WorkflowExecutionCollection] Returns a collection
      #   that will only enumerate or count executions that have
      #   the given `workflow_id`.
      #
      def with_workflow_id workflow_id
        collection_with(:workflow_id => workflow_id)
      end

      # @param [WorkflowType,Hash] workflow_type Should be a {WorkflowType}
      #   object or a hash with `:name` and `:version` keys.
      #
      # @return [WorkflowExecutionCollection] Returns a collection
      #   that will only enumerate or count executions that have
      #   the given `workflow_type`.
      #
      def with_workflow_type workflow_type
        collection_with(:workflow_type => workflow_type)
      end

      # @param [String] tag A tag to filter workflow executions with.
      #
      # @return [WorkflowExecutionCollection] Returns a collection
      #   that will only enumerate or count executions that have
      #   the given `tag`.
      #
      def tagged tag
        collection_with(:tagged => tag)
      end

      # Filters workflow executions by their start date.
      #
      # @note It is not possible to filter by both start time and close time.
      #
      # @param [Time,DateTime,Date,Integer,String] oldest_time Should
      #   be one of the listed types.  Integers are treated as timestamps
      #   and strings are parsed by DateTime.
      #
      # @param [Time,DateTime,Date,Integer,String] latest_time Should
      #   be one of the listed types.  Integers are treated as timestamps
      #   and strings are parsed by DateTime.
      #
      # @return [WorkflowExecutionCollection] Returns a collection
      #   that will only enumerate or count executions that have start
      #   times that fall within the given range.
      #
      def started_between oldest_time, latest_time
        started_after(oldest_time).started_before(latest_time)
      end

      # Filters workflow executions by their start date.
      #
      #     # executions that started at least an hour ago
      #     domain.workflow_executions.started_before(Time.now - 3600)
      #
      # @note It is not possible to filter by both start time and close time.
      #
      # @param [Time,DateTime,Date,Integer,String] time Should
      #   be one of the listed types.  Integers are treated as timestamps
      #   and strings are parsed by DateTime.
      #
      # @return [WorkflowExecutionCollection] Returns a collection
      #   that will only enumerate or count executions that started
      #   before the given time.
      #
      def started_before time
        collection_with(:started_before => time)
      end

      # Filters workflow executions by their start date.
      #
      #     # executions that started within the last hour
      #     domain.workflow_executions.started_after(Time.now - 3600)
      #
      # @note It is not possible to filter by both start time and close time.
      #
      # @param [Time,DateTime,Date,Integer,String] time Should
      #   be one of the listed types.  Integers are treated as timestamps
      #   and strings are parsed by DateTime.
      #
      # @return [WorkflowExecutionCollection] Returns a collection
      #   that will only enumerate or count executions that started
      #   after the given time.
      #
      def started_after time
        collection_with(:started_after => time)
      end

      # Filters workflow executions by their close date.
      #
      # @note It is not possible to filter by both start time and close time.
      #
      # @param [Time,DateTime,Date,Integer,String] oldest_time Should
      #   be one of the listed types.  Integers are treated as timestamps
      #   and strings are parsed by DateTime.
      #
      # @param [Time,DateTime,Date,Integer,String] latest_time Should
      #   be one of the listed types.  Integers are treated as timestamps
      #   and strings are parsed by DateTime.
      #
      # @return [WorkflowExecutionCollection] Returns a collection
      #   that will only enumerate or count executions that closed
      #   between the given times.
      #
      def closed_between oldest_time, latest_time
        closed_after(oldest_time).closed_before(latest_time)
      end

      # Filters workflow executions by their close date.
      #
      #     # executions that closed more than an hour ago
      #     domain.workflow_executions.closed_before(Time.now - 3600)
      #
      # @note It is not possible to filter by both start time and close time.
      #
      # @param [Time,DateTime,Date,Integer,String] time Should
      #   be one of the listed types.  Integers are treated as timestamps
      #   and strings are parsed by DateTime.
      #
      # @return [WorkflowExecutionCollection] Returns a collection
      #   that will only enumerate or count executions that closed
      #   before the given time.
      #
      def closed_before time
        collection_with(:closed_before => time)
      end

      # Filters workflow executions by their close date.
      #
      #     # executions that closed within the last hour
      #     domain.workflow_executions.closed_after(Time.now - 3600)
      #
      # @note It is not possible to filter by both start time and close time.
      #
      # @param [Time,DateTime,Date,Integer,String] time Should
      #   be one of the listed types.  Integers are treated as timestamps
      #   and strings are parsed by DateTime.
      #
      # @return [WorkflowExecutionCollection] Returns a collection
      #   that will only enumerate or count executions that closed
      #   after the given time.
      #
      def closed_after time
        collection_with(:closed_after => time)
      end

      # Returns a collection that enumerates workflow executions in reverse
      # chronological order.  By default executions are enumerated in
      # ascending order of their start or close time (ordered by
      # close time when filtered by #closed_between).
      #
      #     # get the latest execution
      #     execution = domain.workflow_executions.reverse_order.first
      #
      # @return [WorkflowExecutionCollection] Returns a collection
      #   that enumerates workflow executions in reverse order.
      #
      def reverse_order
        collection_with(:reverse_order => true)
      end

      # Returns the number of workflow executions within the domain that
      # meet the specified filtering criteria.  Counts can be truncated
      # so you should check the return value.
      #
      #     count = domain.workflow_executions.count
      #     puts(count.truncated? ? "#{count.to_i}+" : count.to_i)
      #
      # @note You may only pass one of the following options:
      #   `:workflow_id`, `:workflow_type`, `:tagged` or
      #   `:status` with a "closed" value (`:status` with `:open` is okay).
      #
      # @note This operation is eventually consistent. The results are best
      #   effort and may not exactly reflect recent updates and changes.
      #
      # @param [Hash] options
      #
      # @option options [Symbol] :status Filters workflow executions by the
      #   given status.  If status is not provided then it defaults to
      #   `:open` unless you pass `:closed_between` (then it defaults to
      #   `:closed`).
      #
      #   If `:status` is anything besides `:open` or `:closed` then
      #   it may not be passed with `:workflow_id`, `:workflow_type` or
      #   `:tagged`.
      #
      #   Accepted values for `:status` include:
      #
      #   * `:open`
      #   * `:closed`
      #   * `:completed`
      #   * `:failed`
      #   * `:canceled`
      #   * `:terminated`
      #   * `:continued`
      #   * `:timed_out`
      #
      # @option options [Time] :started_after Filters workflow executions
      #   down to those started after the given time.
      #
      #   You may pass `:started_after` with `:started_before`, but not with
      #   `:closed_after` or `:closed_before`.
      #
      # @option options [Time] :started_before Filters workflow executions
      #   down to those started before the given time.
      #
      #   You may pass `:started_after` with `:started_before`, but not with
      #   `:closed_after` or `:closed_before`.
      #
      # @option options [Time] :closed_after Filters workflow executions
      #   to those closed after the given time.
      #
      #   * You may pass `:closed_after` with `:closed_before`, but not with
      #     `:started_after` or `:started_before`.
      #
      #   * This option is invalid when counting or listing open executions.
      #
      # @option options [Time] :closed_before Filters workflow executions
      #   to those closed before the given time.
      #
      #   * You may pass `:closed_after` with `:closed_before`, but not with
      #     `:started_after` or `:started_before`.
      #
      #   * This option is invalid when counting or listing open executions.
      #
      # @option options [String] :workflow_id (nil) If specified, workflow
      #   executions are filtered by the provided workflow id.
      #
      # @option options [String] :tagged (nil) Filters workflow executions
      #   by the given tag.
      #
      # @option options [WorkflowType,Hash] :workflow_type (nil)
      #   Filters workflow executions with the given workflow type.
      #   `:workflow_type` can be a {WorkflowType} object or a hash with
      #   a workflow type `:name` and `:version`.
      #
      # @return [Count] Returns a possibly truncated count of
      #   workflow executions.
      #
      def count options = {}

        open_or_closed, client_opts = handle_options(options)

        client_method = :"count_#{open_or_closed}_workflow_executions"
        response = client.send(client_method, client_opts)
        Count.new(response.data['count'], response.data['truncated'])
      end

      # Enumerates workflow executions.
      # @note (see #count)
      # @param (see #count)
      # @option (see #count)
      # @option (see Core::Collection#each)
      # @option options [Boolean] :reverse_order Enumerates the workflow
      #   execution in reverse chronological order if `true`.  The date
      #   used will be the execution start time unless filtering by
      #   closed before/after (then it will sort by the closed time).
      # @return (see Core::Collection#each)
      def each options = {}
        super
      end

      protected
      def collection_with options = {}
        defaults = @defaults.merge(:reverse_order => @reverse_order)
        self.class.new(domain, defaults.merge(options))
      end

      protected
      def _each_item next_token, limit, options = {}, &block

        open_or_closed, client_opts = handle_options(options)

        client_method = :"list_#{open_or_closed}_workflow_executions"

        client_opts[:maximum_page_size] = limit if limit
        client_opts[:next_page_token] = next_token if next_token
        client_opts[:reverse_order] = @reverse_order unless
          client_opts.key?(:reverse_order)

        response = client.send(client_method, client_opts)
        response.data['executionInfos'].each do |desc|

          workflow_id = desc['execution']['workflowId']
          run_id = desc['execution']['runId']

          workflow_execution = WorkflowExecution.new_from(
            client_method, desc, domain, workflow_id, run_id)

          yield(workflow_execution)

        end

        response.data['nextPageToken']

      end

      protected
      def handle_options options

        options = @defaults.merge(options)

        options[:domain] = domain.name

        status = options.delete(:status)
        status ||= (options[:closed_after] or options[:closed_before]) ?
          :closed : :open

        case status
        when :open   then open_or_closed = :open
        when :closed then open_or_closed = :closed
        else
          open_or_closed = :closed
          options[:close_status_filter] = { :status => status.to_s.upcase }
        end

        time_filter(open_or_closed, options)

        if workflow_id = options.delete(:workflow_id)
          options[:execution_filter] = {}
          options[:execution_filter][:workflow_id] = workflow_id
        end

        if tag = options.delete(:tagged)
          options[:tag_filter] = {}
          options[:tag_filter][:tag] = tag
        end

        if type = options.delete(:workflow_type)
          if type.is_a?(WorkflowType)
            type = { :name => type.name, :version => type.version }
          end
          options[:type_filter] = type
        end

        [open_or_closed, options]

      end

      protected
      def time_filter open_or_closed, options

        early_2010 = Time.parse('2010-01-01').to_i

        [%w(start started), %w(close closed)].each do |mode, suffixed|

          after = options.delete(:"#{suffixed}_after")
          before = options.delete(:"#{suffixed}_before")

          next unless after or before

          time_filter = {}
          time_filter[:oldest_date] = to_timestamp(after || early_2010)
          time_filter[:latest_date] = to_timestamp(before) if before

          options[:"#{mode}_time_filter"] = time_filter

        end

        if options.key?(:start_time_filter) and options.key?(:close_time_filter)
          raise 'You may filter by execution start or close time but not both.'
        end

        if options.key?(:close_time_filter) and open_or_closed == :open
          raise 'Unable to filter by closed time for open workflow executions.'
        end

        # if the client does not filter by start or close time, then add
        # a default filter that should return "everything"
        unless options[:start_time_filter] or options[:close_time_filter]
          options[:start_time_filter] = { :oldest_date => early_2010 }
        end

      end

      protected
      def to_timestamp time
        case time
        when Integer then time
        when Time then time.to_i
        else Time.parse(time.to_s).to_i
        end
      end

    end
  end
end