lib/aws/simple_workflow/decision_task_collection.rb



require 'uuidtools'

# Copyright 2011-2012 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
#     http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.

require 'timeout'

module AWS
  class SimpleWorkflow

    # Provides an interface to count, and receive decision tasks
    # ({DecisionTask} objects) from the service.
    #
    # == Counting
    #
    # To get a count of decision tasks needing attention, call {#count}
    # with a task list name:
    #
    #   domain.decision_tasks.count('my-task-list').to_i #=> 7
    #
    # == Getting a single decision task
    #
    # To process a single task use {#poll_for_single_task}:
    #
    #   domain.decision_tasks.poll_for_single_task('my-task-list') do |task|
    #     # this block is yielded to only if a task is found
    #     # within 60 seconds.
    #   end
    #
    # At the end of the block, the decision task is auto-completed.
    # If you prefer you can omit the block and +nil+ or a {DecisionTask}
    # will be returned.
    #
    #   if task = domain.decision_tasks.poll_for_single_task('my-task-list')
    #     # you need to call complete! or the decision task will time out
    #     task.complete!
    #   end
    #
    # == Polling for Tasks in a Loop
    #
    # You can poll indefinetly for tasks in a loop with {#poll}:
    #
    #   domain.decision_tasks.poll('my-task-list') do |task|
    #     # yields once for every decision task found 
    #   end
    #
    # Just like the block form above, the decision task is auto completed at
    # the end of the block.  Please note, if you call +break+ or +return+ 
    # from inside the block, you *MUST* call {DecisionTask#complete!} or
    # the task will timeout.
    #
    # == Events and Decisions
    #
    # Each decision task provides an enumerable collection of both
    # new events ({DecisionTask#new_events}) and all events 
    # ({DecisionTask#events}).  
    # 
    # Based on the events in the workflow execution history, you should
    # call methods on the decision task.  See {DecisionTask} for
    # a complete list of decision methods.
    #
    class DecisionTaskCollection

      include Core::Model
      include OptionFormatters

      # @private
      def initialize domain, options = {}
        @domain = domain
        super
      end

      # @return [Domain]
      attr_reader :domain

      # Returns the number of decision tasks in the specified +task_list+. 
      #
      #   count = decision_tasks.count('task-list-name')
      #   count.truncated? #=> false
      #   count.to_i #=> 7
      #
      # @note This operation is eventually consistent. The results are best 
      #   effort and may not exactly reflect recent updates and changes.
      #
      # @param [String] task_list Name of the task list to count 
      #   decision tasks for.
      #
      # @return [Count] Returns a {Count} object that specifies the number
      #   of decision tasks for the given task list.  This count will
      #   also indiciate if the count was truncated.
      #
      def count task_list
        options = {}
        options[:domain] = domain.name
        options[:task_list] = { :name => task_list }
        response = client.count_pending_decision_tasks(options)
        Count.new(response.data['count'], response.data['truncated'])
      end

      # Polls the service for a single decision task.  The service may
      # hold the request for up to 60 seconds before responding.  
      #
      #   # try to get a single task, may return nil when no tasks available
      #   task = domain.decision_tasks.poll_for_single_task('task-list')
      #   if task
      #     # make decisions ...
      #     task.complete!
      #   end
      #
      # You can optionally pass a block and that will only be yielded
      # to when a decision task is available within the 60 seconds.
      #
      #   domain.decision_tasks.poll_for_single_task('task-list') do |task|
      #      # make decisions
      #      # task.complete! is called for you at the end of the block
      #   end
      #
      # With the block form you do not need to call #complete! on the 
      # decision task.  It will be called when the block exists.
      #
      # @note If you are not using the block form you must call 
      #   {DecisionTask#complete!} yourself or your decision task will
      #   timeout.
      #
      # @param [String] task_list Specifies the task list to poll for 
      #   decision tasks.
      #
      # @param [Hash] options
      #
      # @option options [String] :identity The identity of the decider 
      #   requesting a decision task.  This will be recorded in the
      #   DecisionTaskStarted event in the workflow history. 
      #   If +:identity+ is not passed then the hostname and 
      #   process id will be sent (e.g. "hostname:pid").
      #
      # @option options [Boolean] :reverse_event_order (false)  When true, 
      #   the history events on the decision task will enumerate in 
      #   reverse chronological order (newest events first).  By default 
      #   the events are enumerated in chronological order (oldest first).
      #
      # @option options [Integer] :event_batch_size (1000) When enumerating
      #   events on the decision task, multiple requests may be required
      #   to fetch all of the events.  You can specify the maximum number
      #   of events to request each time (must not be greater than 1000).
      #
      # @yieldparam [DecisionTask] decision_task
      # 
      # @return [DecisionTask,nil] Returns a decision task or +nil+.  If
      #   a block was passed then +nil+ is always returned.  If a block
      #   is not passed, then +nil+ or a {DecisionTask} will be returned.
      #
      def poll_for_single_task task_list, options = {}, &block

        client_opts = {}
        client_opts[:domain] = domain.name
        client_opts[:identity] = identity_opt(options)
        client_opts[:task_list] = { :name => task_list }
        client_opts[:maximum_page_size] = options[:event_batch_size] || 1000
        client_opts[:reverse_order] = options.key?(:reverse_event_order) ?
          options[:reverse_event_order] : false

        response = client.poll_for_decision_task(client_opts)

        if response.data['taskToken']
          decision_task = DecisionTask.new(domain, client_opts, response.data)
          if block_given?
            yield(decision_task)
            decision_task.complete! unless decision_task.responded?
            nil
          else
            decision_task
          end
        else
          nil
        end

      end

      # Polls indefinetly for decision tasks.  Each deicsion task found is
      # yielded to the block.  At the end of the block the decision task
      # is auto-completed ({DecisionTask#complete!} is called).
      #
      #   # yields once for each decision task found, indefinetly
      #   domain.decision_tasks.poll do |decision_task|
      #     # make decisions here
      #   end
      #
      # @note If you to terminate the block (by calling +break+ or +return+)
      #   then it is your responsibility to call #complete! on the decision 
      #   task.
      #
      # @param (see #poll_for_single_task)
      #
      # @option (see #poll_for_single_task)
      # 
      # @yieldparam [DecisionTask] decision_task
      #
      # @return [nil]
      #
      def poll task_list, options = {}, &block
        loop do 
          begin
            poll_for_single_task(task_list, options) do |decision_task|
              yield(decision_task)
            end
          rescue Timeout::Error
            retry
          end
        end
        nil
      end

    end
  end
end