Index: tools/telemetry/third_party/gsutil/third_party/boto/tests/integration/swf/test_layer1_workflow_execution.py |
diff --git a/tools/telemetry/third_party/gsutil/third_party/boto/tests/integration/swf/test_layer1_workflow_execution.py b/tools/telemetry/third_party/gsutil/third_party/boto/tests/integration/swf/test_layer1_workflow_execution.py |
deleted file mode 100644 |
index 8b876a54eaa8616dfb1c59b165723940f0ab6ddb..0000000000000000000000000000000000000000 |
--- a/tools/telemetry/third_party/gsutil/third_party/boto/tests/integration/swf/test_layer1_workflow_execution.py |
+++ /dev/null |
@@ -1,173 +0,0 @@ |
-""" |
-Tests for Layer1 of Simple Workflow |
- |
-""" |
-import time |
-import uuid |
-import json |
-import traceback |
- |
-from boto.swf.layer1_decisions import Layer1Decisions |
- |
-from tests.integration.swf.test_layer1 import SimpleWorkflowLayer1TestBase |
- |
- |
- |
-class SwfL1WorkflowExecutionTest(SimpleWorkflowLayer1TestBase): |
- """ |
- test a simple workflow execution |
- """ |
- swf = True |
- |
- def run_decider(self): |
- """ |
- run one iteration of a simple decision engine |
- """ |
- # Poll for a decision task. |
- tries = 0 |
- while True: |
- dtask = self.conn.poll_for_decision_task(self._domain, |
- self._task_list, reverse_order=True) |
- if dtask.get('taskToken') is not None: |
- # This means a real decision task has arrived. |
- break |
- time.sleep(2) |
- tries += 1 |
- if tries > 10: |
- # Give up if it's taking too long. Probably |
- # means something is broken somewhere else. |
- assert False, 'no decision task occurred' |
- |
- # Get the most recent interesting event. |
- ignorable = ( |
- 'DecisionTaskScheduled', |
- 'DecisionTaskStarted', |
- 'DecisionTaskTimedOut', |
- ) |
- event = None |
- for tevent in dtask['events']: |
- if tevent['eventType'] not in ignorable: |
- event = tevent |
- break |
- |
- # Construct the decision response. |
- decisions = Layer1Decisions() |
- if event['eventType'] == 'WorkflowExecutionStarted': |
- activity_id = str(uuid.uuid1()) |
- decisions.schedule_activity_task(activity_id, |
- self._activity_type_name, self._activity_type_version, |
- task_list=self._task_list, |
- input=event['workflowExecutionStartedEventAttributes']['input']) |
- elif event['eventType'] == 'ActivityTaskCompleted': |
- decisions.complete_workflow_execution( |
- result=event['activityTaskCompletedEventAttributes']['result']) |
- elif event['eventType'] == 'ActivityTaskFailed': |
- decisions.fail_workflow_execution( |
- reason=event['activityTaskFailedEventAttributes']['reason'], |
- details=event['activityTaskFailedEventAttributes']['details']) |
- else: |
- decisions.fail_workflow_execution( |
- reason='unhandled decision task type; %r' % (event['eventType'],)) |
- |
- # Send the decision response. |
- r = self.conn.respond_decision_task_completed(dtask['taskToken'], |
- decisions=decisions._data, |
- execution_context=None) |
- assert r is None |
- |
- |
- def run_worker(self): |
- """ |
- run one iteration of a simple worker engine |
- """ |
- # Poll for an activity task. |
- tries = 0 |
- while True: |
- atask = self.conn.poll_for_activity_task(self._domain, |
- self._task_list, identity='test worker') |
- if atask.get('activityId') is not None: |
- # This means a real activity task has arrived. |
- break |
- time.sleep(2) |
- tries += 1 |
- if tries > 10: |
- # Give up if it's taking too long. Probably |
- # means something is broken somewhere else. |
- assert False, 'no activity task occurred' |
- # Do the work or catch a "work exception." |
- reason = None |
- try: |
- result = json.dumps(sum(json.loads(atask['input']))) |
- except: |
- reason = 'an exception was raised' |
- details = traceback.format_exc() |
- if reason is None: |
- r = self.conn.respond_activity_task_completed( |
- atask['taskToken'], result) |
- else: |
- r = self.conn.respond_activity_task_failed( |
- atask['taskToken'], reason=reason, details=details) |
- assert r is None |
- |
- |
- def test_workflow_execution(self): |
- # Start a workflow execution whose activity task will succeed. |
- workflow_id = 'wfid-%.2f' % (time.time(),) |
- r = self.conn.start_workflow_execution(self._domain, |
- workflow_id, |
- self._workflow_type_name, |
- self._workflow_type_version, |
- execution_start_to_close_timeout='20', |
- input='[600, 15]') |
- # Need the run_id to lookup the execution history later. |
- run_id = r['runId'] |
- |
- # Move the workflow execution forward by having the |
- # decider schedule an activity task. |
- self.run_decider() |
- |
- # Run the worker to handle the scheduled activity task. |
- self.run_worker() |
- |
- # Complete the workflow execution by having the |
- # decider close it down. |
- self.run_decider() |
- |
- # Check that the result was stored in the execution history. |
- r = self.conn.get_workflow_execution_history(self._domain, |
- run_id, workflow_id, |
- reverse_order=True)['events'][0] |
- result = r['workflowExecutionCompletedEventAttributes']['result'] |
- assert json.loads(result) == 615 |
- |
- |
- def test_failed_workflow_execution(self): |
- # Start a workflow execution whose activity task will fail. |
- workflow_id = 'wfid-%.2f' % (time.time(),) |
- r = self.conn.start_workflow_execution(self._domain, |
- workflow_id, |
- self._workflow_type_name, |
- self._workflow_type_version, |
- execution_start_to_close_timeout='20', |
- input='[600, "s"]') |
- # Need the run_id to lookup the execution history later. |
- run_id = r['runId'] |
- |
- # Move the workflow execution forward by having the |
- # decider schedule an activity task. |
- self.run_decider() |
- |
- # Run the worker to handle the scheduled activity task. |
- self.run_worker() |
- |
- # Complete the workflow execution by having the |
- # decider close it down. |
- self.run_decider() |
- |
- # Check that the failure was stored in the execution history. |
- r = self.conn.get_workflow_execution_history(self._domain, |
- run_id, workflow_id, |
- reverse_order=True)['events'][0] |
- reason = r['workflowExecutionFailedEventAttributes']['reason'] |
- assert reason == 'an exception was raised' |
- |