| Index: tools/telemetry/third_party/gsutilz/third_party/boto/tests/integration/swf/test_layer1_workflow_execution.py
|
| diff --git a/tools/telemetry/third_party/gsutilz/third_party/boto/tests/integration/swf/test_layer1_workflow_execution.py b/tools/telemetry/third_party/gsutilz/third_party/boto/tests/integration/swf/test_layer1_workflow_execution.py
|
| deleted file mode 100644
|
| index 8b876a54eaa8616dfb1c59b165723940f0ab6ddb..0000000000000000000000000000000000000000
|
| --- a/tools/telemetry/third_party/gsutilz/third_party/boto/tests/integration/swf/test_layer1_workflow_execution.py
|
| +++ /dev/null
|
| @@ -1,173 +0,0 @@
|
| -"""
|
| -Tests for Layer1 of Simple Workflow
|
| -
|
| -"""
|
| -import time
|
| -import uuid
|
| -import json
|
| -import traceback
|
| -
|
| -from boto.swf.layer1_decisions import Layer1Decisions
|
| -
|
| -from tests.integration.swf.test_layer1 import SimpleWorkflowLayer1TestBase
|
| -
|
| -
|
| -
|
| -class SwfL1WorkflowExecutionTest(SimpleWorkflowLayer1TestBase):
|
| - """
|
| - test a simple workflow execution
|
| - """
|
| - swf = True
|
| -
|
| - def run_decider(self):
|
| - """
|
| - run one iteration of a simple decision engine
|
| - """
|
| - # Poll for a decision task.
|
| - tries = 0
|
| - while True:
|
| - dtask = self.conn.poll_for_decision_task(self._domain,
|
| - self._task_list, reverse_order=True)
|
| - if dtask.get('taskToken') is not None:
|
| - # This means a real decision task has arrived.
|
| - break
|
| - time.sleep(2)
|
| - tries += 1
|
| - if tries > 10:
|
| - # Give up if it's taking too long. Probably
|
| - # means something is broken somewhere else.
|
| - assert False, 'no decision task occurred'
|
| -
|
| - # Get the most recent interesting event.
|
| - ignorable = (
|
| - 'DecisionTaskScheduled',
|
| - 'DecisionTaskStarted',
|
| - 'DecisionTaskTimedOut',
|
| - )
|
| - event = None
|
| - for tevent in dtask['events']:
|
| - if tevent['eventType'] not in ignorable:
|
| - event = tevent
|
| - break
|
| -
|
| - # Construct the decision response.
|
| - decisions = Layer1Decisions()
|
| - if event['eventType'] == 'WorkflowExecutionStarted':
|
| - activity_id = str(uuid.uuid1())
|
| - decisions.schedule_activity_task(activity_id,
|
| - self._activity_type_name, self._activity_type_version,
|
| - task_list=self._task_list,
|
| - input=event['workflowExecutionStartedEventAttributes']['input'])
|
| - elif event['eventType'] == 'ActivityTaskCompleted':
|
| - decisions.complete_workflow_execution(
|
| - result=event['activityTaskCompletedEventAttributes']['result'])
|
| - elif event['eventType'] == 'ActivityTaskFailed':
|
| - decisions.fail_workflow_execution(
|
| - reason=event['activityTaskFailedEventAttributes']['reason'],
|
| - details=event['activityTaskFailedEventAttributes']['details'])
|
| - else:
|
| - decisions.fail_workflow_execution(
|
| - reason='unhandled decision task type; %r' % (event['eventType'],))
|
| -
|
| - # Send the decision response.
|
| - r = self.conn.respond_decision_task_completed(dtask['taskToken'],
|
| - decisions=decisions._data,
|
| - execution_context=None)
|
| - assert r is None
|
| -
|
| -
|
| - def run_worker(self):
|
| - """
|
| - run one iteration of a simple worker engine
|
| - """
|
| - # Poll for an activity task.
|
| - tries = 0
|
| - while True:
|
| - atask = self.conn.poll_for_activity_task(self._domain,
|
| - self._task_list, identity='test worker')
|
| - if atask.get('activityId') is not None:
|
| - # This means a real activity task has arrived.
|
| - break
|
| - time.sleep(2)
|
| - tries += 1
|
| - if tries > 10:
|
| - # Give up if it's taking too long. Probably
|
| - # means something is broken somewhere else.
|
| - assert False, 'no activity task occurred'
|
| - # Do the work or catch a "work exception."
|
| - reason = None
|
| - try:
|
| - result = json.dumps(sum(json.loads(atask['input'])))
|
| - except:
|
| - reason = 'an exception was raised'
|
| - details = traceback.format_exc()
|
| - if reason is None:
|
| - r = self.conn.respond_activity_task_completed(
|
| - atask['taskToken'], result)
|
| - else:
|
| - r = self.conn.respond_activity_task_failed(
|
| - atask['taskToken'], reason=reason, details=details)
|
| - assert r is None
|
| -
|
| -
|
| - def test_workflow_execution(self):
|
| - # Start a workflow execution whose activity task will succeed.
|
| - workflow_id = 'wfid-%.2f' % (time.time(),)
|
| - r = self.conn.start_workflow_execution(self._domain,
|
| - workflow_id,
|
| - self._workflow_type_name,
|
| - self._workflow_type_version,
|
| - execution_start_to_close_timeout='20',
|
| - input='[600, 15]')
|
| - # Need the run_id to lookup the execution history later.
|
| - run_id = r['runId']
|
| -
|
| - # Move the workflow execution forward by having the
|
| - # decider schedule an activity task.
|
| - self.run_decider()
|
| -
|
| - # Run the worker to handle the scheduled activity task.
|
| - self.run_worker()
|
| -
|
| - # Complete the workflow execution by having the
|
| - # decider close it down.
|
| - self.run_decider()
|
| -
|
| - # Check that the result was stored in the execution history.
|
| - r = self.conn.get_workflow_execution_history(self._domain,
|
| - run_id, workflow_id,
|
| - reverse_order=True)['events'][0]
|
| - result = r['workflowExecutionCompletedEventAttributes']['result']
|
| - assert json.loads(result) == 615
|
| -
|
| -
|
| - def test_failed_workflow_execution(self):
|
| - # Start a workflow execution whose activity task will fail.
|
| - workflow_id = 'wfid-%.2f' % (time.time(),)
|
| - r = self.conn.start_workflow_execution(self._domain,
|
| - workflow_id,
|
| - self._workflow_type_name,
|
| - self._workflow_type_version,
|
| - execution_start_to_close_timeout='20',
|
| - input='[600, "s"]')
|
| - # Need the run_id to lookup the execution history later.
|
| - run_id = r['runId']
|
| -
|
| - # Move the workflow execution forward by having the
|
| - # decider schedule an activity task.
|
| - self.run_decider()
|
| -
|
| - # Run the worker to handle the scheduled activity task.
|
| - self.run_worker()
|
| -
|
| - # Complete the workflow execution by having the
|
| - # decider close it down.
|
| - self.run_decider()
|
| -
|
| - # Check that the failure was stored in the execution history.
|
| - r = self.conn.get_workflow_execution_history(self._domain,
|
| - run_id, workflow_id,
|
| - reverse_order=True)['events'][0]
|
| - reason = r['workflowExecutionFailedEventAttributes']['reason']
|
| - assert reason == 'an exception was raised'
|
| -
|
|
|