Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2488)

Unified Diff: appengine/swarming/server/task_scheduler_test.py

Issue 2914803004: Fix non-idempotent task that /poll reaped but returned HTTP 500. (Closed)
Patch Set: Add test case Created 3 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « appengine/swarming/server/task_scheduler.py ('k') | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: appengine/swarming/server/task_scheduler_test.py
diff --git a/appengine/swarming/server/task_scheduler_test.py b/appengine/swarming/server/task_scheduler_test.py
index c9884714a5e4d4ccc2901f2696017db5a3ccb449..93e8e804458142b4746efde8be21e13d0158bba3 100755
--- a/appengine/swarming/server/task_scheduler_test.py
+++ b/appengine/swarming/server/task_scheduler_test.py
@@ -205,12 +205,12 @@ class TaskSchedulerApiTest(test_env_handlers.AppTestBase):
expected.update(**kwargs)
return expected
- def _quick_schedule(self, dims, nb_task=1):
+ def _quick_schedule(self, nb_task=1, **kwargs):
"""Schedules a task.
nb_task is 1 if a GAE task queue rebuild-task-cache was enqueued.
"""
- request = self._gen_request(properties={'dimensions': dims})
+ request = self._gen_request(**kwargs)
result_summary = task_scheduler.schedule_request(request, None)
self.assertEqual(nb_task, self.execute_tasks())
return result_summary
@@ -225,9 +225,9 @@ class TaskSchedulerApiTest(test_env_handlers.AppTestBase):
task_queues.assert_bot(bot_dimensions)
self.assertEqual(nb_task, self.execute_tasks())
- def _quick_reap(self, nb_task=1):
+ def _quick_reap(self, nb_task=1, **kwargs):
"""Reaps a task."""
- self._quick_schedule({u'os': u'Windows-3.1.1', u'pool': u'default'})
+ self._quick_schedule(**kwargs)
self._register_bot(self.bot_dimensions, nb_task=nb_task)
reaped_request, _, run_result = task_scheduler.bot_reap_task(
self.bot_dimensions, 'abc', None)
@@ -685,8 +685,7 @@ class TaskSchedulerApiTest(test_env_handlers.AppTestBase):
def test_schedule_request(self):
# It is tested indirectly in the other functions.
- self.assertTrue(
- self._quick_schedule({u'os': u'Windows-3.1.1', u'pool': u'default'}))
+ self.assertTrue(self._quick_schedule())
def mock_dim_acls(self, mapping):
self.mock(config, 'settings', lambda: config_pb2.SettingsCfg(
@@ -698,22 +697,23 @@ class TaskSchedulerApiTest(test_env_handlers.AppTestBase):
def test_schedule_request_forbidden_dim(self):
self.mock_dim_acls({u'pool:bad': u'noone'})
- self._quick_schedule({u'pool': u'good'})
+ self._quick_schedule(properties={'dimensions': {u'pool': u'good'}})
with self.assertRaises(auth.AuthorizationError):
- self._quick_schedule({u'pool': u'bad'})
+ self._quick_schedule(properties={'dimensions': {u'pool': u'bad'}})
def test_schedule_request_forbidden_dim_via_star(self):
self.mock_dim_acls({u'abc:*': u'noone'})
- self._quick_schedule({u'pool': u'default'})
+ self._quick_schedule(properties={'dimensions': {u'pool': u'default'}})
with self.assertRaises(auth.AuthorizationError):
- self._quick_schedule({u'pool': u'default', u'abc': u'blah'})
+ self._quick_schedule(
+ properties={'dimensions': {u'pool': u'default', u'abc': u'blah'}})
def test_schedule_request_id_without_pool(self):
self.mock_dim_acls({u'pool:good': u'mocked'})
with self.assertRaises(auth.AuthorizationError):
- self._quick_schedule({u'id': u'abc'})
+ self._quick_schedule(properties={'dimensions': {u'id': u'abc'}})
auth_testing.mock_is_admin(self)
- self._quick_schedule({u'id': u'abc'}, nb_task=0)
+ self._quick_schedule(properties={'dimensions': {u'id': u'abc'}}, nb_task=0)
def test_schedule_request_id_and_pool(self):
self.mock_dim_acls({u'pool:good': u'mocked'})
@@ -725,10 +725,14 @@ class TaskSchedulerApiTest(test_env_handlers.AppTestBase):
return False
self.mock(auth, 'is_group_member', mocked_is_group_member)
- self._quick_schedule({u'id': u'abc', u'pool': u'unknown'}, nb_task=0)
- self._quick_schedule({u'id': u'abc', u'pool': u'good'}, nb_task=0)
+ self._quick_schedule(
+ properties={'dimensions': {u'id': u'abc', u'pool': u'unknown'}},
+ nb_task=0)
+ self._quick_schedule(
+ properties={'dimensions': {u'id': u'abc', u'pool': u'good'}}, nb_task=0)
with self.assertRaises(auth.AuthorizationError):
- self._quick_schedule({u'id': u'abc', u'pool': u'bad'})
+ self._quick_schedule(
+ properties={'dimensions': {u'id': u'abc', u'pool': u'bad'}})
def test_bot_update_task(self):
run_result = self._quick_reap(nb_task=0)
@@ -1145,6 +1149,137 @@ class TaskSchedulerApiTest(test_env_handlers.AppTestBase):
self.assertEqual(0, self.execute_tasks())
self.assertEqual(4, len(pub_sub_calls)) # RUNNING -> COMPLETED
+ def test_cron_handle_bot_died_no_update_not_idempotent(self):
+ # A bot reaped a task but the handler returned HTTP 500, leaving the task in
+ # a lingering state.
+ pub_sub_calls = self.mock_pub_sub()
+
+ # Test first retry, then success.
+ now = utils.utcnow()
+ request = self._gen_request(
+ created_ts=now,
+ expiration_ts=now+datetime.timedelta(seconds=600),
+ pubsub_topic='projects/abc/topics/def')
+ task_request.init_new_request(request, True, None)
+ _result_summary = task_scheduler.schedule_request(request, None)
+ self.assertEqual(1, self.execute_tasks())
+ self.assertEqual(0, len(pub_sub_calls))
+ self._register_bot(self.bot_dimensions, nb_task=0)
+ request, _, run_result = task_scheduler.bot_reap_task(
+ self.bot_dimensions, 'abc', None)
+ self.assertEqual(
+ task_result.State.RUNNING, run_result.result_summary_key.get().state)
+ self.assertEqual(1, self.execute_tasks())
+ self.assertEqual(1, len(pub_sub_calls)) # PENDING -> RUNNING
+ self.assertEqual(1, run_result.try_number)
+ self.assertEqual(task_result.State.RUNNING, run_result.state)
+ now_1 = self.mock_now(self.now + task_result.BOT_PING_TOLERANCE, 1)
+ self.assertEqual(([], 1, 0), task_scheduler.cron_handle_bot_died('f.local'))
+ self.assertEqual(1, self.execute_tasks())
+ self.assertEqual(2, len(pub_sub_calls)) # RUNNING -> PENDING
+
+ # Refresh and compare:
+ expected = self._gen_result_summary_reaped(
+ costs_usd=[0.],
+ id='1d69b9f088008910',
+ modified_ts=now_1,
+ state=task_result.State.PENDING,
+ try_number=1)
+ self.assertEqual(expected, run_result.result_summary_key.get().to_dict())
+ expected = self._gen_run_result(
+ abandoned_ts=now_1,
+ id='1d69b9f088008911',
+ internal_failure=True,
+ modified_ts=now_1,
+ state=task_result.State.BOT_DIED)
+ self.assertEqual(expected, run_result.key.get().to_dict())
+
+ # Task was retried.
+ now_2 = self.mock_now(self.now + task_result.BOT_PING_TOLERANCE, 2)
+ bot_dimensions_second = self.bot_dimensions.copy()
+ bot_dimensions_second[u'id'] = [u'localhost-second']
+ self._register_bot(bot_dimensions_second, nb_task=0)
+ _request, _, run_result = task_scheduler.bot_reap_task(
+ bot_dimensions_second, 'abc', None)
+ self.assertEqual(1, self.execute_tasks())
+ self.assertEqual(3, len(pub_sub_calls)) # PENDING -> RUNNING
+ logging.info('%s', [t.to_dict() for t in task_to_run.TaskToRun.query()])
+ self.assertEqual(2, run_result.try_number)
+ self.assertEqual(
+ task_result.State.COMPLETED,
+ task_scheduler.bot_update_task(
+ run_result_key=run_result.key,
+ bot_id='localhost-second',
+ cipd_pins=None,
+ output='Foo1',
+ output_chunk_start=0,
+ exit_code=0,
+ duration=0.1,
+ hard_timeout=False,
+ io_timeout=False,
+ cost_usd=0.1,
+ outputs_ref=None,
+ performance_stats=None))
+ expected = self._gen_result_summary_reaped(
+ bot_dimensions=bot_dimensions_second,
+ bot_id=u'localhost-second',
+ completed_ts=now_2,
+ costs_usd=[0., 0.1],
+ duration=0.1,
+ exit_code=0,
+ id='1d69b9f088008910',
+ modified_ts=now_2,
+ started_ts=now_2,
+ state=task_result.State.COMPLETED,
+ try_number=2)
+ self.assertEqual(expected, run_result.result_summary_key.get().to_dict())
+ self.assertEqual(0.1, run_result.key.get().cost_usd)
+
+ self.assertEqual(0, self.execute_tasks())
+ self.assertEqual(4, len(pub_sub_calls)) # RUNNING -> COMPLETED
+
+ def test_bot_poll_http_500_but_bot_reapears_after_BOT_PING_TOLERANCE(self):
+ # A bot reaped a task, sleeps for over BOT_PING_TOLERANCE (2 minutes), then
+ # sends a ping.
+ # In the meantime the cron job ran, saw the job idle with 0 update for more
+ # than BOT_PING_TOLERANCE, re-enqueue it.
+ run_result = self._quick_reap(
+ expiration_ts=self.now+(3*task_result.BOT_PING_TOLERANCE),
+ nb_task=0)
+ to_run_key = task_to_run.request_to_task_to_run_key(
+ run_result.request_key.get())
+ self.assertEqual(None, to_run_key.get().queue_number)
+
+ # See _handle_dead_bot() with special case about non-idempotent task that
+ # were never updated.
+ now_1 = self.mock_now(self.now + task_result.BOT_PING_TOLERANCE, 1)
+ #logging.info('%s', [t.to_dict() for t in task_to_run.TaskToRun.query()])
+ self.assertEqual(([], 1, 0), task_scheduler.cron_handle_bot_died('f.local'))
+
+ # Now the task is available. Bot magically wakes up (let's say a laptop that
+ # went to sleep). The update is denied.
+ self.assertEqual(
+ None,
+ task_scheduler.bot_update_task(
+ run_result_key=run_result.key,
+ bot_id='localhost-second',
+ cipd_pins=None,
+ output='Foo1',
+ output_chunk_start=0,
+ exit_code=0,
+ duration=0.1,
+ hard_timeout=False,
+ io_timeout=False,
+ cost_usd=0.1,
+ outputs_ref=None,
+ performance_stats=None))
+ # Confirm it is denied.
+ run_result = run_result.key.get()
+ self.assertEqual(State.BOT_DIED, run_result.state)
+ result_summary = run_result.result_summary_key.get()
+ self.assertEqual(State.PENDING, result_summary.state)
+ self.assertTrue(to_run_key.get().queue_number)
+
def test_cron_handle_bot_died_same_bot_denied(self):
# Test first retry, then success.
now = utils.utcnow()
« no previous file with comments | « appengine/swarming/server/task_scheduler.py ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698